File: epo.py

package info (click to toggle)
python-bx 0.13.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,000 kB
  • sloc: python: 17,136; ansic: 2,326; makefile: 24; sh: 8
file content (327 lines) | stat: -rw-r--r-- 12,048 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
"""Classes and utilities for mutliple alignments from the EPO pipeline"""

import logging
import os
import pickle as cPickle
import re
from collections import namedtuple

from ._epo import (  # noqa: F401
    bed_union,
    cumulative_intervals,
    fastLoadChain,
    rem_dash,
)

log = logging.getLogger(__name__)


class Chain(namedtuple("Chain", "score tName tSize tStrand tStart tEnd qName qSize qStrand qStart qEnd id")):
    """A Chain header as in http://genome.ucsc.edu/goldenPath/help/chain.html

    chain coordinates are with respect to the strand, so for example tStart on the + strand is the
    distance from the leftmost position; tStart on the - strand is the distance from the rightmost position."""

    __slots__ = ()

    def __str__(self):
        return "chain {score} {tName} {tSize} {tStrand} {tStart} {tEnd} {qName} {qSize} {qStrand} {qStart} {qEnd} {id}".format(
            **self._asdict()
        )

    @classmethod
    def _strfactory(cls, line):
        """factory class method for Chain

        :param line: header of a chain (in .chain format)
        """

        assert isinstance(line, str), "this is a factory from string"

        line = line.rstrip().split()[1:]  # the first component is the keyword "chain"
        tup = [t[0](t[1]) for t in zip([int, str, int, str, int, int, str, int, str, int, int, str], line)]
        return tuple.__new__(cls, tup)

    @classmethod
    def _make_from_epo(cls, trg_comp, qr_comp, trg_chrom_sizes, qr_chrom_sizes):
        """crate a chain of collinear rings from the given  components.

        The target of the chain will always be on the forward strand.
        This is done to avoid confusion when mapping psl files. So,
        if trg_comp.strand=-, qr_comp.strand=- (resp. +) the
        chain header will have tStrand=+, qStrand=+ (resp. -). No strand
        changes on the other cases.

        :param trg_comp: target (i.e, the first) component
        :type trg_comp: L{EPOitem}
        :param qr_comp: query (i.e, the second) component
        :type qr_comp: L{EPOitem}
        :param trg_chrom_sizes: chromosome sizes of the target
        :type trg_chrom_sizes: dictionary of the type (chrom) --> size
        :param qr_chrom_sizes: chromosome sizes of the query
        :type qr_chrom_sizes: dictionary of the type (chrom) --> size
        :return: A L{Chain} instance"""

        # size, target, query arrays
        S, T, Q = [], [], []

        # the target strand of the chain must be on the forward strand
        trg_intervals = trg_comp.intervals(reverse=trg_comp.strand == "-")
        qr_intervals = qr_comp.intervals(reverse=trg_comp.strand == "-")
        if len(trg_intervals) == 0 or len(qr_intervals) == 0:
            log.warning("deletion/insertion only intervals")
            return None
        A, B = rem_dash(trg_intervals, qr_intervals)
        # correct for when cigar starts/ends with dashes (in number of bases)
        tr_start_correction = max(B[0][0] - A[0][0], 0)
        tr_end_correction = max(A[-1][1] - B[-1][1], 0)
        qr_start_correction = max(A[0][0] - B[0][0], 0)
        qr_end_correction = max(B[-1][1] - A[-1][1], 0)

        a, b = A.pop(0), B.pop(0)

        # intervals are 0-base, halfo-open => lengths = coordinate difference
        while A or B:
            if a[1] < b[1]:
                T.append(0)
                Q.append(A[0][0] - a[1])
                S.append(min(a[1], b[1]) - max(a[0], b[0]))
                a = A.pop(0)
            elif b[1] < a[1]:
                Q.append(0)
                T.append(B[0][0] - b[1])
                S.append(min(a[1], b[1]) - max(a[0], b[0]))
                b = B.pop(0)
            elif A and B:
                assert 1 > 2, "there are dash columns"
            else:
                break
        S.append(min(a[1], b[1]) - max(a[0], b[0]))
        assert len(T) == len(Q) == len(S) - 1, "(S, T, Q) = (%d, %d, %d)" % tuple(map(len, (S, T, Q)))

        tSize = trg_chrom_sizes[trg_comp.chrom]
        qSize = qr_chrom_sizes[qr_comp.chrom]
        # UCSC coordinates are 0-based, half-open and e! coordinates are 1-base, closed
        # chain_start = epo_start - 1 and chain_end = epo_end
        if qr_comp.strand == "+":
            chain = Chain(
                0,
                trg_comp.chrom,
                tSize,
                "+",
                (trg_comp.start - 1) + tr_start_correction,
                trg_comp.end - tr_end_correction,
                qr_comp.chrom,
                qSize,
                (qr_comp.strand == trg_comp.strand and "+" or "-"),
                (qr_comp.start - 1) + qr_start_correction,
                qr_comp.end - qr_end_correction,
                qr_comp.gabid,
            )
        else:
            chain = Chain(
                0,
                trg_comp.chrom,
                tSize,
                "+",
                (trg_comp.start - 1) + tr_start_correction,
                trg_comp.end - tr_end_correction,
                qr_comp.chrom,
                qSize,
                (qr_comp.strand == trg_comp.strand and "+" or "-"),
                (qr_comp.start - 1) + qr_end_correction,
                qr_comp.end - qr_start_correction,
                qr_comp.gabid,
            )

        # strand correction. in UCSC coordinates this is: size - coord
        if chain.qStrand == "-":
            chain = chain._replace(qEnd=chain.qSize - chain.qStart, qStart=chain.qSize - chain.qEnd)

        assert chain.tEnd - chain.tStart == sum(S) + sum(T), "[%s] %d != %d" % (
            str(chain),
            chain.tEnd - chain.tStart,
            sum(S) + sum(T),
        )
        assert chain.qEnd - chain.qStart == sum(S) + sum(Q), "[%s] %d != %d" % (
            str(chain),
            chain.qEnd - chain.qStart,
            sum(S) + sum(Q),
        )
        return chain, S, T, Q

    def slice(self, who):
        "return the slice entry (in a bed6 format), AS IS in the chain header"

        assert who in ("t", "q"), "who should be 't' or 'q'"

        if who == "t":
            return (self.tName, self.tStart, self.tEnd, self.id, self.score, self.tStrand)
        else:
            return (self.qName, self.qStart, self.qEnd, self.id, self.score, self.qStrand)

    def bedInterval(self, who):
        "return a BED6 entry, thus DOES coordinate conversion for minus strands"

        if who == "t":
            st, en = self.tStart, self.tEnd
            if self.tStrand == "-":
                st, en = self.tSize - en, self.tSize - st
            return (self.tName, st, en, self.id, self.score, self.tStrand)
        else:
            st, en = self.qStart, self.qEnd
            if self.qStrand == "-":
                st, en = self.qSize - en, self.qSize - st
                assert en - st == self.qEnd - self.qStart
            return (self.qName, st, en, self.id, self.score, self.qStrand)

    @classmethod
    def _parse_file(cls, path, pickle=False):
        """parse a .chain file into a list of the type [(L{Chain}, arr, arr, arr) ...]

        :param path: path of the file"""

        fname = path
        if fname.endswith(".gz"):
            fname = path[:-3]

        if fname.endswith(".pkl"):
            # you asked for the pickled file. I'll give it to you
            log.debug("loading pickled file %s ...", fname)
            with open(fname, "rb") as f:
                return cPickle.load(f)

        fname_pkl = f"{fname}.pkl"
        if os.path.isfile(fname_pkl):
            # there is a cached version I can give to you
            log.info("loading pickled file %s ...", fname_pkl)
            if os.stat(path).st_mtime > os.stat(fname_pkl).st_mtime:
                log.critical("*** pickled file %s is not up to date ***", fname_pkl)
            try:
                with open(fname_pkl, "rb") as f:
                    return cPickle.load(f)
            except Exception:
                log.warning("Loading pickled file %s failed", fname_pkl)

        data = fastLoadChain(path, cls._strfactory)
        if pickle and not os.path.isfile(fname_pkl):
            log.info("pickling to %s", fname_pkl)
            with open(fname_pkl, "wb") as f:
                cPickle.dump(data, f)
        return data


