1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
import asyncio
import json
import os
import socket
import subprocess
import sys
import threading
import time
import pytest
import websockets
NUM_CLIENTS = 2
NUM_REQUESTS = 5
TEST_PORT = 5102
HOST = "127.0.0.1"
MAX_STARTUP_SECONDS = 5.0
CHECK_INTERVAL = 0.1
@pytest.fixture(scope="module", autouse=True)
def ws_server_subprocess():
cmd = [
sys.executable,
"-m",
"pylsp.__main__",
"--ws",
"--host",
HOST,
"--port",
str(TEST_PORT),
]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ.copy(),
)
deadline = time.time() + MAX_STARTUP_SECONDS
while True:
try:
with socket.create_connection(
("127.0.0.1", TEST_PORT), timeout=CHECK_INTERVAL
):
break
except (ConnectionRefusedError, OSError):
if time.time() > deadline:
proc.kill()
out, err = proc.communicate(timeout=1)
raise RuntimeError(
f"Server didn’t start listening on port {TEST_PORT} in time.\n"
f"STDOUT:\n{out.decode()}\nSTDERR:\n{err.decode()}"
)
time.sleep(CHECK_INTERVAL)
yield # run the tests
proc.terminate()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
proc.kill()
TEST_DOC = """\
def test():
'''Test documentation'''
test()
"""
def test_concurrent_ws_requests():
errors = set()
lock = threading.Lock()
def thread_target(i: int):
async def do_initialize(idx):
uri = f"ws://{HOST}:{TEST_PORT}"
async with websockets.connect(uri) as ws:
# send initialize
init_request = {
"jsonrpc": "2.0",
"id": 4 * idx,
"method": "initialize",
"params": {},
}
did_open_request = {
"jsonrpc": "2.0",
"id": 4 * (idx + 1),
"method": "textDocument/didOpen",
"params": {
"textDocument": {
"uri": "test.py",
"languageId": "python",
"version": 0,
"text": TEST_DOC,
}
},
}
async def send_request(request: dict):
await asyncio.wait_for(
ws.send(json.dumps(request, ensure_ascii=False)), timeout=5
)
async def get_json_reply():
raw = await asyncio.wait_for(ws.recv(), timeout=60)
obj = json.loads(raw)
return obj
try:
await send_request(init_request)
await get_json_reply()
await send_request(did_open_request)
await get_json_reply()
requests = []
for i in range(NUM_REQUESTS):
hover_request = {
"jsonrpc": "2.0",
"id": 4 * (idx + 2 + i),
"method": "textDocument/definition",
"params": {
"textDocument": {
"uri": "test.py",
},
"position": {
"line": 3,
"character": 2,
},
},
}
requests.append(send_request(hover_request))
# send many requests in parallel
await asyncio.gather(*requests)
# collect replies
for i in range(NUM_REQUESTS):
hover = await get_json_reply()
assert hover
except (json.JSONDecodeError, asyncio.TimeoutError) as e:
return e
return None
error = asyncio.run(do_initialize(i))
with lock:
errors.add(error)
# launch threads
threads = []
for i in range(1, NUM_CLIENTS + 1):
t = threading.Thread(target=thread_target, args=(i,))
t.start()
threads.append(t)
# wait for them all
for t in threads:
t.join(timeout=50)
assert not t.is_alive(), f"Worker thread {t} hung!"
assert not any(filter(bool, errors))
|