1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
from eventlet import Timeout, monkey_patch, spawn_n
from eventlet.event import Event
from eventlet.queue import LightQueue
monkey_patch()
class Receipt:
result = None
def __init__(self, callback=None):
self.callback = callback
self.ready = Event()
def finished(self, result):
self.result = result
if self.callback:
self.callback(result)
self.ready.send()
def wait(self, timeout=None):
with Timeout(timeout):
return self.ready.wait()
class ProducerPool:
"""Usage::
>>> app = Celery(broker='amqp://')
>>> ProducerPool(app)
"""
Receipt = Receipt
def __init__(self, app, size=20):
self.app = app
self.size = size
self.inqueue = LightQueue()
self._running = None
self._producers = None
def apply_async(self, task, args, kwargs, callback=None, **options):
if self._running is None:
self._running = spawn_n(self._run)
receipt = self.Receipt(callback)
self.inqueue.put((task, args, kwargs, options, receipt))
return receipt
def _run(self):
self._producers = [
spawn_n(self._producer) for _ in range(self.size)
]
def _producer(self):
inqueue = self.inqueue
with self.app.producer_or_acquire() as producer:
while 1:
task, args, kwargs, options, receipt = inqueue.get()
result = task.apply_async(args, kwargs,
producer=producer,
**options)
receipt.finished(result)
|