1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
import cProfile
import logging
import os.path
import copy
from multiprocessing import Process
from multiprocessing.process import current_process
import warnings
from urllib.parse import urlparse
import numpy as np
from pbcore.io import AlignmentSet
from pbcore.io.opener import (openAlignmentFile, openIndexedAlignmentFile)
# FIXME this should ultimately go somewhere else. actually, so should the
# rest of this module.
def _openFiles(self, refFile=None, sharedIndices=None):
"""
Hack to enable sharing of indices (but not filehandles!) between dataset
instances.
"""
log = logging.getLogger()
log.debug("Opening resources")
for k, extRes in enumerate(self.externalResources):
location = urlparse(extRes.resourceId).path
sharedIndex = None
if sharedIndices is not None:
sharedIndex = sharedIndices[k]
try:
resource = openIndexedAlignmentFile(
location,
referenceFastaFname=refFile,
sharedIndex=sharedIndex)
except (IOError, ValueError):
log.info("pbi file missing for {f}, operating with "
"reduced speed and functionality".format(
f=location))
resource = openAlignmentFile(location,
referenceFastaFname=refFile)
if len(resource) == 0:
log.warn("{f} has no mapped reads".format(f=location))
else:
self._openReaders.append(resource)
if len(self._openReaders) == 0:
raise IOError("No mapped reads found")
log.debug("Done opening resources")
def _reopen(self):
"""
Force re-opening of underlying alignment files, preserving the
reference and indices if present, and return a copy of the
AlignmentSet. This is a workaround to allow us to share the index
file(s) already loaded in memory while avoiding multiprocessing
problems related to .bam files.
"""
refFile = self._referenceFile
newSet = copy.deepcopy(self)
newSet._referenceFastaFname = refFile
indices = [f.index for f in self.resourceReaders()]
self.close()
_openFiles(newSet, refFile=refFile, sharedIndices=indices)
return newSet
class WorkerProcess(Process):
"""
Base class for worker processes that read reference coordinates
from the task queue, perform variant calling, then push results
back to another queue, to be written to a GFF file by another
process.
All tasks that are O(genome length * coverage depth) should be
distributed to Worker processes, leaving the ResultCollector
process only O(genome length) work to do.
"""
def __init__(self, options, workQueue, resultsQueue,
sharedAlignmentSet=None):
Process.__init__(self)
self.options = options
self.daemon = True
self._workQueue = workQueue
self._resultsQueue = resultsQueue
self._sharedAlignmentSet = sharedAlignmentSet
def _run(self):
logging.info("Worker %s (PID=%d) started running" %
(self.name, self.pid))
if self._sharedAlignmentSet is not None:
# XXX this will create an entirely new AlignmentSet object, but
# keeping any indices already loaded into memory
self.caseAlignments = _reopen(self._sharedAlignmentSet)
# `self._sharedAlignmentSet.close()
self._sharedAlignmentSet = None
else:
warnings.warn("Shared AlignmentSet not used")
self.caseAlignments = AlignmentSet(self.options.infile,
referenceFastaFname=self.options.reference)
self.controlAlignments = None
if not self.options.control is None:
self.controlAlignments = AlignmentSet(self.options.control,
referenceFastaFname=self.options.reference)
if self.options.randomSeed is None:
np.random.seed(42)
self.onStart()
while True:
if self.isTerminated():
break
chunkDesc = self._workQueue.get()
if chunkDesc is None:
# Sentinel indicating end of input. Place a sentinel
# on the results queue and end this worker process.
self._resultsQueue.put(None)
self._workQueue.task_done()
break
else:
(chunkId, datum) = chunkDesc
logging.info("Got chunk: (%s, %s) -- Process: %s" %
(chunkId, str(datum), current_process()))
result = self.onChunk( # pylint: disable=assignment-from-none
datum)
logging.debug("Process %s: putting result." %
current_process())
self._resultsQueue.put((chunkId, result))
self._workQueue.task_done()
self.onFinish()
logging.info("Process %s (PID=%d) done; exiting." %
(self.name, self.pid))
def run(self):
# Make the workers run with lower priority -- hopefully the results writer will win
# It is single threaded so it could become the bottleneck
self._lowPriority()
if self.options.doProfiling:
cProfile.runctx("self._run()",
globals=globals(),
locals=locals(),
filename="profile-%s.out" % self.name)
else:
self._run()
# ==
# Begin overridable interface
# ==
def onStart(self):
pass
def onChunk(self, target):
"""
This function is the heart of the matter.
referenceWindow, alnHits -> result
"""
return None
def onFinish(self):
pass
def isTerminated(self):
return False
def _lowPriority(self):
"""
Set the priority of the process to below-normal.
"""
os.nice(10)
|