File: AsynchronousSender.py

package info (click to toggle)
python-applicationinsights 0.11.10-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 876 kB
  • sloc: python: 5,948; makefile: 151; sh: 77
file content (147 lines) | stat: -rw-r--r-- 5,438 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from .SenderBase import SenderBase, DEFAULT_ENDPOINT_URL
from threading import Lock, Thread

class AsynchronousSender(SenderBase):
    """An asynchronous sender that works in conjunction with the :class:`AsynchronousQueue`. The sender object will
    start a worker thread that will pull items from the :func:`queue`. The thread will be created when the client
    calls :func:`start` and will check for queue items every :func:`send_interval` seconds. The worker thread can
    also be forced to check the queue by setting the :func:`flush_notification` event.

    - If no items are found, the thread will go back to sleep.
    - If items are found, the worker thread will send items to the specified service in batches of :func:`send_buffer_size`.

    If no queue items are found for :func:`send_time` seconds, the worker thread will shut down (and :func:`start` will
    need  to be called again).
    """
    def __init__(self, service_endpoint_uri=None):
        """Initializes a new instance of the class.

        Args:
            sender (String) service_endpoint_uri the address of the service to send telemetry data to.
        """
        self._send_interval = 1.0
        self._send_remaining_time = 0
        self._send_time = 3.0
        self._lock_send_remaining_time = Lock()
        SenderBase.__init__(self, service_endpoint_uri or DEFAULT_ENDPOINT_URL)

    @property
    def send_interval(self):
        """The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).

        Args:
            value (int) the interval in seconds.

        Returns:
            int. the interval in seconds.
        """
        return self._send_interval

    @send_interval.setter
    def send_interval(self, value):
        """The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).

        Args:
            value (int) the interval in seconds.

        Returns:
            int. the interval in seconds.
        """
        self._send_interval = value

    @property
    def send_time(self):
        """The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).

        Args:
            value (int) the interval in seconds.

        Returns:
            int. the interval in seconds.
        """
        return self._send_time

    @send_time.setter
    def send_time(self, value):
        """The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).

        Args:
            value (int) the interval in seconds.

        Returns:
            int. the interval in seconds.
        """
        self._send_time = value

    def start(self):
        """Starts a new sender thread if none is not already there
        """
        with self._lock_send_remaining_time:
            if self._send_remaining_time <= 0.0:
                local_send_interval = self._send_interval
                if self._send_interval < 0.1:
                    local_send_interval = 0.1
                self._send_remaining_time = self._send_time
                if self._send_remaining_time < local_send_interval:
                    self._send_remaining_time = local_send_interval
                thread = Thread(target=self._run)
                thread.daemon = True
                thread.start()

    def stop(self):
        """Gracefully stops the sender thread if one is there.
        """
        with self._lock_send_remaining_time:
            self._send_remaining_time = 0.0

    def _run(self):
        # save the queue locally
        local_queue = self._queue
        if not local_queue:
            self.stop()
            return

        # fix up the send interval (can't be lower than 100ms)
        local_send_interval = self._send_interval
        if self._send_interval < 0.1:
            local_send_interval = 0.1
        local_send_time = self._send_time
        if local_send_time < local_send_interval:
            local_send_time = local_send_interval
        while True:
            while True:
                # get at most send_buffer_size items from the queue
                counter = self._send_buffer_size
                data = []
                while counter > 0:
                    item = local_queue.get()
                    if not item:
                        break
                    data.append(item)
                    counter -= 1

                # if we didn't get any items from the queue, we're done here
                if len(data) == 0:
                    break

                # reset the send time
                with self._lock_send_remaining_time:
                    self._send_remaining_time = local_send_time

                # finally send the data
                self.send(data)

            # wait at most send_interval (or until we get signalled)
            result = local_queue.flush_notification.wait(local_send_interval)
            if result:
                local_queue.flush_notification.clear()
                continue

            # decrement the remaining time
            local_remaining_time = 0
            with self._lock_send_remaining_time:
                self._send_remaining_time -= local_send_interval
                local_remaining_time = self._send_remaining_time

            if local_remaining_time <= 0:
                break