1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
import socket
from .base import StatsClientBase, PipelineBase
class StreamPipeline(PipelineBase):
def _send(self):
self._client._after('\n'.join(self._stats))
self._stats.clear()
class StreamClientBase(StatsClientBase):
def connect(self):
raise NotImplementedError()
def close(self):
if self._sock and hasattr(self._sock, 'close'):
self._sock.close()
self._sock = None
def reconnect(self):
self.close()
self.connect()
def pipeline(self):
return StreamPipeline(self)
def _send(self, data):
"""Send data to statsd."""
if not self._sock:
self.connect()
self._do_send(data)
def _do_send(self, data):
self._sock.sendall(data.encode('ascii') + b'\n')
class TCPStatsClient(StreamClientBase):
"""TCP version of StatsClient."""
def __init__(self, host='localhost', port=8125, prefix=None,
timeout=None, ipv6=False):
"""Create a new client."""
self._host = host
self._port = port
self._ipv6 = ipv6
self._timeout = timeout
self._prefix = prefix
self._sock = None
def connect(self):
fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
family, _, _, _, addr = socket.getaddrinfo(
self._host, self._port, fam, socket.SOCK_STREAM)[0]
self._sock = socket.socket(family, socket.SOCK_STREAM)
self._sock.settimeout(self._timeout)
self._sock.connect(addr)
class UnixSocketStatsClient(StreamClientBase):
"""Unix domain socket version of StatsClient."""
def __init__(self, socket_path, prefix=None, timeout=None):
"""Create a new client."""
self._socket_path = socket_path
self._timeout = timeout
self._prefix = prefix
self._sock = None
def connect(self):
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.settimeout(self._timeout)
self._sock.connect(self._socket_path)
|