File: test_canvas.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (33 lines) | stat: -rw-r--r-- 989 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import uuid


class test_Canvas:

    def test_freeze_reply_to(self):
        # Tests that Canvas.freeze() correctly
        # creates reply_to option

        @self.app.task
        def test_task(a, b):
            return

        s = test_task.s(2, 2)
        s.freeze()

        from concurrent.futures import ThreadPoolExecutor

        def foo():
            s = test_task.s(2, 2)
            s.freeze()
            return self.app.thread_oid, s.options['reply_to']
        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(foo)
        t_reply_to_app, t_reply_to_opt = future.result()

        assert uuid.UUID(s.options['reply_to'])
        assert uuid.UUID(t_reply_to_opt)
        # reply_to must be equal to thread_oid of Application
        assert self.app.thread_oid == s.options['reply_to']
        assert t_reply_to_app == t_reply_to_opt
        # reply_to must be thread-relative.
        assert t_reply_to_opt != s.options['reply_to']