File: io.py

package info (click to toggle)
python-bx 0.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,000 kB
  • sloc: python: 17,136; ansic: 2,326; makefile: 24; sh: 8
file content (156 lines) | stat: -rw-r--r-- 4,501 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
Reading and writing delimited data files (with headers and comments).
"""

from itertools import count

FIRST_LINE_IS_HEADER = object()


class ParseError(Exception):
    def __init__(self, *args, **kwargs):
        Exception.__init__(self, *args)
        self.linenum = kwargs.get("linenum", None)

    def __str__(self):
        if self.linenum:
            return Exception.__str__(self) + " on line " + str(self.linenum)
        else:
            return Exception.__str__(self)


class TableRow:
    """
    A row of a table
    """

    def __init__(self, reader, fields):
        self.reader = reader
        self.fields = fields

    def __getitem__(self, key):
        if isinstance(key, int):
            return self.fields[key]
        elif isinstance(key, str):
            if self.reader.header:
                return self.fields[self.reader.header.field_to_column[key]]
            else:
                raise TypeError("column names only supported for files with headers")
        else:
            raise TypeError("field indices must be integers or strings")

    @property
    def fieldnames(self):
        return self.reader.header.fields

    def __str__(self):
        return "\t".join(self.fields)


class Header:
    """
    Header of a table -- contains column names and a mapping from them
    to column indexes
    """

    def __init__(self, fields):
        self.set_fields(fields)

    def set_fields(self, fields):
        self.fields = fields
        self.field_to_column = dict(zip(fields, count()))

    def __getitem__(self, key):
        if isinstance(key, int):
            return self.fields[key]
        elif isinstance(key, str):
            if key in self.field_to_column:
                return key
        else:
            raise TypeError("field indices must be integers or strings")

    def __str__(self):
        return "#" + "\t".join(self.fields)


class Comment:
    def __init__(self, line):
        self.line = line

    def __str__(self):
        if self.line.startswith("#"):
            return self.line
        return "#" + self.line


class TableReader:
    """
    Reader for iterating tabular data
    """

    def __init__(
        self, input, return_header=True, return_comments=True, force_header=None, comment_lines_startswith=["#"]
    ):
        self.input = input
        self.return_comments = return_comments
        self.return_header = return_header
        self.input_iter = iter(input)
        self.linenum = 0
        self.header = force_header
        self.comment_lines_startswith = comment_lines_startswith

    def __iter__(self):
        return self

    def __next__(self):
        line = next(self.input_iter)
        self.linenum += 1
        line = line.rstrip("\r\n")
        # Catch blank lines (throw a warning?)
        # This will end up adding a '#' at the beginning of blank lines
        if line == "":
            if self.return_comments:
                return Comment(line)
            else:
                return next(self)
        # Force header?
        if self.header is FIRST_LINE_IS_HEADER and self.linenum == 1:
            self.header = self.parse_header(line)
            if self.return_header:
                return self.header
            else:
                return next(self)
        # Is it a comment line?
        for comment_line_start in self.comment_lines_startswith:
            if line.startswith(comment_line_start):
                # If a comment and the first line we assume it is a header
                if self.header is None and self.linenum == 1:
                    self.header = self.parse_header(line)
                    if self.return_header:
                        return self.header
                    else:
                        return next(self)
                else:
                    if self.return_comments:
                        return self.parse_comment(line)
                    else:
                        return next(self)
        # Not a comment, must be an interval
        try:
            return self.parse_row(line)
        except ParseError as e:
            e.linenum = self.linenum
            raise e

    def parse_header(self, line):
        if line.startswith("#"):
            fields = line[1:].split("\t")
        else:
            fields = line.split("\t")
        return Header(fields)

    def parse_comment(self, line):
        return Comment(line)

    def parse_row(self, line):
        return TableRow(self, line.split("\t"))