File: ReferenceUtils.py

package info (click to toggle)
kineticstools 0.6.1%2Bgit20220223.1326a4d%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 22,188 kB
  • sloc: python: 3,508; makefile: 200; ansic: 104; sh: 55; xml: 19
file content (131 lines) | stat: -rw-r--r-- 4,365 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# FIXME all of this belongs somewhere else (probably either pbcore.io.dataset
# or a future base module for resequencing apps)

from collections import namedtuple
import itertools
import math
import re
import os

from pbcore.io import ReferenceSet

# FIXME pbcore keys contigs by name, but kineticsTools usually keys by ID
ReferenceWindow = namedtuple(
    "ReferenceWindow", ["refId", "refName", "start", "end"])


def loadReferenceContigs(referencePath, alignmentSet, windows=None):
    # FIXME we should get rid of this entirely, but I think it requires
    # fixing the inconsistency in how contigs are referenced here versus in
    # pbcore.io
    """
    Load the reference contigs, and tag each one with the ref.alignmentID it
    was assigned in the alignment file(s).  Return a list of contigs,
    which are used to set up IpdModel.
    """

    # Read contigs from FASTA file (or XML dataset)
    refReader = ReferenceSet(referencePath)
    contigs = []
    if windows is not None:
        refNames = set([rw.refName for rw in windows])
        for contig in refReader:
            if contig.id in refNames:
                contigs.append(contig)
    else:
        contigs.extend([x for x in refReader])
    contigDict = dict([(x.id, x) for x in contigs])

    # initially each contig has an id of None -- this will be overwritten with the id from the alignments if there are any
    # reads mapped to it.
    for x in contigs:
        x.alignmentID = None

    # Mark each contig with it's ID from the alignments - match them up using
    # MD5s
    for x in alignmentSet.referenceInfoTable:
        if x.FullName in contigDict:
            contigDict[x.FullName].alignmentID = x.ID

    return contigs


def referenceWindowsFromAlignment(ds, refInfoLookup):
    return [ReferenceWindow(refId=refInfoLookup(w[0]).ID,
                            refName=w[0],
                            start=w[1],
                            end=w[2]) for w in ds.refWindows]


def parseReferenceWindow(s, refInfoLookup):
    if s is None:
        return None
    m = re.match("(.*):(.*)-(.*)", s)
    if m:
        refContigInfo = refInfoLookup(m.group(1))
        refId = refContigInfo.ID
        refName = refContigInfo.Name
        refStart = int(m.group(2))
        refEnd = min(int(m.group(3)), refContigInfo.Length)
    else:
        refContigInfo = refInfoLookup(s)
        refId = refContigInfo.ID
        refName = refContigInfo.Name
        refStart = 0
        refEnd = refContigInfo.Length
    return ReferenceWindow(refId=refId, refName=refName, start=refStart,
                           end=refEnd)


def createReferenceWindows(refInfo):
    return [ReferenceWindow(refId=r.ID,
                            refName=r.Name,
                            start=0,
                            end=r.Length) for r in refInfo]


def enumerateChunks(referenceStride, referenceWindow):
    """
    Enumerate all work chunks on this reference contig (restricted to
    the windows, if provided).
    """
    def intersection(int1, int2):
        s1, e1 = int1
        s2, e2 = int2
        si, ei = max(s1, s2), min(e1, e2)
        if si < ei:
            return (si, ei)
        else:
            return None

    def enumerateIntervals(bounds, stride):
        """
        Enumerate windows of size "stride", attempting to align window
        boundaries on multiple of stride.
        """
        def alignDown(chunk, x):
            return (x // chunk) * chunk

        def alignUp(chunk, x):
            return int(math.ceil(float(x) / chunk) * chunk)

        start, end = bounds
        roundStart = alignDown(stride, start)
        roundEnd = alignUp(stride, end)

        for s in range(roundStart, roundEnd, stride):
            roundWin = (s, s + stride)
            yield intersection(bounds, roundWin)

    for (s, e) in enumerateIntervals((referenceWindow.start,
                                      referenceWindow.end), referenceStride):
        yield ReferenceWindow(refId=referenceWindow.refId,
                              refName=referenceWindow.refName,
                              start=s, end=e)


def loadAlignmentChemistry(alignmentSet):
    chems = alignmentSet.sequencingChemistry
    chemCounts = {k: len(list(v)) for k, v in itertools.groupby(chems)}
    majorityChem = max(chemCounts, key=chemCounts.get)
    return majorityChem