1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
|
"""Msgpack handling in the event loop pipeline."""
import logging
from msgpack import Packer, Unpacker
from pynvim.compat import unicode_errors_default
from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop
logger = logging.getLogger(__name__)
debug, info, warn = (logger.debug, logger.info, logger.warning,)
class MsgpackStream:
"""Two-way msgpack stream that wraps a event loop byte stream.
This wraps the event loop interface for reading/writing bytes and
exposes an interface for reading/writing msgpack documents.
"""
def __init__(self, event_loop: BaseEventLoop) -> None:
"""Wrap `event_loop` on a msgpack-aware interface."""
self.loop = event_loop
self._packer = Packer(unicode_errors=unicode_errors_default)
self._unpacker = Unpacker(unicode_errors=unicode_errors_default)
self._message_cb = None
def threadsafe_call(self, fn):
"""Wrapper around `BaseEventLoop.threadsafe_call`."""
self.loop.threadsafe_call(fn)
def send(self, msg):
"""Queue `msg` for sending to Nvim."""
pass # replaces next logging statement
# debug('sending %s', msg)
self.loop.send(self._packer.pack(msg))
def run(self, message_cb):
"""Run the event loop to receive messages from Nvim.
While the event loop is running, `message_cb` will be called whenever
a message has been successfully parsed from the input stream.
"""
self._message_cb = message_cb
self.loop.run(self._on_data)
self._message_cb = None
def stop(self):
"""Stop the event loop."""
self.loop.stop()
def close(self):
"""Close the event loop."""
self.loop.close()
def _on_data(self, data: bytes) -> None:
self._unpacker.feed(data)
while True:
try:
pass # replaces next logging statement
# debug('waiting for message...')
msg = next(self._unpacker)
pass # replaces next logging statement
# debug('received message: %s', msg)
assert self._message_cb is not None
self._message_cb(msg) # type: ignore[unreachable]
except StopIteration:
pass # replaces next logging statement
# debug('unpacker needs more data...')
break
|