1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
from .SenderBase import SenderBase, DEFAULT_ENDPOINT_URL
from threading import Lock, Thread
class AsynchronousSender(SenderBase):
"""An asynchronous sender that works in conjunction with the :class:`AsynchronousQueue`. The sender object will
start a worker thread that will pull items from the :func:`queue`. The thread will be created when the client
calls :func:`start` and will check for queue items every :func:`send_interval` seconds. The worker thread can
also be forced to check the queue by setting the :func:`flush_notification` event.
- If no items are found, the thread will go back to sleep.
- If items are found, the worker thread will send items to the specified service in batches of :func:`send_buffer_size`.
If no queue items are found for :func:`send_time` seconds, the worker thread will shut down (and :func:`start` will
need to be called again).
"""
def __init__(self, service_endpoint_uri=None):
"""Initializes a new instance of the class.
Args:
sender (String) service_endpoint_uri the address of the service to send telemetry data to.
"""
self._send_interval = 1.0
self._send_remaining_time = 0
self._send_time = 3.0
self._lock_send_remaining_time = Lock()
SenderBase.__init__(self, service_endpoint_uri or DEFAULT_ENDPOINT_URL)
@property
def send_interval(self):
"""The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).
Args:
value (int) the interval in seconds.
Returns:
int. the interval in seconds.
"""
return self._send_interval
@send_interval.setter
def send_interval(self, value):
"""The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).
Args:
value (int) the interval in seconds.
Returns:
int. the interval in seconds.
"""
self._send_interval = value
@property
def send_time(self):
"""The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).
Args:
value (int) the interval in seconds.
Returns:
int. the interval in seconds.
"""
return self._send_time
@send_time.setter
def send_time(self, value):
"""The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).
Args:
value (int) the interval in seconds.
Returns:
int. the interval in seconds.
"""
self._send_time = value
def start(self):
"""Starts a new sender thread if none is not already there
"""
with self._lock_send_remaining_time:
if self._send_remaining_time <= 0.0:
local_send_interval = self._send_interval
if self._send_interval < 0.1:
local_send_interval = 0.1
self._send_remaining_time = self._send_time
if self._send_remaining_time < local_send_interval:
self._send_remaining_time = local_send_interval
thread = Thread(target=self._run)
thread.daemon = True
thread.start()
def stop(self):
"""Gracefully stops the sender thread if one is there.
"""
with self._lock_send_remaining_time:
self._send_remaining_time = 0.0
def _run(self):
# save the queue locally
local_queue = self._queue
if not local_queue:
self.stop()
return
# fix up the send interval (can't be lower than 100ms)
local_send_interval = self._send_interval
if self._send_interval < 0.1:
local_send_interval = 0.1
local_send_time = self._send_time
if local_send_time < local_send_interval:
local_send_time = local_send_interval
while True:
while True:
# get at most send_buffer_size items from the queue
counter = self._send_buffer_size
data = []
while counter > 0:
item = local_queue.get()
if not item:
break
data.append(item)
counter -= 1
# if we didn't get any items from the queue, we're done here
if len(data) == 0:
break
# reset the send time
with self._lock_send_remaining_time:
self._send_remaining_time = local_send_time
# finally send the data
self.send(data)
# wait at most send_interval (or until we get signalled)
result = local_queue.flush_notification.wait(local_send_interval)
if result:
local_queue.flush_notification.clear()
continue
# decrement the remaining time
local_remaining_time = 0
with self._lock_send_remaining_time:
self._send_remaining_time -= local_send_interval
local_remaining_time = self._send_remaining_time
if local_remaining_time <= 0:
break
|