class EPOitem(namedtuple("Epo_item", "species gabid chrom start end strand cigar")):
    "this format is how alignments are delivered from e!"

    __slots__ = ()

    cigar_pattern = re.compile(r"(\d*)([MD])")

    def __repr__(self):
        return str(self)

    def __str__(self):
        c = self.cigar[:5] + "..." + self.cigar[-5:]
        return "(%s %s %s %d %d %s %s)" % tuple(self[:6] + (c,))

    @classmethod
    def _strfactory(cls, line):
        """factory method for an EPOitem

        :param line: a line of input"""

        cmp = line.rstrip().split()
        chrom = cmp[2]
        if not chrom.startswith("chr"):
            chrom = f"chr{chrom}"
        instance = tuple.__new__(
            cls, (cmp[0], cmp[1], chrom, int(cmp[3]), int(cmp[4]), {"1": "+", "-1": "-"}[cmp[5]], cmp[6])
        )
        span = instance.end - instance.start + 1
        m_num = sum((t[1] == "M" and [t[0]] or [0])[0] for t in instance.cigar_iter(False))
        if span != m_num:
            log.warning(
                "[%s] %s.%s:%s-%s.(span) %d != %d (matches)",
                instance.gabid,
                instance.species,
                instance.chrom,
                instance.start,
                instance.end,
                span,
                m_num,
            )
            return None
        return instance

    @classmethod
    def _parse_epo(cls, fname):
        """Load an entire file in the EPO format into a dictionary of the type {gab_id => [Epoitem, ...]}

        :param fname: file name"""

        data = {}
        with open(fname) as fd:
            for el in (cls._strfactory(_) for _ in fd):
                if el:
                    data.setdefault(el.gabid, []).append(el)
        log.info("parsed %d elements from %s", len(data), fname)
        return data

    def cigar_iter(self, reverse):
        """self.cigar => [(length, type) ... ] iterate the cigar

        :param reverse: whether to iterate in the reverse direction (right-to-left)
        :type reverse: boolean

        :return a list of pairs of the type [(length, M/D) ..]
        """

        l = 0
        P = self.cigar_pattern

        data = []
        cigar = self.cigar
        parsed_cigar = re.findall(P, cigar)
        if reverse:
            parsed_cigar = parsed_cigar[::-1]
        for _l, t in parsed_cigar:
            # 1M is encoded as M
            l = _l and int(_l) or 1  # int(_l) cannot be 0
            data.append((l, t))
        return data

    def intervals(self, reverse, thr=0):
        """return a list of (0-based half-open) intervals representing the match regions of the cigar

        for example 4MD4M2DM with reverse=False will produce [(0,4), (5,9), (11,12)]
        4MD4M2DM with reverse=True will produce [(0,1), (3,7), (8,12)] (= 12 - previous interval)

        :param reverse: whether to iterate in the reverse direction (right-to-left) (this is passed as is to self.cigar_iter)
        :type reverse: boolean
        :param thr: shift all intervals by this much
        :type thr: integer

        :return: list of pairs"""

        d = [(thr, thr)]
        dl = 0
        for tup in self.cigar_iter(reverse):
            if tup[1] == "D":
                dl = tup[0]
            else:
                s = d[-1][1] + dl
                d.append((s, s + tup[0]))

        assert d[0] == (thr, thr)
        # assert that nr. of Ms in the interval == sum of produced intervals
        assert sum(t[0] for t in self.cigar_iter(False) if t[1] == "M") == sum(t[1] - t[0] for t in d)

        d_sum = sum(t[1] - t[0] for t in d)
        assert self.end - self.start + 1 == d_sum, "[ (%d, %d) = %d ] != %d" % (
            self.start,
            self.end,
            self.end - self.start + 1,
            d_sum,
        )
        return d[1:]  # clip the (thr, thr) entry