Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.file=pysam.Fastafile( "ex1.fa" )
"""Test that the one hot coded sequences match."""
for gi in range(2):
# read sequence coordinates
seqs_bed_file = '%s/sequences%d.bed' % (self.out_dir, gi)
seq_coords = read_seq_coords(seqs_bed_file)
# read one hot coding from TF Records
train_tfrs_str = '%s/tfrecords/train-%d-0.tfr' % (self.out_dir, gi)
seqs_1hot, _, genomes = self.read_tfrecords(train_tfrs_str)
# check genome
self.assertEqual(len(np.unique(genomes)), 1)
self.assertEqual(genomes[0], gi)
# open FASTA
fasta_open = pysam.Fastafile(self.fasta_files[gi])
# check random sequences
seq_indexes = random.sample(range(seqs_1hot.shape[0]), 32)
for si in seq_indexes:
sc = seq_coords[si]
seq_fasta = fasta_open.fetch(sc.chr, sc.start, sc.end).upper()
seq_1hot_dna = hot1_dna(seqs_1hot[si])
self.assertEqual(seq_fasta, seq_1hot_dna)
def test_binned_pad_wg():
expected = stat_coverage_binned_refimpl(
Samfile('fixture/test.bam'),
Fastafile('fixture/ref.fa'))
actual = pysamstats.stat_coverage_binned(Samfile('fixture/test.bam'),
Fastafile('fixture/ref.fa'))
_compare_iterators(expected, actual)
kwargs = {'window_size': 200,
'window_offset': 100}
for f, needs_ref in binned_functions:
debug(f.__name__)
if needs_ref:
a = f(Samfile('fixture/test.bam'), Fastafile('fixture/ref.fa'),
**kwargs)
else:
a = f(Samfile('fixture/test.bam'), **kwargs)
assert sorted(set(a['chrom'])) == [b'Pf3D7_01_v3', b'Pf3D7_02_v3',
b'Pf3D7_03_v3']
eq_(100, a[a['chrom'] == b'Pf3D7_01_v3']['pos'][0])
eq_(50100, a[a['chrom'] == b'Pf3D7_01_v3']['pos'][-1])
eq_(100, a[a['chrom'] == b'Pf3D7_02_v3']['pos'][0])
eq_(60100, a[a['chrom'] == b'Pf3D7_02_v3']['pos'][-1])
def main(args):
'''
this is here for testing/debugging
'''
reffile = None
if not args.noref:
if not args.refFasta:
raise ValueError("no reference given and --noref not set")
reffile = pysam.Fastafile(args.refFasta)
(chr,coords) = args.regionString.split(':')
(start,end) = coords.split('-')
start = re.sub(',','',start)
end = re.sub(',','',end)
start = int(start)
end = int(end)
if not os.path.exists(args.tmpdir):
os.mkdir(args.tmpdir)
print "INFO\t" + now() + "\tcreated tmp directory: " + args.tmpdir
contigs = asm(chr, start, end, args.bamFileName, reffile, int(args.kmersize), args.tmpdir)
maxlen = 0
maxeid = None
import pysam
parser = argparse.ArgumentParser("Convert genotyped BreakDancer output to VCF",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--sv_file", metavar="sv_file", help="SV file", required=False, default="-")
parser.add_argument("--reference", metavar="reference", help="Reference file", required=True)
parser.add_argument("--sample", metavar="Sample", help="Sample name", required=True)
parser.add_argument("--sort", action="store_true", help="Sort the input")
args = parser.parse_args()
input_handle = sys.stdin if args.sv_file == "-" else open(args.sv_file)
fasta_handle = pysam.Fastafile(args.reference)
def get_contigs(fai_filename):
fai_file = open(fai_filename)
contigs = {}
contigs_order = {}
linenum = 0
for line in fai_file.readlines():
line = line.strip()
line_items = line.split("\t")
name, length = line_items[0:2]
name = name.split(" ")[0]
contigs[name] = int(length)
contigs_order[name] = linenum
linenum += 1
fai_file.close()
def bam_minorallele(bam_fname, ref_fname, min_qual=0, min_count=0, num_alleles=0, name=None, min_ci_low=None):
bam = pysam.Samfile(bam_fname, "rb")
ref = pysam.Fastafile(ref_fname)
if not name:
name = os.path.basename(bam_fname)
if num_alleles:
print "# %s" % num_alleles
sys.stdout.write('\t'.join("chrom pos refbase altbase total refcount altcount background refback altback".split()))
if num_alleles and rscript:
sys.stdout.write("\tci_low\tci_high\tallele_lowt\tallele_high")
sys.stdout.write('\n')
for pileup in bam_pileup_iter(bam, mask=1540):
chrom = bam.getrname(pileup.tid)
counts = {'A': 0, 'C': 0, 'G': 0, 'T': 0}
paired reads are treated as separate, individual reads.
This MUST maintain seq_i to name and read_i to name mappings between iterations, so that a single
name always maintains the same index from one iteration to the next. One result of this requirement
is that the various matrices can always get larger in a later t, but never smaller (as reads or seqs are added)
"""
if self._VERBOSE:
sys.stderr.write("Reading bam file %s at %s...\n"%(bam_filename, ctime()))
start_time = time()
initial_iteration = self.iteration_i < 0 # this is initial iteration
self.current_bam_filename = bam_filename
self.current_reference_fasta_filename = reference_fasta_filename
self.fastafile = pysam.Fastafile(self.current_reference_fasta_filename)
for d in [self.sequence_name2sequence_i, self.sequence_i2sequence_name,
self.read_name2read_i, self.read_i2read_name]:
if initial_iteration:
d.append({})
else:
d.append(d[-1].copy()) # get same name mappings from previous round, add to them if new reads or seqs
if len(d) > 2:
trash = d.pop(0) # no longer care about t-2
del trash
# start new seq_i's or read_i's at next integer if there is a dictionary from the previous iteration.
if initial_iteration:
seq_i = 0
read_i = 0
else:
def singleprocess_permutation(info):
bed_list, mut_df, opts = info
current_chrom = bed_list[0].chrom
logger.info('Working on chromosome: {0} . . .'.format(current_chrom))
num_iterations = opts['num_iterations']
gene_fa = pysam.Fastafile(opts['input'])
gs = GeneSequence(gene_fa, nuc_context=opts['context'])
# go through each gene to perform simulation
result = []
for bed in bed_list:
# compute context counts and somatic bases for each context
gene_tuple = mc.compute_mutation_context(bed, gs, mut_df, opts)
context_cts, context_to_mutations, mutations_df, gs, sc = gene_tuple
if context_to_mutations:
## get information about observed non-silent counts
if opts['summary'] and not num_iterations:
tmp_mut_info = mc.get_aa_mut_info(mutations_df['Coding Position'],
mutations_df['Tumor_Allele'].tolist(),
gs)
# calc mutation info summarizing observed mutations
def get_qual_counter(args):
normal_bam = pysam.Samfile(args.normal_bam, 'rb')
tumour_bam = pysam.Samfile(args.tumour_bam, 'rb')
ref_genome = pysam.Fastafile(args.reference_genome)
counter = JointBinaryQualityCounter(normal_bam, tumour_bam, ref_genome)
if args.positions_file is not None:
counter = PositionsCounter(counter, args.positions_file)
return counter
def get_sequence(sequence, ch, ss, es, reverse=False, complement=False, rna=False, ex=0, strand=None):
import pysam
sequence = pysam.Fastafile(sequence)
if not ch:
seq = sequence.fetch(max(0, ss - ex), es + ex)
else:
seq = sequence.fetch(ch, max(0, ss-ex), es + ex)
seq = seq.upper()
if strand == "-" and not reverse and not complement:
reverse = True
complement = True
if complement:
t = {"A":"T", "T":"A", "C":"G", "G":"C", "N":"N" }
ns = ""
for s in seq: ns += t[s]
seq = ns