File: genbank.pyx

package info (click to toggle)
obitools 3.0.1~b26%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 26,756 kB
  • sloc: ansic: 24,299; python: 657; sh: 27; makefile: 21
file content (204 lines) | stat: -rwxr-xr-x 6,086 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#cython: language_level=3

'''
Created on June 12th 2018

@author: coissac/mercier
'''


import types
import re
import sys
import os
import glob

from obitools3.files.universalopener cimport uopen
from obitools3.utils cimport tostr
from obitools3.dms.obiseq cimport Nuc_Seq
from .embl_genbank_features import extractTaxon

from libc.stdlib  cimport free, malloc, realloc
from libc.string cimport strcpy, strlen


_featureMatcher = re.compile(b'^FEATURES.+\n(?=ORIGIN(\s*))',re.DOTALL + re.M)

_headerMatcher = re.compile(b'^LOCUS.+(?=\nFEATURES)', re.DOTALL + re.M)
_seqMatcher    = re.compile(b'^ORIGIN.+(?=//\n)', re.DOTALL + re.M)
_cleanSeq1     = re.compile(b'ORIGIN(\s*)\n')
_cleanSeq2     = re.compile(b'[ \n0-9]+')
_acMatcher     = re.compile(b'(?<=^ACCESSION   ).+',re.M)
_deMatcher     = re.compile(b'(?<=^DEFINITION  ).+\n( .+\n)*',re.M)
_cleanDe       = re.compile(b'\n *')


def genbankParser(bytes text):
    
    cdef Nuc_Seq seq
        
    try:
        header = _headerMatcher.search(text).group()
        
        ft     = _featureMatcher.search(text).group()
        
        s      = _seqMatcher.search(text).group()
        s      = _cleanSeq1.sub(b'', s)
        s      = _cleanSeq2.sub(b'', s)
        
        acs    = _acMatcher.search(text).group()
        acs    = acs.split()
        ac     = acs[0]
        acs    = acs[1:]
        
        de     = _deMatcher.search(header).group()
        de     = _cleanDe.sub(b' ',de).strip().strip(b'.')
    
        tags = {}
        extractTaxon(ft, tags)
            
        seq = Nuc_Seq(ac,
                      s,
                      definition=de,
                      quality=None,
                      offset=-1,
                      tags=tags)
    
    except Exception as e:
        print("\nCould not import sequence id:", text.split()[1], "(error raised:", e, ")")
        # Do not raise any Exception if you need the possibility to resume the generator
        # (Python generators can't resume after any exception is raised)
        return None

    return seq


def genbankIterator_file(lineiterator, 
                         int skip=0,
                         only=None,
                         firstline=None,
                         int buffersize=100000000
                         ):
    cdef int        lines_to_skip, ionly, read
    cdef Nuc_Seq    seq
    cdef char*      entry = NULL
    cdef size_t     entry_buffer_size 
    cdef int        entry_len
    cdef int        line_len   
    
    entry_buffer_size = 2048
    
    entry = <char*> malloc(entry_buffer_size*sizeof(char)) 

    if only is None:
        ionly = -1
    else:
        ionly = int(only)

    if isinstance(lineiterator, (str, bytes)):
        lineiterator=uopen(lineiterator)        
    if isinstance(lineiterator, LineBuffer):
        iterator = iter(lineiterator)
    else:
        if hasattr(lineiterator, "readlines"):
            iterator = iter(LineBuffer(lineiterator, buffersize))
        elif hasattr(lineiterator, '__next__'):
            iterator = lineiterator
        else:
            raise Exception("Invalid line iterator")

    skipped = 0
    read = 0
    
    if firstline is None:
        line = next(iterator)
    else:
        line = firstline       
            
    while True:
        
        if ionly >= 0 and read >= ionly-1:
            break
                
        while skipped < skip:
            line = next(iterator)
            try:
                while line[:2] != b'//':
                    line = next(iterator)
                line = next(iterator)
            except StopIteration:
                break
            skipped += 1

        try:
            entry_len = 0
            while line[:2] != b'//':
                line_len = strlen(line)
                while (entry_len + line_len) >= entry_buffer_size:
                    entry_buffer_size*=2
                    entry = <char*>realloc(entry, entry_buffer_size)
                strcpy(entry+entry_len, line)
                entry_len+=line_len
                line = next(iterator)
            # Add last line too because need the // flag to parse
            line_len = strlen(line)
            while (entry_len + line_len) >= entry_buffer_size:
                entry_buffer_size*=2
                entry = <char*>realloc(entry, entry_buffer_size)
            strcpy(entry+entry_len, line)
            line = next(iterator)
        except StopIteration:
            break
        
        seq = genbankParser(entry)

        yield seq
        read+=1
    
    # Last sequence if not empty lines
    if entry.strip():
        seq = genbankParser(entry)
        yield seq
    
    free(entry)


def genbankIterator_dir(dir_path, 
                        int skip=0,
                        only=None,
                        firstline=None,
                        int buffersize=100000000
                       ):
    path = dir_path
    read = 0
    read_files = 0
    files = [filename for filename in glob.glob(os.path.join(path, b'*.gbff*'))]
    files.extend([filename for filename in glob.glob(os.path.join(path, b'*.seq*'))])  # new genbank extension
    files = list(set(files))
    for filename in files:
        if read==only:
            return
        print("Parsing file %s (%d/%d)" % (tostr(filename), read_files+1, len(files)))
        f = uopen(filename)
        if only is not None:
            only_f = only-read
        else:
            only_f = None
        for seq in genbankIterator_file(f, skip=skip, only=only_f, buffersize=buffersize):
            yield seq
            read+=1
        read_files+=1
        

def genbankIterator(obj, 
                    int skip=0,
                    only=None,
                    firstline=None,
                    int buffersize=100000000
                   ):
    if type(obj) == bytes or type(obj) == str :
        return genbankIterator_dir(obj, skip=skip, only=only, firstline=firstline, buffersize=buffersize)
    else:
        return genbankIterator_file(obj, skip=skip, only=only, firstline=firstline, buffersize=buffersize)