File: bench_worker.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (110 lines) | stat: -rw-r--r-- 2,816 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import sys
import time

from celery import Celery

os.environ.update(
    NOSETPS='yes',
    USE_FAST_LOCALS='yes',
)


DEFAULT_ITS = 40000

BROKER_TRANSPORT = os.environ.get('BROKER', 'librabbitmq://')
if hasattr(sys, 'pypy_version_info'):
    BROKER_TRANSPORT = 'pyamqp://'

app = Celery('bench_worker')
app.conf.update(
    broker_url=BROKER_TRANSPORT,
    broker_pool_limit=10,
    worker_pool='solo',
    worker_prefetch_multiplier=0,
    task_default_delivery_mode=1,
    task_queues={
        'bench.worker': {
            'exchange': 'bench.worker',
            'routing_key': 'bench.worker',
            'no_ack': True,
            'exchange_durable': False,
            'queue_durable': False,
            'auto_delete': True,
        }
    },
    task_serializer='json',
    task_default_queue='bench.worker',
    result_backend=None,
),


def tdiff(then):
    return time.monotonic() - then


@app.task(cur=0, time_start=None, queue='bench.worker', bare=True)
def it(_, n):
    # use internal counter, as ordering can be skewed
    # by previous runs, or the broker.
    i = it.cur
    if i and not i % 5000:
        print(f'({i} so far: {tdiff(it.subt)}s)', file=sys.stderr)
        it.subt = time.monotonic()
    if not i:
        it.subt = it.time_start = time.monotonic()
    elif i > n - 2:
        total = tdiff(it.time_start)
        print(f'({i} so far: {tdiff(it.subt)}s)', file=sys.stderr)
        print('-- process {} tasks: {}s total, {} tasks/s'.format(
            n, total, n / (total + .0),
        ))
        import os
        os._exit(0)
    it.cur += 1


def bench_apply(n=DEFAULT_ITS):
    time_start = time.monotonic()
    task = it._get_current_object()
    with app.producer_or_acquire() as producer:
        [task.apply_async((i, n), producer=producer) for i in range(n)]
    print(f'-- apply {n} tasks: {time.monotonic() - time_start}s')


def bench_work(n=DEFAULT_ITS, loglevel='CRITICAL'):
    loglevel = os.environ.get('BENCH_LOGLEVEL') or loglevel
    if loglevel:
        app.log.setup_logging_subsystem(loglevel=loglevel)
    worker = app.WorkController(concurrency=15,
                                queues=['bench.worker'])

    try:
        print('-- starting worker')
        worker.start()
    except SystemExit:
        assert sum(worker.state.total_count.values()) == n + 1
        raise


def bench_both(n=DEFAULT_ITS):
    bench_apply(n)
    bench_work(n)


def main(argv=sys.argv):
    n = DEFAULT_ITS
    if len(argv) < 2:
        print(f'Usage: {os.path.basename(argv[0])} [apply|work|both] [n=20k]')
        return sys.exit(1)
    try:
        n = int(argv[2])
    except IndexError:
        pass
    return {'apply': bench_apply,
            'work': bench_work,
            'both': bench_both}[argv[1]](n=n)


if __name__ == '__main__':
    main()