File: testing.rst

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (402 lines) | stat: -rw-r--r-- 11,857 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
.. _testing:

================================================================
 Testing with Celery
================================================================

Testing with Celery is divided into two parts:

    * Unit & Integration: Using ``celery.contrib.pytest``.
    * Smoke / Production: Using :pypi:`pytest-celery <pytest-celery>` >= 1.0.0

Installing the pytest-celery plugin will install the ``celery.contrib.pytest`` infrastructure as well,
alongside the pytest plugin infrastructure. The difference is how you use it.

.. warning::

     Both APIs are NOT compatible with each other. The pytest-celery plugin is Docker based
     and the ``celery.contrib.pytest`` is mock based.

To use the ``celery.contrib.pytest`` infrastructure, follow the instructions below.

The pytest-celery plugin has its `own documentation <https://pytest-celery.readthedocs.io/>`_.

Tasks and unit tests
====================

To test task behavior in unit tests the preferred method is mocking.

.. admonition:: Eager mode

    The eager mode enabled by the :setting:`task_always_eager` setting
    is by definition not suitable for unit tests.

    When testing with eager mode you are only testing an emulation
    of what happens in a worker, and there are many discrepancies
    between the emulation and what happens in reality.

    Note that eagerly executed tasks don't write results to backend by default.
    If you want to enable this functionality, have a look at :setting:`task_store_eager_result`.

A Celery task is much like a web view, in that it should only
define how to perform the action in the context of being called as a task.

This means optimally tasks only handle things like serialization, message headers,
retries, and so on, with the actual logic implemented elsewhere.

Say we had a task like this:

.. code-block:: python

    from .models import Product


    @app.task(bind=True)
    def send_order(self, product_pk, quantity, price):
        price = Decimal(price)  # json serializes this to string.

        # models are passed by id, not serialized.
        product = Product.objects.get(product_pk)

        try:
            product.order(quantity, price)
        except OperationalError as exc:
            raise self.retry(exc=exc)


``Note``: A task being `bound <https://docs.celeryq.dev/en/latest/userguide/tasks.html#bound-tasks>`_ means the first
argument to the task will always be the task instance (self). which means you do get a self argument as the
first argument and can use the Task class methods and attributes.

You could write unit tests for this task, using mocking like
in this example:

.. code-block:: python

    from pytest import raises

    from celery.exceptions import Retry

    # for python 2: use mock.patch from `pip install mock`.
    from unittest.mock import patch

    from proj.models import Product
    from proj.tasks import send_order

    class test_send_order:

        @patch('proj.tasks.Product.order')  # < patching Product in module above
        def test_success(self, product_order):
            product = Product.objects.create(
                name='Foo',
            )
            send_order(product.pk, 3, Decimal(30.3))
            product_order.assert_called_with(3, Decimal(30.3))

        @patch('proj.tasks.Product.order')
        @patch('proj.tasks.send_order.retry')
        def test_failure(self, send_order_retry, product_order):
            product = Product.objects.create(
                name='Foo',
            )

            # Set a side effect on the patched methods
            # so that they raise the errors we want.
            send_order_retry.side_effect = Retry()
            product_order.side_effect = OperationalError()

            with raises(Retry):
                send_order(product.pk, 3, Decimal(30.6))

.. _pytest_plugin:

pytest
======

.. versionadded:: 4.0

Celery also makes a :pypi:`pytest` plugin available that adds fixtures that you can
use in your integration (or unit) test suites.

Enabling
--------

Celery initially ships the plugin in a disabled state, to enable it you can either:

    * ``pip install celery[pytest]``
    * ``pip install pytest-celery``
    * or add an environment variable ``PYTEST_PLUGINS=celery.contrib.pytest``
    * or add ``pytest_plugins = ("celery.contrib.pytest", )`` to your root conftest.py


Marks
-----

``celery`` - Set test app configuration.
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The ``celery`` mark enables you to override the configuration
used for a single test case:

.. code-block:: python

    @pytest.mark.celery(result_backend='redis://')
    def test_something():
        ...


or for all the test cases in a class:

.. code-block:: python

    @pytest.mark.celery(result_backend='redis://')
    class test_something:

        def test_one(self):
            ...

        def test_two(self):
            ...

Fixtures
--------

Function scope
^^^^^^^^^^^^^^

``celery_app`` - Celery app used for testing.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This fixture returns a Celery app you can use for testing.

Example:

.. code-block:: python

    def test_create_task(celery_app, celery_worker):
        @celery_app.task
        def mul(x, y):
            return x * y
        
        celery_worker.reload()
        assert mul.delay(4, 4).get(timeout=10) == 16

``celery_worker`` - Embed live worker.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This fixture starts a Celery worker instance that you can use
for integration tests.  The worker will be started in a *separate thread*
and will be shutdown as soon as the test returns.

