1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
import os
import pickle
import sys
from importlib import import_module
from time import time
from unittest.mock import Mock, patch
import pytest
from celery import uuid
from celery.exceptions import WorkerShutdown, WorkerTerminate
from celery.platforms import EX_OK
from celery.utils.collections import LimitedSet
from celery.worker import state
@pytest.fixture
def reset_state():
yield
state.active_requests.clear()
state.revoked.clear()
state.revoked_stamps.clear()
state.total_count.clear()
class MockShelve(dict):
filename = None
in_sync = False
closed = False
def open(self, filename, **kwargs):
self.filename = filename
return self
def sync(self):
self.in_sync = True
def close(self):
self.closed = True
class MyPersistent(state.Persistent):
storage = MockShelve()
class test_maybe_shutdown:
def teardown_method(self):
state.should_stop = None
state.should_terminate = None
def test_should_stop(self):
state.should_stop = True
with pytest.raises(WorkerShutdown):
state.maybe_shutdown()
state.should_stop = 0
with pytest.raises(WorkerShutdown):
state.maybe_shutdown()
state.should_stop = False
try:
state.maybe_shutdown()
except SystemExit:
raise RuntimeError('should not have exited')
state.should_stop = None
try:
state.maybe_shutdown()
except SystemExit:
raise RuntimeError('should not have exited')
state.should_stop = 0
try:
state.maybe_shutdown()
except SystemExit as exc:
assert exc.code == 0
else:
raise RuntimeError('should have exited')
state.should_stop = 303
try:
state.maybe_shutdown()
except SystemExit as exc:
assert exc.code == 303
else:
raise RuntimeError('should have exited')
@pytest.mark.parametrize('should_stop', (None, False, True, EX_OK))
def test_should_terminate(self, should_stop):
state.should_stop = should_stop
state.should_terminate = True
with pytest.raises(WorkerTerminate):
state.maybe_shutdown()
@pytest.mark.usefixtures('reset_state')
class test_Persistent:
@pytest.fixture
def p(self):
return MyPersistent(state, filename='celery-state')
def test_close_twice(self, p):
p._is_open = False
p.close()
def test_constructor(self, p):
assert p.db == {}
assert p.db.filename == p.filename
def test_save(self, p):
p.db['foo'] = 'bar'
p.save()
assert p.db.in_sync
assert p.db.closed
def add_revoked(self, p, *ids):
for id in ids:
p.db.setdefault('revoked', LimitedSet()).add(id)
def test_merge(self, p, data=['foo', 'bar', 'baz']):
state.revoked.update(data)
p.merge()
for item in data:
assert item in state.revoked
def test_merge_dict(self, p):
p.clock = Mock()
p.clock.adjust.return_value = 626
d = {'revoked': {'abc': time()}, 'clock': 313}
p._merge_with(d)
p.clock.adjust.assert_called_with(313)
assert d['clock'] == 626
assert 'abc' in state.revoked
def test_sync_clock_and_purge(self, p):
passthrough = Mock()
passthrough.side_effect = lambda x: x
with patch('celery.worker.state.revoked') as revoked:
d = {'clock': 0}
p.clock = Mock()
p.clock.forward.return_value = 627
p._dumps = passthrough
p.compress = passthrough
p._sync_with(d)
revoked.purge.assert_called_with()
assert d['clock'] == 627
assert 'revoked' not in d
assert d['zrevoked'] is revoked
def test_sync(self, p,
data1=['foo', 'bar', 'baz'], data2=['baz', 'ini', 'koz']):
self.add_revoked(p, *data1)
for item in data2:
state.revoked.add(item)
p.sync()
assert p.db['zrevoked']
pickled = p.decompress(p.db['zrevoked'])
assert pickled
saved = pickle.loads(pickled)
for item in data2:
assert item in saved
class SimpleReq:
def __init__(self, name):
self.id = uuid()
self.name = name
@pytest.mark.usefixtures('reset_state')
class test_state:
def test_accepted(self, requests=[SimpleReq('foo'),
SimpleReq('bar'),
SimpleReq('baz'),
SimpleReq('baz')]):
for request in requests:
state.task_accepted(request)
for req in requests:
assert req in state.active_requests
assert state.total_count['foo'] == 1
assert state.total_count['bar'] == 1
assert state.total_count['baz'] == 2
def test_ready(self, requests=[SimpleReq('foo'),
SimpleReq('bar')]):
for request in requests:
state.task_accepted(request)
assert len(state.active_requests) == 2
for request in requests:
state.task_ready(request)
assert len(state.active_requests) == 0
class test_state_configuration():
@staticmethod
def import_state():
with patch.dict(sys.modules):
del sys.modules['celery.worker.state']
return import_module('celery.worker.state')
@patch.dict(os.environ, {
'CELERY_WORKER_REVOKES_MAX': '50001',
'CELERY_WORKER_SUCCESSFUL_MAX': '1001',
'CELERY_WORKER_REVOKE_EXPIRES': '10801',
'CELERY_WORKER_SUCCESSFUL_EXPIRES': '10801',
})
def test_custom_configuration(self):
state = self.import_state()
assert state.REVOKES_MAX == 50001
assert state.SUCCESSFUL_MAX == 1001
assert state.REVOKE_EXPIRES == 10801
assert state.SUCCESSFUL_EXPIRES == 10801
def test_default_configuration(self):
state = self.import_state()
assert state.REVOKES_MAX == 50000
assert state.SUCCESSFUL_MAX == 1000
assert state.REVOKE_EXPIRES == 10800
assert state.SUCCESSFUL_EXPIRES == 10800
|