File: universal.pyx

package info (click to toggle)
obitools 3.0.1~b26%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 26,788 kB
  • sloc: ansic: 24,299; python: 657; sh: 27; makefile: 20
file content (154 lines) | stat: -rwxr-xr-x 5,600 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#cython: language_level=3

import re 
from obitools3.parsers.fasta import fastaNucIterator
from obitools3.parsers.fastq import fastqIterator
from obitools3.parsers.tab import tabIterator
from obitools3.parsers.ngsfilter import ngsfilterIterator
from obitools3.parsers.embl import emblIterator
from obitools3.parsers.genbank import genbankIterator


oligore = re.compile(b"^[ACGTRYSWKMBDHVN]+$",re.I)
tagre   = re.compile(b"^([ACGTRYSWKMBDHVN]+|-)(:([ACGTRYSWKMBDHVN]+)|-)?$",re.I)

def is_ngsfilter_line(line):    # TODO doesn't work?
    try:
        parts = line.split()
        ok = tagre.match(parts[2])
        ok&= oligore.match(parts[3])
        ok&= oligore.match(parts[4])
        ok&= parts[5]==b"F" | parts[5]==b"T"
        return ok
    except:
        return False

def entryIteratorFactory(lineiterator, 
                          int skip=0,
                          only=None,
                          bytes seqtype=b'nuc',
                          int offset=-1,
                          bint noquality=False,
                          bint skiperror=True,
                          bint header=False,
                          bytes sep=None,
                          bytes dec=b'.',
                          bytes nastring=b"NA",
                          bint stripwhite=True,
                          bint blanklineskip=True,
                          bytes commentchar=b"#",
                          int buffersize=100000000):

    if isinstance(lineiterator, (str, bytes)):
        lineiterator=uopen(lineiterator)        
    if isinstance(lineiterator, LineBuffer):
        iterator = iter(lineiterator)
    else:
        if hasattr(lineiterator, "readlines"):
            iterator = iter(LineBuffer(lineiterator, buffersize))
        elif hasattr(lineiterator, '__next__'):
            iterator = lineiterator
        else:
            raise Exception("Invalid line iterator")

    i = iterator
        
    try:
        first=next(i)
    except StopIteration:
        first=""
        pass

    format=b"tabular"
    
    try: 
        if first[:1]==b">":
            format=b"fasta"
        if first[:1]==b"@":
            format=b"fastq"
        elif first[0:3]==b'ID ':
            format=b"embl"
        elif first[0:6]==b'LOCUS ':
            format=b"genbank"
        elif first[0:8]==b'#@ecopcr':
            format=b"ecopcrfile"
        elif is_ngsfilter_line(first):
            format=b"ngsfilter"
    except IndexError:
        pass
            
    # TODO Temporary fix
    first=None
    lineiterator.seek(0)
    
    if format==b'fasta':
        if seqtype == b'nuc':
            return (fastaNucIterator(lineiterator,
                                    skip=skip,only=only,
                                    firstline=first,
                                    buffersize=buffersize,
                                    nastring=nastring),
                    Nuc_Seq,
                    format)
        else:
            raise NotImplementedError()
    elif format==b'fastq':
            return (fastqIterator(lineiterator,
                                 skip=skip,only=only,
                                 offset=offset,
                                 noquality=noquality,
                                 firstline=first,
                                 buffersize=buffersize,
                                 nastring=nastring),
                    Nuc_Seq,
                    format)
    elif format==b'tabular':
            return (tabIterator(lineiterator,
                                header = header,
                                sep = sep,
                                dec = dec,
                                stripwhite = stripwhite,
                                blanklineskip = blanklineskip,
                                commentchar = commentchar,
                                nastring=nastring,
                                skip = skip,
                                only = only,
                                firstline=first,
                                buffersize=buffersize),
                    dict,
                    format)
    elif format==b'ngsfilter':
            return (ngsfilterIterator(lineiterator,
                                      sep = sep,
                                      dec = dec,
                                      stripwhite = stripwhite,
                                      blanklineskip = blanklineskip,
                                      commentchar = commentchar,
                                      nastring=nastring,
                                      skip = skip,
                                      only = only,
                                      firstline=first,
                                      buffersize=buffersize),
                    dict,
                    format)

    elif format==b'embl':
            return (emblIterator(lineiterator, 
                                 skip=skip,
                                 only=only,
                                 firstline=first,
                                 buffersize=buffersize),
                    Nuc_Seq,
                    format)

    elif format==b'genbank':
            return (genbankIterator(lineiterator, 
                                    skip=skip,
                                    only=only,
                                    firstline=first,
                                    buffersize=buffersize),
                    Nuc_Seq,
                    format)
    
    raise NotImplementedError('File format iterator not implemented yet')