1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
|
"""Common code for event loop implementations."""
import logging
import signal
import sys
import threading
from abc import ABC, abstractmethod
from typing import Any, Callable, List, Optional, Union
if sys.version_info < (3, 8):
from typing_extensions import Literal
else:
from typing import Literal
logger = logging.getLogger(__name__)
debug, info, warn = (logger.debug, logger.info, logger.warning,)
# When signals are restored, the event loop library may reset SIGINT to SIG_DFL
# which exits the program. To be able to restore the python interpreter to it's
# default state, we keep a reference to the default handler
default_int_handler = signal.getsignal(signal.SIGINT)
main_thread = threading.current_thread()
TTransportType = Union[
Literal['stdio'],
Literal['socket'],
Literal['tcp'],
Literal['child']
]
# TODO: Since pynvim now supports python 3, the only available backend of the
# msgpack_rpc BaseEventLoop is the built-in asyncio (see #294). We will have
# to remove some unnecessary abstractions as well as greenlet. See also #489
class BaseEventLoop(ABC):
"""Abstract base class for all event loops.
Event loops act as the bottom layer for Nvim sessions created by this
library. They hide system/transport details behind a simple interface for
reading/writing bytes to the connected Nvim instance.
A lifecycle of event loop is as follows: (1. -> [2. -> 3.]* -> 4.)
1. initialization (__init__): connection to Nvim is established.
2. run(data_cb): run the event loop (blocks until the loop stops).
Requests are sent to the remote neovim by calling send(), and
responses (messages) from the remote neovim will be passed to the
given `data_cb` callback function while the event loop is running.
Note that run() may be called multiple times.
3. stop(): stop the event loop.
4. close(): close the event loop, destroying all the internal resources.
This class exposes public methods for interacting with the underlying
event loop and delegates implementation-specific work to the following
methods, which subclasses are expected to implement:
- `_init()`: Implementation-specific initialization
- `_connect_tcp(address, port)`: connect to Nvim using tcp/ip
- `_connect_socket(path)`: Same as tcp, but use a UNIX domain socket or
named pipe.
- `_connect_stdio()`: Use stdin/stdout as the connection to Nvim
- `_connect_child(argv)`: Use the argument vector `argv` to spawn an
embedded Nvim that has its stdin/stdout connected to the event loop.
- `_start_reading()`: Called after any of _connect_* methods. Can be used
to perform any post-connection setup or validation.
- `_send(data)`: Send `data` (byte array) to Nvim (usually RPC request).
- `_run()`: Runs the event loop until stopped or the connection is closed.
The following methods can be called upon some events by the event loop:
- `_on_data(data)`: When Nvim sends some data (usually RPC response).
- `_on_signal(signum)`: When a signal is received.
- `_on_error(exc)`: When a non-recoverable error occurs (e.g:
connection lost, or any other OSError)
Note that these _on_{data,signal,error} methods are not 'final', may be
changed around an execution of run(). The subclasses are expected to
handle any early messages arriving while _on_data is not yet set.
- `_stop()`: Stop the event loop.
- `_interrupt(data)`: Like `stop()`, but may be called from other threads
this.
- `_setup_signals(signals)`: Add implementation-specific listeners for
for `signals`, which is a list of OS-specific signal numbers.
- `_teardown_signals()`: Removes signal listeners set by `_setup_signals`
"""
def __init__(self, transport_type: TTransportType, *args: Any, **kwargs: Any):
"""Initialize and connect the event loop instance.
The only arguments are the transport type and transport-specific
configuration, like this:
>>> BaseEventLoop('tcp', '127.0.0.1', 7450)
>>> BaseEventLoop('socket', '/tmp/nvim-socket')
>>> BaseEventLoop('stdio')
>>> BaseEventLoop('child', ['nvim', '--embed', '--headless', '-u', 'NONE'])
Implementation-specific initialization should be made in the __init__
constructor of the subclass, which must call the constructor of the
super class (BaseEventLoop), in which one of the `_connect_*` methods
(based on `transport_type`) and then `_start_reading()`.
"""
self._transport_type = transport_type
self._signames = dict((k, v) for v, k in signal.__dict__.items()
if v.startswith('SIG'))
self._on_data: Optional[Callable[[bytes], None]] = None
self._error: Optional[BaseException] = None
try:
getattr(self, '_connect_{}'.format(transport_type))(*args, **kwargs)
except Exception as e:
self.close()
raise e
self._start_reading()
@abstractmethod
def _start_reading(self) -> None:
raise NotImplementedError()
@abstractmethod
def _send(self, data: bytes) -> None:
raise NotImplementedError()
def connect_tcp(self, address: str, port: int) -> None:
"""Connect to tcp/ip `address`:`port`. Delegated to `_connect_tcp`."""
pass # replaces next logging statement
# info('Connecting to TCP address: %s:%d', address, port)
self._connect_tcp(address, port)
@abstractmethod
def _connect_tcp(self, address: str, port: int) -> None:
raise NotImplementedError()
def connect_socket(self, path: str) -> None:
"""Connect to socket at `path`. Delegated to `_connect_socket`."""
pass # replaces next logging statement
# info('Connecting to %s', path)
self._connect_socket(path)
@abstractmethod
def _connect_socket(self, path: str) -> None:
raise NotImplementedError()
def connect_stdio(self) -> None:
"""Connect using stdin/stdout. Delegated to `_connect_stdio`."""
pass # replaces next logging statement
# info('Preparing stdin/stdout for streaming data')
self._connect_stdio()
@abstractmethod
def _connect_stdio(self) -> None:
raise NotImplementedError()
def connect_child(self, argv):
"""Connect a new Nvim instance. Delegated to `_connect_child`."""
pass # replaces next logging statement
# info('Spawning a new nvim instance')
self._connect_child(argv)
@abstractmethod
def _connect_child(self, argv: List[str]) -> None:
raise NotImplementedError()
def send(self, data: bytes) -> None:
"""Queue `data` for sending to Nvim."""
pass # replaces next logging statement
# debug("Sending '%s'", data)
self._send(data)
def threadsafe_call(self, fn):
"""Call a function in the event loop thread.
This is the only safe way to interact with a session from other
threads.
"""
self._threadsafe_call(fn)
@abstractmethod
def _threadsafe_call(self, fn: Callable[[], Any]) -> None:
raise NotImplementedError()
def run(self, data_cb: Callable[[bytes], None]) -> None:
"""Run the event loop, and receives response messages to a callback."""
if self._error:
err = self._error
if isinstance(self._error, KeyboardInterrupt):
# KeyboardInterrupt is not destructive (it may be used in
# the REPL).
# After throwing KeyboardInterrupt, cleanup the _error field
# so the loop may be started again
self._error = None
raise err
# data_cb: e.g., MsgpackStream._on_data
self._on_data = data_cb
if threading.current_thread() == main_thread:
self._setup_signals([signal.SIGINT, signal.SIGTERM])
pass # replaces next logging statement
# debug('Entering event loop')
self._run()
pass # replaces next logging statement
# debug('Exited event loop')
if threading.current_thread() == main_thread:
self._teardown_signals()
signal.signal(signal.SIGINT, default_int_handler)
self._on_data = None
@abstractmethod
def _run(self) -> None:
raise NotImplementedError()
def stop(self) -> None:
"""Stop the event loop."""
self._stop()
pass # replaces next logging statement
# debug('Stopped event loop')
@abstractmethod
def _stop(self) -> None:
raise NotImplementedError()
def close(self) -> None:
"""Stop the event loop."""
self._close()
pass # replaces next logging statement
# debug('Closed event loop')
@abstractmethod
def _close(self) -> None:
raise NotImplementedError()
def _on_signal(self, signum: signal.Signals) -> None:
# pylint: disable-next=consider-using-f-string
msg = 'Received signal {}'.format(self._signames[signum])
pass # replaces next logging statement
# debug(msg)
if signum == signal.SIGINT and self._transport_type == 'stdio':
# When the transport is stdio, we are probably running as a Nvim
# child process. In that case, we don't want to be killed by
# ctrl+C
return
if signum == signal.SIGINT:
self._error = KeyboardInterrupt()
else:
self._error = Exception(msg)
self.stop()
def _on_error(self, exc: Exception) -> None:
pass # replaces next logging statement
# debug(str(exc))
self._error = exc
self.stop()
def _on_interrupt(self) -> None:
self.stop()
def _setup_signals(self, signals: List[signal.Signals]) -> None:
pass # no-op by default
def _teardown_signals(self) -> None:
pass # no-op by default
|