1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
import os
from threading import Thread, Lock
from time import sleep, time
from sentry_sdk._compat import queue, check_thread_support
from sentry_sdk.utils import logger
from sentry_sdk._types import MYPY
if MYPY:
from queue import Queue
from typing import Any
from typing import Optional
from typing import Callable
_TERMINATOR = object()
class BackgroundWorker(object):
def __init__(self):
# type: () -> None
check_thread_support()
self._queue = queue.Queue(30) # type: Queue[Any]
self._lock = Lock()
self._thread = None # type: Optional[Thread]
self._thread_for_pid = None # type: Optional[int]
@property
def is_alive(self):
# type: () -> bool
if self._thread_for_pid != os.getpid():
return False
if not self._thread:
return False
return self._thread.is_alive()
def _ensure_thread(self):
# type: () -> None
if not self.is_alive:
self.start()
def _timed_queue_join(self, timeout):
# type: (float) -> bool
deadline = time() + timeout
queue = self._queue
real_all_tasks_done = getattr(
queue, "all_tasks_done", None
) # type: Optional[Any]
if real_all_tasks_done is not None:
real_all_tasks_done.acquire()
all_tasks_done = real_all_tasks_done # type: Optional[Any]
elif queue.__module__.startswith("eventlet."):
all_tasks_done = getattr(queue, "_cond", None)
else:
all_tasks_done = None
try:
while queue.unfinished_tasks:
delay = deadline - time()
if delay <= 0:
return False
if all_tasks_done is not None:
all_tasks_done.wait(timeout=delay)
else:
# worst case, we just poll the number of remaining tasks
sleep(0.1)
return True
finally:
if real_all_tasks_done is not None:
real_all_tasks_done.release()
def start(self):
# type: () -> None
with self._lock:
if not self.is_alive:
self._thread = Thread(
target=self._target, name="raven-sentry.BackgroundWorker"
)
self._thread.setDaemon(True)
self._thread.start()
self._thread_for_pid = os.getpid()
def kill(self):
# type: () -> None
"""
Kill worker thread. Returns immediately. Not useful for
waiting on shutdown for events, use `flush` for that.
"""
logger.debug("background worker got kill request")
with self._lock:
if self._thread:
try:
self._queue.put_nowait(_TERMINATOR)
except queue.Full:
logger.debug("background worker queue full, kill failed")
self._thread = None
self._thread_for_pid = None
def flush(self, timeout, callback=None):
# type: (float, Optional[Any]) -> None
logger.debug("background worker got flush request")
with self._lock:
if self.is_alive and timeout > 0.0:
self._wait_flush(timeout, callback)
logger.debug("background worker flushed")
def _wait_flush(self, timeout, callback):
# type: (float, Optional[Any]) -> None
initial_timeout = min(0.1, timeout)
if not self._timed_queue_join(initial_timeout):
pending = self._queue.qsize()
logger.debug("%d event(s) pending on flush", pending)
if callback is not None:
callback(pending, timeout)
self._timed_queue_join(timeout - initial_timeout)
def submit(self, callback):
# type: (Callable[[], None]) -> None
self._ensure_thread()
try:
self._queue.put_nowait(callback)
except queue.Full:
logger.debug("background worker queue full, dropping event")
def _target(self):
# type: () -> None
while True:
callback = self._queue.get()
try:
if callback is _TERMINATOR:
break
try:
callback()
except Exception:
logger.error("Failed processing job", exc_info=True)
finally:
self._queue.task_done()
sleep(0)
|