File: devices.py

package info (click to toggle)
python-pyflume 0.8.7-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 332 kB
  • sloc: python: 857; makefile: 6
file content (66 lines) | stat: -rw-r--r-- 1,676 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""Retrieve Devices from Flume API."""

from requests import Session

from .constants import API_DEVICES_URL, DEFAULT_TIMEOUT  # noqa: WPS300
from .utils import configure_logger, flume_response_error  # noqa: WPS300

# Configure logging
LOGGER = configure_logger(__name__)


class FlumeDeviceList:
    """Get Flume Device List from API."""

    def __init__(
        self,
        flume_auth,
        http_session=None,
        timeout=DEFAULT_TIMEOUT,
    ):
        """

        Initialize the data object.

        Args:
            flume_auth: Authentication object.
            http_session: Requests Session()
            timeout: Requests timeout for throttling.

        """
        self._timeout = timeout
        self._flume_auth = flume_auth

        if http_session is None:
            self._http_session = Session()
        else:
            self._http_session = http_session

        self.device_list = self.get_devices()

    def get_devices(self):
        """
        Return all available devices from Flume API.

        Returns:
            Json device list.

        """

        url = API_DEVICES_URL.format(user_id=self._flume_auth.user_id)
        query_string = {"user": "true", "location": "true"}

        response = self._http_session.request(
            "GET",
            url,
            headers=self._flume_auth.authorization_header,
            params=query_string,
            timeout=self._timeout,
        )

        LOGGER.debug("get_devices Response: %s", response.text)  # noqa: WPS323

        # Check for response errors.
        flume_response_error("Impossible to retreive devices", response)

        return response.json()["data"]