1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
|
# kate: syntax Python;
from cpython.mem cimport PyMem_Malloc, PyMem_Free
from collections import Counter
from cython.view cimport array as cvarray
import cython
from ._codons import GENETIC_CODE
DEF START_WITHIN_SEQ1 = 1
DEF START_WITHIN_SEQ2 = 2
DEF STOP_WITHIN_SEQ1 = 4
DEF STOP_WITHIN_SEQ2 = 8
DEF SEMIGLOBAL = 15
DEF ALLOW_WILDCARD_SEQ1 = 1
DEF ALLOW_WILDCARD_SEQ2 = 2
DEF INSERTION_COST = 1
DEF DELETION_COST = 1
DEF MATCH_COST = 0
DEF MISMATCH_COST = 1
DEF WILDCARD_CHAR = b'N'
# structure for a DP matrix entry
ctypedef struct ScoreEntry:
int score
int backtrace
# insertion means: inserted into seq1 (does not appear in seq2)
DEF GAPCHAR = b'\0'
@cython.boundscheck(False)
def edit_distance(s, t, int maxdiff=-1):
"""
Return the edit distance between the strings s and t.
The edit distance is the sum of the numbers of insertions, deletions,
and mismatches that is minimally necessary to transform one string
into the other.
If maxdiff is not -1, then a banded alignment is performed. In that case,
the true edit distance is returned if and only if it is maxdiff or less.
Otherwise, a value is returned that is guaranteed to be greater than
maxdiff, but which is not necessarily the true edit distance.
"""
cdef int m = len(s) # index: i
cdef int n = len(t) # index: j
cdef int e = maxdiff
cdef int i, j, start, stop, c, prev, smallest
cdef bint match
cdef bytes s_bytes, t_bytes
cdef char* sv
cdef char* tv
# Return early if string lengths are too different
if e != -1 and abs(m - n) > e:
return abs(m - n)
s_bytes = s.encode() if isinstance(s, unicode) else s
t_bytes = t.encode() if isinstance(t, unicode) else t
sv = s_bytes
tv = t_bytes
# Skip identical prefixes
while m > 0 and n > 0 and sv[0] == tv[0]:
sv += 1
tv += 1
m -= 1
n -= 1
# Skip identical suffixes
while m > 0 and n > 0 and sv[m-1] == tv[n-1]:
m -= 1
n -= 1
cdef int[:] costs = cvarray(shape=(m+1,), itemsize=sizeof(int), format="i")
if e == -1:
# Regular (unbanded) global alignment
with nogil:
for i in range(m + 1):
costs[i] = i
# compute columns of the alignment matrix (using unit costs)
prev = 0
for j in range(1, n+1):
prev = costs[0]
costs[0] += 1
for i in range(1, m+1):
match = sv[i-1] == tv[j-1]
c = min(
prev + 1 - match,
costs[i] + 1,
costs[i-1] + 1)
prev = costs[i]
costs[i] = c
else:
# Banded alignment
with nogil:
for i in range(m + 1):
costs[i] = i
smallest = 0
for j in range(1, n + 1):
stop = min(j + e + 1, m + 1)
if j <= e:
prev = costs[0]
costs[0] += 1
smallest = costs[0]
start = 1
else:
start = j - e
prev = costs[start - 1]
smallest = maxdiff + 1
for i in range(start, stop):
match = sv[i-1] == tv[j-1]
c = min(
prev + 1 - match,
costs[i] + 1,
costs[i-1] + 1)
prev = costs[i]
costs[i] = c
smallest = min(smallest, c)
if smallest > maxdiff:
break
if smallest > maxdiff:
return smallest
return costs[m]
#@cython.boundscheck(False)
def globalalign(char* s1, char* s2, int flags=0, int match=1, int mismatch=-2, int insertion=-2, int deletion=-2):
"""
Compute an optimal global or semiglobal alignment between strings s1 and s2.
An alignment is optimal if it has maximal score.
The optimal score is not returned. Instead, the number of errors is computed
and returned.
Return ... -> (r1, r2, start1, stop1, start2, stop2, errors)
TODO
This is a direct translation of the C code and should be re-written to make
it more readable. (Use Cython's memoryview for the matrix, avoid pointer-like
access to p1 and p2.)
FIXME THE REMAINDER OF THIS DOCSTRING
Return a tuple (row1, row2, start1, stop1, start2, stop2, errors)
where row1 and row2 are strings of the same length containing the alignment
(an INDEL is marked by a null byte ('\\0').
start1 is the position within row1 at which the part of s1, that is aligned, starts.
start2 is the position within row1 at which the part of s1, that is aligned, ends.
The same holds for start2, stop2.
It is always the case that at least one of start1 and start2 is zero.
It is always the case that either stop1==len(row1) or stop2==len(row2) or both
(note that len(row1)==len(row2)). This is a property of semiglobal alignments.
errors is the number of errors in the alignment.
For example, globalalign("SISSI", "MISSISSIPPI") returns:
row1 = [ 0, 0, 0, 'S', 'I', 'S', 'S', 'I', 0, 0, 0]
row2 = [ 'M', 'I', 'S', 'S', 'I', 'S', 'S', 'I', 'P', 'P', 'I']
start1, stop1 = 0, 5
start2, stop2 = 3, 8
errors = 0
This corresponds to the following alignment:
SISSI
|||||
MISSISSIPPI
"""
cdef int m = len(s1)
cdef int n = len(s2)
# DP Matrix:
# s2 (j)
# ----------> n
# |
# s1 (i) |
# |
# V
# m
# direction constants for backtrace table
cdef int LEFT = 1, UP = 2, DIAG = 3
# the DP matrix is stored column-major
cdef ScoreEntry[:,:] columns = cvarray(shape=(m+1, n+1), itemsize=sizeof(ScoreEntry), format="ii")
cdef int i, j, bt, score, tmp
# initialize first column
for i in range(m + 1):
columns[i, 0].score = 0 if (flags & START_WITHIN_SEQ1) else i * deletion
columns[i, 0].backtrace = UP
# initialize first row
for j in range(n + 1):
columns[0, j].score = 0 if (flags & START_WITHIN_SEQ2) else j * insertion
columns[0, j].backtrace = LEFT
# fill the entire DP matrix
# outer loop goes over columns
for j in range(1, n+1):
for i in range(1, m+1):
bt = DIAG
score = columns[i-1,j-1].score + (match if (s1[i-1] == s2[j-1]) else mismatch)
tmp = columns[i-1,j].score + insertion
if tmp > score:
bt = UP
score = tmp
tmp = columns[i,j-1].score + deletion
if tmp > score:
bt = LEFT
score = tmp
columns[i,j].score = score
columns[i,j].backtrace = bt
# initialize best score and its position to the bottomright cell
cdef int best_i = m # also: s1stop
cdef int best_j = n # also: s2stop
cdef int best = columns[m,n].score
if flags & STOP_WITHIN_SEQ2:
# search also in last row
for j in range(n + 1):
if columns[m,j].score >= best:
best = columns[m,j].score
best_i = m
best_j = j
cdef ScoreEntry* last_column
if flags & STOP_WITHIN_SEQ1:
# search also in last column
#last_column = &(columns[0,n])
for i in range(m + 1):
if columns[i,n].score >= best:
best_i = i
best_j = n
best = columns[i,n].score
# trace back
cdef char* alignment1 = <char*>PyMem_Malloc((m+n+4)*sizeof(char))
if not alignment1:
raise MemoryError()
cdef char* alignment2 = <char*>PyMem_Malloc((m+n+4)*sizeof(char))
if not alignment2:
PyMem_Free(alignment2)
raise MemoryError()
cdef char* p1 = alignment1
cdef char* p2 = alignment2
i = m
j = n
# first, walk from the lower right corner to the
# position where we found the maximum score
cdef int errors = 0
cdef int gaps_are_errors # if gaps are currently errors, this is 1, otherwise it's 0
gaps_are_errors = 0 if (flags & STOP_WITHIN_SEQ2) else 1
if i == best_i: # we are in the last row
while j > best_j:
p1[0] = GAPCHAR
j -= 1
p2[0] = s2[j]
p1 += 1
p2 += 1
errors += gaps_are_errors
else: # we are in the last column
gaps_are_errors = 0 if (flags & STOP_WITHIN_SEQ1) else 1
while i > best_i:
i -= 1
p1[0] = s1[i]
p2[0] = GAPCHAR
p1 += 1
p2 += 1
errors += gaps_are_errors
assert i == best_i and j == best_j
# the actual backtracing
# The alignments are constructed in reverse
# and this is undone afterwards.
cdef int direction
while i > 0 and j > 0:
direction = columns[i,j].backtrace
if direction == DIAG:
i -= 1
j -= 1
if s1[i] != s2[j]:
errors += 1
p1[0] = s1[i]
p2[0] = s2[j]
p1 += 1
p2 += 1
elif direction == LEFT:
errors += 1
p1[0] = GAPCHAR
j -= 1
p2[0] = s2[j]
p1 += 1
p2 += 1
elif direction == UP:
i -= 1
p1[0] = s1[i]
p2[0] = GAPCHAR
errors += 1
p1 += 1
p2 += 1
else:
assert False, 'DP table corrupt'
cdef int start1 = i if (flags & START_WITHIN_SEQ1) else 0
cdef int start2 = j if (flags & START_WITHIN_SEQ2) else 0
errors += (i - start1) + (j - start2)
while j > 0:
p1[0] = GAPCHAR
j -= 1
p2[0] = s2[j]
p1 += 1
p2 += 1
while i > 0:
i -= 1
p1[0] = s1[i]
p2[0] = GAPCHAR
p1 += 1
p2 += 1
assert i == 0 and j == 0
align1 = alignment1[:(p1-alignment1)]
align2 = alignment2[:(p2-alignment2)]
align1 = align1[::-1]
align2 = align2[::-1]
PyMem_Free(alignment1)
PyMem_Free(alignment2)
return (align1, align2, start1, best_i, start2, best_j, errors)
def byte_frequencies(bytes s):
"""Faster replacement for collections.Counter(s) for the case when s is a bytes object.
Speed advantage depends on the length of the bytes object. When the length is less
than 10, speedup is at least 2x. For length 100, speedup is 14x. For length 1000, speedup is
approx 100x.
"""
cdef int[256] frequencies
cdef int i
cdef unsigned char c
for i in range(256):
frequencies[i] = 0
for c in s:
frequencies[c] += 1
counter = Counter()
for i in range(256):
if frequencies[i] > 0:
counter[i] = frequencies[i]
return counter
def expected_errors(str qualities, int base=33):
cdef int i, q
cdef bytes quals = qualities.encode()
cdef char* cq = quals
cdef double e = 0.0
for i in range(len(qualities)):
q = cq[i] - base
e += 10 ** (-q / 10)
return e
def hamming_distance(unicode s, unicode t):
"""
Compute hamming distance between two strings. The two strings must have the
same length.
Return the number of differences between the strings.
"""
cdef Py_ssize_t m = len(s)
cdef Py_ssize_t n = len(t)
if m != n:
raise IndexError("sequences must have the same length")
cdef Py_ssize_t e = 0
cdef Py_ssize_t i
for i in range(m):
if s[i] != t[i]:
e += 1
return e
# Lookup table that maps nucleotides to their 2-bit representation
# and everything else to 255.
def _make_nt_trans():
table = bytearray([255]*256)
for frm, to in zip(b'ACGTacgt', b'\x00\x01\x02\x03\x00\x01\x02\x03'):
table[frm] = to
return bytes(table)
cdef bytes _nt_trans = _make_nt_trans()
# Lookup table that maps 6-bit encoded codons to amino acids
def _make_codon_array(stop_aa='*'):
triples = bytearray([ord(stop_aa)]*64)
for codon, aa in GENETIC_CODE.items():
b = codon.encode().translate(_nt_trans)
index = b[0] * 16 + b[1] * 4 + b[2]
triples[index] = ord(aa)
return bytes(triples)
cdef bytes _codon_array = _make_codon_array()
def nt_to_aa(s: str):
"""
Translate a sequence of nucleotides to a sequence of amino acids,
using the genetic code.
>>> nt_to_aa('AAA')
'K'
>>> nt_to_aa('AAATGATGG)
'K*W'
"""
cdef int i = 0
cdef int j = 0
cdef int v = 0
cdef unsigned char nt0, nt1, nt2
cdef char* nt_trans_ptr = _nt_trans
cdef char* codon_array_ptr = _codon_array
cdef bytes s_bytes = s.encode()
cdef char* b = s_bytes
cdef bytearray result = bytearray([0]*((len(s)+2)//3))
cdef char* c = result
cdef int n = len(b)
for i in range(0, n-2, 3):
v = 0
nt0 = nt_trans_ptr[b[i]]
nt1 = nt_trans_ptr[b[i+1]]
nt2 = nt_trans_ptr[b[i+2]]
if nt0 > 3 or nt1 > 3 or nt2 > 3:
raise ValueError("Encountered non-nucleotide character in codon {!r}".format(s[i:i+3]))
v = nt0 * 16 + nt1 * 4 + nt2
c[j] = codon_array_ptr[v]
j += 1
if i < n:
c[j] = '*'
return result.decode()
|