Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
contigs_line.startswith("contigs:"),
bases_line.startswith("bases:"))):
return
# Get organism and sample name from the first line
# Assumes organism name only consists of two words,
# i.e. 'Genusname speciesname', and that the remaining
# text on the organism line is the sample name.
try:
organism = " ".join(first_line.strip().split(":", 1)[1].split()[:2])
s_name = self.clean_s_name(" ".join(first_line.split()[3:]), f['root'])
except KeyError:
organism = first_line.strip().split(":", 1)[1]
s_name = f['s_name']
# Don't try to guess sample name if requested in the config
if getattr(config, 'prokka_fn_snames', False):
s_name = f['s_name']
if s_name in self.prokka:
log.debug("Duplicate sample name found! Overwriting: {}".format(s_name))
self.prokka[s_name] = dict()
self.prokka[s_name]['organism'] = organism
self.prokka[s_name]['contigs'] = int(contigs_line.split(":")[1])
self.prokka[s_name]['bases'] = int(bases_line.split(":")[1])
# Get additional info from remaining lines
for line in f['f']:
description, value = line.split(":")
try:
self.prokka[s_name][description] = int(value)
except ValueError:
log.warning("Unable to parse line: '%s'", line)
'modify': lambda x: x * config.read_count_multiplier,
'shared_key': 'read_count'
def htseq_stats_table(self):
""" Take the parsed stats from the HTSeq Count report and add them to the
basic stats table at the top of the report """
headers = OrderedDict()
headers['percent_assigned'] = {
'title': '% Assigned',
'description': '% Assigned reads',
'max': 100,
'min': 0,
'suffix': '%',
'scale': 'RdYlGn'
}
headers['assigned'] = {
'title': '{} Assigned'.format(config.read_count_prefix),
'description': 'Assigned Reads ({})'.format(config.read_count_desc),
'min': 0,
'scale': 'PuBu',
'modify': lambda x: float(x) * config.read_count_multiplier,
'shared_key': 'read_count'
}
self.general_stats_addcols(self.htseq_data, headers)
'modify': lambda x: x * config.read_count_multiplier,
'shared_key': 'read_count'
if pattern.get('fn') is None and pattern.get('fn_re') is None:
return True
break
# Search by file contents (regex)
elif pattern.get('contents_re') is not None:
if re.search(repattern, line):
contents_matched = True
if pattern.get('fn') is None and pattern.get('fn_re') is None:
return True
break
# Break if we've searched enough lines for this pattern
if pattern.get('num_lines') and l >= pattern.get('num_lines'):
break
l += 1
except (IOError, OSError, ValueError, UnicodeDecodeError):
if config.report_readerrors:
logger.debug("Couldn't read file when looking for output: {}".format(f['fn']))
return False
return fn_matched and contents_matched
#!/usr/bin/env python
""" MultiQC code to export data to MegaQC / flat JSON files """
from __future__ import print_function
import gzip
import io
import json
import os
import requests
from multiqc import config
log = config.logger
# Custom encoder to handle lambda functions
class MQCJSONEncoder(json.JSONEncoder):
def default(self, obj):
if callable(obj):
try:
return obj(1)
except:
return None
return json.JSONEncoder.default(self, obj)
def multiqc_dump_json(report):
exported_data = dict()
export_vars = {
'report': [
'data_sources',
if not os.path.isfile(os.path.join(root, fn)):
return None
# Check that we don't want to ignore this file
i_matches = [n for n in config.fn_ignore_files if fnmatch.fnmatch(fn, n)]
if len(i_matches) > 0:
logger.debug("Ignoring file as matched an ignore pattern: {}".format(fn))
return None
# Limit search to small files, to avoid 30GB FastQ files etc.
try:
f['filesize'] = os.path.getsize(os.path.join(root,fn))
except (IOError, OSError, ValueError, UnicodeDecodeError):
logger.debug("Couldn't read file when checking filesize: {}".format(fn))
else:
if f['filesize'] > config.log_filesize_limit:
return False
# Test file for each search pattern
for patterns in spatterns:
for key, sps in patterns.items():
for sp in sps:
if search_file (sp, f, key):
# Check that we shouldn't exclude this file
if not exclude_file(sp, f):
# Looks good! Remember this file
files[key].append(f)
# Don't keep searching this file for other modules
if not sp.get('shared', False):
return
# Don't look at other patterns for this module
else:
'min': 0,
'suffix': 'X',
'scale': 'GnBu',
'hidden': True,
}
self.general_stats_headers['SD_COVERAGE'] = {
'title': 'Median Coverage',
'description': 'The standard deviation coverage in bases of the genome territory, after all filters are applied.',
'min': 0,
'suffix': 'X',
'scale': 'GnBu',
'hidden': True,
}
# user configurable coverage level
try:
covs = config.picard_config['general_stats_target_coverage']
assert type(covs) == list
assert len(covs) > 0
covs = [str(i) for i in covs]
log.debug("Custom Picard coverage thresholds: {}".format(", ".join([i for i in covs])))
except (AttributeError, TypeError, AssertionError):
covs = ['30']
for c in covs:
self.general_stats_headers['PCT_{}X'.format(c)] = {
'id': 'picard_target_bases_{}X'.format(c),
'title': 'Bases ≥ {}X'.format(c),
'description': 'Percent of target bases with coverage ≥ {}X'.format(c),
'max': 100,
'min': 0,
'suffix': '%',
'format': '{:,.0f}',
'scale': 'RdYlGn',
'min': 0,
'suffix': '%',
'scale': 'RdYlGn-rev'
}
headers['after_filtering_q30_rate'] = {
'title': '% > Q30',
'description': 'Percentage of reads > Q30 after filtering',
'min': 0,
'max': 100,
'modify': lambda x: x * 100.0,
'scale': 'GnBu',
'suffix': '%',
'hidden': True
}
headers['after_filtering_q30_bases'] = {
'title': '{} Q30 bases'.format(config.base_count_prefix),
'description': 'Bases > Q30 after filtering ({})'.format(config.base_count_desc),
'min': 0,
'modify': lambda x: x * config.base_count_multiplier,
'scale': 'GnBu',
'shared_key': 'base_count',
'hidden': True
}
headers['after_filtering_gc_content'] = {
'title': 'GC content',
'description': 'GC content after filtering',
'max': 100,
'min': 0,
'suffix': '%',
'scale': 'Blues',
'modify': lambda x: x * 100.0
}
if f['s_name'] in self.fq_screen_data:
log.debug("Duplicate sample name found! Overwriting: {}".format(f['s_name']))
self.add_data_source(f)
self.fq_screen_data[f['s_name']] = parsed_data
# Filter to strip out ignored sample names
self.fq_screen_data = self.ignore_samples(self.fq_screen_data)
if len(self.fq_screen_data) == 0:
raise UserWarning
log.info("Found {} reports".format(len(self.fq_screen_data)))
# Section 1 - Alignment Profiles
# Posh plot only works for around 20 samples, 8 organisms.
if len(self.fq_screen_data) * self.num_orgs <= 160 and not config.plots_force_flat and not getattr(config, 'fastqscreen_simpleplot', False):
self.add_section( content = self.fqscreen_plot() )
# Use simpler plot that works with many samples
else:
self.add_section( plot = self.fqscreen_simple_plot() )
# Write the total counts and percentages to files
self.write_data_file(self.parse_csv(), 'multiqc_fastq_screen')