File: async_session.py

package info (click to toggle)
python-pynvim 0.5.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 432 kB
  • sloc: python: 3,040; makefile: 4
file content (153 lines) | stat: -rw-r--r-- 5,441 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Asynchronous msgpack-rpc handling in the event loop pipeline."""
import logging
from traceback import format_exc
from typing import Any, AnyStr, Callable, Dict

from pynvim.msgpack_rpc.msgpack_stream import MsgpackStream

logger = logging.getLogger(__name__)
debug, info, warn = (logger.debug, logger.info, logger.warning,)


# response call back takes two arguments: (err, return_value)
ResponseCallback = Callable[..., None]


class AsyncSession:

    """Asynchronous msgpack-rpc layer that wraps a msgpack stream.

    This wraps the msgpack stream interface for reading/writing msgpack
    documents and exposes an interface for sending and receiving msgpack-rpc
    requests and notifications.
    """

    def __init__(self, msgpack_stream: MsgpackStream):
        """Wrap `msgpack_stream` on a msgpack-rpc interface."""
        self._msgpack_stream = msgpack_stream
        self._next_request_id = 1
        self._pending_requests: Dict[int, ResponseCallback] = {}
        self._request_cb = self._notification_cb = None
        self._handlers = {
            0: self._on_request,
            1: self._on_response,
            2: self._on_notification
        }
        self.loop = msgpack_stream.loop

    def threadsafe_call(self, fn):
        """Wrapper around `MsgpackStream.threadsafe_call`."""
        self._msgpack_stream.threadsafe_call(fn)

    def request(self, method: AnyStr, args: Any,
                response_cb: ResponseCallback) -> None:
        """Send a msgpack-rpc request to Nvim.

        A msgpack-rpc with method `method` and argument `args` is sent to
        Nvim. The `response_cb` function is called with when the response
        is available.
        """
        request_id = self._next_request_id
        self._next_request_id = request_id + 1
        self._msgpack_stream.send([0, request_id, method, args])
        self._pending_requests[request_id] = response_cb

    def notify(self, method, args):
        """Send a msgpack-rpc notification to Nvim.

        A msgpack-rpc with method `method` and argument `args` is sent to
        Nvim. This will have the same effect as a request, but no response
        will be received
        """
        self._msgpack_stream.send([2, method, args])

    def run(self, request_cb, notification_cb):
        """Run the event loop to receive requests and notifications from Nvim.

        While the event loop is running, `request_cb` and `notification_cb`
        will be called whenever requests or notifications are respectively
        available.
        """
        self._request_cb = request_cb
        self._notification_cb = notification_cb
        self._msgpack_stream.run(self._on_message)
        self._request_cb = None
        self._notification_cb = None

    def stop(self):
        """Stop the event loop."""
        self._msgpack_stream.stop()

    def close(self):
        """Close the event loop."""
        self._msgpack_stream.close()

    def _on_message(self, msg):
        try:
            self._handlers.get(msg[0], self._on_invalid_message)(msg)
        except Exception:
            err_str = format_exc(5)
            pass # replaces next logging statement
            # warn(err_str)
            self._msgpack_stream.send([1, 0, err_str, None])

    def _on_request(self, msg):
        # request
        #   - msg[1]: id
        #   - msg[2]: method name
        #   - msg[3]: arguments
        pass # replaces next logging statement
        # debug('received request: %s, %s', msg[2], msg[3])
        assert self._request_cb is not None
        self._request_cb(msg[2], msg[3],
                         Response(self._msgpack_stream, msg[1]))

    def _on_response(self, msg):
        # response to a previous request:
        #   - msg[1]: the id
        #   - msg[2]: error(if any)
        #   - msg[3]: result(if not errored)
        pass # replaces next logging statement
        # debug('received response: %s, %s', msg[2], msg[3])
        self._pending_requests.pop(msg[1])(msg[2], msg[3])

    def _on_notification(self, msg):
        # notification/event
        #   - msg[1]: event name
        #   - msg[2]: arguments
        pass # replaces next logging statement
        # debug('received notification: %s, %s', msg[1], msg[2])
        assert self._notification_cb is not None
        self._notification_cb(msg[1], msg[2])

    def _on_invalid_message(self, msg):
        error = 'Received invalid message %s' % msg
        pass # replaces next logging statement
        # warn(error)
        self._msgpack_stream.send([1, 0, error, None])


class Response:
    """Response to a msgpack-rpc request that came from Nvim.

    When Nvim sends a msgpack-rpc request, an instance of this class is
    created for remembering state required to send a response.
    """

    def __init__(self, msgpack_stream: MsgpackStream, request_id: int):
        """Initialize the Response instance."""
        self._msgpack_stream = msgpack_stream
        self._request_id = request_id

    def send(self, value, error=False):
        """Send the response.

        If `error` is True, it will be sent as an error.
        """
        if error:
            resp = [1, self._request_id, value, None]
        else:
            resp = [1, self._request_id, None, value]
        pass # replaces next logging statement
        # debug('sending response to request %d: %s', self._request_id, resp)
        self._msgpack_stream.send(resp)