File: eval_disco_one_snp_per_locus.py

package info (click to toggle)
discosnp 1%3A2.6.2-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,656 kB
  • sloc: python: 5,893; sh: 2,966; cpp: 2,692; makefile: 14
file content (193 lines) | stat: -rw-r--r-- 7,545 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# 

import sys
import getopt

# Calcule un recall locus-based : vcf disco filtré avec 1 SNP par cluster
# recall-locus = # locus avec un VP / # locus reference
# locus-dup = # locus de la ref qui sont présent au moins 2 fois dans le vcf disco
# compare SNP du génome simulé (argv[1] --> vcf de référence)
# avec ceux d'un vcf DiscoSnp (arg[2])
# 1 prediction  = chr_pos_allele_pair, VP si pos et allele pair identique

def index_reference(ref_vcf):

    ##construit un index avec le vcf de ref, sour la forme :  {"chr3R" : {"12957641" : ["A_T","A_G"]}}  + un index SNP_to_locus {"chr1_12957641" : "loci56681"} + un dico des locus {"loci56681" : 0}
    ## gere les sites multi-alleliques
    ## alleles sortés
    
    #chr3R    12957641    1    T    A    .    loci56681;128    .    ./.
    
    SNP_index = {}
    SNP_to_locus = {}  # for each distinct chr_pos, stores the locus, key is "chr_pos"
    all_loci = {}
    filin = open(ref_vcf, 'r')
    nb_total_loci = 0
   
    while True:
        line = filin.readline()
        if not line: break
        if line.startswith("#"): continue

        chr = line.split("\t")[0]
        pos = line.split("\t")[1]
        ref = line.split("\t")[3].upper()
        alt = line.split("\t")[4].upper()
        
        locus = line.split("\t")[6].split(";")[0]

        if ref<alt:
            bases = '_'.join([ref,alt])
        else:
            bases = '_'.join([alt,ref])
      
        ##initialise index si pas fait
        if chr not in SNP_index:
            SNP_index[chr] = {}
        if pos not in SNP_index[chr]:
            SNP_index[chr][pos] = []
            SNP_to_locus["{0}_{1}".format(chr, pos)] = locus
        if bases not in SNP_index[chr][pos]:
            SNP_index[chr][pos].append(bases)

        if locus not in all_loci:
            all_loci[locus] = 0
            nb_total_loci += 1

    filin.close()
    #print(SNP_index) 
    return SNP_index, SNP_to_locus, all_loci, nb_total_loci

def extract_multiple_pos(chr,pos,XA_field):
    dict_pos = {}
    #Ty=SNP;Rk=0.025231;UL=1;UR=1;CL=.;CR=.;Genome=.;Sd=1;XA=chr2L_+5176594,chr2L_+5177242,chr2L_+5177566

    ## ajouter position primaire
    dict_pos[chr]=[]
    dict_pos[chr].append(pos)
    for i in XA_field.split(","):
        pos = i.split("_")[-1]
        chr = i.split("_")[0]
        if chr not in dict_pos: dict_pos[chr] = []
        dict_pos[chr].append(pos)

    return dict_pos



def comp_disco_vcf(vcf_disco, index, SNP_to_locus, all_loci):

    cpt=0
    nb_total_prediction = 0 # pour vérifier : nb_TP_prediction+nb_FP=nb_total_prediction
    nb_locus_duplicated = 0 # compte le nb de vrais loci qui apparaissent + d'une fois
    nb_locus_recall = 0 # compte le nb de vrais locis trouvés (au moins 1 fois)
    
    filin = open(vcf_disco, 'r')
    while True:
        #random_genome	49694	9997	G	T	.	MULTIPLE	Ty=SNP;Rk=1;UL=83;UR=4;CL=83;CR=4;Genome=G;Sd=1	GT:DP:PL:AD:HQ
        line = filin.readline()
        if not line: break
        if line.startswith("#"): continue
 
        # WARNING : enleve ce test pour la compatibilité avec Stacks, dans nos tests on ne demande pas d'INDEL a disco, outes les lignes sont des ty=SNP.
        #thistype = line.split("\t")[7].split(";")[0].split("=")[1].strip()
        #print(thistype)
        #if thistype != "SNP": continue

        nb_total_prediction += 1
        
        chr = line.split("\t")[0]
        pos = line.split("\t")[1]
        id = line.split("\t")[2]
        ref = line.split("\t")[3].upper()
        alt = line.split("\t")[4].upper()

        if ref<alt:
            bases = '_'.join([ref,alt])
        else:
            bases = '_'.join([alt,ref])
    
        #print("chr"+str(chr)+" pos "+str(pos)+" alleles "+ref+"-"+alt+" geno"+str(pred_geno))
        
