1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
|
"""Utilities for working with FASTA-formatted sequences.
This module uses Martel-based parsing to speed up the parsing process.
Classes:
Record Holds FASTA sequence data.
Iterator Iterates over sequence data in a FASTA file.
Dictionary Accesses a FASTA file using a dictionary interface.
RecordParser Parses FASTA sequence data into a Record object.
SequenceParser Parses FASTA sequence data into a Sequence object.
Functions:
index_file Index a FASTA file for a Dictionary.
"""
import os
import cStringIO
from Bio import Seq
from Bio import SeqRecord
from Bio import Alphabet
from Bio import Mindy
from Bio.Mindy import SimpleSeqRecord
from Bio.expressions import fasta
from Martel.LAX import LAX
class Record:
"""Holds information from a FASTA record.
Members:
title Title line ('>' character not included).
sequence The sequence.
"""
def __init__(self, colwidth=60):
"""__init__(self, colwidth=60)
Create a new Record. colwidth specifies the number of residues
to put on each line when generating FASTA format.
"""
self.title = ''
self.sequence = ''
self._colwidth = colwidth
def __str__(self):
s = []
s.append('>%s' % self.title)
i = 0
while i < len(self.sequence):
s.append(self.sequence[i:i+self._colwidth])
i = i + self._colwidth
return os.linesep.join(s)
class Iterator:
"""Returns one record at a time from a FASTA file.
"""
def __init__(self, handle, parser = None, debug = 0):
"""Initialize a new iterator.
"""
it_builder = fasta.format.make_iterator("record", debug_level = debug)
self._parser = parser
self._iterator = it_builder.iterateFile(handle,
LAX(fields = ['bioformat:sequence', 'bioformat:description']))
def __iter__(self):
return iter(self.next, None)
def next(self):
try:
result = self._iterator.next()
except StopIteration:
return None
if self._parser is None:
# return a string formatted result comparable to the original
parser = RecordParser()
rec = parser.convert_lax(result)
return str(rec) + "\n"
else:
return self._parser.convert_lax(result)
class _MartelBaseFastaParser:
"""Base class to provide a old-Biopython style to Martel parsing.
"""
def __init__(self, debug):
self._it_builder = fasta.format.make_iterator("record",
debug_level = debug)
def parse(self, handle):
"""Parse a single FASTA record in an file handle.
Internally, this parses the record into a dictionary
and then passes that on to the convert_lax function defined
in a derived class.
"""
iterator = self._it_builder.iterateFile(handle,
LAX(fields = ['bioformat:sequence', 'bioformat:description']))
return self.convert_lax(iterator.next())
def parse_str(self, str):
return self.parse(cStringIO.StringIO(str))
def convert_lax(self, result):
raise NotImplementedError("Derived class must implement")
class RecordParser(_MartelBaseFastaParser):
"""Parses FASTA sequence data into a Fasta.Record object.
"""
def __init__(self, debug = 0):
_MartelBaseFastaParser.__init__(self, debug)
def convert_lax(self, result):
"""Convert a dictionary LAX parsing result into a Record object.
"""
rec = Record()
rec.title = "".join(result['bioformat:description'])
if 'bioformat:sequence' in result:
rec.sequence = "".join(result['bioformat:sequence'])
else:
rec.sequence = ""
return rec
class SequenceParser(_MartelBaseFastaParser):
"""Parses FASTA sequence data into a SeqRecord object.
"""
def __init__(self, alphabet = Alphabet.generic_alphabet, title2ids = None,
debug = 0):
"""Initialize a Scanner and Sequence Consumer.
Arguments:
o alphabet - The alphabet of the sequences to be parsed. If not
passed, this will be set as generic_alphabet.
o title2ids - A function that, when given the title of the FASTA
file (without the beginning >), will return the id, name and
description (in that order) for the record. If this is not given,
then the entire title line will be used as the description.
"""
_MartelBaseFastaParser.__init__(self, debug)
self.alphabet = alphabet
self.title2ids = title2ids
def convert_lax(self, result):
"""Convert a dictionary LAX parsing result into a Sequence object.
"""
seq = Seq.Seq("".join(result['bioformat:sequence']), self.alphabet)
rec = SeqRecord.SeqRecord(seq)
title = "".join(result['bioformat:description'])
if self.title2ids:
seq_id, name, descr = self.title2ids(title)
rec.id = seq_id
rec.name = name
rec.description = descr
else:
rec.description = title
return rec
class Dictionary(dict):
"""Accesses an indexed FASTA file using a dictionary interface.
"""
def __init__(self, indexname, parser=None, filename = None):
"""Open a Fasta Dictionary.
indexname is the name of the index for the dictionary. The index should
have been created using the index_file function.
parser is an optional Parser object to change the results into another
form. If set to None, then the raw contents of the file will be returned.
filename specifies the name of the file that this index references.
This is useful in cases where the file has been moved since indexing.
If no filename is supplied (the default) the filename stored in the
index will be used. XXX This is no longer supported -- use symbolic
links in the filesystem.
"""
# we can't support finding the index file name if we want to follow
# standard open-bio fetching protocols.
if filename is not None:
raise AttributeError("Specifying filenames is no longer supported")
self._index = Mindy.open(indexname)
self._parser = parser
primary_key_retriever = self._index['id']
for k in primary_key_retriever.keys():
dict.__setitem__(self,k,None)
def _load_seq(self,key):
try:
seqs = self._index.lookup(id = key)
# if we can't do that, we have to try and fetch by alias
except KeyError:
seqs = self._index.lookup(aliases = key)
if len(seqs) == 1:
seq = seqs[0]
else:
raise KeyError("Multiple sequences found for %s" % key)
if self._parser:
handle = cStringIO.StringIO(seq.text)
self[key] = self._parser.parse(handle)
else:
self[key] = seq.text
def __getitem__(self, key):
if self.has_key(key) and dict.__getitem__(self,key) is None:
self._load_seq(key)
return dict.__getitem__(self,key)
def index_file(filename, indexname, rec2key = None, use_berkeley = 0):
"""Index a FASTA file.
filename is the name of the file to index.
indexname is the name of the dictionary to be created. This can be
just the name of the index, in which case the index information will
be created in a directory of the given index name in the current
directory, or a full pathname to a directory to save the indexing
information.
rec2key is an optional callback fuction that takes a Fasta Record and
generates a unique key (e.g. the accession number) for the record.
Optionally, it can also return 3 items, to be used as the id (unique key)
name, and aliases for the index. If not specified, the sequence title
will be used.
use_berkeley specifies whether to use the BerkeleyDB indexer, which
uses the bsddb3 wrappers around the embedded database Berkeley DB. By
default, the standard flat file (non-Berkeley) indexes are used.
"""
if rec2key:
indexer = _FastaFunctionIndexer(rec2key)
else:
indexer = _FastaTitleIndexer()
if use_berkeley:
SimpleSeqRecord.create_berkeleydb([filename], indexname, indexer)
else:
SimpleSeqRecord.create_flatdb([filename], indexname, indexer)
class _FastaTitleIndexer(SimpleSeqRecord.BaseSeqRecordIndexer):
"""Simple indexer to index by the title of a FASTA record.
This doesn't do anything fancy, just gets the title and uses that as the
identifier.
"""
def __init__(self):
SimpleSeqRecord.BaseSeqRecordIndexer.__init__(self)
def primary_key_name(self):
return "id"
def secondary_key_names(self):
return ["name", "aliases"]
def get_id_dictionary(self, seq_record):
sequence_id = seq_record.description
id_info = {"id" : [sequence_id],
"name" : [],
"aliases" : []}
return id_info
class _FastaFunctionIndexer(SimpleSeqRecord.BaseSeqRecordIndexer):
"""Indexer to index based on values returned by a function.
This class is passed a function to parse description titles from a Fasta
title. It needs to return either one item, which is an id from the title,
or three items which are (in order), the id, a list of names, and a list
of aliases.
This indexer allows indexing to be completely flexible based on passed
functions.
"""
def __init__(self, index_function):
SimpleSeqRecord.BaseSeqRecordIndexer.__init__(self)
self.index_function = index_function
def primary_key_name(self):
return "id"
def secondary_key_names(self):
return ["name", "aliases"]
def get_id_dictionary(self, seq_record):
# make a FASTA record to make this compatible with previous Biopython
# code
tmp_rec = Record()
tmp_rec.title = seq_record.description
tmp_rec.sequence = seq_record.seq.data
items = self.index_function(tmp_rec)
if type(items) is not type([]) and type(items) is not type(()):
items = [items]
if len(items) == 1:
seq_id = items[0]
name = []
aliases = []
elif len(items) == 3:
seq_id, name, aliases = items
else:
raise ValueError("Unexpected items from index function: %s" %
(items))
return {"id" : [seq_id],
"name" : name,
"aliases" : aliases}
|