1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
# Copyright 2017-2020 Palantir Technologies, Inc.
# Copyright 2021- Python Language Server Contributors.
import logging
import threading
try:
import ujson as json
except Exception: # pylint: disable=broad-except
import json
log = logging.getLogger(__name__)
class JsonRpcStreamReader:
def __init__(self, rfile):
self._rfile = rfile
def close(self):
self._rfile.close()
def listen(self, message_consumer):
"""Blocking call to listen for messages on the rfile.
Args:
message_consumer (fn): function that is passed each message as it is read off the socket.
"""
while not self._rfile.closed:
try:
request_str = self._read_message()
except ValueError:
if self._rfile.closed:
return
log.exception("Failed to read from rfile")
if request_str is None:
break
try:
message_consumer(json.loads(request_str.decode('utf-8')))
except ValueError:
log.exception("Failed to parse JSON message %s", request_str)
continue
def _read_message(self):
"""Reads the contents of a message.
Returns:
body of message if parsable else None
"""
line = self._rfile.readline()
if not line:
return None
content_length = self._content_length(line)
# Blindly consume all header lines
while line and line.strip():
line = self._rfile.readline()
if not line:
return None
# Grab the body
return self._rfile.read(content_length)
@staticmethod
def _content_length(line):
"""Extract the content length from an input line."""
if line.startswith(b'Content-Length: '):
_, value = line.split(b'Content-Length: ')
value = value.strip()
try:
return int(value)
except ValueError as e:
raise ValueError(f"Invalid Content-Length header: {value}") from e
return None
class JsonRpcStreamWriter:
def __init__(self, wfile, **json_dumps_args):
self._wfile = wfile
self._wfile_lock = threading.Lock()
self._json_dumps_args = json_dumps_args
def close(self):
with self._wfile_lock:
self._wfile.close()
def write(self, message):
with self._wfile_lock:
if self._wfile.closed:
return
try:
body = json.dumps(message, **self._json_dumps_args)
# Ensure we get the byte length, not the character length
content_length = len(body) if isinstance(body, bytes) else len(body.encode('utf-8'))
response = (
f"Content-Length: {content_length}\r\n"
f"Content-Type: application/vscode-jsonrpc; charset=utf8\r\n\r\n"
f"{body}"
)
self._wfile.write(response.encode('utf-8'))
self._wfile.flush()
except Exception: # pylint: disable=broad-except
log.exception("Failed to write message to output file %s", message)
|