File: WorkerProcess.py

package info (click to toggle)
kineticstools 0.6.1%2Bgit20220223.1326a4d%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 22,188 kB
  • sloc: python: 3,508; makefile: 200; ansic: 104; sh: 55; xml: 19
file content (177 lines) | stat: -rw-r--r-- 6,096 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import cProfile
import logging
import os.path
import copy
from multiprocessing import Process
from multiprocessing.process import current_process
import warnings

from urllib.parse import urlparse
import numpy as np

from pbcore.io import AlignmentSet
from pbcore.io.opener import (openAlignmentFile, openIndexedAlignmentFile)


# FIXME this should ultimately go somewhere else.  actually, so should the
# rest of this module.
def _openFiles(self, refFile=None, sharedIndices=None):
    """
    Hack to enable sharing of indices (but not filehandles!) between dataset
    instances.
    """
    log = logging.getLogger()
    log.debug("Opening resources")
    for k, extRes in enumerate(self.externalResources):
        location = urlparse(extRes.resourceId).path
        sharedIndex = None
        if sharedIndices is not None:
            sharedIndex = sharedIndices[k]
        try:
            resource = openIndexedAlignmentFile(
                location,
                referenceFastaFname=refFile,
                sharedIndex=sharedIndex)
        except (IOError, ValueError):
            log.info("pbi file missing for {f}, operating with "
                     "reduced speed and functionality".format(
                         f=location))
            resource = openAlignmentFile(location,
                                         referenceFastaFname=refFile)
        if len(resource) == 0:
            log.warn("{f} has no mapped reads".format(f=location))
        else:
            self._openReaders.append(resource)
    if len(self._openReaders) == 0:
        raise IOError("No mapped reads found")
    log.debug("Done opening resources")


def _reopen(self):
    """
    Force re-opening of underlying alignment files, preserving the
    reference and indices if present, and return a copy of the
    AlignmentSet.  This is a workaround to allow us to share the index
    file(s) already loaded in memory while avoiding multiprocessing
    problems related to .bam files.
    """
    refFile = self._referenceFile
    newSet = copy.deepcopy(self)
    newSet._referenceFastaFname = refFile
    indices = [f.index for f in self.resourceReaders()]
    self.close()
    _openFiles(newSet, refFile=refFile, sharedIndices=indices)
    return newSet


class WorkerProcess(Process):

    """
    Base class for worker processes that read reference coordinates
    from the task queue, perform variant calling, then push results
    back to another queue, to be written to a GFF file by another
    process.

    All tasks that are O(genome length * coverage depth) should be
    distributed to Worker processes, leaving the ResultCollector
    process only O(genome length) work to do.
    """

    def __init__(self, options, workQueue, resultsQueue,
                 sharedAlignmentSet=None):
        Process.__init__(self)
        self.options = options
        self.daemon = True
        self._workQueue = workQueue
        self._resultsQueue = resultsQueue
        self._sharedAlignmentSet = sharedAlignmentSet

    def _run(self):
        logging.info("Worker %s (PID=%d) started running" %
                     (self.name, self.pid))
        if self._sharedAlignmentSet is not None:
            # XXX this will create an entirely new AlignmentSet object, but
            # keeping any indices already loaded into memory
            self.caseAlignments = _reopen(self._sharedAlignmentSet)
            # `self._sharedAlignmentSet.close()
            self._sharedAlignmentSet = None
        else:
            warnings.warn("Shared AlignmentSet not used")
            self.caseAlignments = AlignmentSet(self.options.infile,
                                               referenceFastaFname=self.options.reference)

        self.controlAlignments = None
        if not self.options.control is None:
            self.controlAlignments = AlignmentSet(self.options.control,
                                                  referenceFastaFname=self.options.reference)

        if self.options.randomSeed is None:
            np.random.seed(42)
        self.onStart()

        while True:
            if self.isTerminated():
                break

            chunkDesc = self._workQueue.get()
            if chunkDesc is None:
                # Sentinel indicating end of input.  Place a sentinel
                # on the results queue and end this worker process.
                self._resultsQueue.put(None)
                self._workQueue.task_done()
                break
            else:
                (chunkId, datum) = chunkDesc
                logging.info("Got chunk: (%s, %s) -- Process: %s" %
                             (chunkId, str(datum), current_process()))
                result = self.onChunk(  # pylint: disable=assignment-from-none
                    datum)

                logging.debug("Process %s: putting result." %
                              current_process())
                self._resultsQueue.put((chunkId, result))
                self._workQueue.task_done()

        self.onFinish()

        logging.info("Process %s (PID=%d) done; exiting." %
                     (self.name, self.pid))

    def run(self):
        # Make the workers run with lower priority -- hopefully the results writer will win
        # It is single threaded so it could become the bottleneck
        self._lowPriority()

        if self.options.doProfiling:
            cProfile.runctx("self._run()",
                            globals=globals(),
                            locals=locals(),
                            filename="profile-%s.out" % self.name)
        else:
            self._run()

    # ==
    # Begin overridable interface
    # ==
    def onStart(self):
        pass

    def onChunk(self, target):
        """
        This function is the heart of the matter.

        referenceWindow, alnHits -> result
        """
        return None

    def onFinish(self):
        pass

    def isTerminated(self):
        return False

    def _lowPriority(self):
        """
        Set the priority of the process to below-normal.
        """
        os.nice(10)