1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
from celery.app.task import Context
# Retrieve the values of all context attributes as a
# dictionary in an implementation-agnostic manner.
def get_context_as_dict(ctx, getter=getattr):
defaults = {}
for attr_name in dir(ctx):
if attr_name.startswith('_'):
continue # Ignore pseudo-private attributes
attr = getter(ctx, attr_name)
if callable(attr):
continue # Ignore methods and other non-trivial types
defaults[attr_name] = attr
return defaults
default_context = get_context_as_dict(Context())
class test_Context:
def test_default_context(self):
# A bit of a tautological test, since it uses the same
# initializer as the default_context constructor.
defaults = dict(default_context, children=[])
assert get_context_as_dict(Context()) == defaults
def test_updated_context(self):
expected = dict(default_context)
changes = {'id': 'unique id', 'args': ['some', 1], 'wibble': 'wobble'}
ctx = Context()
expected.update(changes)
ctx.update(changes)
assert get_context_as_dict(ctx) == expected
assert get_context_as_dict(Context()) == default_context
def test_modified_context(self):
expected = dict(default_context)
ctx = Context()
expected['id'] = 'unique id'
expected['args'] = ['some', 1]
ctx.id = 'unique id'
ctx.args = ['some', 1]
assert get_context_as_dict(ctx) == expected
assert get_context_as_dict(Context()) == default_context
def test_cleared_context(self):
changes = {'id': 'unique id', 'args': ['some', 1], 'wibble': 'wobble'}
ctx = Context()
ctx.update(changes)
ctx.clear()
defaults = dict(default_context, children=[])
assert get_context_as_dict(ctx) == defaults
assert get_context_as_dict(Context()) == defaults
def test_context_get(self):
expected = dict(default_context)
changes = {'id': 'unique id', 'args': ['some', 1], 'wibble': 'wobble'}
ctx = Context()
expected.update(changes)
ctx.update(changes)
ctx_dict = get_context_as_dict(ctx, getter=Context.get)
assert ctx_dict == expected
assert get_context_as_dict(Context()) == default_context
def test_extract_headers(self):
# Should extract custom headers from the request dict
request = {
'task': 'test.test_task',
'id': 'e16eeaee-1172-49bb-9098-5437a509ffd9',
'custom-header': 'custom-value',
}
ctx = Context(request)
assert ctx.headers == {'custom-header': 'custom-value'}
def test_dont_override_headers(self):
# Should not override headers if defined in the request
request = {
'task': 'test.test_task',
'id': 'e16eeaee-1172-49bb-9098-5437a509ffd9',
'headers': {'custom-header': 'custom-value'},
'custom-header-2': 'custom-value-2',
}
ctx = Context(request)
assert ctx.headers == {'custom-header': 'custom-value'}
|