File: statsd.py

package info (click to toggle)
python-urllib3 2.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,340 kB
  • sloc: python: 26,167; makefile: 122; javascript: 92; sh: 11
file content (26 lines) | stat: -rw-r--r-- 769 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from __future__ import annotations

import asyncio
from typing import Optional

from ..config import Config
from ..statsd import StatsdLogger as Base


class _DummyProto(asyncio.DatagramProtocol):
    pass


class StatsdLogger(Base):
    def __init__(self, config: Config) -> None:
        super().__init__(config)
        self.address = config.statsd_host.rsplit(":", 1)
        self.transport: Optional[asyncio.BaseTransport] = None

    async def _socket_send(self, message: bytes) -> None:
        if self.transport is None:
            self.transport, _ = await asyncio.get_event_loop().create_datagram_endpoint(
                _DummyProto, remote_addr=(self.address[0], int(self.address[1]))
            )

        self.transport.sendto(message)  # type: ignore