1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
#!/usr/bin/env python
"""Provides some classes for treating files as sequences of records.
Typically more useful as subclasses. Covers the three main types of records:
DelimitedRecordFinder: Records demarcated by an end line, e.g. '\\'
LabeledRecordFinder: Records demarcated by a start line, e.g. '>label'
LineGrouper: Records consisting of a certain number of lines.
TailedRecordFinder: Records demarcated by an end mark, e.g. 'blah.'
All the first classes ignore/delete blank lines and strip leading and trailing
whitespace. The TailedRecodeFinder is Functional similar to
DelimitedRecordFinder except that it accept a is_tail function instead of a
str. Note that its default constuctor is rstrip instead of strip.
"""
from cogent.parse.record import RecordError, FieldError
from string import strip, rstrip
__author__ = "Rob Knight"
__copyright__ = "Copyright 2007-2012, The Cogent Project"
__credits__ = ["Rob Knight", "Gavin Huttley", "Zongzhi Liu"]
__license__ = "GPL"
__version__ = "1.5.3"
__maintainer__ = "Rob Knight"
__email__ = "rob@spot.colorado.edu"
__status__ = "Production"
def is_empty(line):
"""Returns True empty lines and lines consisting only of whitespace."""
return (not line) or line.isspace()
def never_ignore(line):
"""Always returns False."""
return False
def DelimitedRecordFinder(delimiter, constructor=strip, ignore=is_empty,
keep_delimiter=True, strict=True):
"""Returns function that returns successive delimited records from file.
Includes delimiter in return value. Returns list of relevant lines.
Default constructor is string.strip, but can supply another constructor
to transform lines and/or coerce into correct type. If constructor is None,
passes along the lines without alteration.
Skips any lines for which ignore(line) evaluates True (default is to skip
whitespace).
keep_delimiter: keep delimiter line at the end of last block if True
(default), otherwise discard delimiter line.
strict: when lines found after the last delimiter -- raise error if True
(default), otherwise yield the lines silently
"""
def parser(lines):
curr = []
for line in lines:
if constructor:
line = constructor(line)
#else:
# line = l
#ignore blank lines
if ignore(line):
continue
#if we find the delimiter, return the line; otherwise, keep it
if line == delimiter:
if keep_delimiter:
curr.append(line)
yield curr
curr = []
else:
curr.append(line)
if curr:
if strict:
raise RecordError, "Found additional data after records: %s"%\
(curr)
else:
yield curr
return parser
#The following is an example of the sorts of iterators RecordFinder returns.
GbFinder = DelimitedRecordFinder('//')
def TailedRecordFinder(is_tail_line, constructor=rstrip, ignore=is_empty,
strict=True):
"""Returns function that returns successive tailed records from lines.
Includes tail line in return value. Returns list of relevant lines.
constructor: a modifier for each line, default is string.rstrip: to remove
\n and trailing spaces.
Skips over any lines for which ignore(line) evaluates True (default is
to skip empty lines). note that the line maybe modified by constructor.
strict: if True(default), raise error if the last line is not a tail.
otherwise, yield the last lines.
"""
def parser(lines):
curr = []
for line in lines:
if constructor:
line = constructor(line)
if ignore(line):
continue
curr.append(line)
#if we find the label, return the previous record
if is_tail_line(line):
yield curr
curr = []
#don't forget to return the last record in the file
if curr:
if strict:
raise RecordError('lines exist after the last tail_line '
'or no tail_line at all')
else:
yield curr
return parser
def LabeledRecordFinder(is_label_line, constructor=strip, ignore=is_empty):
"""Returns function that returns successive labeled records from file.
Includes label line in return value. Returns list of relevant lines.
Default constructor is string.strip, but can supply another constructor
to transform lines and/or coerce into correct type. If constructor is None,
passes along the lines without alteration.
Skips over any lines for which ignore(line) evaluates True (default is
to skip empty lines).
NOTE: Does _not_ raise an exception if the last line is a label line: for
some formats, this is acceptable. It is the responsibility of whatever is
parsing the sets of lines returned into records to complain if a record
is incomplete.
"""
def parser(lines):
curr = []
for l in lines:
if constructor:
line = constructor(l)
else:
line = l
if ignore(line):
continue
#if we find the label, return the previous record
if is_label_line(line):
if curr:
yield curr
curr = []
curr.append(line)
#don't forget to return the last record in the file
if curr:
yield curr
return parser
def is_fasta_label(x):
"""Checks if x looks like a FASTA label line."""
return x.startswith('>')
#The following is an example of the sorts of iterators RecordFinder returns.
FastaFinder = LabeledRecordFinder(is_fasta_label)
def LineGrouper(num, constructor=strip, ignore=is_empty):
"""Returns num lines at a time, stripping and ignoring blanks.
Default constructor is string.strip, but can supply another constructor
to transform lines and/or coerce into correct type. If constructor is None,
passes along the lines without alteration.
Skips over any lines for which ignore(line) evaluates True: default is to
skip whitespace lines.
"""
def parser(lines):
curr = []
for l in lines:
if constructor:
line = constructor(l)
else:
line = l
if ignore(line):
continue
curr.append(line)
if len(curr) == num:
yield curr
curr = []
if curr:
raise RecordError, "Non-blank lines not even multiple of %s" % num
return parser
|