Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pparser = parser.add_argument_group('Proportional sampling options')
pparser.add_argument('-P', '--proportional', default=False, action='store_true',
help='Activate proportional sampling, thus keeping depth variations of the pileup.')
pparser.add_argument('-S', '--seed', default=None, type=int,
help='Random seed for proportional downsampling of reads.')
args = parser.parse_args()
if args.threads == -1:
args.threads = multiprocessing.cpu_count()
with pysam.AlignmentFile(args.bam) as bam:
ref_lengths = dict(zip(bam.references, bam.lengths))
if args.regions is not None:
regions = parse_regions(args.regions, ref_lengths=ref_lengths)
else:
regions = [Region(ref_name=r, start=0, end=ref_lengths[r]) for r in bam.references]
if args.proportional:
worker = functools.partial(subsample_region_proportionally, args=args)
else:
worker = functools.partial(subsample_region_uniformly, args=args)
enough_depth = []
with ProcessPoolExecutor(max_workers=args.threads) as executor:
for res in executor.map(worker, regions):
enough_depth.append(res)
if args.any_fail and not all(enough_depth):
raise RuntimeError('Insufficient read coverage for one or more requested regions.')
if args.all_fail and all([not x for x in enough_depth]):
description='Calculate read coverage depth from a bam.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('bam', help='.fasta/fastq file.')
parser.add_argument('-r', '--regions', nargs='+', help='Only process given regions.')
parser.add_argument('-p', '--prefix', help='Prefix for output, defaults to basename of bam.')
parser.add_argument('-s', '--stride', type=int, default=1000, help='Stride in genomic coordinate.')
parser.add_argument('--summary_only', action='store_true',
help='Output only the depth_summary.txt file')
args = parser.parse_args()
bam = pysam.AlignmentFile(args.bam)
ref_lengths = dict(zip(bam.references, bam.lengths))
if args.regions is not None:
regions = parse_regions(args.regions, ref_lengths=ref_lengths)
else:
regions = [Region(ref_name=r, start=0, end=ref_lengths[r]) for r in bam.references]
summary = {}
for region in regions:
# write final depth
prefix = args.prefix
if prefix is None:
prefix = os.path.splitext(os.path.basename(args.bam))[0]
region_str = '{}_{}_{}'.format(region.ref_name, region.start, region.end)
df = coverage_of_region(region, args.bam, args.stride)
summary[region_str] = df['depth'].describe()
if not args.summary_only: