1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
import gc
import sys
import time
from celery.utils.dispatch import Signal
if sys.platform.startswith('java'):
def garbage_collect():
# Some JVM GCs will execute finalizers in a different thread, meaning
# we need to wait for that to complete before we go on looking for the
# effects of that.
gc.collect()
time.sleep(0.1)
elif hasattr(sys, 'pypy_version_info'):
def garbage_collect():
# Collecting weakreferences can take two collections on PyPy.
gc.collect()
gc.collect()
else:
def garbage_collect():
gc.collect()
def receiver_1_arg(val, **kwargs):
return val
class Callable:
def __call__(self, val, **kwargs):
return val
def a(self, val, **kwargs):
return val
a_signal = Signal(providing_args=['val'], use_caching=False)
class test_Signal:
"""Test suite for dispatcher (barely started)"""
def _testIsClean(self, signal):
"""Assert that everything has been cleaned up automatically"""
assert not signal.has_listeners()
assert signal.receivers == []
def test_exact(self):
a_signal.connect(receiver_1_arg, sender=self)
try:
expected = [(receiver_1_arg, 'test')]
result = a_signal.send(sender=self, val='test')
assert result == expected
finally:
a_signal.disconnect(receiver_1_arg, sender=self)
self._testIsClean(a_signal)
def test_ignored_sender(self):
a_signal.connect(receiver_1_arg)
try:
expected = [(receiver_1_arg, 'test')]
result = a_signal.send(sender=self, val='test')
assert result == expected
finally:
a_signal.disconnect(receiver_1_arg)
self._testIsClean(a_signal)
def test_garbage_collected(self):
a = Callable()
a_signal.connect(a.a, sender=self)
expected = []
del a
garbage_collect()
result = a_signal.send(sender=self, val='test')
assert result == expected
self._testIsClean(a_signal)
def test_multiple_registration(self):
a = Callable()
result = None
try:
a_signal.connect(a)
a_signal.connect(a)
a_signal.connect(a)
a_signal.connect(a)
a_signal.connect(a)
a_signal.connect(a)
result = a_signal.send(sender=self, val='test')
assert len(result) == 1
assert len(a_signal.receivers) == 1
finally:
del a
del result
garbage_collect()
self._testIsClean(a_signal)
def test_uid_registration(self):
def uid_based_receiver_1(**kwargs):
pass
def uid_based_receiver_2(**kwargs):
pass
a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
try:
a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
assert len(a_signal.receivers) == 1
finally:
a_signal.disconnect(dispatch_uid='uid')
self._testIsClean(a_signal)
def test_robust(self):
def fails(val, **kwargs):
raise ValueError('this')
a_signal.connect(fails)
try:
a_signal.send(sender=self, val='test')
finally:
a_signal.disconnect(fails)
self._testIsClean(a_signal)
def test_disconnection(self):
receiver_1 = Callable()
receiver_2 = Callable()
receiver_3 = Callable()
try:
try:
a_signal.connect(receiver_1)
a_signal.connect(receiver_2)
a_signal.connect(receiver_3)
finally:
a_signal.disconnect(receiver_1)
del receiver_2
garbage_collect()
finally:
a_signal.disconnect(receiver_3)
self._testIsClean(a_signal)
def test_retry(self):
class non_local:
counter = 1
def succeeds_eventually(val, **kwargs):
non_local.counter += 1
if non_local.counter < 3:
raise ValueError('this')
return val
a_signal.connect(succeeds_eventually, sender=self, retry=True)
try:
result = a_signal.send(sender=self, val='test')
assert non_local.counter == 3
assert result[0][1] == 'test'
finally:
a_signal.disconnect(succeeds_eventually, sender=self)
self._testIsClean(a_signal)
def test_retry_with_dispatch_uid(self):
uid = 'abc123'
a_signal.connect(receiver_1_arg, sender=self, retry=True,
dispatch_uid=uid)
assert a_signal.receivers[0][0][0] == uid
a_signal.disconnect(receiver_1_arg, sender=self, dispatch_uid=uid)
self._testIsClean(a_signal)
def test_boundmethod(self):
a = Callable()
a_signal.connect(a.a, sender=self)
expected = [(a.a, 'test')]
garbage_collect()
result = a_signal.send(sender=self, val='test')
assert result == expected
del a, result, expected
garbage_collect()
self._testIsClean(a_signal)
def test_disconnect_retryable_decorator(self):
# Regression test for https://github.com/celery/celery/issues/9119
@a_signal.connect(sender=self, retry=True)
def succeeds_eventually(val, **kwargs):
return val
try:
a_signal.send(sender=self, val='test')
finally:
a_signal.disconnect(succeeds_eventually, sender=self)
self._testIsClean(a_signal)
|