File: test_pool.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (72 lines) | stat: -rw-r--r-- 1,837 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import itertools
import time

import pytest
from billiard.einfo import ExceptionInfo

pytest.importorskip('multiprocessing')


def do_something(i):
    return i * i


def long_something():
    time.sleep(1)


def raise_something(i):
    try:
        raise KeyError('FOO EXCEPTION')
    except KeyError:
        return ExceptionInfo()


class test_TaskPool:

    def setup_method(self):
        from celery.concurrency.prefork import TaskPool
        self.TaskPool = TaskPool

    def test_attrs(self):
        p = self.TaskPool(2)
        assert p.limit == 2
        assert p._pool is None

    def x_apply(self):
        p = self.TaskPool(2)
        p.start()
        scratchpad = {}
        proc_counter = itertools.count()

        def mycallback(ret_value):
            process = next(proc_counter)
            scratchpad[process] = {}
            scratchpad[process]['ret_value'] = ret_value

        myerrback = mycallback

        res = p.apply_async(do_something, args=[10], callback=mycallback)
        res2 = p.apply_async(raise_something, args=[10], errback=myerrback)
        res3 = p.apply_async(do_something, args=[20], callback=mycallback)

        assert res.get() == 100
        time.sleep(0.5)
        assert scratchpad.get(0)['ret_value'] == 100

        assert isinstance(res2.get(), ExceptionInfo)
        assert scratchpad.get(1)
        time.sleep(1)
        assert isinstance(scratchpad[1]['ret_value'], ExceptionInfo)
        assert scratchpad[1]['ret_value'].exception.args == ('FOO EXCEPTION',)

        assert res3.get() == 400
        time.sleep(0.5)
        assert scratchpad.get(2)['ret_value'] == 400

        res3 = p.apply_async(do_something, args=[30], callback=mycallback)

        assert res3.get() == 900
        time.sleep(0.5)
        assert scratchpad.get(3)['ret_value'] == 900
        p.stop()