1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
.. currentmodule:: motor.motor_tornado
.. _bulk-write-tutorial:
Bulk Write Operations
=====================
.. warning:: Motor will be deprecated on May 14th, 2026, one year after the production release of the PyMongo Async driver. Critical bug fixes will be made until May 14th, 2027.
We strongly recommend that Motor users migrate to the PyMongo Async driver while Motor is still supported.
To learn more, see `the migration guide <https://www.mongodb.com/docs/languages/python/pymongo-driver/current/reference/migration/>`_.
.. testsetup::
client = MotorClient()
db = client.test_database
IOLoop.current().run_sync(db.test.drop)
This tutorial explains how to take advantage of Motor's bulk
write operation features. Executing write operations in batches
reduces the number of network round trips, increasing write
throughput.
This example describes using Motor with Tornado. Beginning in
version 0.5 Motor can also integrate with asyncio instead of Tornado.
Bulk Insert
-----------
A batch of documents can be inserted by passing a list or generator
to the :meth:`~MotorCollection.insert_many` method. Motor
will automatically split the batch into smaller sub-batches based on
the maximum message size accepted by MongoDB, supporting very large
bulk insert operations.
.. doctest::
>>> async def f():
... await db.test.insert_many(({"i": i} for i in range(10000)))
... count = await db.test.count_documents({})
... print("Final count: %d" % count)
...
>>>
>>> IOLoop.current().run_sync(f)
Final count: 10000
Mixed Bulk Write Operations
---------------------------
Motor also supports executing mixed bulk write operations. A batch
of insert, update, and remove operations can be executed together using
the bulk write operations API.
.. _ordered_bulk:
Ordered Bulk Write Operations
.............................
Ordered bulk write operations are batched and sent to the server in the
order provided for serial execution. The return value is an instance of
:class:`~pymongo.results.BulkWriteResult` describing the type and count
of operations performed.
.. doctest::
:options: +NORMALIZE_WHITESPACE
>>> from pprint import pprint
>>> from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne
>>> async def f():
... result = await db.test.bulk_write(
... [
... DeleteMany({}), # Remove all documents from the previous example.
... InsertOne({"_id": 1}),
... InsertOne({"_id": 2}),
... InsertOne({"_id": 3}),
... UpdateOne({"_id": 1}, {"$set": {"foo": "bar"}}),
... UpdateOne({"_id": 4}, {"$inc": {"j": 1}}, upsert=True),
... ReplaceOne({"j": 1}, {"j": 2}),
... ]
... )
... pprint(result.bulk_api_result)
...
>>> IOLoop.current().run_sync(f)
{'nInserted': 3,
'nMatched': 2,
'nModified': 2,
'nRemoved': 10000,
'nUpserted': 1,
'upserted': [{'_id': 4, 'index': 5}],
'writeConcernErrors': [],
'writeErrors': []}
The first write failure that occurs (e.g. duplicate key error) aborts the
remaining operations, and Motor raises
:class:`~pymongo.errors.BulkWriteError`. The :attr:`details` attribute of
the exception instance provides the execution results up until the failure
occurred and details about the failure - including the operation that caused
the failure.
.. doctest::
:options: +NORMALIZE_WHITESPACE
>>> from pymongo import InsertOne, DeleteOne, ReplaceOne
>>> from pymongo.errors import BulkWriteError
>>> async def f():
... requests = [
... ReplaceOne({"j": 2}, {"i": 5}),
... InsertOne({"_id": 4}), # Violates the unique key constraint on _id.
... DeleteOne({"i": 5}),
... ]
... try:
... await db.test.bulk_write(requests)
... except BulkWriteError as bwe:
... pprint(bwe.details)
...
>>> IOLoop.current().run_sync(f)
{'nInserted': 0,
'nMatched': 1,
'nModified': 1,
'nRemoved': 0,
'nUpserted': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [{'code': 11000,
'errmsg': '... duplicate key error ...',
'index': 1,
'keyPattern': {'_id': 1},
'keyValue': {'_id': 4},
'op': {'_id': 4}}]}
.. _unordered_bulk:
Unordered Bulk Write Operations
...............................
Unordered bulk write operations are batched and sent to the server in
**arbitrary order** where they may be executed in parallel. Any errors
that occur are reported after all operations are attempted.
In the next example the first and third operations fail due to the unique
constraint on _id. Since we are doing unordered execution the second
and fourth operations succeed.
.. doctest::
:options: +NORMALIZE_WHITESPACE
>>> async def f():
... requests = [
... InsertOne({"_id": 1}),
... DeleteOne({"_id": 2}),
... InsertOne({"_id": 3}),
... ReplaceOne({"_id": 4}, {"i": 1}),
... ]
... try:
... await db.test.bulk_write(requests, ordered=False)
... except BulkWriteError as bwe:
... pprint(bwe.details)
...
>>> IOLoop.current().run_sync(f)
{'nInserted': 0,
'nMatched': 1,
'nModified': 1,
'nRemoved': 1,
'nUpserted': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [{'code': 11000,
'errmsg': '... duplicate key error ...',
'index': 0,
'keyPattern': {'_id': 1},
'keyValue': {'_id': 1},
'op': {'_id': 1}},
{'code': 11000,
'errmsg': '... duplicate key error ...',
'index': 2,
'keyPattern': {'_id': 1},
'keyValue': {'_id': 3},
'op': {'_id': 3}}]}
Write Concern
.............
Bulk operations are executed with the
:attr:`~pymongo.collection.Collection.write_concern` of the collection they
are executed against. Write concern errors (e.g. wtimeout) will be reported
after all operations are attempted, regardless of execution order.
.. doctest::
:options: +SKIP
.. Standalone MongoDB raises "can't use w>1" with this example, so skip it.
>>> from pymongo import WriteConcern
>>> async def f():
... coll = db.get_collection("test", write_concern=WriteConcern(w=4, wtimeout=1))
... try:
... await coll.bulk_write([InsertOne({"a": i}) for i in range(4)])
... except BulkWriteError as bwe:
... pprint(bwe.details)
...
>>> IOLoop.current().run_sync(f)
{'nInserted': 4,
'nMatched': 0,
'nModified': 0,
'nRemoved': 0,
'nUpserted': 0,
'upserted': [],
'writeConcernErrors': [{'code': 64,
'errInfo': {'wtimeout': True},
'errmsg': 'waiting for replication timed out'}],
'writeErrors': []}
|