1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
from unittest.mock import Mock
from celery.worker.heartbeat import Heart
class MockDispatcher:
heart = None
next_iter = 0
def __init__(self):
self.sent = []
self.on_enabled = set()
self.on_disabled = set()
self.enabled = True
def send(self, msg, **_fields):
self.sent.append((msg, _fields))
if self.heart:
if self.next_iter > 10:
self.heart._shutdown.set()
self.next_iter += 1
class MockTimer:
def call_repeatedly(self, secs, fun, args=(), kwargs={}):
class entry(tuple):
canceled = False
def cancel(self):
self.canceled = True
return entry((secs, fun, args, kwargs))
def cancel(self, entry):
entry.cancel()
class test_Heart:
def test_start_stop(self):
timer = MockTimer()
eventer = MockDispatcher()
h = Heart(timer, eventer, interval=1)
h.start()
assert h.tref
h.stop()
assert h.tref is None
h.stop()
def test_send_sends_signal(self):
h = Heart(MockTimer(), MockDispatcher(), interval=1)
h._send_sent_signal = None
h._send('worker-heartbeat')
h._send_sent_signal = Mock(name='send_sent_signal')
h._send('worker')
h._send_sent_signal.assert_called_with(sender=h)
def test_start_when_disabled(self):
timer = MockTimer()
eventer = MockDispatcher()
eventer.enabled = False
h = Heart(timer, eventer)
h.start()
assert not h.tref
assert not eventer.sent
def test_stop_when_disabled(self):
timer = MockTimer()
eventer = MockDispatcher()
eventer.enabled = False
h = Heart(timer, eventer)
h.stop()
assert not eventer.sent
def test_message_retries(self):
timer = MockTimer()
eventer = MockDispatcher()
eventer.enabled = True
h = Heart(timer, eventer, interval=1)
h.start()
assert eventer.sent[-1][0] == "worker-online"
# Invoke a heartbeat
h.tref[1](*h.tref[2], **h.tref[3])
assert eventer.sent[-1][0] == "worker-heartbeat"
assert eventer.sent[-1][1]["retry"]
h.stop()
assert eventer.sent[-1][0] == "worker-offline"
assert not eventer.sent[-1][1]["retry"]
|