File: server.py

package info (click to toggle)
debusine 0.14.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,056 kB
  • sloc: python: 193,072; sh: 848; javascript: 335; makefile: 116
file content (106 lines) | stat: -rw-r--r-- 3,524 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Debusine server controller."""

import hashlib
import logging

import requests
import yaml

from utils import common
from utils.common import Configuration, RunResult
from utils.waiter import Waiter

logger = logging.getLogger(__name__)


class DebusineServer:
    """Interacts with the debusine-server: execute debusine-admin commands."""

    @staticmethod
    def wait_for_server_ready() -> bool:
        """
        Wait up to 30 seconds for the server to be ready.

        Return True/False depending on if the server is ready or not.
        """

        def is_ready() -> bool:
            """Return True if {self._api_url}/ is available."""
            response = requests.get(f"{Configuration.get_base_url()}/")
            return response.status_code < 400

        return Waiter.wait_for_success(30, is_ready)

    @staticmethod
    def restart() -> RunResult:
        """Restart (via systemctl) debusine-server."""
        return common.run(["systemctl", "restart", "debusine-server"])

    @staticmethod
    def execute_command(
        command: str,
        *args: str,
        user: str = Configuration.DEBUSINE_SERVER_USER,
        stdin: str | None = None,
    ) -> RunResult:
        """Execute a debusine server management command."""
        cmd = ["debusine-admin"] + [command] + [*args]
        result = common.run_as(user, cmd, stdin=stdin)

        if result.stdout:
            logger.info("stdout: %s", result.stdout)
        if result.stderr:
            logger.info("stderr: %s", result.stderr)

        return result

    @classmethod
    def verify_worker(
        cls,
        token: str,
        connected: bool,
        enabled: bool,
        list_workers: str | None = None,
    ) -> bool:
        """
        Return True if ``worker list`` has token with the specified state.

        :param token: token that is being verified
        :param connected: True if it is expected that the worker is connected
        :param enabled: True if it is expected that the worker's token enabled
        :param list_workers: output of the command ``worker list --yaml`` or
          None.  If None it executes ``worker list --yaml``
        :return: True if a token is connected and enabled as per connected
          and enabled parameters
        """
        if list_workers is None:
            list_workers_output = cls.execute_command(
                "worker", "list", "--yaml"
            ).stdout

        token_hash = hashlib.sha256(token.encode()).hexdigest()
        for worker in yaml.safe_load(list_workers_output):
            if worker["hash"] == token_hash:
                return (
                    bool(worker["connected"]) == connected
                    and worker["enabled"] == enabled
                )

        return False

    @classmethod
    def wait_for_worker_connected(cls, token: str) -> bool:
        """Wait up to 30 seconds for the worker to be connected."""

        def token_is_connected() -> bool:
            return cls.verify_worker(token, connected=True, enabled=True)

        return Waiter.wait_for_success(30, token_is_connected)