1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
from mock import MagicMock
from flower.api.control import ControlHandler
from tests import AsyncHTTPTestCase
class UnknownWorkerControlTests(AsyncHTTPTestCase):
def test_unknown_worker(self):
r = self.post('/api/worker/shutdown/test', body={})
self.assertEqual(404, r.code)
class WorkerControlTests(AsyncHTTPTestCase):
def setUp(self):
AsyncHTTPTestCase.setUp(self)
self.is_worker = ControlHandler.is_worker
ControlHandler.is_worker = lambda *args: True
def tearDown(self):
AsyncHTTPTestCase.tearDown(self)
ControlHandler.is_worker = self.is_worker
def test_shutdown(self):
celery = self._app.capp
celery.control.broadcast = MagicMock()
r = self.post('/api/worker/shutdown/test', body={})
self.assertEqual(200, r.code)
celery.control.broadcast.assert_called_once_with('shutdown',
destination=['test'])
def test_pool_restart(self):
celery = self._app.capp
celery.control.broadcast = MagicMock(return_value=[{'test': 'ok'}])
r = self.post('/api/worker/pool/restart/test', body={})
self.assertEqual(200, r.code)
celery.control.broadcast.assert_called_once_with(
'pool_restart',
arguments={'reload': False},
destination=['test'],
reply=True,
)
def test_pool_grow(self):
celery = self._app.capp
celery.control.pool_grow = MagicMock(return_value=[{'test': 'ok'}])
r = self.post('/api/worker/pool/grow/test', body={'n': 3})
self.assertEqual(200, r.code)
celery.control.pool_grow.assert_called_once_with(
n=3, reply=True, destination=['test'])
def test_pool_shrink(self):
celery = self._app.capp
celery.control.pool_shrink = MagicMock(return_value=[{'test': 'ok'}])
r = self.post('/api/worker/pool/shrink/test', body={})
self.assertEqual(200, r.code)
celery.control.pool_shrink.assert_called_once_with(
n=1, reply=True, destination=['test'])
def test_pool_autoscale(self):
celery = self._app.capp
celery.control.broadcast = MagicMock(return_value=[{'test': 'ok'}])
r = self.post('/api/worker/pool/autoscale/test',
body={'min': 2, 'max': 5})
self.assertEqual(200, r.code)
celery.control.broadcast.assert_called_once_with(
'autoscale',
reply=True, destination=['test'],
arguments={'min': 2, 'max': 5})
def test_add_consumer(self):
celery = self._app.capp
celery.control.broadcast = MagicMock(
return_value=[{'test': {'ok': ''}}])
r = self.post('/api/worker/queue/add-consumer/test',
body={'queue': 'foo'})
self.assertEqual(200, r.code)
celery.control.broadcast.assert_called_once_with(
'add_consumer',
reply=True, destination=['test'],
arguments={'queue': 'foo'})
def test_cancel_consumer(self):
celery = self._app.capp
celery.control.broadcast = MagicMock(
return_value=[{'test': {'ok': ''}}])
r = self.post('/api/worker/queue/cancel-consumer/test',
body={'queue': 'foo'})
self.assertEqual(200, r.code)
celery.control.broadcast.assert_called_once_with(
'cancel_consumer',
reply=True, destination=['test'],
arguments={'queue': 'foo'})
def test_task_timeout(self):
celery = self._app.capp
celery.control.time_limit = MagicMock(
return_value=[{'foo': {'ok': ''}}])
r = self.post(
'/api/task/timeout/celery.map',
body={'workername': 'foo', 'hard': 3.1, 'soft': 1.2}
)
self.assertEqual(200, r.code)
celery.control.time_limit.assert_called_once_with(
'celery.map', hard=3.1, soft=1.2, destination=['foo'],
reply=True)
def test_task_ratelimit(self):
celery = self._app.capp
celery.control.rate_limit = MagicMock(
return_value=[{'foo': {'ok': ''}}])
r = self.post('/api/task/rate-limit/celery.map',
body={'workername': 'foo', 'ratelimit': 20})
self.assertEqual(200, r.code)
celery.control.rate_limit.assert_called_once_with(
'celery.map', '20', destination=['foo'], reply=True)
def test_task_ratelimit_non_integer(self):
celery = self._app.capp
celery.control.rate_limit = MagicMock(
return_value=[{'foo': {'ok': ''}}])
r = self.post('/api/task/rate-limit/celery.map',
body={'workername': 'foo', 'ratelimit': '11/m'})
self.assertEqual(200, r.code)
celery.control.rate_limit.assert_called_once_with(
'celery.map', '11/m', destination=['foo'], reply=True)
class TaskControlTests(AsyncHTTPTestCase):
def test_revoke(self):
celery = self._app.capp
celery.control.revoke = MagicMock()
r = self.post('/api/task/revoke/test', body={})
self.assertEqual(200, r.code)
celery.control.revoke.assert_called_once_with('test',
terminate=False)
def test_terminate(self):
celery = self._app.capp
celery.control.revoke = MagicMock()
r = self.post('/api/task/revoke/test', body={'terminate': True})
self.assertEqual(200, r.code)
celery.control.revoke.assert_called_once_with('test',
terminate=True)
|