File: test_worker.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (59 lines) | stat: -rw-r--r-- 1,972 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import pytest

# this import adds a @shared_task, which uses connect_on_app_finalize
# to install the celery.ping task that the test lib uses
import celery.contrib.testing.tasks  # noqa
from celery import Celery
from celery.contrib.testing.worker import TestWorkController, start_worker


class test_worker:
    def setup_method(self):
        self.app = Celery('celerytest', backend='cache+memory://', broker='memory://', )

        @self.app.task
        def add(x, y):
            return x + y

        self.add = add

        @self.app.task
        def error_task():
            raise NotImplementedError()

        self.error_task = error_task

        self.app.config_from_object({
            'worker_hijack_root_logger': False,
        })

        # to avoid changing the root logger level to ERROR,
        # we have to set both app.log.loglevel start_worker arg to 0
        # (see celery.app.log.setup_logging_subsystem)
        self.app.log.loglevel = 0

    def test_start_worker(self):
        with start_worker(app=self.app, loglevel=0):
            result = self.add.s(1, 2).apply_async()
            val = result.get(timeout=5)
        assert val == 3

    def test_start_worker_with_exception(self):
        """Make sure that start_worker does not hang on exception"""

        with pytest.raises(NotImplementedError):
            with start_worker(app=self.app, loglevel=0):
                result = self.error_task.apply_async()
                result.get(timeout=5)

    def test_start_worker_with_hostname_config(self):
        """Make sure a custom hostname can be supplied to the TestWorkController"""
        test_hostname = 'test_name@test_host'
        with start_worker(app=self.app, loglevel=0, hostname=test_hostname) as w:

            assert isinstance(w, TestWorkController)
            assert w.hostname == test_hostname

            result = self.add.s(1, 2).apply_async()
            val = result.get(timeout=5)
        assert val == 3