File: record_finder.py

package info (click to toggle)
python-cogent 1.4.1-1.2
  • links: PTS, VCS
  • area: non-free
  • in suites: squeeze
  • size: 13,260 kB
  • ctags: 20,087
  • sloc: python: 116,163; ansic: 732; makefile: 74; sh: 9
file content (194 lines) | stat: -rw-r--r-- 6,861 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/bin/env python
"""Provides some classes for treating files as sequences of records.

Typically more useful as subclasses. Covers the three main types of records:
    
    DelimitedRecordFinder:  Records demarcated by an end line, e.g. '\\'
    LabeledRecordFinder:    Records demarcated by a start line, e.g. '>label'
    LineGrouper:            Records consisting of a certain number of lines.
    TailedRecordFinder:     Records demarcated by an end mark, e.g. 'blah.'

All the first classes ignore/delete blank lines and strip leading and trailing
whitespace.  The TailedRecodeFinder is Functional similar to
DelimitedRecordFinder except that it accept a is_tail function instead of a
str.  Note that its default constuctor is rstrip instead of strip.
"""
from cogent.parse.record import RecordError, FieldError
from string import strip, rstrip

__author__ = "Rob Knight"
__copyright__ = "Copyright 2007-2009, The Cogent Project"
__credits__ = ["Rob Knight", "Gavin Huttley", "Zongzhi Liu"]
__license__ = "GPL"
__version__ = "1.4.1"
__maintainer__ = "Rob Knight"
__email__ = "rob@spot.colorado.edu"
__status__ = "Production"

def is_empty(line):
    """Returns True empty lines and lines consisting only of whitespace."""
    return (not line) or line.isspace()

def never_ignore(line):
    """Always returns False."""
    return False

def DelimitedRecordFinder(delimiter, constructor=strip, ignore=is_empty,
        keep_delimiter=True, strict=True):
    """Returns function that returns successive delimited records from file.

    Includes delimiter in return value. Returns list of relevant lines.
    
    Default constructor is string.strip, but can supply another constructor
    to transform lines and/or coerce into correct type. If constructor is None,
    passes along the lines without alteration.

    Skips any lines for which ignore(line) evaluates True (default is to skip
    whitespace).
    
    keep_delimiter: keep delimiter line at the end of last block if True 
    (default), otherwise discard delimiter line.

    strict: when lines found after the last delimiter -- raise error if True 
    (default), otherwise yield the lines silently
    """
    def parser(lines):
        curr = []
        for line in lines:
            if constructor:
                line = constructor(line)
            #else:
            #    line = l
            #ignore blank lines
            if ignore(line):
                continue
            #if we find the delimiter, return the line; otherwise, keep it
            if line == delimiter:
                if keep_delimiter:
                    curr.append(line)
                yield curr
                curr = []
            else:
                curr.append(line)
        if curr:
            if strict:
                raise RecordError, "Found additional data after records: %s"%\
                        (curr)
            else:
                yield curr
    return parser

#The following is an example of the sorts of iterators RecordFinder returns.
GbFinder = DelimitedRecordFinder('//')

def TailedRecordFinder(is_tail_line, constructor=rstrip, ignore=is_empty,
        strict=True):
    """Returns function that returns successive tailed records from lines.

    Includes tail line in return value. Returns list of relevant lines.

    constructor: a modifier for each line, default is string.rstrip: to remove
    \n and trailing spaces.

    Skips over any lines for which ignore(line) evaluates True (default is
    to skip empty lines).  note that the line maybe modified by constructor.

    strict: if True(default), raise error if the last line is not a tail. 
    otherwise, yield the last lines.
    """
    def parser(lines):
        curr = []
        for line in lines:
            if constructor:
                line = constructor(line)
            if ignore(line):
                continue

            curr.append(line)
            #if we find the label, return the previous record
            if is_tail_line(line):
                yield curr
                curr = []

        #don't forget to return the last record in the file
        if curr:
            if strict:
                raise RecordError('lines exist after the last tail_line '
                        'or no tail_line at all')
            else:
                yield curr

    return parser

def LabeledRecordFinder(is_label_line, constructor=strip, ignore=is_empty):
    """Returns function that returns successive labeled records from file.

    Includes label line in return value. Returns list of relevant lines.
    
    Default constructor is string.strip, but can supply another constructor
    to transform lines and/or coerce into correct type. If constructor is None,
    passes along the lines without alteration.

    Skips over any lines for which ignore(line) evaluates True (default is
    to skip empty lines).
    
    NOTE: Does _not_ raise an exception if the last line is a label line: for
    some formats, this is acceptable. It is the responsibility of whatever is
    parsing the sets of lines returned into records to complain if a record
    is incomplete.
    """
    def parser(lines):
        curr = []
        for l in lines:
            if constructor:
                line = constructor(l)
            else:
                line = l
            if ignore(line):
                continue
            #if we find the label, return the previous record
            if is_label_line(line):
                if curr:
                    yield curr
                    curr = []
            curr.append(line)
        #don't forget to return the last record in the file
        if curr:
            yield curr
    return parser

def is_fasta_label(x):
    """Checks if x looks like a FASTA label line."""
    return x.startswith('>')
#The following is an example of the sorts of iterators RecordFinder returns.
FastaFinder = LabeledRecordFinder(is_fasta_label)

def LineGrouper(num, constructor=strip, ignore=is_empty):
    """Returns num lines at a time, stripping and ignoring blanks.
    
    Default constructor is string.strip, but can supply another constructor
    to transform lines and/or coerce into correct type. If constructor is None,
    passes along the lines without alteration.

    Skips over any lines for which ignore(line) evaluates True: default is to
    skip whitespace lines.
    
    """
    def parser(lines):
        curr = []
        for l in lines:
            if constructor:
                line = constructor(l)
            else:
                line = l
            if ignore(line):
                continue
            curr.append(line)
            if len(curr) == num:
                yield curr
                curr = []
        if curr:
            raise RecordError, "Non-blank lines not even multiple of %s" % num
    return parser