File: validate_repeat.py

package info (click to toggle)
hisat2 2.2.1-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 19,448 kB
  • sloc: cpp: 97,109; python: 11,075; perl: 7,279; sh: 2,328; ansic: 1,458; makefile: 532; javascript: 273; java: 116
file content (230 lines) | stat: -rwxr-xr-x 5,534 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/usr/bin/python3
import sys, subprocess
import re
from argparse import ArgumentParser, FileType
from collections import defaultdict, Counter

flag_include_N = True

"""
"""
def read_genome(genome_file):
    chr_dic = {}
    chr_name, sequence = "", ""
    for line in genome_file:
        if line.startswith(">"):
            if chr_name and sequence:
                chr_dic[chr_name] = sequence
            chr_name = line.strip().split()[0][1:]
            sequence = ""
        else:
            line = line.strip()
            if not flag_include_N:
                # remove N-bases
                line = line.replace('N', '')

            sequence += line;    

    if chr_name and sequence:
        chr_dic[chr_name] = sequence
    return chr_dic

"""
"""
def reverse_complement(seq):
    result = ""
    for nt in seq:
        base = nt
        if nt == 'A':
            base = 'T'
        elif nt == 'a':
            base = 't'
        elif nt == 'C':
            base = 'G'
        elif nt == 'c':
            base = 'g'
        elif nt == 'G':
            base = 'C'
        elif nt == 'g':
            base = 'c'
        elif nt == 'T':
            base = 'A'
        elif nt == 't':
            base = 'a'

        result = base + result

    return result


"""
"""
def read_snp(snp_file):
    snps = defaultdict(dict)
    for line in snp_file:
        line = line.strip()
        if not line or line.startswith('#'):
            continue
        try:
            snpID, type, chr, pos, data = line.split('\t')
        except ValueError:
            continue

        assert type in ["single", "deletion", "insertion"]
        if type == "deletion":
            data = int(data)

        snps[chr][snpID] = [snpID, type, int(pos), data];

    return snps


def indelCount(snp_list, snp_id_list):
    indel = 0

    for snp_id in snp_id_list:
        snp = snp_list[snp_id]

        if snp[1] == 'deletion':
            indel -= int(snp[3])
        elif snp[1] == 'insertion':
            indel += len(snp[3])

    return indel

def applySNPs(snp_list, ref_sqn, snp_id_list, base_pos):

    ref_pos = 0
    read_pos = 0
    read = "" 

    for snp_id in snp_id_list:
        snp = snp_list[snp_id]

        pos = snp[2] - base_pos;

        while ref_pos < pos:
            read += ref_sqn[ref_pos]
            ref_pos += 1


        if snp[1] == 'single':
            read += snp[3]
            ref_pos += 1
        elif snp[1] == 'deletion':
            ref_pos += int(snp[3])
        elif snp[1] == 'insertion':
            read += snp[3]

        #print snp_id, snp_list[snp_id]

    while ref_pos < len(ref_sqn):
        read += ref_sqn[ref_pos]
        ref_pos += 1

    return read


def main(genome_file, rpt_name):
    # load genome sequeuce
    chr_dic = read_genome(genome_file)

    rpt_fa_name = rpt_name + ".rep.fa"
    rpt_info_name = rpt_name + ".rep.info"
    rpt_snp_name = rpt_name + ".rep.snp"

    # load repeat sequence
    fp = open(rpt_fa_name, 'r')
    rpt_dic = read_genome(fp)
    fp.close()

    # load repeat snp
    fp = open(rpt_snp_name, 'r')
    rpt_snps = read_snp(fp)
    fp.close()

    # Validates
    # load repeat info
    fp = open(rpt_info_name, 'r')
    repeat_sequence = ""
    repeat_length = 0
    snp_cnt = 0
    indel = 0
    snp_id_list = []

    for line in fp:
        line = line.strip()

        if line.startswith('>'):
            line = line[1:]
            fields = line.split()

            #print fields

            name, rpt_seq_name, rpt_pos, rpt_len, pos_cnt, snp_cnt = fields[0:6]

            snp_cnt = int(snp_cnt)
            rpt_pos = int(rpt_pos)
            rpt_len = int(rpt_len)

            if snp_cnt > 0:
                snp_id_list = fields[6].split(',')
            else:
                snp_id_list = []

            #print name, snp_cnt, snp_list

            # make repeat_sequence (with snp)
            

            repeat_sequence = rpt_dic[rpt_seq_name][rpt_pos:rpt_pos + rpt_len]
            indel = 0

            if snp_cnt > 0:
                # apply snps
                repeat_sequence = applySNPs(rpt_snps[rpt_seq_name], repeat_sequence, snp_id_list, rpt_pos)
                # in/del count
                indel = indelCount(rpt_snps[rpt_seq_name], snp_id_list)

            #repeat_length = rpt_len + indel
            repeat_length = len(repeat_sequence)

            #print repeat_sequence

        else:
            coords = line.split()
            for coord in coords:
                chr, pos, strand = coord.split(':')
                pos = int(pos)

                # get string
                seq = chr_dic[chr][pos:pos + repeat_length]
                if strand == '-':
                    seq = reverse_complement(seq)

                if seq != repeat_sequence:
                    print('Mismatch', seq, repeat_sequence, snp_cnt, coord, snp_id_list, repeat_length)
                    
    fp.close()


if __name__ == '__main__':
    parser = ArgumentParser(
        description='Validate repeat files')

    parser.add_argument('genome_file',
                        nargs='?',
                        type=FileType('r'),
                        help='input genome file (e.g. genome.fa)')

    parser.add_argument('-r', 
                        dest='rpt_name',
                        type=str,
                        help='Repeat Name')

    args = parser.parse_args()
    if not args.genome_file or not args.rpt_name:
        parser.print_help()
        exit(1)

    main(args.genome_file, args.rpt_name)