File: bulk_import.py

package info (click to toggle)
python-django-import-export 4.3.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,300 kB
  • sloc: python: 11,650; makefile: 180; sh: 63; javascript: 50
file content (189 lines) | stat: -rw-r--r-- 5,342 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""
Helper module for testing bulk imports.

See testing.rst
"""

import time
from functools import wraps

import tablib
from memory_profiler import memory_usage

from import_export import resources
from import_export.instance_loaders import CachedInstanceLoader

from core.models import Book  # isort:skip

# The number of rows to be created on each profile run.
# Increase this value for greater load testing.
NUM_ROWS = 10000


class _BookResource(resources.ModelResource):
    class Meta:
        model = Book
        fields = ("id", "name", "author_email", "price")
        use_bulk = True
        batch_size = 1000
        skip_unchanged = True
        # skip_diff = True
        # This flag can speed up imports
        # Cannot be used when performing updates
        # force_init_instance = True
        instance_loader_class = CachedInstanceLoader


def profile_duration(fn):
    @wraps(fn)
    def inner(*args, **kwargs):
        # Measure duration
        t = time.perf_counter()
        fn(*args, **kwargs)
        elapsed = time.perf_counter() - t
        print(f"Time {elapsed: 0.4}")

    return inner


def profile_mem(fn):
    @wraps(fn)
    def inner(*args, **kwargs):
        # Measure memory
        mem, retval = memory_usage(
            (fn, args, kwargs), retval=True, timeout=200, interval=1e-7
        )
        print(f"Memory {max(mem) - min(mem)}")
        return retval

    return inner


@profile_duration
def do_import_duration(resource, dataset):
    resource.import_data(dataset)


@profile_mem
def do_import_mem(resource, dataset):
    resource.import_data(dataset)


def do_create():
    class _BookResource(resources.ModelResource):
        class Meta:
            model = Book
            fields = ("id", "name", "author_email", "price")
            use_bulk = True
            batch_size = 1000
            skip_unchanged = True
            skip_diff = True
            force_init_instance = True

    print("\ndo_create()")
    # clearing down existing objects
    books = Book.objects.all()
    books._raw_delete(books.db)

    rows = [("", "Some new book", "email@example.com", "10.25")] * NUM_ROWS
    dataset = tablib.Dataset(*rows, headers=["id", "name", "author_email", "price"])

    book_resource = _BookResource()
    do_import_duration(book_resource, dataset)
    do_import_mem(book_resource, dataset)

    # Book objects are created once for the 'duration' run,
    # and once for the 'memory' run
    assert Book.objects.count() == NUM_ROWS * 2
    books._raw_delete(books.db)


def do_update():
    print("\ndo_update()")

    # clearing down existing objects
    books = Book.objects.all()
    books._raw_delete(books.db)

    rows = [("", "Some new book", "email@example.com", "10.25")] * NUM_ROWS
    books = [Book(name=r[1], author_email=r[2], price=r[3]) for r in rows]

    # run 'update' - there must be existing rows in the DB...
    # i.e. so they can be updated
    Book.objects.bulk_create(books)
    assert NUM_ROWS == Book.objects.count()

    # find the ids, so that we can perform the update
    all_books = Book.objects.all()
    rows = [(b.id, b.name, b.author_email, b.price) for b in all_books]
    dataset = tablib.Dataset(*rows, headers=["id", "name", "author_email", "price"])

    book_resource = _BookResource()
    do_import_duration(book_resource, dataset)
    do_import_mem(book_resource, dataset)

    assert NUM_ROWS == Book.objects.count()
    books = Book.objects.all()
    books._raw_delete(books.db)


def do_delete():
    class _BookResource(resources.ModelResource):
        def for_delete(self, row, instance):
            return True

        class Meta:
            model = Book
            fields = ("id", "name", "author_email", "price")
            use_bulk = True
            batch_size = 1000
            skip_diff = True
            instance_loader_class = CachedInstanceLoader

    print("\ndo_delete()")

    # clearing down existing objects
    books = Book.objects.all()
    books._raw_delete(books.db)

    rows = [("", "Some new book", "email@example.com", "10.25")] * NUM_ROWS
    books = [Book(name=r[1], author_email=r[2], price=r[3]) for r in rows]

    # deletes - there must be existing rows in the DB...
    # i.e. so they can be deleted
    Book.objects.bulk_create(books)
    assert NUM_ROWS == Book.objects.count()

    all_books = Book.objects.all()
    rows = [(b.id, b.name, b.author_email, b.price) for b in all_books]
    dataset = tablib.Dataset(*rows, headers=["id", "name", "author_email", "price"])

    book_resource = _BookResource()
    do_import_duration(book_resource, dataset)

    assert 0 == Book.objects.count()

    # recreate rows which have just been deleted
    Book.objects.bulk_create(books)
    assert NUM_ROWS == Book.objects.count()

    all_books = Book.objects.all()
    rows = [(b.id, b.name, b.author_email, b.price) for b in all_books]
    dataset = tablib.Dataset(*rows, headers=["id", "name", "author_email", "price"])
    do_import_mem(book_resource, dataset)
    assert 0 == Book.objects.count()


def run(*args):
    if len(args) > 0:
        arg = args[0].lower()
        if arg == "create":
            do_create()
        if arg == "update":
            do_update()
        if arg == "delete":
            do_delete()
    else:
        do_create()
        do_update()
        do_delete()