File: align.py

package info (click to toggle)
python-sqt 0.8.0-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 5,964; sh: 38; makefile: 10
file content (197 lines) | stat: -rw-r--r-- 6,070 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import subprocess
from collections import Counter
from io import StringIO

from .io.fasta import FastaReader
from .utils import available_cpu_count

from sqt._helpers import globalalign, hamming_distance


class GlobalAlignment:
	"""A global alignment between two strings"""

	def __init__(self, s, t, semiglobal=False):
		if type(s) is str:
			s = s.encode('ascii')
		if type(t) is str:
			t = t.encode('ascii')
		flags = 15 if semiglobal else 0  # TODO this constant shouldn't be here
		# globalalign uses scores. Set match to 0 to get the same result as with
		# edit distance unless we compute semiglobal alignments, where edit
		# distance does not work.
		match = 1 if semiglobal else 0
		(row1, row2, start1, stop1, start2, stop2, errors) = globalalign(s, t, flags=flags, match=match)
		assert semiglobal or start1 == start2 == 0
		assert semiglobal or stop1 == len(s)
		assert semiglobal or stop2 == len(t)
		assert len(row1) == len(row2)
		self.row1 = row1.decode('ascii')
		self.row2 = row2.decode('ascii')
		self.start1 = start1
		self.stop1 = stop1
		self.start2 = start2
		self.stop2 = stop2
		self.errors = errors
		# Overlap start and stop in alignment coordinates.
		# row1[overlap_start:overlap_stop] is the overlapping alignment.
		self.overlap_start = max(start1, start2)
		self.overlap_stop = len(row1) - max(len(s) - stop1, len(t) - stop2)

	def _replace_gaps(self, row, gap_char):
		start = self.overlap_start
		stop = self.overlap_stop
		front = row[:start].replace('\0', ' ')
		middle = row[start:stop].replace('\0', gap_char)
		end = row[stop:].replace('\0', ' ')
		return front + middle + end

	def print(self, width=100, gap_char='-'):
		"""multi-line output and parameters, therefore not called __str__"""
		row1 = self._replace_gaps(self.row1, gap_char)
		row2 = self._replace_gaps(self.row2, gap_char)
		for i in range(0, len(row1), width):
			top = row1[i:i+width]
			bottom = row2[i:i+width]
			m = []
			for c, d in zip(top, bottom):
				if c == d:
					m.append('|')
				elif c == ' ' or d == ' ':
					m.append(' ')
				else:
					m.append('X')
			middle = ''.join(m)
			if i > 0:
				print()
			print(top)
			print(middle)
			print(bottom)

	def __str__(self):
		return 'GA(row1={}, row2={}, errors={})'.format(
			self.row1, self.row2, self.errors)


def edit_distance(s, t, maxdiff=-1):
	"""
	Return the edit distance between the strings s and t.
	The edit distance is the sum of the numbers of insertions, deletions,
	and mismatches that is minimally necessary to transform one string
	into the other.

	If maxdiff is not -1, then a banded alignment is performed. In that case,
	the true edit distance is returned if and only if it is maxdiff or less.
	Otherwise, a value is returned that is guaranteed to be greater than
	maxdiff, but which is not necessarily the true edit distance.
	"""
	m = len(s) # index: i
	n = len(t) # index: j

	e = maxdiff
	if e != -1 and abs(m - n) > e:
		return abs(m - n)

	# dynamic programming "table" (just a single column)
	# note that using an array('h', ...) here is not faster
	costs = list(range(m+1))

	# calculate alignment (using unit costs)
	for j in range(1, n+1):
		start = 1
		stop = m + 1
		if e != -1: # banded
			stop = min(stop, j + e + 1)
			if j <= e:
				prev = costs[0]
				costs[0] += 1
				start = 1
			else:
				start = j - e
				prev = costs[start-1]
		else:
			prev = costs[0]
			costs[0] += 1
		for i in range(start, stop):
			c = min(
				prev + int(s[i-1] != t[j-1]),
				costs[i] + 1,
				costs[i-1] + 1)
			prev = costs[i]
			costs[i] = c

	return costs[-1]


try:
	from sqt._helpers import edit_distance
except:
	pass


def multialign(sequences, program='mafft', threads=available_cpu_count()):
	"""
	Wrapper for multiple sequence alignment tools. Currently supported are
	* ClustalO (http://www.clustal.org/omega/),
	* MAFFT (http://mafft.cbrc.jp/alignment/software/)
	* MUSCLE (http://www.drive5.com/muscle/)

	A package using the libclustalo library directly also exists:
	https://github.com/benchling/clustalo-python/blob/master/clustalo.c
	It is not for Python 3.

	sequences -- a dictionary mapping names to sequences.
		Use an OrderedDict if order matters.

	program -- must be 'clustalo', 'mafft', 'muscle', 'muscle-medium',
		'muscle-fast'. The latter calls MUSCLE with parameters that make it run
		faster (but less accurate). 'muscle-medium' is in between muscle and
		muscle-fast.

	threads -- number of threads to use for those programs that support it.
		By default, set to the number of processors.
	"""
	if program == 'mafft':
		args = ['mafft', '--quiet', '--thread', str(threads), '-']
	elif program == 'clustalo':
		args = ['clustalo', '--threads='+str(threads), '--infile=-']
	elif program == 'muscle':
		args = ['muscle', '-quiet', '-in', '-', '-out', '-']
	elif program == 'muscle-fast':
		args = ['muscle', '-quiet', '-maxiters', '1', '-diags', '-in', '-', '-out', '-']
	elif program == 'muscle-medium':
		args = ['muscle', '-quiet', '-maxiters', '2', '-diags', '-in', '-', '-out', '-']
	else:
		raise ValueError('program {!r} not supported'.format(program))

	fasta_data = ''.join('>{}\n{}\n'.format(name, seq) for name, seq in sequences.items())
	result = subprocess.check_output(args, input=fasta_data, universal_newlines=True)
	aligned = list(FastaReader(StringIO(result)))

	return { record.name: record.sequence.upper() for record in aligned }


def consensus(aligned, threshold=0.7, ambiguous='N', keep_gaps=False):
	"""
	Compute a consensus from multialign() output.

	Idea taken from BioPython’s SummaryInfo.dumb_consensus function.

	aligned -- a dict mapping names to sequences or a list of sequences
	keep_gaps -- whether the returned sequence contains gaps (-)
	"""
	result = []
	n = len(aligned)
	if hasattr(aligned, 'values'):
		sequences = aligned.values()
	else:
		sequences = aligned
	ambiguous = 'N'
	for chars in zip(*sequences):
		char, freq = Counter(chars).most_common(1)[0]
		if freq / n >= threshold:
			if keep_gaps or char != '-':
				result.append(char)
		else:
			result.append(ambiguous)
	return ''.join(result)