1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
# -*- coding: utf-8 -*-
'''
pytestsalt.fixtures.log_server_tornado
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tornado Log Server Fixture
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import threading
# Import 3rd-party libs
import msgpack
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.tcpserver import TCPServer
from tornado.iostream import StreamClosedError
log = logging.getLogger(__name__)
class LogServer(TCPServer):
@gen.coroutine
def handle_stream(self, stream, address):
unpacker = msgpack.Unpacker(raw=False)
while True:
try:
wire_bytes = yield stream.read_bytes(1024, partial=True)
if not wire_bytes:
break
try:
unpacker.feed(wire_bytes)
except msgpack.exceptions.BufferFull:
# Start over loosing some data?!
unpacker = msgpack.Unpacker(raw=False)
unpacker.feed(wire_bytes)
for record_dict in unpacker:
record = logging.makeLogRecord(record_dict)
logger = logging.getLogger(record.name)
logger.handle(record)
except (EOFError, KeyboardInterrupt, SystemExit, StreamClosedError):
break
except Exception as exc: # pylint: disable=broad-except
log.exception(exc)
def log_server_tornado(log_server_port):
'''
Starts a log server.
'''
def process_logs(port):
server = LogServer()
server.listen(port, address='127.0.0.1')
try:
IOLoop.current().start()
except KeyboardInterrupt:
pass
server.stop()
process_queue_thread = threading.Thread(target=process_logs, args=(log_server_port,))
process_queue_thread.daemon = True
process_queue_thread.start()
|