File: asyncsender.py

package info (click to toggle)
python-fluent-logger 0.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 216 kB
  • sloc: python: 1,804; makefile: 3
file content (149 lines) | stat: -rw-r--r-- 4,186 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import threading
from queue import Empty, Full, Queue

from fluent import sender
from fluent.sender import EventTime

__all__ = ["EventTime", "FluentSender"]

DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = False

_TOMBSTONE = object()

_global_sender = None


def _set_global_sender(sender):  # pragma: no cover
    """[For testing] Function to set global sender directly"""
    global _global_sender
    _global_sender = sender


def setup(tag, **kwargs):  # pragma: no cover
    global _global_sender
    _global_sender = FluentSender(tag, **kwargs)


def get_global_sender():  # pragma: no cover
    return _global_sender


def close():  # pragma: no cover
    get_global_sender().close()


class FluentSender(sender.FluentSender):
    def __init__(
        self,
        tag,
        host="localhost",
        port=24224,
        bufmax=1 * 1024 * 1024,
        timeout=3.0,
        verbose=False,
        buffer_overflow_handler=None,
        nanosecond_precision=False,
        msgpack_kwargs=None,
        queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
        queue_circular=DEFAULT_QUEUE_CIRCULAR,
        queue_overflow_handler=None,
        **kwargs,
    ):
        """
        :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
        """
        super().__init__(
            tag=tag,
            host=host,
            port=port,
            bufmax=bufmax,
            timeout=timeout,
            verbose=verbose,
            buffer_overflow_handler=buffer_overflow_handler,
            nanosecond_precision=nanosecond_precision,
            msgpack_kwargs=msgpack_kwargs,
            **kwargs,
        )
        self._queue_maxsize = queue_maxsize
        self._queue_circular = queue_circular
        if queue_circular and queue_overflow_handler:
            self._queue_overflow_handler = queue_overflow_handler
        else:
            self._queue_overflow_handler = self._queue_overflow_handler_default

        self._thread_guard = (
            threading.Event()
        )  # This ensures visibility across all variables
        self._closed = False

        self._queue = Queue(maxsize=queue_maxsize)
        self._send_thread = threading.Thread(
            target=self._send_loop, name="AsyncFluentSender %d" % id(self)
        )
        self._send_thread.daemon = True
        self._send_thread.start()

    def close(self, flush=True):
        with self.lock:
            if self._closed:
                return
            self._closed = True
            if not flush:
                while True:
                    try:
                        self._queue.get(block=False)
                    except Empty:
                        break
            self._queue.put(_TOMBSTONE)
            self._send_thread.join()

    @property
    def queue_maxsize(self):
        return self._queue_maxsize

    @property
    def queue_blocking(self):
        return not self._queue_circular

    @property
    def queue_circular(self):
        return self._queue_circular

    def _send(self, bytes_):
        with self.lock:
            if self._closed:
                return False
            if self._queue_circular and self._queue.full():
                # discard oldest
                try:
                    discarded_bytes = self._queue.get(block=False)
                except Empty:  # pragma: no cover
                    pass
                else:
                    self._queue_overflow_handler(discarded_bytes)
            try:
                self._queue.put(bytes_, block=(not self._queue_circular))
            except Full:  # pragma: no cover
                return False  # this actually can't happen

            return True

    def _send_loop(self):
        send_internal = super()._send_internal

        try:
            while True:
                bytes_ = self._queue.get(block=True)
                if bytes_ is _TOMBSTONE:
                    break

                send_internal(bytes_)
        finally:
            self._close()

    def _queue_overflow_handler_default(self, discarded_bytes):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()