1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
# Copyright 2006-2016 by Peter Cock. All rights reserved.
# Revisions copyright 2021 by Michiel de Hoon.
#
# This file is part of the Biopython distribution and governed by your
# choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
# Please see the LICENSE file that should have been included as part of this
# package.
"""Bio.Align support for the "nexus" file format.
You are expected to use this module via the Bio.Align functions.
See also the Bio.Nexus module (which this code calls internally),
as this offers more than just accessing the alignment or its
sequences as SeqRecord objects.
"""
from io import StringIO
from Bio.Align import Alignment
from Bio.Align import interfaces
from Bio.SeqRecord import SeqRecord
from Bio.Nexus import Nexus
class AlignmentWriter(interfaces.AlignmentWriter):
"""Nexus alignment writer.
Note that Nexus files are only expected to hold ONE alignment
matrix.
You are expected to call this class via Bio.Align.write().
"""
def write_file(self, alignments):
"""Write a file with the alignments, and return the number of alignments.
alignments - A list or iterator returning Alignment objects
"""
count = super().write_file(alignments)
if count != 1:
raise ValueError("Expected to write 1 alignment; wrote %d" % count)
return count
def write_header(self, alignments):
"""Use this to write the file header."""
stream = self.stream
stream.write("#NEXUS\n")
def format_alignment(self, alignment, interleave=None):
"""Return a string with a single alignment in the Nexus format.
Creates an empty Nexus object, adds the sequences
and then gets Nexus to prepare the output.
Default interleave behavior: Interleave if columns > 1000
--> Override with interleave=[True/False]
"""
nseqs, length = alignment.shape
if nseqs == 0:
raise ValueError("Must have at least one sequence")
if length == 0:
raise ValueError("Non-empty sequences are required")
rows, columns = alignment.shape
if rows == 0:
raise ValueError("Must have at least one sequence")
if columns == 0:
raise ValueError("Non-empty sequences are required")
datatype = self._classify_mol_type_for_nexus(alignment)
minimal_record = (
"begin data; dimensions ntax=0 nchar=0; format datatype=%s; end;" % datatype
)
n = Nexus.Nexus(minimal_record)
for record, aligned_sequence in zip(alignment.sequences, alignment):
# Sanity test sequences (should this be even stricter?)
if datatype == "dna" and "U" in record.seq:
raise ValueError(f"{record.id} contains U, but DNA alignment")
elif datatype == "rna" and "T" in record.seq:
raise ValueError(f"{record.id} contains T, but RNA alignment")
n.add_sequence(record.id, aligned_sequence)
# Note: MrBayes may choke on large alignments if not interleaved
if interleave is None:
interleave = columns > 1000
stream = StringIO()
n.write_nexus_data(stream, interleave=interleave)
stream.seek(0)
return stream.read()
def _classify_mol_type_for_nexus(self, alignment):
"""Return 'protein', 'dna', or 'rna' based on records' molecule type (PRIVATE).
All the records must have a molecule_type annotation, and they must
agree.
Raises an exception if this is not possible.
"""
values = {
sequence.annotations.get("molecule_type", None)
for sequence in alignment.sequences
}
if all(_ and "DNA" in _ for _ in values):
return "dna" # could have been a mix of "DNA" and "gDNA"
elif all(_ and "RNA" in _ for _ in values):
return "rna" # could have been a mix of "RNA" and "mRNA"
elif all(_ and "protein" in _ for _ in values):
return "protein"
else:
raise ValueError("Need the molecule type to be defined")
class AlignmentIterator(interfaces.AlignmentIterator):
"""Nexus alignment iterator."""
def __init__(self, source):
"""Create an AlignmentIterator object.
Arguments:
- source - input data or file name
"""
super().__init__(source, mode="t", fmt="Nexus")
def _read_header(self, stream):
try:
line = next(stream)
except StopIteration:
raise ValueError("Empty file.") from None
if line.strip() != "#NEXUS":
raise ValueError("File does not start with NEXUS header.")
def _read_next_alignment(self, stream):
# NOTE - We only expect ONE alignment matrix per Nexus file.
n = Nexus.Nexus(stream)
if not n.matrix:
# No alignment found
return
# Bio.Nexus deals with duplicated names by adding a '.copy' suffix.
# The original names and the modified names are kept in these two lists:
assert len(n.unaltered_taxlabels) == len(n.taxlabels)
# TODO - Can we extract any annotation too?
if n.datatype in ("dna", "nucleotide"):
annotations = {"molecule_type": "DNA"}
elif n.datatype == "rna":
annotations = {"molecule_type": "RNA"}
elif n.datatype == "protein":
annotations = {"molecule_type": "protein"}
else:
annotations = None
aligned_seqs = [str(n.matrix[new_name]) for new_name in n.taxlabels]
records = [
SeqRecord(
n.matrix[new_name].replace("-", ""),
id=old_name,
annotations=annotations,
)
for old_name, new_name in zip(n.unaltered_taxlabels, n.taxlabels)
]
coordinates = Alignment.infer_coordinates(aligned_seqs)
alignment = Alignment(records, coordinates)
self._close()
return alignment
|