#        if line.startswith("SNP"): ##si non mappé
#            nb_FP += 1
#            filout.write("{0}\t{1}\n".format(id, "FP"))
#            continue

        #### cas des alignements multiples ####
        splitXA = line.split("\t")[7].split("XA=")
        if line.split("\t")[6].strip() == "MULTIPLE" and len(splitXA)>1  :

            SNP_trouve = 0
            XA_field = splitXA[1].split(";")[0]
            dict_multiple_pos = extract_multiple_pos(chr,pos,XA_field)
            for chr in dict_multiple_pos:
                if SNP_trouve:
                    break
                if chr not in index:
                    continue

                for pos_mult in dict_multiple_pos[chr]:
                    if SNP_trouve:
                        break # sort de la boucle sur les positions multiples
                    if pos_mult in index[chr]:
                        if len(index[chr][pos_mult])>1 or bases == index[chr][pos_mult][0]: # si multi-allelique compte comme TP sans checker les bases
                            SNP_trouve = 1
                            snp_found="{0}_{1}".format(chr, pos_mult)
                            locus_found = SNP_to_locus[snp_found]
                            if all_loci[locus_found] == 0:
                                nb_locus_recall +=1
                                all_loci[locus_found] = 1
                            elif all_loci[locus_found] == 1:
                                nb_locus_duplicated += 1
                                all_loci[locus_found] = 2

            continue ##on passe à la ligne suivante

        ### cas des alignements uniques ou multiples sans champ XA ###

        if chr in index:
            #print("SNP sur chr indexé :" + chr + "_" + pos)
            if not pos in index[chr]: 
                continue

            if len(index[chr][pos])>1 or bases == index[chr][pos][0]: # si multi-allelique compte comme TP sans checker les bases
                snp_found="{0}_{1}".format(chr, pos)
                locus_found = SNP_to_locus[snp_found]
                if all_loci[locus_found] == 0:
                    nb_locus_recall +=1
                    all_loci[locus_found] = 1
                elif all_loci[locus_found] == 1:
                    nb_locus_duplicated += 1
                    all_loci[locus_found] = 2

    filin.close()
    return nb_locus_recall, nb_locus_duplicated

def main(ref,disco):
    index, SNP_to_locus, all_loci, nb_total_loci = index_reference(ref)
    #print("indexing done\n")
    nb_locus_recall, nb_locus_duplicated = comp_disco_vcf(disco, index, SNP_to_locus, all_loci)

    locus_recall = 100*float(nb_locus_recall)/float(nb_total_loci)
    duplicated_ratio1 = 100*float(nb_locus_duplicated)/float(nb_total_loci)
    duplicated_ratio2 = 100*float(nb_locus_duplicated)/float(nb_locus_recall)
    
    print("#############################################")

    print("#############################################")
    print("Locus Recall on position-allele only. (All multiple predictions at the same position are counted)")
    print(f"Total nb of loci (true vcf) : {nb_total_loci}")
    print(f"Nb loci with a TP SNP : {nb_locus_recall}")
    print(f"Nb duplicated loci (with 2 or more TP SNPs) : {nb_locus_duplicated}")
    print(f"Locus recall : {locus_recall:.2f}")
    print(f"Locus duplication ratio (over all loci) : {duplicated_ratio1:.2f}")
    print(f"Locus duplication ratio (over found loci) : {duplicated_ratio2:.2f}")

if __name__ == "__main__":
    main(sys.argv[1], sys.argv[2])