-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathvcf_var_rel_pos.py
executable file
·60 lines (51 loc) · 2.16 KB
/
vcf_var_rel_pos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#!/usr/bin/env python
"""Converts variant positions in given annotated vcf (SNPeff) into
relative CDS positions
"""
import sys
from math import ceil
import vcf
def main(ann_vcf, scale=1000):
"""main function: takes annotated vcf file as input and spits out
relative variant positions
"""
print "#CHROM\tPOS\tRELPOS(MAX={})\tIMPACT\tGENE".format(scale)
if ann_vcf == "-":
vcfr = vcf.VCFReader(sys.stdin)
else:
vcfr = vcf.VCFReader(filename=ann_vcf)
idx = dict()
for (key, pattern) in [('cds', ' CDS.pos / CDS.length '),
('impact', ' Annotation_Impact '),
('gene', ' Gene_ID ')]:
try:
idx[key] = dict(vcfr.infos)['ANN'].desc.split('|').index(pattern)
#sys.stderr.write("DEBUG: idx[{}]={}\n".format(key, idx[key]))
except:
sys.stderr.write(
"{} annotation in ANN INFO field of {} not found".format(key, ann_vcf))
raise
for var in vcfr:
if not var.INFO.has_key('ANN'):
sys.stderr.write(
"WARNING: {}:{} has no annotation key in INFO field. Skipping...\n".format(
var.CHROM, var.POS, ann_vcf))
continue
#if len(var.INFO['ANN'])>1: sys.stderr.write("DEBUG: >1 ANN for {}".format(var))
# printed once per annotation
for ann in var.INFO['ANN']:
info = dict()
for k in idx.keys():
info[k] = ann.split('|')[idx[k]]
#sys.stderr.write("DEBUG: info[{}]={}\n".format(k, info[k]))
if not info['cds']:
if not 'intergenic_region' in ann:
sys.stderr.write(
"No CDS info found for var {}:{} but not intergenic\n".format(
var.CHROM, var.POS))
else:
pos, length = [int(x) for x in info['cds'].split('/')]
fake_pos = int(ceil(pos / float(length) * scale))
print "{}\t{}\t{}\t{}\t{}".format(var.CHROM, var.POS, fake_pos, info['impact'], info['gene'])
if __name__ == "__main__":
main(sys.argv[1])