1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
"""
Reading and writing delimited data files (with headers and comments).
"""
from itertools import count
FIRST_LINE_IS_HEADER = object()
class ParseError(Exception):
def __init__(self, *args, **kwargs):
Exception.__init__(self, *args)
self.linenum = kwargs.get("linenum", None)
def __str__(self):
if self.linenum:
return Exception.__str__(self) + " on line " + str(self.linenum)
else:
return Exception.__str__(self)
class TableRow:
"""
A row of a table
"""
def __init__(self, reader, fields):
self.reader = reader
self.fields = fields
def __getitem__(self, key):
if isinstance(key, int):
return self.fields[key]
elif isinstance(key, str):
if self.reader.header:
return self.fields[self.reader.header.field_to_column[key]]
else:
raise TypeError("column names only supported for files with headers")
else:
raise TypeError("field indices must be integers or strings")
@property
def fieldnames(self):
return self.reader.header.fields
def __str__(self):
return "\t".join(self.fields)
class Header:
"""
Header of a table -- contains column names and a mapping from them
to column indexes
"""
def __init__(self, fields):
self.set_fields(fields)
def set_fields(self, fields):
self.fields = fields
self.field_to_column = dict(zip(fields, count()))
def __getitem__(self, key):
if isinstance(key, int):
return self.fields[key]
elif isinstance(key, str):
if key in self.field_to_column:
return key
else:
raise TypeError("field indices must be integers or strings")
def __str__(self):
return "#" + "\t".join(self.fields)
class Comment:
def __init__(self, line):
self.line = line
def __str__(self):
if self.line.startswith("#"):
return self.line
return "#" + self.line
class TableReader:
"""
Reader for iterating tabular data
"""
def __init__(
self, input, return_header=True, return_comments=True, force_header=None, comment_lines_startswith=["#"]
):
self.input = input
self.return_comments = return_comments
self.return_header = return_header
self.input_iter = iter(input)
self.linenum = 0
self.header = force_header
self.comment_lines_startswith = comment_lines_startswith
def __iter__(self):
return self
def __next__(self):
line = next(self.input_iter)
self.linenum += 1
line = line.rstrip("\r\n")
# Catch blank lines (throw a warning?)
# This will end up adding a '#' at the beginning of blank lines
if line == "":
if self.return_comments:
return Comment(line)
else:
return next(self)
# Force header?
if self.header is FIRST_LINE_IS_HEADER and self.linenum == 1:
self.header = self.parse_header(line)
if self.return_header:
return self.header
else:
return next(self)
# Is it a comment line?
for comment_line_start in self.comment_lines_startswith:
if line.startswith(comment_line_start):
# If a comment and the first line we assume it is a header
if self.header is None and self.linenum == 1:
self.header = self.parse_header(line)
if self.return_header:
return self.header
else:
return next(self)
else:
if self.return_comments:
return self.parse_comment(line)
else:
return next(self)
# Not a comment, must be an interval
try:
return self.parse_row(line)
except ParseError as e:
e.linenum = self.linenum
raise e
def parse_header(self, line):
if line.startswith("#"):
fields = line[1:].split("\t")
else:
fields = line.split("\t")
return Header(fields)
def parse_comment(self, line):
return Comment(line)
def parse_row(self, line):
return TableRow(self, line.split("\t"))
|