File: udp_emitter.py

package info (click to toggle)
python-aws-xray-sdk 0.95-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 792 kB
  • sloc: python: 3,006; makefile: 20
file content (64 lines) | stat: -rw-r--r-- 2,224 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import socket
import logging

from ..exceptions.exceptions import InvalidDaemonAddressException

log = logging.getLogger(__name__)


PROTOCOL_HEADER = "{\"format\": \"json\", \"version\": 1}"
PROTOCOL_DELIMITER = '\n'
DAEMON_ADDRESS_KEY = "AWS_XRAY_DAEMON_ADDRESS"


class UDPEmitter(object):
    """
    The default emitter the X-Ray recorder uses to send segments/subsegments
    to the X-Ray daemon over UDP using a non-blocking socket. If there is an
    exception on the actual data transfer between the socket and the daemon,
    it logs the exception and continue.
    """
    def __init__(self, daemon_address='127.0.0.1:2000'):

        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setblocking(0)
        address = os.getenv(DAEMON_ADDRESS_KEY, daemon_address)
        self._ip, self._port = self._parse_address(address)

    def send_entity(self, entity):
        """
        Serializes a segment/subsegment and sends it to the X-Ray daemon
        over UDP. By default it doesn't retry on failures.

        :param entity: a trace entity to send to the X-Ray daemon
        """
        message = "%s%s%s" % (PROTOCOL_HEADER,
                              PROTOCOL_DELIMITER,
                              entity.serialize())

        log.debug("sending: %s to %s:%s." % (message, self._ip, self._port))
        self._send_data(message)

    def set_daemon_address(self, address):
        """
        Takes a full address like 127.0.0.1:2000 and parses it into ip address
        and port. Throws an exception if the address has invalid format.
        """
        if address:
            self._ip, self._port = self._parse_address(address)

    def _send_data(self, data):

        try:
            self._socket.sendto(data.encode('utf-8'), (self._ip,
                                self._port))
        except Exception:
            log.exception('failed to send data to X-Ray daemon.')

    def _parse_address(self, daemon_address):
        try:
            val = daemon_address.split(':')
            return val[0], int(val[1])
        except Exception:
            raise InvalidDaemonAddressException('Invalid daemon address %s specified.' % daemon_address)