1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
import os
import sys
import time
from celery import Celery
os.environ.update(
NOSETPS='yes',
USE_FAST_LOCALS='yes',
)
DEFAULT_ITS = 40000
BROKER_TRANSPORT = os.environ.get('BROKER', 'librabbitmq://')
if hasattr(sys, 'pypy_version_info'):
BROKER_TRANSPORT = 'pyamqp://'
app = Celery('bench_worker')
app.conf.update(
broker_url=BROKER_TRANSPORT,
broker_pool_limit=10,
worker_pool='solo',
worker_prefetch_multiplier=0,
task_default_delivery_mode=1,
task_queues={
'bench.worker': {
'exchange': 'bench.worker',
'routing_key': 'bench.worker',
'no_ack': True,
'exchange_durable': False,
'queue_durable': False,
'auto_delete': True,
}
},
task_serializer='json',
task_default_queue='bench.worker',
result_backend=None,
),
def tdiff(then):
return time.monotonic() - then
@app.task(cur=0, time_start=None, queue='bench.worker', bare=True)
def it(_, n):
# use internal counter, as ordering can be skewed
# by previous runs, or the broker.
i = it.cur
if i and not i % 5000:
print(f'({i} so far: {tdiff(it.subt)}s)', file=sys.stderr)
it.subt = time.monotonic()
if not i:
it.subt = it.time_start = time.monotonic()
elif i > n - 2:
total = tdiff(it.time_start)
print(f'({i} so far: {tdiff(it.subt)}s)', file=sys.stderr)
print('-- process {} tasks: {}s total, {} tasks/s'.format(
n, total, n / (total + .0),
))
import os
os._exit(0)
it.cur += 1
def bench_apply(n=DEFAULT_ITS):
time_start = time.monotonic()
task = it._get_current_object()
with app.producer_or_acquire() as producer:
[task.apply_async((i, n), producer=producer) for i in range(n)]
print(f'-- apply {n} tasks: {time.monotonic() - time_start}s')
def bench_work(n=DEFAULT_ITS, loglevel='CRITICAL'):
loglevel = os.environ.get('BENCH_LOGLEVEL') or loglevel
if loglevel:
app.log.setup_logging_subsystem(loglevel=loglevel)
worker = app.WorkController(concurrency=15,
queues=['bench.worker'])
try:
print('-- starting worker')
worker.start()
except SystemExit:
assert sum(worker.state.total_count.values()) == n + 1
raise
def bench_both(n=DEFAULT_ITS):
bench_apply(n)
bench_work(n)
def main(argv=sys.argv):
n = DEFAULT_ITS
if len(argv) < 2:
print(f'Usage: {os.path.basename(argv[0])} [apply|work|both] [n=20k]')
return sys.exit(1)
try:
n = int(argv[2])
except IndexError:
pass
return {'apply': bench_apply,
'work': bench_work,
'both': bench_both}[argv[1]](n=n)
if __name__ == '__main__':
main()
|