File: apat_pattern.pyx

package info (click to toggle)
obitools 3.0.1~b26%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 26,756 kB
  • sloc: ansic: 24,299; python: 657; sh: 27; makefile: 21
file content (150 lines) | stat: -rwxr-xr-x 5,813 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#cython: language_level=3

from libc.stdint cimport uintptr_t

from obitools3.dms.capi.apat cimport SeqPtr, \
                                     PatternPtr, \
                                     ecoseq2apatseq, \
                                     ManberAll, \
                                     delete_apatseq, \
                                     buildPattern, \
                                     complementPattern


cdef class One_primer_search_result:
    
    cdef inline SeqPtr pointer(self) :
        return <SeqPtr>(self._pointer)        

    @staticmethod
    cdef new(SeqPtr apat_seq_p, int pattern_ref, int hit_count) :   # not __init__ method because need to pass pointer        
        result = One_primer_search_result()
        result._pointer = apat_seq_p
        result.pattern_ref = pattern_ref
        result.hit_count = hit_count
        return result
    
    cpdef first_encountered(self):
        cdef int i
        cdef int first
        cdef int first_index
        cdef int hit_pos
        cdef int error_count
        cdef SeqPtr pointer
        
        pointer = self.pointer()
        first = -1
        
        for i in range(self.hit_count):
            hit_pos = pointer.hitpos[self.pattern_ref].val[i]
            if first == -1 or hit_pos < first:
                first = hit_pos
                first_index = i
        error_count = pointer.hiterr[self.pattern_ref].val[first_index]
        return error_count, first


cdef class Primer_search:
   
    def __init__(self, list primers, int error_max) :
        cdef PatternPtr p1
        cdef PatternPtr p2
        cdef PatternPtr p3
        cdef PatternPtr p4   
        
        cdef PatternPtr test
        cdef uintptr_t test_i
        cdef bytes test_b
             
        self.apat_seq_p = NULL
        self.direct_primers = []
        self.revcomp_primers = []
        for i in range(len(primers)):
            p1 = buildPattern(primers[i][0], error_max)
            p2 = buildPattern(primers[i][1], error_max)
            p3 = complementPattern(p1)
            p4 = complementPattern(p2)
            self.direct_primers.append((<uintptr_t>p1, <uintptr_t>p2))
            self.revcomp_primers.append((<uintptr_t>p3, <uintptr_t>p4))
            

    cpdef One_primer_search_result search_one_primer(self, bytes sequence, 
                                                     int primer_pair_index, int primer_index, 
                                                     bint reverse_comp=False, 
                                                     bint same_sequence=False, 
                                                     int pattern_ref=0,
                                                     int begin=0) :
        '''
            begin = start of direct pattern if it was already searched + its length
        '''
        cdef One_primer_search_result result = None
        cdef PatternPtr primer_p
        cdef SeqPtr apat_seq_p
        cdef int hit_count
        
        if not same_sequence:
            self.apat_seq_p = <SeqPtr>(ecoseq2apatseq(sequence, self.apat_seq_p, 0))

        apat_seq_p = <SeqPtr>(self.apat_seq_p)
                
        if reverse_comp:
            primer_p = <PatternPtr>(<uintptr_t>(self.revcomp_primers[primer_pair_index][primer_index]))
        else:
            primer_p = <PatternPtr>(<uintptr_t>(self.direct_primers[primer_pair_index][primer_index]))
        
        begin = begin
        seqlen = (apat_seq_p.seqlen) - begin
                        
        hit_count = ManberAll(apat_seq_p, primer_p, pattern_ref, begin, seqlen)
                
        if hit_count:
            result = One_primer_search_result.new(apat_seq_p, pattern_ref, hit_count)
        else:
            result = None
        return result
    
    
    cpdef free(self):
        delete_apatseq(self.apat_seq_p)






#         min_error_count = -1
#         best_hit = -1
#         for i in range(hit_count):
#             error_count = apat_seq_p.hiterr[pattern_ref].val[i]
#             hit_pos = apat_seq_p.hitpos[pattern_ref].val[i]
#             if min_error_count < 0 or error_count < min_error_count:
#                 self.min_error_count = error_count
#                 self.best_hit = i

#     def __call__(self, bytes sequence):
#         # TODO declare things
#         # Search ALL primers in the direct direction
#         p1 = search_one_primer(self, sequence, True, False, same_sequence=False)
#         p2 = search_one_primer(self, sequence, False, False, same_sequence=True) 
#         
#         # Search each primer in both directions
#         # 1st direction
#         p1 = search_one_primer(self, sequence, True, False, same_sequence=False)
#         p2_revcomp = search_one_primer(self, sequence, False, True, same_sequence=True)
#         # 2nd direction
#         p2 = search_one_primer(self, sequence, False, False, same_sequence=True)
#         p1_revcomp = search_one_primer(self, sequence, True, True, same_sequence=True)
#         # Choose best hit (best score for direction and longest amplicon if multiple hits in one direction)
#         if p1.min_error_count + p2_revcomp.min_error_count < p2.min_error_count + p1_revcomp.min_error_count:
#             direct_hit = True
#             if p1.hit_count > 1:
#                 pos1 = min((pos, idx) for (idx, pos) in enumerate(p1.hit_pos))[0]
#             if p2_revcomp.hit_count > 1:
#                 pos2 = max((pos, idx) for (idx, pos) in enumerate(p2_revcomp.hit_pos))[0]
#         else:
#             direct_hit = False
#             if p2.hit_count > 1:
#                 pos1 = min((pos, idx) for (idx, pos) in enumerate(p2.hit_pos))[0]
#             if p2_revcomp.hit_count > 1:
#                 pos2 = max((pos, idx) for (idx, pos) in enumerate(p2_revcomp.hit_pos))[0]