1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
|
"""Test GatewayClient"""
import os
import json
import uuid
from datetime import datetime
from io import StringIO
from unittest.mock import patch
from tornado.web import HTTPError
from tornado.httpclient import HTTPRequest, HTTPResponse
from notebook.gateway.managers import GatewayClient
from notebook.utils import maybe_future
from .launchnotebook import NotebookTestBase
def generate_kernelspec(name):
argv_stanza = ['python', '-m', 'ipykernel_launcher', '-f', '{connection_file}']
spec_stanza = {'spec': {'argv': argv_stanza, 'env': {}, 'display_name': name, 'language': 'python', 'interrupt_mode': 'signal', 'metadata': {}}}
kernelspec_stanza = {'name': name, 'spec': spec_stanza, 'resources': {}}
return kernelspec_stanza
# We'll mock up two kernelspecs - kspec_foo and kspec_bar
kernelspecs = {'default': 'kspec_foo', 'kernelspecs': {'kspec_foo': generate_kernelspec('kspec_foo'), 'kspec_bar': generate_kernelspec('kspec_bar')}}
# maintain a dictionary of expected running kernels. Key = kernel_id, Value = model.
running_kernels = dict()
def generate_model(name):
"""Generate a mocked kernel model. Caller is responsible for adding model to running_kernels dictionary."""
dt = datetime.utcnow().isoformat() + 'Z'
kernel_id = str(uuid.uuid4())
model = {'id': kernel_id, 'name': name, 'last_activity': str(dt), 'execution_state': 'idle', 'connections': 1}
return model
async def mock_gateway_request(url, **kwargs):
method = 'GET'
if kwargs['method']:
method = kwargs['method']
request = HTTPRequest(url=url, **kwargs)
endpoint = str(url)
# Fetch all kernelspecs
if endpoint.endswith('/api/kernelspecs') and method == 'GET':
response_buf = StringIO(json.dumps(kernelspecs))
response = await maybe_future(HTTPResponse(request, 200, buffer=response_buf))
return response
# Fetch named kernelspec
if endpoint.rfind('/api/kernelspecs/') >= 0 and method == 'GET':
requested_kernelspec = endpoint.rpartition('/')[2]
kspecs = kernelspecs.get('kernelspecs')
if requested_kernelspec in kspecs:
response_buf = StringIO(json.dumps(kspecs.get(requested_kernelspec)))
response = await maybe_future(HTTPResponse(request, 200, buffer=response_buf))
return response
else:
raise HTTPError(404, message=f'Kernelspec does not exist: {requested_kernelspec}')
# Create kernel
if endpoint.endswith('/api/kernels') and method == 'POST':
json_body = json.loads(kwargs['body'])
name = json_body.get('name')
env = json_body.get('env')
kspec_name = env.get('KERNEL_KSPEC_NAME')
assert name == kspec_name # Ensure that KERNEL_ env values get propagated
model = generate_model(name)
running_kernels[model.get('id')] = model # Register model as a running kernel
response_buf = StringIO(json.dumps(model))
response = await maybe_future(HTTPResponse(request, 201, buffer=response_buf))
return response
# Fetch list of running kernels
if endpoint.endswith('/api/kernels') and method == 'GET':
kernels = []
for kernel_id in running_kernels.keys():
model = running_kernels.get(kernel_id)
kernels.append(model)
response_buf = StringIO(json.dumps(kernels))
response = await maybe_future(HTTPResponse(request, 200, buffer=response_buf))
return response
# Interrupt or restart existing kernel
if endpoint.rfind('/api/kernels/') >= 0 and method == 'POST':
requested_kernel_id, sep, action = endpoint.rpartition('/api/kernels/')[2].rpartition('/')
if action == 'interrupt':
if requested_kernel_id in running_kernels:
response = await maybe_future(HTTPResponse(request, 204))
return response
else:
raise HTTPError(404, message=f'Kernel does not exist: {requested_kernel_id}')
elif action == 'restart':
if requested_kernel_id in running_kernels:
response_buf = StringIO(json.dumps(running_kernels.get(requested_kernel_id)))
response = await maybe_future(HTTPResponse(request, 204, buffer=response_buf))
return response
else:
raise HTTPError(404, message=f'Kernel does not exist: {requested_kernel_id}')
else:
raise HTTPError(404, message=f'Bad action detected: {action}')
# Shutdown existing kernel
if endpoint.rfind('/api/kernels/') >= 0 and method == 'DELETE':
requested_kernel_id = endpoint.rpartition('/')[2]
running_kernels.pop(requested_kernel_id) # Simulate shutdown by removing kernel from running set
response = await maybe_future(HTTPResponse(request, 204))
return response
# Fetch existing kernel
if endpoint.rfind('/api/kernels/') >= 0 and method == 'GET':
requested_kernel_id = endpoint.rpartition('/')[2]
if requested_kernel_id in running_kernels:
response_buf = StringIO(json.dumps(running_kernels.get(requested_kernel_id)))
response = await maybe_future(HTTPResponse(request, 200, buffer=response_buf))
return response
else:
raise HTTPError(404, message=f'Kernel does not exist: {requested_kernel_id}')
mocked_gateway = patch('notebook.gateway.managers.gateway_request', mock_gateway_request)
class TestGateway(NotebookTestBase):
mock_gateway_url = 'http://mock-gateway-server:8889'
mock_http_user = 'alice'
@classmethod
def setup_class(cls):
GatewayClient.clear_instance()
super().setup_class()
@classmethod
def teardown_class(cls):
GatewayClient.clear_instance()
super().teardown_class()
@classmethod
def get_patch_env(cls):
test_env = super().get_patch_env()
test_env.update({'JUPYTER_GATEWAY_URL': TestGateway.mock_gateway_url,
'JUPYTER_GATEWAY_CONNECT_TIMEOUT': '44.4'})
return test_env
@classmethod
def get_argv(cls):
argv = super().get_argv()
argv.extend(['--GatewayClient.request_timeout=96.0', '--GatewayClient.http_user=' + TestGateway.mock_http_user])
return argv
def setUp(self):
kwargs = dict()
GatewayClient.instance().load_connection_args(**kwargs)
super().setUp()
def test_gateway_options(self):
assert self.notebook.gateway_config.gateway_enabled == True
assert self.notebook.gateway_config.url == TestGateway.mock_gateway_url
assert self.notebook.gateway_config.http_user == TestGateway.mock_http_user
assert self.notebook.gateway_config.connect_timeout == self.notebook.gateway_config.connect_timeout
assert self.notebook.gateway_config.connect_timeout == 44.4
assert self.notebook.gateway_config.request_timeout == 96.0
assert os.environ['KERNEL_LAUNCH_TIMEOUT'] == str(96) # Ensure KLT gets set from request-timeout
def test_gateway_class_mappings(self):
# Ensure appropriate class mappings are in place.
assert self.notebook.kernel_manager_class.__name__ == 'GatewayKernelManager'
assert self.notebook.session_manager_class.__name__ == 'GatewaySessionManager'
assert self.notebook.kernel_spec_manager_class.__name__ == 'GatewayKernelSpecManager'
def test_gateway_get_kernelspecs(self):
# Validate that kernelspecs come from gateway.
with mocked_gateway:
response = self.request('GET', '/api/kernelspecs')
self.assertEqual(response.status_code, 200)
content = json.loads(response.content.decode('utf-8'))
kspecs = content.get('kernelspecs')
assert len(kspecs) == 2
assert kspecs.get('kspec_bar').get('name') == 'kspec_bar'
def test_gateway_get_named_kernelspec(self):
# Validate that a specific kernelspec can be retrieved from gateway.
with mocked_gateway:
response = self.request('GET', '/api/kernelspecs/kspec_foo')
assert response.status_code == 200
kspec_foo = json.loads(response.content.decode('utf-8'))
assert kspec_foo.get('name') == 'kspec_foo'
response = self.request('GET', '/api/kernelspecs/no_such_spec')
assert response.status_code == 404
def test_gateway_session_lifecycle(self):
# Validate session lifecycle functions; create and delete.
# create
session_id, kernel_id = self.create_session('kspec_foo')
# ensure kernel still considered running
self.assertTrue(self.is_kernel_running(kernel_id))
# interrupt
self.interrupt_kernel(kernel_id)
# ensure kernel still considered running
self.assertTrue(self.is_kernel_running(kernel_id))
# restart
self.restart_kernel(kernel_id)
# ensure kernel still considered running
self.assertTrue(self.is_kernel_running(kernel_id))
# delete
self.delete_session(session_id)
self.assertFalse(self.is_kernel_running(kernel_id))
def test_gateway_kernel_lifecycle(self):
# Validate kernel lifecycle functions; create, interrupt, restart and delete.
# create
kernel_id = self.create_kernel('kspec_bar')
# ensure kernel still considered running
self.assertTrue(self.is_kernel_running(kernel_id))
# interrupt
self.interrupt_kernel(kernel_id)
# ensure kernel still considered running
self.assertTrue(self.is_kernel_running(kernel_id))
# restart
self.restart_kernel(kernel_id)
# ensure kernel still considered running
self.assertTrue(self.is_kernel_running(kernel_id))
# delete
self.delete_kernel(kernel_id)
self.assertFalse(self.is_kernel_running(kernel_id))
def create_session(self, kernel_name):
"""Creates a session for a kernel. The session is created against the notebook server
which then uses the gateway for kernel management.
"""
with mocked_gateway:
nb_path = os.path.join(self.notebook_dir, 'testgw.ipynb')
kwargs = dict()
kwargs['json'] = {'path': nb_path, 'type': 'notebook', 'kernel': {'name': kernel_name}}
# add a KERNEL_ value to the current env and we'll ensure that that value exists in the mocked method
os.environ['KERNEL_KSPEC_NAME'] = kernel_name
# Create the kernel... (also tests get_kernel)
response = self.request('POST', '/api/sessions', **kwargs)
self.assertEqual(response.status_code, 201)
model = json.loads(response.content.decode('utf-8'))
self.assertEqual(model.get('path'), nb_path)
kernel_id = model.get('kernel').get('id')
# ensure its in the running_kernels and name matches.
running_kernel = running_kernels.get(kernel_id)
self.assertEqual(kernel_id, running_kernel.get('id'))
self.assertEqual(model.get('kernel').get('name'), running_kernel.get('name'))
session_id = model.get('id')
# restore env
os.environ.pop('KERNEL_KSPEC_NAME')
return session_id, kernel_id
def delete_session(self, session_id):
"""Deletes a session corresponding to the given session id.
"""
with mocked_gateway:
# Delete the session (and kernel)
response = self.request('DELETE', '/api/sessions/' + session_id)
self.assertEqual(response.status_code, 204)
self.assertEqual(response.reason, 'No Content')
def is_kernel_running(self, kernel_id):
"""Issues request to get the set of running kernels
"""
with mocked_gateway:
# Get list of running kernels
response = self.request('GET', '/api/kernels')
self.assertEqual(response.status_code, 200)
kernels = json.loads(response.content.decode('utf-8'))
self.assertEqual(len(kernels), len(running_kernels))
for model in kernels:
if model.get('id') == kernel_id:
return True
return False
def create_kernel(self, kernel_name):
"""Issues request to restart the given kernel
"""
with mocked_gateway:
kwargs = dict()
kwargs['json'] = {'name': kernel_name}
# add a KERNEL_ value to the current env and we'll ensure that that value exists in the mocked method
os.environ['KERNEL_KSPEC_NAME'] = kernel_name
response = self.request('POST', '/api/kernels', **kwargs)
self.assertEqual(response.status_code, 201)
model = json.loads(response.content.decode('utf-8'))
kernel_id = model.get('id')
# ensure its in the running_kernels and name matches.
running_kernel = running_kernels.get(kernel_id)
self.assertEqual(kernel_id, running_kernel.get('id'))
self.assertEqual(model.get('name'), kernel_name)
# restore env
os.environ.pop('KERNEL_KSPEC_NAME')
return kernel_id
def interrupt_kernel(self, kernel_id):
"""Issues request to interrupt the given kernel
"""
with mocked_gateway:
response = self.request('POST', '/api/kernels/' + kernel_id + '/interrupt')
self.assertEqual(response.status_code, 204)
self.assertEqual(response.reason, 'No Content')
def restart_kernel(self, kernel_id):
"""Issues request to restart the given kernel
"""
with mocked_gateway:
response = self.request('POST', '/api/kernels/' + kernel_id + '/restart')
self.assertEqual(response.status_code, 200)
model = json.loads(response.content.decode('utf-8'))
restarted_kernel_id = model.get('id')
# ensure its in the running_kernels and name matches.
running_kernel = running_kernels.get(restarted_kernel_id)
self.assertEqual(restarted_kernel_id, running_kernel.get('id'))
self.assertEqual(model.get('name'), running_kernel.get('name'))
def delete_kernel(self, kernel_id):
"""Deletes kernel corresponding to the given kernel id.
"""
with mocked_gateway:
# Delete the session (and kernel)
response = self.request('DELETE', '/api/kernels/' + kernel_id)
self.assertEqual(response.status_code, 204)
self.assertEqual(response.reason, 'No Content')
|