1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
#!/usr/bin/env python
__version__ = "$Revision: 1.14 $"
import os
import sys
from Bio.SeqIO.FASTA import FastaReader, FastaWriter
try:
import poly
_NamedTemporaryFile = poly.NamedTemporaryFile
except ImportError:
import tempfile
try:
_NamedTemporaryFile = tempfile.NamedTemporaryFile
except AttributeError: # no NamedTemporaryFile on 2.2, stuck without it
_NamedTemporaryFile = tempfile.TemporaryFile
def _build_align_cmdline(cmdline, pair, output_filename, kbyte=None, force_type=None, quiet=False):
"""
>>> os.environ["WISE_KBYTE"]="300000"
>>> _build_align_cmdline(["dnal"], ("seq1.fna", "seq2.fna"), "/tmp/output", kbyte=100000)
'dnal -kbyte 100000 seq1.fna seq2.fna > /tmp/output'
>>> _build_align_cmdline(["psw"], ("seq1.faa", "seq2.faa"), "/tmp/output_aa")
'psw -kbyte 300000 seq1.faa seq2.faa > /tmp/output_aa'
"""
cmdline = cmdline[:]
### XXX: force_type ignored
if kbyte is None:
try:
cmdline.extend(("-kbyte", os.environ["WISE_KBYTE"]))
except KeyError:
pass
else:
cmdline.extend(("-kbyte", str(kbyte)))
if not os.isatty(sys.stderr.fileno()):
cmdline.append("-quiet")
cmdline.extend(pair)
cmdline.extend((">", output_filename))
if quiet:
cmdline.extend(("2>", "/dev/null"))
cmdline_str = ' '.join(cmdline)
return cmdline_str
def align(cmdline, pair, kbyte=None, force_type=None, dry_run=False, quiet=False, debug=False):
"""
Returns a filehandle
"""
assert len(pair) == 2
output_file = _NamedTemporaryFile(mode='r')
input_files = _NamedTemporaryFile(mode="w"), _NamedTemporaryFile(mode="w")
if dry_run:
print _build_align_cmdline(cmdline,
pair,
output_file.name,
kbyte,
force_type,
quiet)
return
try:
for filename, input_file in zip(pair, input_files):
input_file.close()
FastaWriter(file(input_file.name, "w")).write(FastaReader(file(filename)).next())
input_file_names = [input_file.name for input_file in input_files]
cmdline_str = _build_align_cmdline(cmdline,
input_file_names,
output_file.name,
kbyte,
force_type,
quiet)
if debug:
print >>sys.stderr, cmdline_str
status = os.system(cmdline_str) >> 8
if status > 1:
if kbyte != 0: # possible memory problem; could be None
print >>sys.stderr, "INFO trying again with the linear model"
return align(cmdline, pair, 0, force_type, dry_run, quiet, debug)
else:
raise OSError, "%s returned %s" % (" ".join(cmdline), status)
finally:
for input_file in input_files:
os.remove(input_file.name)
return output_file
def all_pairs(singles):
"""
Generate pairs list for all-against-all alignments
>>> all_pairs(range(4))
[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
"""
pairs = []
singles = list(singles)
while singles:
suitor = singles.pop(0) # if sorted, stay sorted
pairs.extend([(suitor, single) for single in singles])
return pairs
def main():
pass
def _test(*args, **keywds):
import doctest, sys
doctest.testmod(sys.modules[__name__], *args, **keywds)
if __name__ == "__main__":
if __debug__:
_test()
main()
|