By default the fixture will wait up to 10 seconds for the worker to complete
outstanding tasks and will raise an exception if the time limit is exceeded.
The timeout can be customized by setting the ``shutdown_timeout`` key in the
dictionary returned by the :func:`celery_worker_parameters` fixture.

Example:

.. code-block:: python

    # Put this in your conftest.py
    @pytest.fixture(scope='session')
    def celery_config():
        return {
            'broker_url': 'amqp://',
            'result_backend': 'redis://'
        }

    def test_add(celery_worker):
        mytask.delay()


    # If you wish to override some setting in one test cases
    # only - you can use the ``celery`` mark:
    @pytest.mark.celery(result_backend='rpc')
    def test_other(celery_worker):
        ...

Heartbeats are disabled by default which means that the test worker doesn't
send events for ``worker-online``, ``worker-offline`` and ``worker-heartbeat``.
To enable heartbeats modify the :func:`celery_worker_parameters` fixture:

.. code-block:: python

    # Put this in your conftest.py
    @pytest.fixture(scope="session")
    def celery_worker_parameters():
        return {"without_heartbeat": False}
        ...



Session scope
^^^^^^^^^^^^^

``celery_config`` - Override to setup Celery test app configuration.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You can redefine this fixture to configure the test Celery app.

The config returned by your fixture will then be used
to configure the :func:`celery_app`, and :func:`celery_session_app` fixtures.

Example:

.. code-block:: python

    @pytest.fixture(scope='session')
    def celery_config():
        return {
            'broker_url': 'amqp://',
            'result_backend': 'rpc',
        }


``celery_parameters`` - Override to setup Celery test app parameters.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can redefine this fixture to change the ``__init__`` parameters of test
Celery app. In contrast to :func:`celery_config`, these are directly passed to
when instantiating :class:`~celery.Celery`.

The config returned by your fixture will then be used
to configure the :func:`celery_app`, and :func:`celery_session_app` fixtures.

Example:

.. code-block:: python

    @pytest.fixture(scope='session')
    def celery_parameters():
        return {
            'task_cls':  my.package.MyCustomTaskClass,
            'strict_typing': False,
        }

``celery_worker_parameters`` - Override to setup Celery worker parameters.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can redefine this fixture to change the ``__init__`` parameters of test
Celery workers. These are directly passed to
:class:`~celery.worker.WorkController` when it is instantiated.

The config returned by your fixture will then be used
to configure the :func:`celery_worker`, and :func:`celery_session_worker`
fixtures.

Example:

.. code-block:: python

    @pytest.fixture(scope='session')
    def celery_worker_parameters():
        return {
            'queues':  ('high-prio', 'low-prio'),
            'exclude_queues': ('celery'),
        }


``celery_enable_logging`` - Override to enable logging in embedded workers.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This is a fixture you can override to enable logging in embedded workers.

Example:

.. code-block:: python

    @pytest.fixture(scope='session')
    def celery_enable_logging():
        return True

``celery_includes`` - Add additional imports for embedded workers.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You can override fixture to include modules when an embedded worker starts.

You can have this return a list of module names to import,
which can be task modules, modules registering signals, and so on.

Example:

.. code-block:: python

    @pytest.fixture(scope='session')
    def celery_includes():
        return [
            'proj.tests.tasks',
            'proj.tests.celery_signal_handlers',
        ]

``celery_worker_pool`` - Override the pool used for embedded workers.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You can override fixture to configure the execution pool used for embedded
workers.

Example:

.. code-block:: python

    @pytest.fixture(scope='session')
    def celery_worker_pool():
        return 'prefork'

.. warning::

    You cannot use the gevent/eventlet pools, that is unless your whole test
    suite is running with the monkeypatches enabled.

``celery_session_worker`` - Embedded worker that lives throughout the session.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This fixture starts a worker that lives throughout the testing session
(it won't be started/stopped for every test).

Example:

.. code-block:: python

    # Add this to your conftest.py
    @pytest.fixture(scope='session')
    def celery_config():
        return {
            'broker_url': 'amqp://',
            'result_backend': 'rpc',
        }

    # Do this in your tests.
    def test_add_task(celery_session_worker):
        assert add.delay(2, 2).get() == 4

.. warning::

    It's probably a bad idea to mix session and ephemeral workers...

``celery_session_app`` - Celery app used for testing (session scope).
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This can be used by other session scoped fixtures when they need to refer
to a Celery app instance.

``use_celery_app_trap`` - Raise exception on falling back to default app.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This is a fixture you can override in your ``conftest.py``, to enable the "app trap":
if something tries to access the default or current_app, an exception
is raised.

Example:

.. code-block:: python

    @pytest.fixture(scope='session')
    def use_celery_app_trap():
        return True


If a test wants to access the default app, you would have to mark it using
the ``depends_on_current_app`` fixture:

.. code-block:: python

    @pytest.mark.usefixtures('depends_on_current_app')
    def test_something():
        something()