File: test_ingress.py

package info (click to toggle)
python-aiohasupervisor 0.4.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 948 kB
  • sloc: python: 4,843; sh: 37; makefile: 3
file content (69 lines) | stat: -rw-r--r-- 2,536 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""Test ingress component client."""

from aioresponses import aioresponses
import pytest
from yarl import URL

from aiohasupervisor import SupervisorClient
from aiohasupervisor.models import CreateSessionOptions

from . import load_fixture
from .const import SUPERVISOR_URL


async def test_panels(
    responses: aioresponses, supervisor_client: SupervisorClient
) -> None:
    """Test panels API."""
    responses.get(
        f"{SUPERVISOR_URL}/ingress/panels",
        status=200,
        body=load_fixture("ingress_panels.json"),
    )
    result = await supervisor_client.ingress.panels()
    assert "core_ssh" in result
    assert result["core_ssh"].title == "Terminal"
    assert result["core_ssh"].icon == "mdi:console"
    assert result["core_ssh"].admin is True
    assert result["core_ssh"].enable is False
    assert "a0d7b954_vscode" in result
    assert result["a0d7b954_vscode"].title == "Studio Code Server"
    assert result["a0d7b954_vscode"].icon == "mdi:microsoft-visual-studio-code"
    assert result["a0d7b954_vscode"].admin is True
    assert result["a0d7b954_vscode"].enable is True


@pytest.mark.parametrize(
    ("options", "expected_body"),
    [(None, None), (CreateSessionOptions(user_id="test"), {"user_id": "test"})],
)
async def test_create_session(
    responses: aioresponses,
    supervisor_client: SupervisorClient,
    options: CreateSessionOptions | None,
    expected_body: dict[str, str] | None,
) -> None:
    """Test create session API."""
    session_url = f"{SUPERVISOR_URL}/ingress/session"
    responses.post(session_url, status=200, body=load_fixture("create_session.json"))
    result = await supervisor_client.ingress.create_session(options)
    assert result == "abc123"

    assert responses.requests.keys() == {("POST", URL(session_url))}
    session_calls = responses.requests[("POST", URL(session_url))]
    assert len(session_calls) == 1
    assert session_calls[0].kwargs["json"] == expected_body


async def test_validate_session(
    responses: aioresponses, supervisor_client: SupervisorClient
) -> None:
    """Test validate session API."""
    validate_url = f"{SUPERVISOR_URL}/ingress/validate_session"
    responses.post(validate_url, status=200)
    assert await supervisor_client.ingress.validate_session("abc123") is None

    assert responses.requests.keys() == {("POST", URL(validate_url))}
    validate_calls = responses.requests[("POST", URL(validate_url))]
    assert len(validate_calls) == 1
    assert validate_calls[0].kwargs["json"] == {"session": "abc123"}