Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read_gtf():
gr = pr.read_gtf("tests/test_data/ensembl.gtf", full=True)
assert len(gr.columns) == 28
df = gr.df
transcript = df.iloc[1]
assert transcript['tag'] == 'basic'
exon = df[df['exon_id'] == 'ENSE00003812156'].iloc[0]
assert exon['tag'] == 'basic'
gr = pr.read_gtf("tests/test_data/ensembl.gtf",
full=True, duplicate_attr=True)
assert len(gr.columns) == 28
df = gr.df
transcript = df.iloc[1]
assert transcript['tag'] == 'basic'
exon = df[df['exon_id'] == 'ENSE00003812156'].iloc[0]
assert exon['tag'] == 'CCDS,basic'
# assert list(gr.df.columns[:4]) == "Chromosome Start End Strand".split()
def __init__(self, gtf_file, fasta_file):
"""Protein sequences in the genome
"""
self.gtf_file = str(gtf_file)
self.fasta_file = str(fasta_file)
self.fae = FastaStringExtractor(self.fasta_file, use_strand=False)
df = pr.read_gtf(self.gtf_file, output_df=True)
biotype_str = 'transcript_biotype' if 'transcript_biotype' in df else 'gene_biotype'
self.cds = (df
.query("{} == 'protein_coding'".format(biotype_str))
.query("(Feature == 'CDS') | (Feature == 'CCDS')")
.query("(tag == 'basic') | (tag == 'CCDS')")
.set_index('transcript_id'))
# filter valid transcripts
if 'transcript_support_level' in self.cds:
self.cds = self.cds[~self.cds.transcript_support_level.isnull()]
self.cds = self.cds[self.cds.transcript_support_level != 'NA']
self.cds.transcript_support_level = self.cds.transcript_support_level.astype(int)
self.cds = self.cds[~self.cds.transcript_support_level.isna()]
else:
print("Transcript support level not in gtf. Skipping the associated filters.")
self.cds_exons = pr.PyRanges(self.cds.reset_index())
self.transcripts = self.cds.index.unique()
def test_BaseVariantMatcher__read_intervals():
pranges = pyranges.read_gtf(gtf_file)
with pytest.raises(ValueError):
pr = BaseVariantMatcher._read_intervals(
pranges=pranges, gtf_path=gtf_file)
with pytest.raises(ValueError):
pr = BaseVariantMatcher._read_intervals(
intervals=intervals, interval_attrs=['gene_id'])
pr = BaseVariantMatcher._read_intervals(gtf_path=gtf_file)
assert pr.Chromosome.tolist() == ['chr1'] * 5
assert pr.Start.tolist() == [200, 200, 200, 1049, 3029]
assert pr.End.tolist() == [4230, 4230, 402, 1340, 4230]
# assert len(pr.intervals.tolist()) == 5
pr = BaseVariantMatcher._read_intervals(bed_path=example_intervals_bed)
def test_pyranges_to_intervals():
pranges = pyranges.read_gtf(gtf_file)
intervals = list(pyranges_to_intervals(pranges, interval_attrs=[
'gene_id', 'gene_name', 'transcript_id', 'exon_id']))
assert len(intervals) == 5
assert intervals[4].attrs['gene_id'] == 'ENSG00000012048'
assert intervals[4].attrs['gene_name'] == 'BRCA1'
assert intervals[4].attrs['transcript_id'] == 'ENST00000357654'
assert intervals[4].attrs['exon_id'] == 'ENSE00003510592'
pranges = pyranges.read_bed(example_intervals_bed)
intervals = list(pyranges_to_intervals(pranges))
assert len(intervals) == 4
assert intervals[0].start == 2
assert pranges.Chromosome.tolist() == ['chr1'] * 4
>>> # | chr1 | HAVANA | exon | 11868 | 12227 | . | + | . | ENSG00000223972.5 | ... |
>>> # | chr1 | HAVANA | exon | 12612 | 12721 | . | + | . | ENSG00000223972.5 | ... |
>>> # | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
>>> # | chr1 | HAVANA | exon | 1430549 | 1430662 | . | - | . | ENSG00000225285.1 | ... |
>>> # | chr1 | HAVANA | transcript | 1430663 | 1434520 | . | - | . | ENSG00000225285.1 | ... |
>>> # | chr1 | HAVANA | exon | 1434177 | 1434520 | . | - | . | ENSG00000225285.1 | ... |
>>> # | chr1 | HAVANA | exon | 1430663 | 1430954 | . | - | . | ENSG00000225285.1 | ... |
>>> # +--------------+------------+--------------+-----------+-----------+------------+--------------+------------+-------------------+-------+
>>> # Stranded PyRanges object has 4,995 rows and 24 columns from 1 chromosomes.
>>> # For printing, the PyRanges was sorted on Chromosome and Strand.
>>> # 15 hidden columns: gene_type, gene_name, level, havana_gene, transcript_id, transcript_type, transcript_name, transcript_support_level, tag, ... (+ 6 more.)
"""
full_path = get_example_path("gencode_human.gtf.gz")
return pr.read_gtf(full_path)
filter_tag=False,
duplicate_attr=None,
on_error_warn=True,
):
"""
Read, extract and filter valid cds from the given gtf_file
:param gtf_file: path to the GTF file
"""
import pyranges
if duplicate_attr == None:
# One row in the GTF file can have multiple tags;
# therefore, to filter them we have to allow duplicate attrs.
duplicate_attr = filter_tag
df = pyranges.read_gtf(gtf_file, as_df=True, duplicate_attr=duplicate_attr)
cds = CDSFetcher.get_cds_from_gtf(
df,
filter_valid_transcripts=filter_valid_transcripts,
filter_biotype=filter_biotype,
filter_tag=filter_tag,
on_error_warn=on_error_warn
)
cds = cds.set_index("transcript_id")
return cds
gtf_file,
feature_type="5UTR",
infer_from_cds=False,
on_error_warn=True,
) -> pd.DataFrame:
"""
Read, extract and filter valid UTRs from the given gtf_file
:param gtf_file: path to the GTF file
:param feature_type: type of the feature that will be filtered for. In general '5UTR' or '3UTR'.
:param infer_from_cds: Substract the CDS from the exon regions to infer the UTR regions.
Will use 'feature_type' to decide whether '5UTR' or '3UTR' should be returned.
:param on_error_warn: Do not break on error; instead throw warning.
"""
import pyranges
df = pyranges.read_gtf(gtf_file, as_df=True)
utr_df = UTRFetcher.get_utr_from_gtf(
df,
feature_type=feature_type,
infer_from_cds=infer_from_cds,
on_error_warn=on_error_warn
)
utr_df = utr_df.set_index("transcript_id")
return utr_df
def _read_cds(gtf_file, duplicate_attr=False):
"""
Read, extract and filter valid cds from the given gtf_file
:param gtf_file:
"""
df = pyranges.read_gtf(gtf_file, as_df=True,
duplicate_attr=duplicate_attr)
cds = CDSFetcher._get_cds_from_gtf(df)
cds = CDSFetcher._filter_valid_transcripts(cds)
return cds
def _read_intervals(gtf_path=None, bed_path=None, pranges=None,
intervals=None, interval_attrs=None, duplicate_attr=False):
alternatives = [bed_path, pranges, intervals, gtf_path]
if sum(i is not None for i in alternatives) != 1:
raise ValueError('only one of `gth_path`, `bed_path`, `pranges`,'
'`intervals` or should given as input.')
if gtf_path:
import pyranges
pranges = pyranges.read_gtf(gtf_path, duplicate_attr=duplicate_attr)
elif bed_path:
import pyranges
pranges = pyranges.read_bed(bed_path)
elif intervals:
if interval_attrs is not None:
raise ValueError(
'`interval_attrs` is not valid with `intervals`')
pranges = intervals_to_pyranges(intervals)
pranges.intervals = intervals
return pranges
>>> # | 1 | havana | exon | 11868 | 12227 | . | + | . | transcribed_unprocessed_pseudogene | ... |
>>> # | 1 | havana | exon | 12612 | 12721 | . | + | . | transcribed_unprocessed_pseudogene | ... |
>>> # | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
>>> # | 1 | havana | gene | 1173055 | 1179555 | . | - | . | lncRNA | ... |
>>> # | 1 | havana | transcript | 1173055 | 1179555 | . | - | . | lncRNA | ... |
>>> # | 1 | havana | exon | 1179364 | 1179555 | . | - | . | lncRNA | ... |
>>> # | 1 | havana | exon | 1173055 | 1176396 | . | - | . | lncRNA | ... |
>>> # +--------------+------------+--------------+-----------+-----------+------------+--------------+------------+------------------------------------+-------+
>>> # Stranded PyRanges object has 2,446 rows and 28 columns from 1 chromosomes.
>>> # For printing, the PyRanges was sorted on Chromosome and Strand.
>>> # 19 hidden columns: gene_id, gene_name, gene_source, gene_version, tag, transcript_biotype, transcript_id, transcript_name, transcript_source, transcript_support_level, ... (+ 9 more.)
"""
full_path = get_example_path("ensembl_human.gtf.gz")
return pr.read_gtf(full_path)