File: benchmark_models.py

package info (click to toggle)
python-django-celery-results 2.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 696 kB
  • sloc: python: 2,373; makefile: 312; sh: 7; sql: 2
file content (70 lines) | stat: -rw-r--r-- 2,256 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import time
from datetime import timedelta

import pytest
from celery import uuid
from django.test import TransactionTestCase

from django_celery_results.models import TaskResult
from django_celery_results.utils import now

RECORDS_COUNT = 100000


@pytest.fixture()
def use_benchmark(request, benchmark):
    def wrapped(a=10, b=5):
        return a + b
    request.cls.benchmark = benchmark


@pytest.mark.usefixtures('use_benchmark')
@pytest.mark.usefixtures('depends_on_current_app')
class benchmark_Models(TransactionTestCase):

    @pytest.fixture(autouse=True)
    def setup_app(self, app):
        self.app = app
        self.app.conf.result_serializer = 'pickle'
        self.app.conf.result_backend = (
            'django_celery_results.backends:DatabaseBackend')

    def create_many_task_result(self, count):
        start = time.time()
        draft_results = [TaskResult(task_id=uuid()) for _ in range(count)]
        drafted = time.time()
        results = TaskResult.objects.bulk_create(draft_results)
        done_creating = time.time()

        print((
            'drafting time: {drafting:.2f}\n'
            'bulk_create time: {done:.2f}\n'
            '------'
        ).format(drafting=drafted - start, done=done_creating - drafted))
        return results

    def setup_records_to_delete(self):
        self.create_many_task_result(count=RECORDS_COUNT)
        mid_point = TaskResult.objects.order_by('id')[int(RECORDS_COUNT / 2)]
        todelete = TaskResult.objects.filter(id__gte=mid_point.id)
        todelete.update(date_done=now() - timedelta(days=10))

    def test_taskresult_delete_expired(self):
        start = time.time()
        self.setup_records_to_delete()
        after_setup = time.time()
        self.benchmark.pedantic(
            TaskResult.objects.delete_expired,
            args=(self.app.conf.result_expires,),
            iterations=1,
            rounds=1,
        )
        done = time.time()
        assert TaskResult.objects.count() == int(RECORDS_COUNT / 2)

        print((
            '------'
            'setup time: {setup:.2f}\n'
            'bench time: {bench:.2f}\n'
        ).format(setup=after_setup - start, bench=done - after_setup))
        assert self.benchmark.stats.stats.max < 1