File: fasta.pyx

package info (click to toggle)
obitools 3.0.1~b26%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 26,788 kB
  • sloc: ansic: 24,299; python: 657; sh: 27; makefile: 20
file content (173 lines) | stat: -rwxr-xr-x 4,437 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#cython: language_level=3

'''
Created on 30 mars 2016

@author: coissac
'''

import types

from obitools3.dms.obiseq cimport Nuc_Seq


# def fastaIterator(lineiterator, 
#                   int skip=0,
#                   only=None,
#                   firstline=None,
#                   int buffersize=100000000
#                  ):
#     cdef str        ident
#     cdef str        definition
#     cdef dict       tags
#     cdef list       s
#     cdef bytes      sequence
#     cdef int        skipped, ionly, read
#     
#     if only is None:
#         ionly=-1
#     else:
#         ionly=int(only)
# 
#     if isinstance(lineiterator, (str, bytes)):
#         lineiterator=uopen(lineiterator)        
#     if isinstance(lineiterator, LineBuffer):
#         iterator = iter(lineiterator)
#     else:
#         if hasattr(lineiterator, "readlines"):
#             iterator = iter(LineBuffer(lineiterator, buffersize))
#         elif hasattr(lineiterator, '__next__'):
#             iterator = lineiterator
#         else:
#             raise Exception("Invalid line iterator")
#     
#     skipped = 0
#     i = iterator
#     
#     if firstline is None:
#         line = next(i)
#     else:
#         line = firstline
#         
#     while True:
#         
#         if ionly >= 0 and read >= ionly:
#             break
#                 
#         while skipped < skip :
#             line = next(i)
#             try:
#                 while line[0]!='>':
#                     line = next(i)
#             except StopIteration:
#                 pass
#             skipped += 1
# 
#         ident,tags,definition = parseHeader(line)
#         s = []
#         line = next(i)
#             
#         try:
#             while line[0]!='>':
#                 s.append(str2bytes(line)[0:-1])
#                 line = next(i)
#         
#         except StopIteration:
#             pass
#         
#         sequence  = b"".join(s)
# 
#         yield { "id"         : ident,
#                 "definition" : definition,
#                 "sequence"   : sequence,
#                 "quality"    : None,
#                 "offset"     : None,
#                 "tags"       : tags,
#                 "annotation" : {}
#               }
#         
#         read+=1

    
def fastaNucIterator(lineiterator, 
                     int skip=0,
                     only=None,
                     firstline=None,
                     int buffersize=100000000,
                     bytes nastring=b"NA"
                    ):
    
    cdef bytes      ident
    cdef bytes      definition
    cdef dict       tags
    cdef list       s
    cdef bytes      sequence
    cdef int        skipped, ionly, read
    cdef Nuc_Seq    seq
    cdef bint       stop
    
    if only is None:
        ionly = -1
    else:
        ionly = int(only)

    if isinstance(lineiterator, (str, bytes)):
        lineiterator=uopen(lineiterator)        
    if isinstance(lineiterator, LineBuffer):
        iterator = iter(lineiterator)
    else:
        if hasattr(lineiterator, "readlines"):
            iterator = iter(LineBuffer(lineiterator, buffersize))
        elif hasattr(lineiterator, '__next__'):
            iterator = lineiterator
        else:
            raise Exception("Invalid line iterator")

    skipped = 0
    read = 0
    
    if firstline is None:
        line = next(iterator)
    else:
        line = firstline       
    
    stop=False
    while not stop:
                
        if ionly >= 0 and read >= ionly:
            break
                
        while skipped < skip :
            line = next(iterator)
            try:
                while line[:1]!=b'>':
                    line = next(iterator)
            except StopIteration:
                pass
            skipped += 1

        ident,tags,definition = parseHeader(line, nastring=nastring)
        s = []
        line = next(iterator)
    
        try:
            while line[:1]!=b'>':
                s.append(line[0:-1])
                line = next(iterator)
        except StopIteration:
            stop=True
        
        sequence  = b"".join(s)        
        
        seq = Nuc_Seq(ident,
                      sequence,
                      definition=definition,
                      quality=None,
                      offset=-1,
                      tags=tags)
            
        yield seq
        
        read+=1