File: transport.py

package info (click to toggle)
python-py-zipkin 1.2.8-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 300 kB
  • sloc: python: 1,556; makefile: 3
file content (114 lines) | stat: -rw-r--r-- 4,333 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from typing import Optional
from typing import Tuple
from typing import Union
from urllib.request import Request
from urllib.request import urlopen

from py_zipkin.encoding import detect_span_version_and_encoding
from py_zipkin.encoding import Encoding


class BaseTransportHandler:
    def get_max_payload_bytes(self) -> Optional[int]:  # pragma: no cover
        """Returns the maximum payload size for this transport.

        Most transports have a maximum packet size that can be sent. For example,
        UDP has a 65507 bytes MTU.
        py_zipkin automatically batches collected spans for performance reasons.
        The batch size is going to be the minimum between `get_max_payload_bytes`
        and `max_span_batch_size` from `zipkin_span`.

        If you don't want to enforce a max payload size, return None.

        :returns: max payload size in bytes or None.
        """
        raise NotImplementedError("get_max_payload_bytes is not implemented")

    def send(self, payload: Union[bytes, str]) -> None:  # pragma: no cover
        """Sends the encoded payload over the transport.

        :argument payload: encoded list of spans.
        """
        raise NotImplementedError("send is not implemented")

    def __call__(self, payload: Union[bytes, str]) -> None:
        """Internal wrapper around `send`. Do not override.

        Mostly used to keep backward compatibility with older transports
        implemented as functions. However decoupling the function developers
        override and what's internally called by py_zipkin will allow us to add
        extra logic here in the future without having the users update their
        code every time.
        """
        self.send(payload)


class UnknownEncoding(Exception):
    """Exception class for when encountering an unknown Encoding"""


class SimpleHTTPTransport(BaseTransportHandler):
    def __init__(self, address: str, port: int) -> None:
        """A simple HTTP transport for zipkin.

        This is not production ready (not async, no retries) but
        it's helpful for tests or people trying out py-zipkin.

        .. code-block:: python

            with zipkin_span(
                service_name='my_service',
                span_name='home',
                sample_rate=100,
                transport_handler=SimpleHTTPTransport('localhost', 9411),
                encoding=Encoding.V2_JSON,
            ):
                pass

        :param address: zipkin server address.
        :type address: str
        :param port: zipkin server port.
        :type port: int
        """
        super().__init__()
        self.address = address
        self.port = port

    def get_max_payload_bytes(self) -> Optional[int]:
        return None

    def _get_path_content_type(self, payload: Union[str, bytes]) -> Tuple[str, str]:
        """Choose the right api path and content type depending on the encoding.

        This is not something you'd need to do generally when writing your own
        transport since in that case you'd know which encoding you're using.
        Since this is a generic transport, we need to make it compatible with
        any encoding instead.
        """
        encoded_payload = (
            payload.encode("utf-8") if isinstance(payload, str) else payload
        )
        encoding = detect_span_version_and_encoding(encoded_payload)

        if encoding == Encoding.V1_JSON:
            return "/api/v1/spans", "application/json"
        elif encoding == Encoding.V1_THRIFT:
            return "/api/v1/spans", "application/x-thrift"
        elif encoding == Encoding.V2_JSON:
            return "/api/v2/spans", "application/json"
        elif encoding == Encoding.V2_PROTO3:
            return "/api/v2/spans", "application/x-protobuf"
        else:  # pragma: nocover
            raise UnknownEncoding(f"Unknown encoding: {encoding}")

    def send(self, payload: Union[str, bytes]) -> None:
        encoded_payload = (
            payload.encode("utf-8") if isinstance(payload, str) else payload
        )
        path, content_type = self._get_path_content_type(encoded_payload)
        url = f"http://{self.address}:{self.port}{path}"

        req = Request(url, encoded_payload, {"Content-Type": content_type})
        response = urlopen(req)

        assert response.getcode() == 202