1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
"""
A log destination for use by Twisted applications.
Runs in a thread, so that we don't do blocking I/O in the event loop thread.
"""
import threading
from queue import SimpleQueue
from twisted.application.service import Service
from twisted.internet.threads import deferToThreadPool
from . import addDestination, removeDestination
_STOP = object()
class ThreadedWriter(Service):
"""
An non-blocking Eliot log destination that wraps a blocking
destination, writing log messages to the latter in a managed thread.
@ivar _thread: C{None}, or a L{threading.Thread} running the private
reactor.
"""
name = "Eliot Log Writer"
def __init__(self, destination, reactor):
"""
@param destination: The underlying destination for log files. This will
be called from a non-reactor thread.
@param reactor: The main reactor.
"""
self._destination = destination
self._queue = SimpleQueue()
self._mainReactor = reactor
self._thread = None
def startService(self):
"""
Start the writer thread.
"""
Service.startService(self)
self._thread = threading.Thread(target=self._reader)
self._thread.start()
addDestination(self)
def stopService(self):
"""
Stop the writer thread, wait for it to finish.
"""
Service.stopService(self)
removeDestination(self)
self._queue.put(_STOP)
return deferToThreadPool(
self._mainReactor, self._mainReactor.getThreadPool(), self._thread.join
)
def __call__(self, data):
"""
Add the data to the queue, to be serialized to JSON and written by the
writer thread with a newline added.
@param data: C{bytes} to write to disk.
"""
self._queue.put(data)
def _reader(self):
"""
Runs in a thread, reads messages from a queue and writes them to
the wrapped observer.
"""
while True:
msg = self._queue.get()
if msg is _STOP:
return
try:
self._destination(msg)
except Exception:
# Lower-level destination blew up, nothing we can do, so
# just drop on the floor.
pass
|