File: uds.py

package info (click to toggle)
python-podman 5.4.0.1-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,140 kB
  • sloc: python: 7,532; makefile: 82; sh: 75
file content (179 lines) | stat: -rw-r--r-- 6,000 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""Specialized Transport Adapter for UNIX domain sockets."""

import collections
import functools
import logging
import socket
from typing import Optional, Union
from urllib.parse import unquote, urlparse

import urllib3
import urllib3.connection

from requests.adapters import DEFAULT_POOLBLOCK, DEFAULT_POOLSIZE, DEFAULT_RETRIES, HTTPAdapter

from ..errors import APIError

from .adapter_utils import _key_normalizer

logger = logging.getLogger("podman.uds_adapter")


class UDSSocket(socket.socket):
    """Specialization of socket.socket for a UNIX domain socket."""

    def __init__(self, uds: str):
        """Initialize UDSSocket.

        Args:
            uds: Full address of a Podman service UNIX domain socket.

        Examples:
            UDSSocket("http+unix:///run/podman/podman.sock")
        """
        super().__init__(socket.AF_UNIX, socket.SOCK_STREAM)
        self.uds = uds

    def connect(self, **kwargs):  # pylint: disable=unused-argument
        """Returns socket for UNIX domain socket."""
        netloc = unquote(urlparse(self.uds).netloc)
        try:
            super().connect(netloc)
        except Exception as e:
            raise APIError(f"Unable to make connection to UDS '{netloc}'") from e


class UDSConnection(urllib3.connection.HTTPConnection):
    """Specialization of HTTPConnection to use a UNIX domain sockets."""

    def __init__(
        self,
        host: str,
        port: int,
        timeout: Union[float, urllib3.Timeout, None] = None,
        strict=False,
        **kwargs,  # pylint: disable=unused-argument
    ):
        """Initialize connection to UNIX domain socket for HTTP client.

        Args:
            host: Ignored.
            port: Ignored.
            timeout: Time to allow for operation.
            strict: Ignored.

        Keyword Args:
            uds: Full address of a Podman service UNIX domain socket. Required.
        """
        connection_kwargs = kwargs.copy()
        self.sock: Optional[socket.socket] = None

        if timeout is not None:
            if isinstance(timeout, urllib3.Timeout):
                try:
                    connection_kwargs["timeout"] = float(timeout.total)
                except TypeError:
                    pass
            connection_kwargs["timeout"] = timeout

        self.uds = connection_kwargs.pop("uds")
        super().__init__(host, **connection_kwargs)

    def connect(self) -> None:
        """Connect to Podman service via UNIX domain socket."""
        sock = UDSSocket(self.uds)
        sock.settimeout(self.timeout)
        sock.connect()
        self.sock = sock


class UDSConnectionPool(urllib3.HTTPConnectionPool):
    """Specialization of HTTPConnectionPool for holding UNIX domain sockets."""

    ConnectionCls = UDSConnection  # pylint: disable=invalid-name


class UDSPoolManager(urllib3.PoolManager):
    """Specialized PoolManager for tracking UNIX domain socket connections."""

    # pylint's special handling for namedtuple does not cover this usage
    # pylint: disable=invalid-name
    _PoolKey = collections.namedtuple(
        "_PoolKey", urllib3.poolmanager.PoolKey._fields + ("key_uds",)
    )

    # Map supported schemes to Pool Classes
    _pool_classes_by_scheme = {
        "http": UDSConnectionPool,
        "http+ssh": UDSConnectionPool,
    }

    # Map supported schemes to Pool Key index generator
    _key_fn_by_scheme = {
        "http": functools.partial(_key_normalizer, _PoolKey),
        "http+ssh": functools.partial(_key_normalizer, _PoolKey),
    }

    def __init__(self, num_pools=10, headers=None, **kwargs):
        """Initialize UDSPoolManager.

        Args:
            num_pools: Number of UDS Connection pools to maintain.
            headers: Additional headers to add to operations.
        """
        super().__init__(num_pools, headers, **kwargs)
        self.pool_classes_by_scheme = UDSPoolManager._pool_classes_by_scheme
        self.key_fn_by_scheme = UDSPoolManager._key_fn_by_scheme


class UDSAdapter(HTTPAdapter):
    """Specialization of requests transport adapter for UNIX domain sockets."""

    def __init__(
        self,
        uds: str,
        pool_connections=DEFAULT_POOLSIZE,
        pool_maxsize=DEFAULT_POOLSIZE,
        max_retries=DEFAULT_RETRIES,
        pool_block=DEFAULT_POOLBLOCK,
        **kwargs,
    ):  # pylint: disable=too-many-positional-arguments
        """Initialize UDSAdapter.

        Args:
            uds: Full address of a Podman service UNIX domain socket.
                Format, http+unix:///run/podman/podman.sock
            max_retries: The maximum number of retries each connection should attempt.
            pool_block: Whether the connection pool should block for connections.
            pool_connections: The number of connection pools to cache.
            pool_maxsize: The maximum number of connections to save in the pool.

        Keyword Args:
            timeout (float): Time in seconds to wait for response

        Examples:
            requests.Session.mount(
                "http://", UDSAdapter("http+unix:///run/user/1000/podman/podman.sock"))
        """
        self.poolmanager: Optional[UDSPoolManager] = None

        self._pool_kwargs = {"uds": uds}

        if "timeout" in kwargs:
            self._pool_kwargs["timeout"] = kwargs.get("timeout")

        super().__init__(pool_connections, pool_maxsize, max_retries, pool_block)

    def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **kwargs):
        """Initialize UDS Pool Manager.

        Args:
            connections: The number of urllib3 connection pools to cache.
            maxsize: The maximum number of connections to save in the pool.
            block: Block when no free connections are available.
        """
        pool_kwargs = kwargs.copy()
        pool_kwargs.update(self._pool_kwargs)
        self.poolmanager = UDSPoolManager(
            num_pools=connections, maxsize=maxsize, block=block, **pool_kwargs
        )