1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
|
"""Classes and utilities for mutliple alignments from the EPO pipeline"""
import logging
import os
import pickle as cPickle
import re
from collections import namedtuple
from ._epo import ( # noqa: F401
bed_union,
cumulative_intervals,
fastLoadChain,
rem_dash,
)
log = logging.getLogger(__name__)
class Chain(namedtuple("Chain", "score tName tSize tStrand tStart tEnd qName qSize qStrand qStart qEnd id")):
"""A Chain header as in http://genome.ucsc.edu/goldenPath/help/chain.html
chain coordinates are with respect to the strand, so for example tStart on the + strand is the
distance from the leftmost position; tStart on the - strand is the distance from the rightmost position."""
__slots__ = ()
def __str__(self):
return "chain {score} {tName} {tSize} {tStrand} {tStart} {tEnd} {qName} {qSize} {qStrand} {qStart} {qEnd} {id}".format(
**self._asdict()
)
@classmethod
def _strfactory(cls, line):
"""factory class method for Chain
:param line: header of a chain (in .chain format)
"""
assert isinstance(line, str), "this is a factory from string"
line = line.rstrip().split()[1:] # the first component is the keyword "chain"
tup = [t[0](t[1]) for t in zip([int, str, int, str, int, int, str, int, str, int, int, str], line)]
return tuple.__new__(cls, tup)
@classmethod
def _make_from_epo(cls, trg_comp, qr_comp, trg_chrom_sizes, qr_chrom_sizes):
"""crate a chain of collinear rings from the given components.
The target of the chain will always be on the forward strand.
This is done to avoid confusion when mapping psl files. So,
if trg_comp.strand=-, qr_comp.strand=- (resp. +) the
chain header will have tStrand=+, qStrand=+ (resp. -). No strand
changes on the other cases.
:param trg_comp: target (i.e, the first) component
:type trg_comp: L{EPOitem}
:param qr_comp: query (i.e, the second) component
:type qr_comp: L{EPOitem}
:param trg_chrom_sizes: chromosome sizes of the target
:type trg_chrom_sizes: dictionary of the type (chrom) --> size
:param qr_chrom_sizes: chromosome sizes of the query
:type qr_chrom_sizes: dictionary of the type (chrom) --> size
:return: A L{Chain} instance"""
# size, target, query arrays
S, T, Q = [], [], []
# the target strand of the chain must be on the forward strand
trg_intervals = trg_comp.intervals(reverse=trg_comp.strand == "-")
qr_intervals = qr_comp.intervals(reverse=trg_comp.strand == "-")
if len(trg_intervals) == 0 or len(qr_intervals) == 0:
log.warning("deletion/insertion only intervals")
return None
A, B = rem_dash(trg_intervals, qr_intervals)
# correct for when cigar starts/ends with dashes (in number of bases)
tr_start_correction = max(B[0][0] - A[0][0], 0)
tr_end_correction = max(A[-1][1] - B[-1][1], 0)
qr_start_correction = max(A[0][0] - B[0][0], 0)
qr_end_correction = max(B[-1][1] - A[-1][1], 0)
a, b = A.pop(0), B.pop(0)
# intervals are 0-base, halfo-open => lengths = coordinate difference
while A or B:
if a[1] < b[1]:
T.append(0)
Q.append(A[0][0] - a[1])
S.append(min(a[1], b[1]) - max(a[0], b[0]))
a = A.pop(0)
elif b[1] < a[1]:
Q.append(0)
T.append(B[0][0] - b[1])
S.append(min(a[1], b[1]) - max(a[0], b[0]))
b = B.pop(0)
elif A and B:
assert 1 > 2, "there are dash columns"
else:
break
S.append(min(a[1], b[1]) - max(a[0], b[0]))
assert len(T) == len(Q) == len(S) - 1, "(S, T, Q) = (%d, %d, %d)" % tuple(map(len, (S, T, Q)))
tSize = trg_chrom_sizes[trg_comp.chrom]
qSize = qr_chrom_sizes[qr_comp.chrom]
# UCSC coordinates are 0-based, half-open and e! coordinates are 1-base, closed
# chain_start = epo_start - 1 and chain_end = epo_end
if qr_comp.strand == "+":
chain = Chain(
0,
trg_comp.chrom,
tSize,
"+",
(trg_comp.start - 1) + tr_start_correction,
trg_comp.end - tr_end_correction,
qr_comp.chrom,
qSize,
(qr_comp.strand == trg_comp.strand and "+" or "-"),
(qr_comp.start - 1) + qr_start_correction,
qr_comp.end - qr_end_correction,
qr_comp.gabid,
)
else:
chain = Chain(
0,
trg_comp.chrom,
tSize,
"+",
(trg_comp.start - 1) + tr_start_correction,
trg_comp.end - tr_end_correction,
qr_comp.chrom,
qSize,
(qr_comp.strand == trg_comp.strand and "+" or "-"),
(qr_comp.start - 1) + qr_end_correction,
qr_comp.end - qr_start_correction,
qr_comp.gabid,
)
# strand correction. in UCSC coordinates this is: size - coord
if chain.qStrand == "-":
chain = chain._replace(qEnd=chain.qSize - chain.qStart, qStart=chain.qSize - chain.qEnd)
assert chain.tEnd - chain.tStart == sum(S) + sum(T), "[%s] %d != %d" % (
str(chain),
chain.tEnd - chain.tStart,
sum(S) + sum(T),
)
assert chain.qEnd - chain.qStart == sum(S) + sum(Q), "[%s] %d != %d" % (
str(chain),
chain.qEnd - chain.qStart,
sum(S) + sum(Q),
)
return chain, S, T, Q
def slice(self, who):
"return the slice entry (in a bed6 format), AS IS in the chain header"
assert who in ("t", "q"), "who should be 't' or 'q'"
if who == "t":
return (self.tName, self.tStart, self.tEnd, self.id, self.score, self.tStrand)
else:
return (self.qName, self.qStart, self.qEnd, self.id, self.score, self.qStrand)
def bedInterval(self, who):
"return a BED6 entry, thus DOES coordinate conversion for minus strands"
if who == "t":
st, en = self.tStart, self.tEnd
if self.tStrand == "-":
st, en = self.tSize - en, self.tSize - st
return (self.tName, st, en, self.id, self.score, self.tStrand)
else:
st, en = self.qStart, self.qEnd
if self.qStrand == "-":
st, en = self.qSize - en, self.qSize - st
assert en - st == self.qEnd - self.qStart
return (self.qName, st, en, self.id, self.score, self.qStrand)
@classmethod
def _parse_file(cls, path, pickle=False):
"""parse a .chain file into a list of the type [(L{Chain}, arr, arr, arr) ...]
:param path: path of the file"""
fname = path
if fname.endswith(".gz"):
fname = path[:-3]
if fname.endswith(".pkl"):
# you asked for the pickled file. I'll give it to you
log.debug("loading pickled file %s ...", fname)
with open(fname, "rb") as f:
return cPickle.load(f)
fname_pkl = f"{fname}.pkl"
if os.path.isfile(fname_pkl):
# there is a cached version I can give to you
log.info("loading pickled file %s ...", fname_pkl)
if os.stat(path).st_mtime > os.stat(fname_pkl).st_mtime:
log.critical("*** pickled file %s is not up to date ***", fname_pkl)
try:
with open(fname_pkl, "rb") as f:
return cPickle.load(f)
except Exception:
log.warning("Loading pickled file %s failed", fname_pkl)
data = fastLoadChain(path, cls._strfactory)
if pickle and not os.path.isfile(fname_pkl):
log.info("pickling to %s", fname_pkl)
with open(fname_pkl, "wb") as f:
cPickle.dump(data, f)
return data
class EPOitem(namedtuple("Epo_item", "species gabid chrom start end strand cigar")):
"this format is how alignments are delivered from e!"
__slots__ = ()
cigar_pattern = re.compile(r"(\d*)([MD])")
def __repr__(self):
return str(self)
def __str__(self):
c = self.cigar[:5] + "..." + self.cigar[-5:]
return "(%s %s %s %d %d %s %s)" % tuple(self[:6] + (c,))
@classmethod
def _strfactory(cls, line):
"""factory method for an EPOitem
:param line: a line of input"""
cmp = line.rstrip().split()
chrom = cmp[2]
if not chrom.startswith("chr"):
chrom = f"chr{chrom}"
instance = tuple.__new__(
cls, (cmp[0], cmp[1], chrom, int(cmp[3]), int(cmp[4]), {"1": "+", "-1": "-"}[cmp[5]], cmp[6])
)
span = instance.end - instance.start + 1
m_num = sum((t[1] == "M" and [t[0]] or [0])[0] for t in instance.cigar_iter(False))
if span != m_num:
log.warning(
"[%s] %s.%s:%s-%s.(span) %d != %d (matches)",
instance.gabid,
instance.species,
instance.chrom,
instance.start,
instance.end,
span,
m_num,
)
return None
return instance
@classmethod
def _parse_epo(cls, fname):
"""Load an entire file in the EPO format into a dictionary of the type {gab_id => [Epoitem, ...]}
:param fname: file name"""
data = {}
with open(fname) as fd:
for el in (cls._strfactory(_) for _ in fd):
if el:
data.setdefault(el.gabid, []).append(el)
log.info("parsed %d elements from %s", len(data), fname)
return data
def cigar_iter(self, reverse):
"""self.cigar => [(length, type) ... ] iterate the cigar
:param reverse: whether to iterate in the reverse direction (right-to-left)
:type reverse: boolean
:return a list of pairs of the type [(length, M/D) ..]
"""
l = 0
P = self.cigar_pattern
data = []
cigar = self.cigar
parsed_cigar = re.findall(P, cigar)
if reverse:
parsed_cigar = parsed_cigar[::-1]
for _l, t in parsed_cigar:
# 1M is encoded as M
l = _l and int(_l) or 1 # int(_l) cannot be 0
data.append((l, t))
return data
def intervals(self, reverse, thr=0):
"""return a list of (0-based half-open) intervals representing the match regions of the cigar
for example 4MD4M2DM with reverse=False will produce [(0,4), (5,9), (11,12)]
4MD4M2DM with reverse=True will produce [(0,1), (3,7), (8,12)] (= 12 - previous interval)
:param reverse: whether to iterate in the reverse direction (right-to-left) (this is passed as is to self.cigar_iter)
:type reverse: boolean
:param thr: shift all intervals by this much
:type thr: integer
:return: list of pairs"""
d = [(thr, thr)]
dl = 0
for tup in self.cigar_iter(reverse):
if tup[1] == "D":
dl = tup[0]
else:
s = d[-1][1] + dl
d.append((s, s + tup[0]))
assert d[0] == (thr, thr)
# assert that nr. of Ms in the interval == sum of produced intervals
assert sum(t[0] for t in self.cigar_iter(False) if t[1] == "M") == sum(t[1] - t[0] for t in d)
d_sum = sum(t[1] - t[0] for t in d)
assert self.end - self.start + 1 == d_sum, "[ (%d, %d) = %d ] != %d" % (
self.start,
self.end,
self.end - self.start + 1,
d_sum,
)
return d[1:] # clip the (thr, thr) entry
|