1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
|
"""Test IPythonKernel directly"""
import asyncio
import os
import pytest
import zmq
from IPython.core.history import DummyDB
from ipykernel.comm.comm import BaseComm
from ipykernel.ipkernel import IPythonKernel, _create_comm
from .conftest import MockIPyKernel
if os.name == "nt":
pytest.skip("skipping tests on windows", allow_module_level=True)
class user_mod:
__dict__ = {}
async def test_properties(ipkernel: IPythonKernel) -> None:
ipkernel.user_module = user_mod()
ipkernel.user_ns = {}
async def test_direct_kernel_info_request(ipkernel):
reply = await ipkernel.test_shell_message("kernel_info_request", {})
assert reply["header"]["msg_type"] == "kernel_info_reply"
async def test_direct_execute_request(ipkernel: MockIPyKernel) -> None:
reply = await ipkernel.test_shell_message("execute_request", dict(code="hello", silent=False))
assert reply["header"]["msg_type"] == "execute_reply"
reply = await ipkernel.test_shell_message(
"execute_request", dict(code="trigger_error", silent=False)
)
assert reply["content"]["status"] == "aborted"
reply = await ipkernel.test_shell_message("execute_request", dict(code="hello", silent=False))
assert reply["header"]["msg_type"] == "execute_reply"
async def test_direct_execute_request_aborting(ipkernel):
ipkernel._aborting = True
reply = await ipkernel.test_shell_message("execute_request", dict(code="hello", silent=False))
assert reply["header"]["msg_type"] == "execute_reply"
assert reply["content"]["status"] == "aborted"
async def test_complete_request(ipkernel):
reply = await ipkernel.test_shell_message("complete_request", dict(code="hello", cursor_pos=0))
assert reply["header"]["msg_type"] == "complete_reply"
ipkernel.use_experimental_completions = False
reply = await ipkernel.test_shell_message(
"complete_request", dict(code="hello", cursor_pos=None)
)
assert reply["header"]["msg_type"] == "complete_reply"
async def test_inspect_request(ipkernel):
reply = await ipkernel.test_shell_message("inspect_request", dict(code="hello", cursor_pos=0))
assert reply["header"]["msg_type"] == "inspect_reply"
async def test_history_request(ipkernel):
ipkernel.shell.history_manager.db = DummyDB()
reply = await ipkernel.test_shell_message(
"history_request", dict(hist_access_type="", output="", raw="")
)
assert reply["header"]["msg_type"] == "history_reply"
reply = await ipkernel.test_shell_message(
"history_request", dict(hist_access_type="tail", output="", raw="")
)
assert reply["header"]["msg_type"] == "history_reply"
reply = await ipkernel.test_shell_message(
"history_request", dict(hist_access_type="range", output="", raw="")
)
assert reply["header"]["msg_type"] == "history_reply"
reply = await ipkernel.test_shell_message(
"history_request", dict(hist_access_type="search", output="", raw="")
)
assert reply["header"]["msg_type"] == "history_reply"
async def test_comm_info_request(ipkernel):
reply = await ipkernel.test_shell_message("comm_info_request")
assert reply["header"]["msg_type"] == "comm_info_reply"
async def test_direct_interrupt_request(ipkernel):
reply = await ipkernel.test_control_message("interrupt_request", {})
assert reply["header"]["msg_type"] == "interrupt_reply"
assert reply["content"] == {"status": "ok"}
# test failure on interrupt request
def raiseOSError():
msg = "evalue"
raise OSError(msg)
ipkernel._send_interrupt_children = raiseOSError
reply = await ipkernel.test_control_message("interrupt_request", {})
assert reply["header"]["msg_type"] == "interrupt_reply"
assert reply["content"]["status"] == "error"
assert reply["content"]["ename"] == "OSError"
assert reply["content"]["evalue"] == "evalue"
assert len(reply["content"]["traceback"]) > 0
# TODO: this causes deadlock
# async def test_direct_shutdown_request(ipkernel):
# reply = await ipkernel.test_shell_message("shutdown_request", dict(restart=False))
# assert reply["header"]["msg_type"] == "shutdown_reply"
# reply = await ipkernel.test_shell_message("shutdown_request", dict(restart=True))
# assert reply["header"]["msg_type"] == "shutdown_reply"
# TODO: this causes deadlock
# async def test_direct_usage_request(kernel):
# reply = await kernel.test_control_message("usage_request", {})
# assert reply['header']['msg_type'] == 'usage_reply'
async def test_is_complete_request(ipkernel: MockIPyKernel) -> None:
reply = await ipkernel.test_shell_message("is_complete_request", dict(code="hello"))
assert reply["header"]["msg_type"] == "is_complete_reply"
setattr(ipkernel, "shell.input_transformer_manager", None)
reply = await ipkernel.test_shell_message("is_complete_request", dict(code="hello"))
assert reply["header"]["msg_type"] == "is_complete_reply"
def test_do_apply(ipkernel: MockIPyKernel) -> None:
from ipyparallel import pack_apply_message
def hello():
pass
msg = pack_apply_message(hello, (), {})
ipkernel.do_apply(None, msg, "1", {})
ipkernel.do_apply(None, [], "1", {})
async def test_direct_debug_request(ipkernel):
reply = await ipkernel.test_control_message("debug_request", {})
assert reply["header"]["msg_type"] == "debug_reply"
async def test_direct_clear(ipkernel):
ipkernel.do_clear()
async def test_cancel_on_sigint(ipkernel: IPythonKernel) -> None:
future: asyncio.Future = asyncio.Future()
with ipkernel._cancel_on_sigint(future):
pass
future.set_result(None)
def test_dispatch_debugpy(ipkernel: IPythonKernel) -> None:
msg = ipkernel.session.msg("debug_request", {})
msg_list = ipkernel.session.serialize(msg)
ipkernel.dispatch_debugpy([zmq.Message(m) for m in msg_list])
async def test_start(ipkernel: IPythonKernel) -> None:
shell_future: asyncio.Future = asyncio.Future()
def fake_publish_status(status, channel):
if status == "starting" and channel == "shell":
shell_future.set_result(None)
ipkernel._publish_status = fake_publish_status # type:ignore
ipkernel.start()
await shell_future
shell_stream = ipkernel.shell_stream
assert shell_stream is not None
assert not shell_stream.closed()
async def test_start_no_debugpy(ipkernel: IPythonKernel) -> None:
shell_future: asyncio.Future = asyncio.Future()
def fake_publish_status(status, channel):
if status == "starting" and channel == "shell":
shell_future.set_result(None)
ipkernel._publish_status = fake_publish_status # type:ignore
ipkernel.debugpy_stream = None
ipkernel.start()
await shell_future
shell_stream = ipkernel.shell_stream
assert shell_stream is not None
assert not shell_stream.closed()
def test_create_comm():
assert isinstance(_create_comm(), BaseComm)
def test_finish_metadata(ipkernel: IPythonKernel) -> None:
reply_content = dict(status="error", ename="UnmetDependency")
metadata = ipkernel.finish_metadata({}, {}, reply_content)
assert metadata["dependencies_met"] is False
async def test_do_debug_request(ipkernel: IPythonKernel) -> None:
msg = ipkernel.session.msg("debug_request", {})
ipkernel.session.serialize(msg)
await ipkernel.do_debug_request(msg)
|