File: _decoders.py

package info (click to toggle)
python-py-zipkin 1.2.8-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 300 kB
  • sloc: python: 1,556; makefile: 3
file content (37 lines) | stat: -rw-r--r-- 1,215 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import logging
from typing import List

from py_zipkin.encoding._helpers import Span
from py_zipkin.encoding._types import Encoding
from py_zipkin.exception import ZipkinError

log = logging.getLogger("py_zipkin.encoding")


def get_decoder(encoding: Encoding) -> "IDecoder":
    """Creates encoder object for the given encoding.
    :param encoding: desired output encoding protocol
    :type encoding: Encoding
    :return: corresponding IDecoder object
    :rtype: IDecoder
    """
    if encoding == Encoding.V1_THRIFT:
        raise NotImplementedError(f"{encoding} decoding no longer supported")
    if encoding == Encoding.V1_JSON:
        raise NotImplementedError(f"{encoding} decoding not yet implemented")
    if encoding == Encoding.V2_JSON:
        raise NotImplementedError(f"{encoding} decoding not yet implemented")
    raise ZipkinError(f"Unknown encoding: {encoding}")


class IDecoder:
    """Decoder interface."""

    def decode_spans(self, spans: bytes) -> List[Span]:
        """Decodes an encoded list of spans.
        :param spans: encoded list of spans
        :type spans: bytes
        :return: list of spans
        :rtype: list of Span
        """
        raise NotImplementedError()