Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pos_chunker = chunker(c.pileups[0].positions)
if c.labels is None:
labels_gen = itertools.repeat(None)
else:
labels_gen = chunker(c.labels)
if c.ref_seq is None:
ref_seq_gen = itertools.repeat(None)
else:
ref_seq_gen = chunker(c.ref_seq)
for pos in pos_chunker:
chunk_labels = next(labels_gen)
chunk_ref_seq = next(ref_seq_gen)
chunk_pileups = tuple([Pileup(p.bam, p.ref_name, next(rc), pos) for p, rc in zip(c.pileups, read_chunkers)])
msg = "Chunking positions {}:{} ({} bases)"
logger.debug(msg.format(pos['major'][0], pos['major'][-1], len(pos)))
yield LabelledPileup(pileups=chunk_pileups, labels=chunk_labels, ref_seq=chunk_ref_seq)
array, along with some meta information.
:param ref_name: name of reference to which reads are aligned.
:param start: start position in reference coordinates.
:param end: end position in reference coordinates.
:returns: a tuple of `Pileup` tuples
"""
# TODO: caching the reindexed regions
pileups, ref_seqs = zip(*(tv.pileup(ref_name, start=start, end=end) for tv in self.tviews))
if self.n_inputs > 1:
positions = get_common_index([p.positions for p in pileups])
# all pileups should be aligned to same ref, so just use the first
ref_p = Pileup(reads=ref_seqs[0], positions=pileups[0].positions)
ref_seq = reindex_labels(ref_p, positions)
pileups = (reindex_pileup(p, positions, self._minor_gaps) for p in pileups)
else:
ref_seq = ref_seqs[0]
pileups = tuple(pileups)
return pileups, ref_seq
if 'ref_seq' not in hdf[chunk_str]:
ref_seq = None
else:
ref_seq = np.char.decode(hdf[chunk_str]['ref_seq'][()])
if data_types is None:
data_types = [d for d in hdf[chunk_str] if d.startswith('datatype')]
pileups = []
for dtype in data_types:
grp = hdf[chunk_str][dtype]
ref_name = grp.attrs['rname']
bam = grp.attrs['bam']
reads = grp['data'][()]
positions = grp['positions'][()]
pileups.append(Pileup(bam, ref_name, reads, positions))
yield LabelledPileup(pileups=pileups, labels=labels, ref_seq=ref_seq)
is_minor_pos = new_positions['minor'] > 0
for i, row in enumerate(reindexed):
if minor_gaps:
is_minor_pos_and_nan = np.logical_and(is_minor_pos, np.isnan(row))
reindexed[i, np.where(is_minor_pos_and_nan)] = encoding[_ref_gap_]
# fill all remaining np.nan with _gap_
reindexed[i, np.where(np.isnan(reindexed[i]))] = encoding[_gap_]
# convert back to pileup.dtype (we used nan above)
reindexed = reindexed.astype(pileup.reads.dtype)
# join up / expand read separators (relies on equality, hence done after casting back)
for i, row in enumerate(reindexed):
reindexed[i] = join_read_seps(reindexed[i], encoding[_read_sep_], encoding[_gap_])
return Pileup(pileup.bam, pileup.ref_name, reindexed, new_positions)
columns = int(self._columns_per_base * (end - start))
tview = self._run_tview(ref_name, start, columns)
actual_end = tview.end # actually this is the last number in the header line
self._columns_per_base = self._column_pad * columns / float(actual_end - start)
self.logger.debug("Grabbing tview region required {} iterations.".format(it))
t0 = now()
reads, positions, ref_seq = self._tview_to_numpy(tview, end)
t1 = now()
self.logger.debug(
"Parsed tview output for {}:{}-{}. Shape: {}.\t({:.3f}s)".format(
self._bam, start, end, reads.shape, t1 - t0)
)
assert len(ref_seq) == len(positions)
self._ref_seq = ref_seq
self._pileup = Pileup(self._bam, ref_name, reads, positions)
return self