1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
#cython: language_level=3
import re
from obitools3.parsers.fasta import fastaNucIterator
from obitools3.parsers.fastq import fastqIterator
from obitools3.parsers.tab import tabIterator
from obitools3.parsers.ngsfilter import ngsfilterIterator
from obitools3.parsers.embl import emblIterator
from obitools3.parsers.genbank import genbankIterator
oligore = re.compile(b"^[ACGTRYSWKMBDHVN]+$",re.I)
tagre = re.compile(b"^([ACGTRYSWKMBDHVN]+|-)(:([ACGTRYSWKMBDHVN]+)|-)?$",re.I)
def is_ngsfilter_line(line): # TODO doesn't work?
try:
parts = line.split()
ok = tagre.match(parts[2])
ok&= oligore.match(parts[3])
ok&= oligore.match(parts[4])
ok&= parts[5]==b"F" | parts[5]==b"T"
return ok
except:
return False
def entryIteratorFactory(lineiterator,
int skip=0,
only=None,
bytes seqtype=b'nuc',
int offset=-1,
bint noquality=False,
bint skiperror=True,
bint header=False,
bytes sep=None,
bytes dec=b'.',
bytes nastring=b"NA",
bint stripwhite=True,
bint blanklineskip=True,
bytes commentchar=b"#",
int buffersize=100000000):
if isinstance(lineiterator, (str, bytes)):
lineiterator=uopen(lineiterator)
if isinstance(lineiterator, LineBuffer):
iterator = iter(lineiterator)
else:
if hasattr(lineiterator, "readlines"):
iterator = iter(LineBuffer(lineiterator, buffersize))
elif hasattr(lineiterator, '__next__'):
iterator = lineiterator
else:
raise Exception("Invalid line iterator")
i = iterator
try:
first=next(i)
except StopIteration:
first=""
pass
format=b"tabular"
try:
if first[:1]==b">":
format=b"fasta"
if first[:1]==b"@":
format=b"fastq"
elif first[0:3]==b'ID ':
format=b"embl"
elif first[0:6]==b'LOCUS ':
format=b"genbank"
elif first[0:8]==b'#@ecopcr':
format=b"ecopcrfile"
elif is_ngsfilter_line(first):
format=b"ngsfilter"
except IndexError:
pass
# TODO Temporary fix
first=None
lineiterator.seek(0)
if format==b'fasta':
if seqtype == b'nuc':
return (fastaNucIterator(lineiterator,
skip=skip,only=only,
firstline=first,
buffersize=buffersize,
nastring=nastring),
Nuc_Seq,
format)
else:
raise NotImplementedError()
elif format==b'fastq':
return (fastqIterator(lineiterator,
skip=skip,only=only,
offset=offset,
noquality=noquality,
firstline=first,
buffersize=buffersize,
nastring=nastring),
Nuc_Seq,
format)
elif format==b'tabular':
return (tabIterator(lineiterator,
header = header,
sep = sep,
dec = dec,
stripwhite = stripwhite,
blanklineskip = blanklineskip,
commentchar = commentchar,
nastring=nastring,
skip = skip,
only = only,
firstline=first,
buffersize=buffersize),
dict,
format)
elif format==b'ngsfilter':
return (ngsfilterIterator(lineiterator,
sep = sep,
dec = dec,
stripwhite = stripwhite,
blanklineskip = blanklineskip,
commentchar = commentchar,
nastring=nastring,
skip = skip,
only = only,
firstline=first,
buffersize=buffersize),
dict,
format)
elif format==b'embl':
return (emblIterator(lineiterator,
skip=skip,
only=only,
firstline=first,
buffersize=buffersize),
Nuc_Seq,
format)
elif format==b'genbank':
return (genbankIterator(lineiterator,
skip=skip,
only=only,
firstline=first,
buffersize=buffersize),
Nuc_Seq,
format)
raise NotImplementedError('File format iterator not implemented yet')
|