1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
|
#! /usr/bin/env python
##############################################################################
## DendroPy Phylogenetic Computing Library.
##
## Copyright 2010-2015 Jeet Sukumaran and Mark T. Holder.
## All rights reserved.
##
## See "LICENSE.rst" for terms and conditions of usage.
##
## If you use this work or any portion thereof in published work,
## please cite it as:
##
## Sukumaran, J. and M. T. Holder. 2010. DendroPy: a Python library
## for phylogenetic computing. Bioinformatics 26: 1569-1571.
##
##############################################################################
"""
Implementation of PHYLIP-format data reader.
"""
import re
from dendropy.dataio import ioservice
from dendropy.utility import filesys
from dendropy.utility import error
class PhylipReader(ioservice.DataReader):
"Implements the DataReader interface for parsing PHYLIP files."
# supported_data_types = ['dna', 'rna', 'protein', 'standard', 'restriction', 'infinite']
# supported_matrix_types = [dataobject.DnaCharacterMatrix,
# dataobject.RnaCharacterMatrix,
# dataobject.ProteinCharacterMatrix,
# dataobject.StandardCharacterMatrix,
# dataobject.RestrictionSitesCharacterMatrix,
# dataobject.InfiniteSitesCharacterMatrix]
class PhylipStrictSequentialError(error.DataParseError):
def __init__(self, *args, **kwargs):
error.DataParseError.__init__(self, *args, **kwargs)
class PhylipStrictInterleavedError(error.DataParseError):
def __init__(self, *args, **kwargs):
error.DataParseError.__init__(self, *args, **kwargs)
class PhylipRelaxedSequentialError(error.DataParseError):
def __init__(self, *args, **kwargs):
error.DataParseError.__init__(self, *args, **kwargs)
class PhylipRelaxedInterleavedError(error.DataParseError):
def __init__(self, *args, **kwargs):
error.DataParseError.__init__(self, *args, **kwargs)
def __init__(self, **kwargs):
"""
Keyword Arguments
-----------------
data_type: str
When reading into a |DataSet| object, the type of data must be
specified: "dna", "rna", "protein", "restriction", "infinite",
"standard", or "continuous".
default_state_alphabet: |StateAlphabet| instance
A |StateAlphabet| object to be used to manage the alphabet of the
characters (|StandardCharacterMatrix| **only**).
strict : bool
If |True|, then data is given in 'strict' format, where first 10
characters are the taxon label and remaining characters are the sequence.
Default is |False|: relaxed format, where taxon labels are of
arbitrary length and separation of sequences are is by one or more (if
``multispace_delimiter`` is |False|) or two or more (if
``multispace_delimiter`` is |True|) spaces.
interleaved : bool
If |True|, then data is in interleaved format.
Default is |False|: data is non-interleaved.
multispace_delimiter: bool
If |True| (and ``strict`` is |False|), then at least two spaces are
required to delimit taxon label and associated sequence. Default is
|False|: one or more spaces delimit taxon label and associated
sequence.
underscore_to_spaces: bool
If |True|, then underscores in taxon labels are converted to
spaces. Default is |False|: underscores are not converted.
ignore_invalid_chars : bool
If |True| then any invalid characters in sequences will be ignored.
Default is |False|: invalid characters result in errors.
ignore_unrecognized_keyword_arguments : boolean, default: |False|
If |True|, then unsupported or unrecognized keyword arguments will
not result in an error. Default is |False|: unsupported keyword
arguments will result in an error.
"""
ioservice.DataReader.__init__(self)
self.data_type = kwargs.pop("data_type", None)
# if "char_matrix_type" in kwargs and "data_type" in kwargs:
# raise ValueError("Cannot specify both 'data_type' and 'char_matrix_type'")
# if "data_type" in kwargs:
# data_type = kwargs["data_type"].lower()
# if data_type not in PhylipReader.supported_data_types:
# raise ValueError("'%s' is not a valid data type specification; must be one of: %s" \
# % (", ".join([("'" + d + "'") for d in PhylipReader.supported_data_types])))
# else:
# self.char_matrix_type = dataobject.character_data_type_label_map[data_type]
# elif "char_matrix_type" in kwargs:
# self.char_matrix_type = kwargs.pop("char_matrix_type")
# else:
# raise ValueError("Must specify 'data_type' for PHYLIP format, one of: %s" % (PhylipReader.supported_data_types))
# if self.char_matrix_type not in PhylipReader.supported_matrix_types:
# raise ValueError("'%s' is not a supported data type for PhylipReader" % self.char_matrix_type.__name__)
self.strict = kwargs.pop("strict", False)
self.interleaved = kwargs.pop("interleaved", False)
self.multispace_delimiter = kwargs.pop("multispace_delimiter", False)
self.underscores_to_spaces = kwargs.pop("underscores_to_spaces", False)
self.ignore_invalid_chars = kwargs.pop("ignore_invalid_chars", False)
self.default_state_alphabet = kwargs.pop("default_state_alphabet", None)
if self.default_state_alphabet is not None:
if self.data_type is None:
self.data_type = "standard"
elif self.data_type != "standard":
raise ValueError("Cannot specify 'default_state_alphabet' with data type of '{}'".format(self.data_type))
self.check_for_unused_keyword_arguments(kwargs)
self.ntax = None
self.nchar = None
self.char_matrix = None
self.taxon_namespace = None
def describe_mode(self):
parts = []
if self.strict:
parts.append("strict")
else:
parts.append("relaxed")
if self.interleaved:
parts.append("interleaved")
else:
parts.append("sequential")
return ", ".join(parts)
def reset(self):
self.ntax = None
self.nchar = None
self.char_matrix = None
self.taxon_namespace = None
self.stream = None
def _read(self,
stream,
taxon_namespace_factory=None,
tree_list_factory=None,
char_matrix_factory=None,
state_alphabet_factory=None,
global_annotations_target=None):
self.reset()
self.stream = stream
self.taxon_namespace = taxon_namespace_factory(label=None)
if self.data_type is None:
raise TypeError("Data type must be specified for this schema")
if self.data_type == "standard" and self.default_state_alphabet is not None:
self.char_matrix = char_matrix_factory(
self.data_type,
label=None,
taxon_namespace=self.taxon_namespace,
default_state_alphabet=self.default_state_alphabet,
)
else:
self.char_matrix = char_matrix_factory(
self.data_type,
label=None,
taxon_namespace=self.taxon_namespace)
if self.data_type == "standard":
state_alphabet = state_alphabet_factory(
fundamental_states="0123456789",
no_data_symbol="?",
gap_symbol="-",
case_sensitive=False)
self.char_matrix.state_alphabets.append(state_alphabet)
lines = filesys.get_lines(stream)
if len(lines) == 0:
raise error.DataSourceError("No data in source", stream=self.stream)
elif len(lines) <= 2:
raise error.DataParseError("Expecting at least 2 lines in PHYLIP format data source", stream=self.stream)
desc_line = lines[0]
lines = lines[1:]
m = re.match('\s*(\d+)\s+(\d+)\s*$', desc_line)
if m is None:
raise self._data_parse_error("Invalid data description line: '%s'" % desc_line)
self.ntax = int(m.groups()[0])
self.nchar = int(m.groups()[1])
if self.ntax == 0 or self.nchar == 0:
raise error.DataSourceError("No data in source", stream=self.stream)
if self.interleaved:
self._parse_interleaved(lines)
else:
self._parse_sequential(lines)
product = self.Product(
taxon_namespaces=None,
tree_lists=None,
char_matrices=[self.char_matrix])
return product
def _parse_taxon_from_line(self, line, line_index):
if self.strict:
seq_label = line[:10].strip()
line = line[10:]
else:
if self.multispace_delimiter:
parts = re.split('[ \t]{2,}', line, maxsplit=1)
else:
parts = re.split('[ \t]{1,}', line, maxsplit=1)
seq_label = parts[0]
if len(parts) < 2:
line = ''
else:
line = parts[1]
seq_label = seq_label.strip()
if not seq_label:
raise self._data_parse_error("Expecting taxon label", line_index=line_index)
if self.underscores_to_spaces:
seq_label = seq_label.replace('_', ' ')
current_taxon = self.char_matrix.taxon_namespace.require_taxon(label=seq_label)
if current_taxon not in self.char_matrix:
self.char_matrix[current_taxon] = self.char_matrix.new_sequence(taxon=current_taxon)
else:
if len(self.char_matrix[current_taxon]) >= self.nchar:
raise self._data_parse_error("Cannot add characters to sequence for taxon '%s': already has declared number of characters (%d)" \
% (current_taxon.label, self.char_matrix[current_taxon]), line_index=line_index)
return current_taxon, line
def _parse_sequence_from_line(self, current_taxon, line, line_index):
if self.data_type == "continuous":
for c in line.split():
if not c:
continue
try:
state = float(c)
except ValueError:
if not self.ignore_invalid_chars:
raise self._data_parse_error("Invalid state for taxon '%s': '%s'" % (current_taxon.label, c),
line_index=line_index)
else:
self.char_matrix[current_taxon].append(state)
else:
for c in line:
if c in [' ', '\t']:
continue
try:
state = self.char_matrix.default_state_alphabet[c]
except KeyError:
if not self.ignore_invalid_chars:
raise self._data_parse_error("Invalid state symbol for taxon '%s': '%s'" % (current_taxon.label, c),
line_index=line_index)
else:
self.char_matrix[current_taxon].append(state)
def _parse_sequential(self, lines, line_num_start=1):
seq_labels = []
current_taxon = None
for line_index, line in enumerate(lines):
line = line.rstrip()
if line == '':
continue
if current_taxon is None:
seq_label = None
current_taxon, line = self._parse_taxon_from_line(line, line_index)
# if current_taxon not in self.char_matrix and len(self.char_matrix.taxon_namespace) >= self.ntax:
# raise self._data_parse_error("Cannot add new sequence %s: declared number of sequences (%d) already defined" \
# % (current_taxon, len(self.char_matrix.taxon_namespace)), line_index=line_index)
self._parse_sequence_from_line(current_taxon, line, line_index)
if len(self.char_matrix[current_taxon]) >= self.nchar:
current_taxon = None
def _parse_interleaved(self, lines, line_num_start=1):
seq_labels = []
current_taxon = None
paged = False
paged_row = -1
for line_index, line in enumerate(lines):
current_taxon = None
line = line.rstrip()
if line == '':
continue
paged_row += 1
if paged_row >= self.ntax:
paged_row = 0
if paged:
current_taxon = self.char_matrix.taxon_namespace[paged_row]
else:
current_taxon, line = self._parse_taxon_from_line(line, line_index)
if len(self.char_matrix.taxon_namespace) == self.ntax:
paged = True
paged_row = -1
self._parse_sequence_from_line(current_taxon, line, line_index)
def _data_parse_error(self, message, line_index=None):
if line_index is None:
row = None
else:
row = line_index + 2
if self.strict and self.interleaved:
error_type = PhylipReader.PhylipStrictInterleavedError
elif self.strict:
error_type = PhylipReader.PhylipStrictSequentialError
elif self.interleaved:
error_type = PhylipReader.PhylipRelaxedInterleavedError
else:
error_type = PhylipReader.PhylipStrictSequentialError
return error_type(message, line_num=row, stream=self.stream)
|