1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
|
"""Test kernel subshells."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
from __future__ import annotations
import platform
import time
from collections import Counter
from queue import Empty
import pytest
from jupyter_client.blocking.client import BlockingKernelClient
from .utils import (
TIMEOUT,
assemble_output,
flush_channels,
get_replies,
get_reply,
new_kernel,
wait_for_idle,
)
# Helpers
def create_subshell_helper(kc: BlockingKernelClient):
msg = kc.session.msg("create_subshell_request")
kc.control_channel.send(msg)
msg_id = msg["header"]["msg_id"]
reply = get_reply(kc, msg_id, TIMEOUT, channel="control")
return reply["content"]
def delete_subshell_helper(kc: BlockingKernelClient, subshell_id: str):
msg = kc.session.msg("delete_subshell_request", {"subshell_id": subshell_id})
kc.control_channel.send(msg)
msg_id = msg["header"]["msg_id"]
reply = get_reply(kc, msg_id, TIMEOUT, channel="control")
return reply["content"]
def list_subshell_helper(kc: BlockingKernelClient):
msg = kc.session.msg("list_subshell_request")
kc.control_channel.send(msg)
msg_id = msg["header"]["msg_id"]
reply = get_reply(kc, msg_id, TIMEOUT, channel="control")
return reply["content"]
def execute_request(
kc: BlockingKernelClient, code: str, subshell_id: str | None, silent: bool = False
):
msg = kc.session.msg("execute_request", {"code": code, "silent": silent})
msg["header"]["subshell_id"] = subshell_id
kc.shell_channel.send(msg)
return msg
def execute_request_subshell_id(
kc: BlockingKernelClient, code: str, subshell_id: str | None, terminator: str = "\n"
):
msg = execute_request(kc, code, subshell_id)
msg_id = msg["header"]["msg_id"]
stdout, _ = assemble_output(kc.get_iopub_msg, None, msg_id)
return stdout.strip()
def execute_thread_count(kc: BlockingKernelClient) -> int:
code = "print(threading.active_count())"
return int(execute_request_subshell_id(kc, code, None))
def execute_thread_ids(kc: BlockingKernelClient, subshell_id: str | None = None) -> tuple[str, str]:
code = "print(threading.get_ident(), threading.main_thread().ident)"
return execute_request_subshell_id(kc, code, subshell_id).split()
# Tests
def test_no_subshells():
with new_kernel() as kc:
# Test operation of separate channel thread without using any subshells.
execute_request_subshell_id(kc, "a = 2*3", None)
res = execute_request_subshell_id(kc, "print(a)", None)
assert res == "6"
def test_supported():
with new_kernel() as kc:
msg_id = kc.kernel_info()
reply = get_reply(kc, msg_id, TIMEOUT)
assert "supported_features" in reply["content"]
assert "kernel subshells" in reply["content"]["supported_features"]
def test_subshell_id_lifetime():
with new_kernel() as kc:
assert list_subshell_helper(kc)["subshell_id"] == []
subshell_id = create_subshell_helper(kc)["subshell_id"]
assert list_subshell_helper(kc)["subshell_id"] == [subshell_id]
delete_subshell_helper(kc, subshell_id)
assert list_subshell_helper(kc)["subshell_id"] == []
def test_thread_counts():
with new_kernel() as kc:
execute_request_subshell_id(kc, "import threading", None)
nthreads = execute_thread_count(kc)
subshell_id = create_subshell_helper(kc)["subshell_id"]
nthreads2 = execute_thread_count(kc)
assert nthreads2 > nthreads
delete_subshell_helper(kc, subshell_id)
nthreads3 = execute_thread_count(kc)
assert nthreads3 == nthreads
def test_thread_ids():
with new_kernel() as kc:
execute_request_subshell_id(kc, "import threading", None)
subshell_id = create_subshell_helper(kc)["subshell_id"]
thread_id, main_thread_id = execute_thread_ids(kc)
assert thread_id == main_thread_id
thread_id, main_thread_id = execute_thread_ids(kc, subshell_id) # This is the problem
assert thread_id != main_thread_id
delete_subshell_helper(kc, subshell_id)
@pytest.mark.parametrize("are_subshells", [(False, True), (True, False), (True, True)])
@pytest.mark.parametrize("overlap", [True, False])
def test_run_concurrently_sequence(are_subshells, overlap, request):
if request.config.getvalue("--cov"):
pytest.skip("Skip time-sensitive subshell tests if measuring coverage")
with new_kernel() as kc:
subshell_ids = [
create_subshell_helper(kc)["subshell_id"] if is_subshell else None
for is_subshell in are_subshells
]
# Import time module before running time-sensitive subshell code
# and use threading.Barrier to synchronise start of subshell code.
execute_request_subshell_id(
kc, "import threading as t, time; b=t.Barrier(2); print('ok')", None
)
sleep = 0.5
if overlap:
codes = [
f"b.wait(); start0=True; end0=False; time.sleep({sleep}); end0=True",
f"b.wait(); time.sleep({sleep / 2}); assert start0; assert not end0; time.sleep({sleep}); assert end0",
]
else:
codes = [
f"b.wait(); start0=True; end0=False; time.sleep({sleep}); assert end1",
f"b.wait(); time.sleep({sleep / 2}); assert start0; assert not end0; end1=True",
]
msgs = []
for subshell_id, code in zip(subshell_ids, codes, strict=False):
msg = kc.session.msg("execute_request", {"code": code})
msg["header"]["subshell_id"] = subshell_id
kc.shell_channel.send(msg)
msgs.append(msg)
replies = get_replies(kc, [msg["msg_id"] for msg in msgs], timeout=None)
for subshell_id in subshell_ids:
if subshell_id:
delete_subshell_helper(kc, subshell_id)
for reply in replies:
assert reply["content"]["status"] == "ok", reply
def test_create_while_execute():
with new_kernel() as kc:
# Send request to execute code on main subshell.
msg = kc.session.msg("execute_request", {"code": "import time; time.sleep(0.05)"})
kc.shell_channel.send(msg)
# Create subshell via control channel.
control_msg = kc.session.msg("create_subshell_request")
kc.control_channel.send(control_msg)
control_reply = get_reply(kc, control_msg["header"]["msg_id"], TIMEOUT, channel="control")
subshell_id = control_reply["content"]["subshell_id"]
control_date = control_reply["header"]["date"]
# Get result message from main subshell.
shell_date = get_reply(kc, msg["msg_id"])["header"]["date"]
delete_subshell_helper(kc, subshell_id)
assert control_date < shell_date
@pytest.mark.skipif(
platform.python_implementation() == "PyPy",
reason="does not work on PyPy",
)
def test_shutdown_with_subshell():
# Based on test_kernel.py::test_shutdown
with new_kernel() as kc:
km = kc.parent
subshell_id = create_subshell_helper(kc)["subshell_id"]
assert list_subshell_helper(kc)["subshell_id"] == [subshell_id]
kc.shutdown()
for _ in range(100): # 10 s timeout
if km.is_alive():
time.sleep(0.1)
else:
break
assert not km.is_alive()
@pytest.mark.parametrize("are_subshells", [(False, True), (True, False), (True, True)])
def test_execute_stop_on_error(are_subshells):
# Based on test_message_spec.py::test_execute_stop_on_error, testing that exception
# in one subshell aborts execution queue in that subshell but not others.
with new_kernel() as kc:
subshell_ids = [
create_subshell_helper(kc)["subshell_id"] if is_subshell else None
for is_subshell in are_subshells
]
msg_ids = []
msg = execute_request(
kc, "import asyncio; await asyncio.sleep(1); raise ValueError()", subshell_ids[0]
)
msg_ids.append(msg["header"]["msg_id"])
msg = execute_request(kc, "print('hello')", subshell_ids[0])
msg_ids.append(msg["header"]["msg_id"])
msg = execute_request(kc, "print('goodbye')", subshell_ids[0])
msg_ids.append(msg["header"]["msg_id"])
msg = execute_request(kc, "import time; time.sleep(1.5)", subshell_ids[1])
msg_ids.append(msg["header"]["msg_id"])
msg = execute_request(kc, "print('other')", subshell_ids[1])
msg_ids.append(msg["header"]["msg_id"])
replies = get_replies(kc, msg_ids)
assert replies[0]["parent_header"]["subshell_id"] == subshell_ids[0]
assert replies[1]["parent_header"]["subshell_id"] == subshell_ids[0]
assert replies[2]["parent_header"]["subshell_id"] == subshell_ids[0]
assert replies[3]["parent_header"]["subshell_id"] == subshell_ids[1]
assert replies[4]["parent_header"]["subshell_id"] == subshell_ids[1]
assert replies[0]["content"]["status"] == "error"
assert replies[1]["content"]["status"] == "aborted"
assert replies[2]["content"]["status"] == "aborted"
assert replies[3]["content"]["status"] == "ok"
assert replies[4]["content"]["status"] == "ok"
# Check abort is cleared.
msg = execute_request(kc, "print('check')", subshell_ids[0])
reply = get_reply(kc, msg["msg_id"])
assert reply["parent_header"]["subshell_id"] == subshell_ids[0]
assert reply["content"]["status"] == "ok"
# Cleanup
for subshell_id in subshell_ids:
if subshell_id:
delete_subshell_helper(kc, subshell_id)
@pytest.mark.parametrize("are_subshells", [(False, True), (True, False), (True, True)])
def test_idle_message_parent_headers(are_subshells):
with new_kernel() as kc:
# import time module on main shell.
msg = kc.session.msg("execute_request", {"code": "import time"})
kc.shell_channel.send(msg)
subshell_ids = [
create_subshell_helper(kc)["subshell_id"] if is_subshell else None
for is_subshell in are_subshells
]
# Wait for all idle status messages to be received.
for _ in range(1 + sum(are_subshells)):
wait_for_idle(kc)
msg_ids = []
for subshell_id in subshell_ids:
msg = execute_request(kc, "time.sleep(0.5)", subshell_id)
msg_ids.append(msg["msg_id"])
# Expect 4 status messages (2 busy, 2 idle) on iopub channel for the two execute_requests
statuses = []
timeout = TIMEOUT # Combined timeout to receive all the status messages
t0 = time.time()
while True:
status = kc.get_iopub_msg(timeout=timeout)
if status["msg_type"] != "status" or status["parent_header"]["msg_id"] not in msg_ids:
continue
statuses.append(status)
if len(statuses) == 4:
break
t1 = time.time()
timeout -= t1 - t0
t0 = t1
execution_states = Counter(msg["content"]["execution_state"] for msg in statuses)
assert execution_states["busy"] == 2
assert execution_states["idle"] == 2
parent_msg_ids = Counter(msg["parent_header"]["msg_id"] for msg in statuses)
assert parent_msg_ids[msg_ids[0]] == 2
assert parent_msg_ids[msg_ids[1]] == 2
parent_subshell_ids = Counter(msg["parent_header"].get("subshell_id") for msg in statuses)
assert parent_subshell_ids[subshell_ids[0]] == 2
assert parent_subshell_ids[subshell_ids[1]] == 2
# Cleanup
for subshell_id in subshell_ids:
if subshell_id:
delete_subshell_helper(kc, subshell_id)
def test_silent_flag_in_subshells():
"""Verifies that the 'silent' flag suppresses output in main and subshell contexts."""
with new_kernel() as kc:
subshell_id = None
try:
flush_channels(kc)
# Test silent execution in main shell
msg_main_silent = execute_request(kc, "a=1", None, silent=True)
reply_main_silent = get_reply(kc, msg_main_silent["header"]["msg_id"])
assert reply_main_silent["content"]["status"] == "ok"
# Test silent execution in subshell
subshell_id = create_subshell_helper(kc)["subshell_id"]
msg_sub_silent = execute_request(kc, "b=2", subshell_id, silent=True)
reply_sub_silent = get_reply(kc, msg_sub_silent["header"]["msg_id"])
assert reply_sub_silent["content"]["status"] == "ok"
# Check for no iopub messages (other than status) from the silent requests
for msg_id in [msg_main_silent["header"]["msg_id"], msg_sub_silent["header"]["msg_id"]]:
while True:
try:
msg = kc.get_iopub_msg(timeout=0.2)
if msg["header"]["msg_type"] == "status":
continue
pytest.fail(
f"Silent execution produced unexpected IOPub message: {msg['header']['msg_type']}"
)
except Empty:
break
# Test concurrent silent and non-silent execution
msg_silent = execute_request(
kc, "import time; time.sleep(0.5); c=3", subshell_id, silent=True
)
msg_noisy = execute_request(kc, "print('noisy')", None, silent=False)
# Wait for both replies
get_replies(kc, [msg_silent["header"]["msg_id"], msg_noisy["header"]["msg_id"]])
# Verify that we only receive stream output from the noisy message
stdout, stderr = assemble_output(
kc.get_iopub_msg, parent_msg_id=msg_noisy["header"]["msg_id"]
)
assert "noisy" in stdout
assert not stderr
# Verify there is no output from the concurrent silent message
while True:
try:
msg = kc.get_iopub_msg(timeout=0.2)
if (
msg["header"]["msg_type"] == "status"
and msg["parent_header"].get("msg_id") == msg_silent["header"]["msg_id"]
):
continue
if msg["parent_header"].get("msg_id") == msg_silent["header"]["msg_id"]:
pytest.fail(
"Silent execution in concurrent setting produced unexpected IOPub message"
)
except Empty:
break
finally:
# Ensure subshell is always deleted
if subshell_id:
delete_subshell_helper(kc, subshell_id)
|