Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
interval = Interval('chr1', 4, 14)
seq = variant_seq_extractor.extract(interval, variants, anchor=4)
assert len(seq) == interval.end - interval.start
assert seq == 'GAACGTAACG'
interval = Interval('chr1', 2, 5)
seq = variant_seq_extractor.extract(interval, variants, anchor=3)
assert len(seq) == interval.end - interval.start
assert seq == 'GCG'
interval = Interval('chr1', 24, 34)
seq = variant_seq_extractor.extract(interval, variants, anchor=27)
assert len(seq) == interval.end - interval.start
assert seq == 'TGATAACGTA'
interval = Interval('chr1', 25, 35)
seq = variant_seq_extractor.extract(interval, variants, anchor=34)
assert len(seq) == interval.end - interval.start
assert seq == 'TGATAACGTA'
interval = Interval('chr1', 34, 44)
seq = variant_seq_extractor.extract(interval, variants, anchor=37)
assert len(seq) == interval.end - interval.start
assert seq == 'AACGTAACGT'
interval = Interval('chr1', 34, 44)
seq = variant_seq_extractor.extract(interval, variants, anchor=100)
assert len(seq) == interval.end - interval.start
assert seq == 'AACGTAACGT'
interval = Interval('chr1', 5, 11, strand='+')
seq = variant_seq_extractor.extract(
import pytest
from conftest import vcf_file, sample_5kb_fasta_file
from kipoiseq.dataclasses import Variant, Interval
from kipoiseq.extractors.vcf_query import NumberVariantQuery
from kipoiseq.extractors.vcf import MultiSampleVCF
fasta_file = sample_5kb_fasta_file
intervals = [
Interval('chr1', 3, 10),
Interval('chr1', 4, 30),
Interval('chr1', 19, 30)
]
@pytest.fixture
def multi_sample_vcf():
return MultiSampleVCF(vcf_file)
def test_MultiSampleVCF__next__(multi_sample_vcf):
variant = next(multi_sample_vcf)
assert variant.chrom == 'chr1'
assert variant.pos == 4
assert variant.ref == 'T'
assert variant.alt == 'C'
def test_single_seq_vcf_seq_extract(single_seq_vcf_seq_extractor):
interval = Interval('chr1', 2, 9)
seq = single_seq_vcf_seq_extractor.extract(interval, anchor=3)
assert seq == 'GCGAACG'
interval.chrom = 'asd'
with pytest.raises(AttributeError):
interval.start = 10
with pytest.raises(AttributeError):
interval.end = 300
with pytest.raises(AttributeError):
interval.strand = '+'
assert interval.strand == '-'
# non-fixed arguments
interval.name = 'asd'
interval.score = 10
assert interval.unstrand().strand == '.'
assert interval == Interval.from_pybedtools(interval.to_pybedtools())
assert isinstance(interval.to_pybedtools(), pybedtools.Interval)
i2 = interval.shift(10, use_strand=False)
# original unchanged
assert interval.start == 10
assert interval.end == 20
assert i2.start == 20
assert i2.end == 30
i2 = interval.shift(10) # use_strand = True by default
assert i2.start == 0
assert i2.end == 10
assert not interval.shift(20, use_strand=True).is_valid()
def test_MultiSampleVCF_fetch_variant(multi_sample_vcf):
interval = Interval('chr1', 3, 5)
assert len(list(multi_sample_vcf.fetch_variants(interval))) == 2
assert len(list(multi_sample_vcf.fetch_variants(interval, 'NA00003'))) == 1
assert len(list(multi_sample_vcf.fetch_variants(interval, 'NA00001'))) == 0
interval = Interval('chr1', 7, 12)
assert len(list(multi_sample_vcf.fetch_variants(interval))) == 0
assert len(list(multi_sample_vcf.fetch_variants(interval, 'NA00003'))) == 0
def test_MultiSampleVCF_fetch_variant(multi_sample_vcf):
interval = Interval('chr1', 3, 5)
assert len(list(multi_sample_vcf.fetch_variants(interval))) == 2
assert len(list(multi_sample_vcf.fetch_variants(interval, 'NA00003'))) == 1
assert len(list(multi_sample_vcf.fetch_variants(interval, 'NA00001'))) == 0
interval = Interval('chr1', 7, 12)
assert len(list(multi_sample_vcf.fetch_variants(interval))) == 0
assert len(list(multi_sample_vcf.fetch_variants(interval, 'NA00003'))) == 0
def restore(self, sequence: Sequence):
"""
Args:
sequence: `pyfaidx.Sequence` which convert all interval inside
to `Seqeunce` objects.
"""
for i, interval in enumerate(self):
# interval.end can be bigger than interval.start
interval_len = max(0, interval.end - interval.start)
if type(self[i]) == Interval:
start = interval.start - sequence.start
end = start + interval_len
self[i] = sequence[start: end]
def _upstream_builder(up_variants, interval, anchor, iend):
up_sb = IntervalSeqBuilder()
prev = anchor
for ref, alt in up_variants:
if ref.start >= iend:
break
up_sb.append(Interval(interval.chrom, prev, ref.start))
up_sb.append(alt)
prev = ref.end
up_sb.append(Interval(interval.chrom, prev, iend))
return up_sb
# Arguments:
variant: variant object or variant id as string.
# Returns
Variant object.
# Example
```python
>>> MultiSampleVCF(vcf_path).get_variant("chr1:4:T:['C']")
```
"""
if type(variant) == str:
variant = Variant.from_str(variant)
variants = self.fetch_variants(
Interval(variant.chrom, variant.pos - 1, variant.pos))
for v in variants:
if v.ref == variant.ref and v.alt == variant.alt:
return v
raise KeyError('Variant %s not found in vcf file.' % str(variant))
def _regions_from_variants(self, variants, variant_gap=150):
regions = list()
for chrom, vs in self._group_variants_by_chrom(variants).items():
starts = sorted(v.pos for v in vs)
start_i = starts[0]
prev_i = starts[0]
for i in starts[1:]:
if prev_i + 150 < i:
regions.append(Interval(chrom, start_i - 1, prev_i))
start_i = i
prev_i = i
regions.append(Interval(chrom, start_i - 1, prev_i))
return regions