1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
"""Asynchronous msgpack-rpc handling in the event loop pipeline."""
import logging
from traceback import format_exc
from typing import Any, AnyStr, Callable, Dict
from pynvim.msgpack_rpc.msgpack_stream import MsgpackStream
logger = logging.getLogger(__name__)
debug, info, warn = (logger.debug, logger.info, logger.warning,)
# response call back takes two arguments: (err, return_value)
ResponseCallback = Callable[..., None]
class AsyncSession:
"""Asynchronous msgpack-rpc layer that wraps a msgpack stream.
This wraps the msgpack stream interface for reading/writing msgpack
documents and exposes an interface for sending and receiving msgpack-rpc
requests and notifications.
"""
def __init__(self, msgpack_stream: MsgpackStream):
"""Wrap `msgpack_stream` on a msgpack-rpc interface."""
self._msgpack_stream = msgpack_stream
self._next_request_id = 1
self._pending_requests: Dict[int, ResponseCallback] = {}
self._request_cb = self._notification_cb = None
self._handlers = {
0: self._on_request,
1: self._on_response,
2: self._on_notification
}
self.loop = msgpack_stream.loop
def threadsafe_call(self, fn):
"""Wrapper around `MsgpackStream.threadsafe_call`."""
self._msgpack_stream.threadsafe_call(fn)
def request(self, method: AnyStr, args: Any,
response_cb: ResponseCallback) -> None:
"""Send a msgpack-rpc request to Nvim.
A msgpack-rpc with method `method` and argument `args` is sent to
Nvim. The `response_cb` function is called with when the response
is available.
"""
request_id = self._next_request_id
self._next_request_id = request_id + 1
self._msgpack_stream.send([0, request_id, method, args])
self._pending_requests[request_id] = response_cb
def notify(self, method, args):
"""Send a msgpack-rpc notification to Nvim.
A msgpack-rpc with method `method` and argument `args` is sent to
Nvim. This will have the same effect as a request, but no response
will be received
"""
self._msgpack_stream.send([2, method, args])
def run(self, request_cb, notification_cb):
"""Run the event loop to receive requests and notifications from Nvim.
While the event loop is running, `request_cb` and `notification_cb`
will be called whenever requests or notifications are respectively
available.
"""
self._request_cb = request_cb
self._notification_cb = notification_cb
self._msgpack_stream.run(self._on_message)
self._request_cb = None
self._notification_cb = None
def stop(self):
"""Stop the event loop."""
self._msgpack_stream.stop()
def close(self):
"""Close the event loop."""
self._msgpack_stream.close()
def _on_message(self, msg):
try:
self._handlers.get(msg[0], self._on_invalid_message)(msg)
except Exception:
err_str = format_exc(5)
pass # replaces next logging statement
# warn(err_str)
self._msgpack_stream.send([1, 0, err_str, None])
def _on_request(self, msg):
# request
# - msg[1]: id
# - msg[2]: method name
# - msg[3]: arguments
pass # replaces next logging statement
# debug('received request: %s, %s', msg[2], msg[3])
assert self._request_cb is not None
self._request_cb(msg[2], msg[3],
Response(self._msgpack_stream, msg[1]))
def _on_response(self, msg):
# response to a previous request:
# - msg[1]: the id
# - msg[2]: error(if any)
# - msg[3]: result(if not errored)
pass # replaces next logging statement
# debug('received response: %s, %s', msg[2], msg[3])
self._pending_requests.pop(msg[1])(msg[2], msg[3])
def _on_notification(self, msg):
# notification/event
# - msg[1]: event name
# - msg[2]: arguments
pass # replaces next logging statement
# debug('received notification: %s, %s', msg[1], msg[2])
assert self._notification_cb is not None
self._notification_cb(msg[1], msg[2])
def _on_invalid_message(self, msg):
error = 'Received invalid message %s' % msg
pass # replaces next logging statement
# warn(error)
self._msgpack_stream.send([1, 0, error, None])
class Response:
"""Response to a msgpack-rpc request that came from Nvim.
When Nvim sends a msgpack-rpc request, an instance of this class is
created for remembering state required to send a response.
"""
def __init__(self, msgpack_stream: MsgpackStream, request_id: int):
"""Initialize the Response instance."""
self._msgpack_stream = msgpack_stream
self._request_id = request_id
def send(self, value, error=False):
"""Send the response.
If `error` is True, it will be sent as an error.
"""
if error:
resp = [1, self._request_id, value, None]
else:
resp = [1, self._request_id, None, value]
pass # replaces next logging statement
# debug('sending response to request %d: %s', self._request_id, resp)
self._msgpack_stream.send(resp)
|