1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
import json
from unittest.mock import MagicMock
import pytest
from pykube import Pod
from pykube.exceptions import HTTPError
from pykube.query import Query
@pytest.fixture
def api():
return MagicMock()
def test_watch_response_exists(api):
stream = Query(api, Pod).watch()
assert hasattr(stream, "response")
assert stream.response is None # not yet executed
def test_watch_response_is_readonly(api):
stream = Query(api, Pod).watch()
with pytest.raises(AttributeError):
stream.response = object()
def test_watch_response_is_set_on_iter(api):
line1 = json.dumps({"type": "ADDED", "object": {}}).encode("utf-8")
expected_response = MagicMock()
expected_response.iter_lines.return_value = [line1]
api.get.return_value = expected_response
stream = Query(api, Pod).watch()
next(iter(stream))
assert stream.response is expected_response
assert api.get.call_count == 1
assert api.get.call_args_list[0][1]["stream"] is True
assert "watch=true" in api.get.call_args_list[0][1]["url"]
def test_watch_exception_raised_on_failure(api):
line1 = json.dumps(
{
"kind": "Status",
"apiVersion": "meta.k8s.io/v1",
"metadata": {},
"status": "Failure",
"message": "unexpected EOF",
"code": 500,
}
).encode("utf-8")
expected_response = MagicMock()
expected_response.iter_lines.return_value = [line1]
api.get.return_value = expected_response
stream = Query(api, Pod).watch()
with pytest.raises(HTTPError):
next(iter(stream))
assert stream.response is expected_response
assert api.get.call_count == 1
assert api.get.call_args_list[0][1]["stream"] is True
assert "watch=true" in api.get.call_args_list[0][1]["url"]
def test_watch_params_are_passed_through(api):
line1 = json.dumps({"type": "ADDED", "object": {}}).encode("utf-8")
expected_response = MagicMock()
expected_response.iter_lines.return_value = [line1]
api.get.return_value = expected_response
params = dict(timeoutSeconds=123, arbitraryParam=456)
stream = Query(api, Pod).watch(params=params)
next(iter(stream))
assert api.get.call_count == 1
assert "timeoutSeconds=123" in api.get.call_args_list[0][1]["url"]
assert "arbitraryParam=456" in api.get.call_args_list[0][1]["url"]
|