File: Inser_snp_in_ref.py

package info (click to toggle)
mindthegap 2.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,076 kB
  • sloc: cpp: 4,482; python: 917; sh: 419; makefile: 5
file content (93 lines) | stat: -rw-r--r-- 2,748 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import sys
import csv
from Bio.Seq import Seq
from Bio import SeqIO
from collections import defaultdict
import getopt
csv.field_size_limit(100000000)


def main():
    print(sys.argv[1:])
    try:
        opts, args = getopt.getopt(sys.argv[1:], "s:g:o:", ["snp=", "genome=", "genome_altered="])
    except getopt.GetoptError:
        # print help information and exit:
		#print ('error')  # will print something like "option -a not recognized"
        sys.exit(2)
    print('start')
    genome_parser=""
    vcf_reader=""
    out_m=csv.writer(open(sys.argv[3],"w"),delimiter="\n")
    dic_snp={}
	
    for opt, arg in opts:
        print(opt, arg)
        if opt in ('-s', "--snp"):
            vcf_reader = arg
	#print(i)
        elif opt in ('-g', "--genome"):
            genome_parser = arg
	#print(r)
        elif opt in ('-o', "--genome_altered"):
            out_m = arg
        else:
	        assert False, "unhandled option"
    print('start')
    
    dic_snp=extract_snp(vcf_reader)	
    alter_genome(genome_parser,dic_snp,out_m)
	
def is_valid(inser) :
	allowed="ATCGatcg"
	if all(c in allowed for c in inser ) :
		return True
	else : 
		return False
	
def insert_snp(genome,chromosome,dic_snp) :
	nb_error=0
	#print (chromosome)
	#print ("len before", len(genome))
	if chromosome in dic_snp :
		for listing in dic_snp[chromosome] :
			if listing[1]==genome[listing[0]-1] :
				genome[listing[0]-1]=listing[2]
			else :
				print( "Error SNP: in genome ",genome[listing[0]-2:listing[0]+2], "in vcf ", listing[1],"at position", listing[0])
				nb_error+=1
	print (" nb SNP substitution failed",nb_error)
	return genome

def extract_snp(vcf_reader):
    dic_snp={}
    vcf_readers=csv.reader(open(vcf_reader,'r'),delimiter='\t')
    count_snp=0
    for elits in vcf_readers :
        if '#' not in elits[0] and '@' not in elits[0] :
            if len(elits[3])==1 and len(elits[4])==1  and is_valid(elits[3])==True and is_valid(elits[4])==True:
                dic_snp.setdefault(elits[0],[]).append((int(elits[1]),elits[3],elits[4]))
                count_snp+=1
    print("total_snp in vcf : ", count_snp)
    return dic_snp
			
def alter_genome(genome_parser,dic_snp,out_m):
	genome_parsers=SeqIO.parse(genome_parser, "fasta")
	out_ms=csv.writer(open(out_m,"w"),delimiter="\n")
	for record in genome_parsers :
			elts=str(record.description)
			head=">" + str(record.description)
			old_sequence=list(str(record.seq).upper())
			old_length=len(old_sequence)
			sequence=""
			unmasked_seq=""
			num_exon=0
			total_length=0
			work_seq=insert_snp(old_sequence,elts,dic_snp)
			finals=''.join(work_seq)
			print( 'final', len(finals))
			out_m_lists=[head,finals]
			out_ms.writerow(out_m_lists)

if __name__ == "__main__":
    main()