# Copyright 2017 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""MongoDB documentation examples in Python."""
from __future__ import annotations

import asyncio
import datetime
import functools
import sys
import threading
import time
from test.asynchronous.helpers import ConcurrentRunner

sys.path[0:0] = [""]

from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
from test.utils_shared import async_wait_until

import pymongo
from pymongo.asynchronous.helpers import anext
from pymongo.errors import ConnectionFailure, OperationFailure
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.server_api import ServerApi
from pymongo.write_concern import WriteConcern

_IS_SYNC = False


class TestSampleShellCommands(AsyncIntegrationTest):
    async def asyncSetUp(self):
        await super().asyncSetUp()
        await self.db.inventory.drop()

    async def asyncTearDown(self):
        # Run after every test.
        await self.db.inventory.drop()
        await self.client.drop_database("pymongo_test")

    async def test_first_three_examples(self):
        db = self.db

        # Start Example 1
        await db.inventory.insert_one(
            {
                "item": "canvas",
                "qty": 100,
                "tags": ["cotton"],
                "size": {"h": 28, "w": 35.5, "uom": "cm"},
            }
        )
        # End Example 1

        self.assertEqual(await db.inventory.count_documents({}), 1)

        # Start Example 2
        cursor = db.inventory.find({"item": "canvas"})
        # End Example 2

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 3
        await db.inventory.insert_many(
            [
                {
                    "item": "journal",
                    "qty": 25,
                    "tags": ["blank", "red"],
                    "size": {"h": 14, "w": 21, "uom": "cm"},
                },
                {
                    "item": "mat",
                    "qty": 85,
                    "tags": ["gray"],
                    "size": {"h": 27.9, "w": 35.5, "uom": "cm"},
                },
                {
                    "item": "mousepad",
                    "qty": 25,
                    "tags": ["gel", "blue"],
                    "size": {"h": 19, "w": 22.85, "uom": "cm"},
                },
            ]
        )
        # End Example 3

        self.assertEqual(await db.inventory.count_documents({}), 4)

    async def test_query_top_level_fields(self):
        db = self.db

        # Start Example 6
        await db.inventory.insert_many(
            [
                {
                    "item": "journal",
                    "qty": 25,
                    "size": {"h": 14, "w": 21, "uom": "cm"},
                    "status": "A",
                },
                {
                    "item": "notebook",
                    "qty": 50,
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "status": "A",
                },
                {
                    "item": "paper",
                    "qty": 100,
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "status": "D",
                },
                {
                    "item": "planner",
                    "qty": 75,
                    "size": {"h": 22.85, "w": 30, "uom": "cm"},
                    "status": "D",
                },
                {
                    "item": "postcard",
                    "qty": 45,
                    "size": {"h": 10, "w": 15.25, "uom": "cm"},
                    "status": "A",
                },
            ]
        )
        # End Example 6

        self.assertEqual(await db.inventory.count_documents({}), 5)

        # Start Example 7
        cursor = db.inventory.find({})
        # End Example 7

        self.assertEqual(len(await cursor.to_list()), 5)

        # Start Example 9
        cursor = db.inventory.find({"status": "D"})
        # End Example 9

        self.assertEqual(len(await cursor.to_list()), 2)

        # Start Example 10
        cursor = db.inventory.find({"status": {"$in": ["A", "D"]}})
        # End Example 10

        self.assertEqual(len(await cursor.to_list()), 5)

        # Start Example 11
        cursor = db.inventory.find({"status": "A", "qty": {"$lt": 30}})
        # End Example 11

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 12
        cursor = db.inventory.find({"$or": [{"status": "A"}, {"qty": {"$lt": 30}}]})
        # End Example 12

        self.assertEqual(len(await cursor.to_list()), 3)

        # Start Example 13
        cursor = db.inventory.find(
            {"status": "A", "$or": [{"qty": {"$lt": 30}}, {"item": {"$regex": "^p"}}]}
        )
        # End Example 13

        self.assertEqual(len(await cursor.to_list()), 2)

    async def test_query_embedded_documents(self):
        db = self.db

        # Start Example 14
        await db.inventory.insert_many(
            [
                {
                    "item": "journal",
                    "qty": 25,
                    "size": {"h": 14, "w": 21, "uom": "cm"},
                    "status": "A",
                },
                {
                    "item": "notebook",
                    "qty": 50,
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "status": "A",
                },
                {
                    "item": "paper",
                    "qty": 100,
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "status": "D",
                },
                {
                    "item": "planner",
                    "qty": 75,
                    "size": {"h": 22.85, "w": 30, "uom": "cm"},
                    "status": "D",
                },
                {
                    "item": "postcard",
                    "qty": 45,
                    "size": {"h": 10, "w": 15.25, "uom": "cm"},
                    "status": "A",
                },
            ]
        )
        # End Example 14

        # Start Example 15
        cursor = db.inventory.find({"size": {"h": 14, "w": 21, "uom": "cm"}})
        # End Example 15

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 16
        cursor = db.inventory.find({"size": {"w": 21, "h": 14, "uom": "cm"}})
        # End Example 16

        self.assertEqual(len(await cursor.to_list()), 0)

        # Start Example 17
        cursor = db.inventory.find({"size.uom": "in"})
        # End Example 17

        self.assertEqual(len(await cursor.to_list()), 2)

        # Start Example 18
        cursor = db.inventory.find({"size.h": {"$lt": 15}})
        # End Example 18

        self.assertEqual(len(await cursor.to_list()), 4)

        # Start Example 19
        cursor = db.inventory.find({"size.h": {"$lt": 15}, "size.uom": "in", "status": "D"})
        # End Example 19

        self.assertEqual(len(await cursor.to_list()), 1)

    async def test_query_arrays(self):
        db = self.db

        # Start Example 20
        await db.inventory.insert_many(
            [
                {"item": "journal", "qty": 25, "tags": ["blank", "red"], "dim_cm": [14, 21]},
                {"item": "notebook", "qty": 50, "tags": ["red", "blank"], "dim_cm": [14, 21]},
                {
                    "item": "paper",
                    "qty": 100,
                    "tags": ["red", "blank", "plain"],
                    "dim_cm": [14, 21],
                },
                {"item": "planner", "qty": 75, "tags": ["blank", "red"], "dim_cm": [22.85, 30]},
                {"item": "postcard", "qty": 45, "tags": ["blue"], "dim_cm": [10, 15.25]},
            ]
        )
        # End Example 20

        # Start Example 21
        cursor = db.inventory.find({"tags": ["red", "blank"]})
        # End Example 21

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 22
        cursor = db.inventory.find({"tags": {"$all": ["red", "blank"]}})
        # End Example 22

        self.assertEqual(len(await cursor.to_list()), 4)

        # Start Example 23
        cursor = db.inventory.find({"tags": "red"})
        # End Example 23

        self.assertEqual(len(await cursor.to_list()), 4)

        # Start Example 24
        cursor = db.inventory.find({"dim_cm": {"$gt": 25}})
        # End Example 24

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 25
        cursor = db.inventory.find({"dim_cm": {"$gt": 15, "$lt": 20}})
        # End Example 25

        self.assertEqual(len(await cursor.to_list()), 4)

        # Start Example 26
        cursor = db.inventory.find({"dim_cm": {"$elemMatch": {"$gt": 22, "$lt": 30}}})
        # End Example 26

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 27
        cursor = db.inventory.find({"dim_cm.1": {"$gt": 25}})
        # End Example 27

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 28
        cursor = db.inventory.find({"tags": {"$size": 3}})
        # End Example 28

        self.assertEqual(len(await cursor.to_list()), 1)

    async def test_query_array_of_documents(self):
        db = self.db

        # Start Example 29
        await db.inventory.insert_many(
            [
                {
                    "item": "journal",
                    "instock": [
                        {"warehouse": "A", "qty": 5},
                        {"warehouse": "C", "qty": 15},
                    ],
                },
                {"item": "notebook", "instock": [{"warehouse": "C", "qty": 5}]},
                {
                    "item": "paper",
                    "instock": [
                        {"warehouse": "A", "qty": 60},
                        {"warehouse": "B", "qty": 15},
                    ],
                },
                {
                    "item": "planner",
                    "instock": [
                        {"warehouse": "A", "qty": 40},
                        {"warehouse": "B", "qty": 5},
                    ],
                },
                {
                    "item": "postcard",
                    "instock": [
                        {"warehouse": "B", "qty": 15},
                        {"warehouse": "C", "qty": 35},
                    ],
                },
            ]
        )
        # End Example 29

        # Start Example 30
        cursor = db.inventory.find({"instock": {"warehouse": "A", "qty": 5}})
        # End Example 30

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 31
        cursor = db.inventory.find({"instock": {"qty": 5, "warehouse": "A"}})
        # End Example 31

        self.assertEqual(len(await cursor.to_list()), 0)

        # Start Example 32
        cursor = db.inventory.find({"instock.0.qty": {"$lte": 20}})
        # End Example 32

        self.assertEqual(len(await cursor.to_list()), 3)

        # Start Example 33
        cursor = db.inventory.find({"instock.qty": {"$lte": 20}})
        # End Example 33

        self.assertEqual(len(await cursor.to_list()), 5)

        # Start Example 34
        cursor = db.inventory.find({"instock": {"$elemMatch": {"qty": 5, "warehouse": "A"}}})
        # End Example 34

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 35
        cursor = db.inventory.find({"instock": {"$elemMatch": {"qty": {"$gt": 10, "$lte": 20}}}})
        # End Example 35

        self.assertEqual(len(await cursor.to_list()), 3)

        # Start Example 36
        cursor = db.inventory.find({"instock.qty": {"$gt": 10, "$lte": 20}})
        # End Example 36

        self.assertEqual(len(await cursor.to_list()), 4)

        # Start Example 37
        cursor = db.inventory.find({"instock.qty": 5, "instock.warehouse": "A"})
        # End Example 37

        self.assertEqual(len(await cursor.to_list()), 2)

    async def test_query_null(self):
        db = self.db

        # Start Example 38
        await db.inventory.insert_many([{"_id": 1, "item": None}, {"_id": 2}])
        # End Example 38

        # Start Example 39
        cursor = db.inventory.find({"item": None})
        # End Example 39

        self.assertEqual(len(await cursor.to_list()), 2)

        # Start Example 40
        cursor = db.inventory.find({"item": {"$type": 10}})
        # End Example 40

        self.assertEqual(len(await cursor.to_list()), 1)

        # Start Example 41
        cursor = db.inventory.find({"item": {"$exists": False}})
        # End Example 41

        self.assertEqual(len(await cursor.to_list()), 1)

    async def test_projection(self):
        db = self.db

        # Start Example 42
        await db.inventory.insert_many(
            [
                {
                    "item": "journal",
                    "status": "A",
                    "size": {"h": 14, "w": 21, "uom": "cm"},
                    "instock": [{"warehouse": "A", "qty": 5}],
                },
                {
                    "item": "notebook",
                    "status": "A",
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "instock": [{"warehouse": "C", "qty": 5}],
                },
                {
                    "item": "paper",
                    "status": "D",
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "instock": [{"warehouse": "A", "qty": 60}],
                },
                {
                    "item": "planner",
                    "status": "D",
                    "size": {"h": 22.85, "w": 30, "uom": "cm"},
                    "instock": [{"warehouse": "A", "qty": 40}],
                },
                {
                    "item": "postcard",
                    "status": "A",
                    "size": {"h": 10, "w": 15.25, "uom": "cm"},
                    "instock": [{"warehouse": "B", "qty": 15}, {"warehouse": "C", "qty": 35}],
                },
            ]
        )
        # End Example 42

        # Start Example 43
        cursor = db.inventory.find({"status": "A"})
        # End Example 43

        self.assertEqual(len(await cursor.to_list()), 3)

        # Start Example 44
        cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1})
        # End Example 44

        async for doc in cursor:
            self.assertIn("_id", doc)
            self.assertIn("item", doc)
            self.assertIn("status", doc)
            self.assertNotIn("size", doc)
            self.assertNotIn("instock", doc)

        # Start Example 45
        cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1, "_id": 0})
        # End Example 45

        async for doc in cursor:
            self.assertNotIn("_id", doc)
            self.assertIn("item", doc)
            self.assertIn("status", doc)
            self.assertNotIn("size", doc)
            self.assertNotIn("instock", doc)

        # Start Example 46
        cursor = db.inventory.find({"status": "A"}, {"status": 0, "instock": 0})
        # End Example 46

        async for doc in cursor:
            self.assertIn("_id", doc)
            self.assertIn("item", doc)
            self.assertNotIn("status", doc)
            self.assertIn("size", doc)
            self.assertNotIn("instock", doc)

        # Start Example 47
        cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1, "size.uom": 1})
        # End Example 47

        async for doc in cursor:
            self.assertIn("_id", doc)
            self.assertIn("item", doc)
            self.assertIn("status", doc)
            self.assertIn("size", doc)
            self.assertNotIn("instock", doc)
            size = doc["size"]
            self.assertIn("uom", size)
            self.assertNotIn("h", size)
            self.assertNotIn("w", size)

        # Start Example 48
        cursor = db.inventory.find({"status": "A"}, {"size.uom": 0})
        # End Example 48

        async for doc in cursor:
            self.assertIn("_id", doc)
            self.assertIn("item", doc)
            self.assertIn("status", doc)
            self.assertIn("size", doc)
            self.assertIn("instock", doc)
            size = doc["size"]
            self.assertNotIn("uom", size)
            self.assertIn("h", size)
            self.assertIn("w", size)

        # Start Example 49
        cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1, "instock.qty": 1})
        # End Example 49

        async for doc in cursor:
            self.assertIn("_id", doc)
            self.assertIn("item", doc)
            self.assertIn("status", doc)
            self.assertNotIn("size", doc)
            self.assertIn("instock", doc)
            for subdoc in doc["instock"]:
                self.assertNotIn("warehouse", subdoc)
                self.assertIn("qty", subdoc)

        # Start Example 50
        cursor = db.inventory.find(
            {"status": "A"}, {"item": 1, "status": 1, "instock": {"$slice": -1}}
        )
        # End Example 50

        async for doc in cursor:
            self.assertIn("_id", doc)
            self.assertIn("item", doc)
            self.assertIn("status", doc)
            self.assertNotIn("size", doc)
            self.assertIn("instock", doc)
            self.assertEqual(len(doc["instock"]), 1)

    async def test_update_and_replace(self):
        db = self.db

        # Start Example 51
        await db.inventory.insert_many(
            [
                {
                    "item": "canvas",
                    "qty": 100,
                    "size": {"h": 28, "w": 35.5, "uom": "cm"},
                    "status": "A",
                },
                {
                    "item": "journal",
                    "qty": 25,
                    "size": {"h": 14, "w": 21, "uom": "cm"},
                    "status": "A",
                },
                {
                    "item": "mat",
                    "qty": 85,
                    "size": {"h": 27.9, "w": 35.5, "uom": "cm"},
                    "status": "A",
                },
                {
                    "item": "mousepad",
                    "qty": 25,
                    "size": {"h": 19, "w": 22.85, "uom": "cm"},
                    "status": "P",
                },
                {
                    "item": "notebook",
                    "qty": 50,
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "status": "P",
                },
                {
                    "item": "paper",
                    "qty": 100,
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "status": "D",
                },
                {
                    "item": "planner",
                    "qty": 75,
                    "size": {"h": 22.85, "w": 30, "uom": "cm"},
                    "status": "D",
                },
                {
                    "item": "postcard",
                    "qty": 45,
                    "size": {"h": 10, "w": 15.25, "uom": "cm"},
                    "status": "A",
                },
                {
                    "item": "sketchbook",
                    "qty": 80,
                    "size": {"h": 14, "w": 21, "uom": "cm"},
                    "status": "A",
                },
                {
                    "item": "sketch pad",
                    "qty": 95,
                    "size": {"h": 22.85, "w": 30.5, "uom": "cm"},
                    "status": "A",
                },
            ]
        )
        # End Example 51

        # Start Example 52
        await db.inventory.update_one(
            {"item": "paper"},
            {"$set": {"size.uom": "cm", "status": "P"}, "$currentDate": {"lastModified": True}},
        )
        # End Example 52

        async for doc in db.inventory.find({"item": "paper"}):
            self.assertEqual(doc["size"]["uom"], "cm")
            self.assertEqual(doc["status"], "P")
            self.assertIn("lastModified", doc)

        # Start Example 53
        await db.inventory.update_many(
            {"qty": {"$lt": 50}},
            {"$set": {"size.uom": "in", "status": "P"}, "$currentDate": {"lastModified": True}},
        )
        # End Example 53

        async for doc in db.inventory.find({"qty": {"$lt": 50}}):
            self.assertEqual(doc["size"]["uom"], "in")
            self.assertEqual(doc["status"], "P")
            self.assertIn("lastModified", doc)

        # Start Example 54
        await db.inventory.replace_one(
            {"item": "paper"},
            {
                "item": "paper",
                "instock": [{"warehouse": "A", "qty": 60}, {"warehouse": "B", "qty": 40}],
            },
        )
        # End Example 54

        async for doc in db.inventory.find({"item": "paper"}, {"_id": 0}):
            self.assertEqual(len(doc.keys()), 2)
            self.assertIn("item", doc)
            self.assertIn("instock", doc)
            self.assertEqual(len(doc["instock"]), 2)

    async def test_delete(self):
        db = self.db

        # Start Example 55
        await db.inventory.insert_many(
            [
                {
                    "item": "journal",
                    "qty": 25,
                    "size": {"h": 14, "w": 21, "uom": "cm"},
                    "status": "A",
                },
                {
                    "item": "notebook",
                    "qty": 50,
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "status": "P",
                },
                {
                    "item": "paper",
                    "qty": 100,
                    "size": {"h": 8.5, "w": 11, "uom": "in"},
                    "status": "D",
                },
                {
                    "item": "planner",
                    "qty": 75,
                    "size": {"h": 22.85, "w": 30, "uom": "cm"},
                    "status": "D",
                },
                {
                    "item": "postcard",
                    "qty": 45,
                    "size": {"h": 10, "w": 15.25, "uom": "cm"},
                    "status": "A",
                },
            ]
        )
        # End Example 55

        self.assertEqual(await db.inventory.count_documents({}), 5)

        # Start Example 57
        await db.inventory.delete_many({"status": "A"})
        # End Example 57

        self.assertEqual(await db.inventory.count_documents({}), 3)

        # Start Example 58
        await db.inventory.delete_one({"status": "D"})
        # End Example 58

        self.assertEqual(await db.inventory.count_documents({}), 2)

        # Start Example 56
        await db.inventory.delete_many({})
        # End Example 56

        self.assertEqual(await db.inventory.count_documents({}), 0)

    @async_client_context.require_change_streams
    async def test_change_streams(self):
        db = self.db
        done = False

        async def insert_docs():
            nonlocal done
            while not done:
                await db.inventory.insert_one({"username": "alice"})
                await db.inventory.delete_one({"username": "alice"})
                await asyncio.sleep(0.005)

        t = ConcurrentRunner(target=insert_docs)
        await t.start()

        try:
            # 1. The database for reactive, real-time applications
            # Start Changestream Example 1
            cursor = await db.inventory.watch()
            await anext(cursor)
            # End Changestream Example 1
            await cursor.close()

            # Start Changestream Example 2
            cursor = await db.inventory.watch(full_document="updateLookup")
            await anext(cursor)
            # End Changestream Example 2
            await cursor.close()

            # Start Changestream Example 3
            resume_token = cursor.resume_token
            cursor = await db.inventory.watch(resume_after=resume_token)
            await anext(cursor)
            # End Changestream Example 3
            await cursor.close()

            # Start Changestream Example 4
            pipeline = [
                {"$match": {"fullDocument.username": "alice"}},
                {"$addFields": {"newField": "this is an added field!"}},
            ]
            cursor = await db.inventory.watch(pipeline=pipeline)
            await anext(cursor)
            # End Changestream Example 4
            await cursor.close()
        finally:
            done = True
            await t.join()

    async def test_aggregate_examples(self):
        db = self.db

        # Start Aggregation Example 1
        await db.sales.aggregate([{"$match": {"items.fruit": "banana"}}, {"$sort": {"date": 1}}])
        # End Aggregation Example 1

        # Start Aggregation Example 2
        await db.sales.aggregate(
            [
                {"$unwind": "$items"},
                {"$match": {"items.fruit": "banana"}},
                {
                    "$group": {
                        "_id": {"day": {"$dayOfWeek": "$date"}},
                        "count": {"$sum": "$items.quantity"},
                    }
                },
                {"$project": {"dayOfWeek": "$_id.day", "numberSold": "$count", "_id": 0}},
                {"$sort": {"numberSold": 1}},
            ]
        )
        # End Aggregation Example 2

        # Start Aggregation Example 3
        await db.sales.aggregate(
            [
                {"$unwind": "$items"},
                {
                    "$group": {
                        "_id": {"day": {"$dayOfWeek": "$date"}},
                        "items_sold": {"$sum": "$items.quantity"},
                        "revenue": {"$sum": {"$multiply": ["$items.quantity", "$items.price"]}},
                    }
                },
                {
                    "$project": {
                        "day": "$_id.day",
                        "revenue": 1,
                        "items_sold": 1,
                        "discount": {
                            "$cond": {"if": {"$lte": ["$revenue", 250]}, "then": 25, "else": 0}
                        },
                    }
                },
            ]
        )
        # End Aggregation Example 3

        # Start Aggregation Example 4
        await db.air_alliances.aggregate(
            [
                {
                    "$lookup": {
                        "from": "air_airlines",
                        "let": {"constituents": "$airlines"},
                        "pipeline": [{"$match": {"$expr": {"$in": ["$name", "$$constituents"]}}}],
                        "as": "airlines",
                    }
                },
                {
                    "$project": {
                        "_id": 0,
                        "name": 1,
                        "airlines": {
                            "$filter": {
                                "input": "$airlines",
                                "as": "airline",
                                "cond": {"$eq": ["$$airline.country", "Canada"]},
                            }
                        },
                    }
                },
            ]
        )
        # End Aggregation Example 4

    @async_client_context.require_version_min(4, 4)
    async def test_aggregate_projection_example(self):
        db = self.db

        # Start Aggregation Projection Example 1
        db.inventory.find(
            {},
            {
                "_id": 0,
                "item": 1,
                "status": {
                    "$switch": {
                        "branches": [
                            {"case": {"$eq": ["$status", "A"]}, "then": "Available"},
                            {"case": {"$eq": ["$status", "D"]}, "then": "Discontinued"},
                        ],
                        "default": "No status found",
                    }
                },
                "area": {
                    "$concat": [
                        {"$toString": {"$multiply": ["$size.h", "$size.w"]}},
                        " ",
                        "$size.uom",
                    ]
                },
                "reportNumber": {"$literal": 1},
            },
        )

        # End Aggregation Projection Example 1

    async def test_commands(self):
        db = self.db
        await db.restaurants.insert_one({})

        # Start runCommand Example 1
        await db.command("buildInfo")
        # End runCommand Example 1

        # Start runCommand Example 2
        await db.command("count", "restaurants")
        # End runCommand Example 2

    async def test_index_management(self):
        db = self.db

        # Start Index Example 1
        await db.records.create_index("score")
        # End Index Example 1

        # Start Index Example 1
        await db.restaurants.create_index(
            [("cuisine", pymongo.ASCENDING), ("name", pymongo.ASCENDING)],
            partialFilterExpression={"rating": {"$gt": 5}},
        )
        # End Index Example 1

    @async_client_context.require_replica_set
    async def test_misc(self):
        # Marketing examples
        client = self.client
        self.addAsyncCleanup(client.drop_database, "test")
        self.addAsyncCleanup(client.drop_database, "my_database")

        # 2. Tunable consistency controls
        collection = client.my_database.my_collection
        async with client.start_session() as session:
            await collection.insert_one({"_id": 1}, session=session)
            await collection.update_one({"_id": 1}, {"$set": {"a": 1}}, session=session)
            async for _doc in collection.find({}, session=session):
                pass

        # 3. Exploiting the power of arrays
        collection = client.test.array_updates_test
        await collection.update_one(
            {"_id": 1}, {"$set": {"a.$[i].b": 2}}, array_filters=[{"i.b": 0}]
        )


class TestTransactionExamples(AsyncIntegrationTest):
    @async_client_context.require_transactions
    async def test_transactions(self):
        # Transaction examples
        client = self.client
        self.addAsyncCleanup(client.drop_database, "hr")
        self.addAsyncCleanup(client.drop_database, "reporting")

        employees = client.hr.employees
        events = client.reporting.events
        await employees.insert_one({"employee": 3, "status": "Active"})
        await events.insert_one({"employee": 3, "status": {"new": "Active", "old": None}})

        # Start Transactions Intro Example 1

        async def update_employee_info(session):
            employees_coll = session.client.hr.employees
            events_coll = session.client.reporting.events

            async with await session.start_transaction(
                read_concern=ReadConcern("snapshot"), write_concern=WriteConcern(w="majority")
            ):
                await employees_coll.update_one(
                    {"employee": 3}, {"$set": {"status": "Inactive"}}, session=session
                )
                await events_coll.insert_one(
                    {"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session
                )

                while True:
                    try:
                        # Commit uses write concern set at transaction start.
                        await session.commit_transaction()
                        print("Transaction committed.")
                        break
                    except (ConnectionFailure, OperationFailure) as exc:
                        # Can retry commit
                        if exc.has_error_label("UnknownTransactionCommitResult"):
                            print("UnknownTransactionCommitResult, retrying commit operation ...")
                            continue
                        else:
                            print("Error during commit ...")
                            raise

        # End Transactions Intro Example 1

        async with client.start_session() as session:
            await update_employee_info(session)

        employee = await employees.find_one({"employee": 3})
        assert employee is not None
        self.assertIsNotNone(employee)
        self.assertEqual(employee["status"], "Inactive")

        # Start Transactions Retry Example 1
        async def run_transaction_with_retry(txn_func, session):
            while True:
                try:
                    await txn_func(session)  # performs transaction
                    break
                except (ConnectionFailure, OperationFailure) as exc:
                    print("Transaction aborted. Caught exception during transaction.")

                    # If transient error, retry the whole transaction
                    if exc.has_error_label("TransientTransactionError"):
                        print("TransientTransactionError, retrying transaction ...")
                        continue
                    else:
                        raise

        # End Transactions Retry Example 1

        async with client.start_session() as session:
            await run_transaction_with_retry(update_employee_info, session)

        employee = await employees.find_one({"employee": 3})
        assert employee is not None
        self.assertIsNotNone(employee)
        self.assertEqual(employee["status"], "Inactive")

        # Start Transactions Retry Example 2
        async def commit_with_retry(session):
            while True:
                try:
                    # Commit uses write concern set at transaction start.
                    await session.commit_transaction()
                    print("Transaction committed.")
                    break
                except (ConnectionFailure, OperationFailure) as exc:
                    # Can retry commit
                    if exc.has_error_label("UnknownTransactionCommitResult"):
                        print("UnknownTransactionCommitResult, retrying commit operation ...")
                        continue
                    else:
                        print("Error during commit ...")
                        raise

        # End Transactions Retry Example 2

        # Test commit_with_retry from the previous examples
        async def _insert_employee_retry_commit(session):
            async with await session.start_transaction():
                await employees.insert_one({"employee": 4, "status": "Active"}, session=session)
                await events.insert_one(
                    {"employee": 4, "status": {"new": "Active", "old": None}}, session=session
                )

                await commit_with_retry(session)

        async with client.start_session() as session:
            await run_transaction_with_retry(_insert_employee_retry_commit, session)

        employee = await employees.find_one({"employee": 4})
        assert employee is not None
        self.assertIsNotNone(employee)
        self.assertEqual(employee["status"], "Active")

        # Start Transactions Retry Example 3

        async def run_transaction_with_retry(txn_func, session):
            while True:
                try:
                    await txn_func(session)  # performs transaction
                    break
                except (ConnectionFailure, OperationFailure) as exc:
                    # If transient error, retry the whole transaction
                    if exc.has_error_label("TransientTransactionError"):
                        print("TransientTransactionError, retrying transaction ...")
                        continue
                    else:
                        raise

        async def commit_with_retry(session):
            while True:
                try:
                    # Commit uses write concern set at transaction start.
                    await session.commit_transaction()
                    print("Transaction committed.")
                    break
                except (ConnectionFailure, OperationFailure) as exc:
                    # Can retry commit
                    if exc.has_error_label("UnknownTransactionCommitResult"):
                        print("UnknownTransactionCommitResult, retrying commit operation ...")
                        continue
                    else:
                        print("Error during commit ...")
                        raise

        # Updates two collections in a transactions

        async def update_employee_info(session):
            employees_coll = session.client.hr.employees
            events_coll = session.client.reporting.events

            async with await session.start_transaction(
                read_concern=ReadConcern("snapshot"),
                write_concern=WriteConcern(w="majority"),
                read_preference=ReadPreference.PRIMARY,
            ):
                await employees_coll.update_one(
                    {"employee": 3}, {"$set": {"status": "Inactive"}}, session=session
                )
                await events_coll.insert_one(
                    {"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session
                )

                await commit_with_retry(session)

        # Start a session.
        async with client.start_session() as session:
            try:
                await run_transaction_with_retry(update_employee_info, session)
            except Exception:
                # Do something with error.
                raise

        # End Transactions Retry Example 3

        employee = await employees.find_one({"employee": 3})
        assert employee is not None
        self.assertIsNotNone(employee)
        self.assertEqual(employee["status"], "Inactive")

        async def MongoClient(_):
            return await self.async_rs_client()

        uriString = None

        # Start Transactions withTxn API Example 1

        # For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
        # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
        # For a sharded cluster, connect to the mongos instances; e.g.
        # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'

        client = await MongoClient(uriString)
        wc_majority = WriteConcern("majority", wtimeout=1000)

        # Prereq: Create collections.
        await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
        await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})

        # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
        async def callback(session):
            collection_one = session.client.mydb1.foo
            collection_two = session.client.mydb2.bar

            # Important:: You must pass the session to the operations.
            await collection_one.insert_one({"abc": 1}, session=session)
            await collection_two.insert_one({"xyz": 999}, session=session)

        # Step 2: Start a client session.
        async with client.start_session() as session:
            # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
            await session.with_transaction(callback)

        # End Transactions withTxn API Example 1


class TestCausalConsistencyExamples(AsyncIntegrationTest):
    @async_client_context.require_secondaries_count(1)
    async def test_causal_consistency(self):
        # Causal consistency examples
        client = self.client
        self.addAsyncCleanup(client.drop_database, "test")
        await client.test.drop_collection("items")
        await client.test.items.insert_one(
            {"sku": "111", "name": "Peanuts", "start": datetime.datetime.today()}
        )

        # Start Causal Consistency Example 1
        async with client.start_session(causal_consistency=True) as s1:
            current_date = datetime.datetime.today()
            items = client.get_database(
                "test",
                read_concern=ReadConcern("majority"),
                write_concern=WriteConcern("majority", wtimeout=1000),
            ).items
            await items.update_one(
                {"sku": "111", "end": None}, {"$set": {"end": current_date}}, session=s1
            )
            await items.insert_one(
                {"sku": "nuts-111", "name": "Pecans", "start": current_date}, session=s1
            )
        # End Causal Consistency Example 1

        assert s1.cluster_time is not None
        assert s1.operation_time is not None

        # Start Causal Consistency Example 2
        async with client.start_session(causal_consistency=True) as s2:
            s2.advance_cluster_time(s1.cluster_time)
            s2.advance_operation_time(s1.operation_time)

            items = client.get_database(
                "test",
                read_preference=ReadPreference.SECONDARY,
                read_concern=ReadConcern("majority"),
                write_concern=WriteConcern("majority", wtimeout=1000),
            ).items
            async for item in items.find({"end": None}, session=s2):
                print(item)
        # End Causal Consistency Example 2


class TestVersionedApiExamples(AsyncIntegrationTest):
    @async_client_context.require_version_min(4, 7)
    async def test_versioned_api(self):
        # Versioned API examples
        async def MongoClient(_, server_api):
            return await self.async_rs_client(server_api=server_api, connect=False)

        uri = None

        # Start Versioned API Example 1
        from pymongo.server_api import ServerApi

        await MongoClient(uri, server_api=ServerApi("1"))
        # End Versioned API Example 1

        # Start Versioned API Example 2
        await MongoClient(uri, server_api=ServerApi("1", strict=True))
        # End Versioned API Example 2

        # Start Versioned API Example 3
        await MongoClient(uri, server_api=ServerApi("1", strict=False))
        # End Versioned API Example 3

        # Start Versioned API Example 4
        await MongoClient(uri, server_api=ServerApi("1", deprecation_errors=True))
        # End Versioned API Example 4

    @unittest.skip("PYTHON-3167 count has been added to API version 1")
    @async_client_context.require_version_min(4, 7)
    async def test_versioned_api_migration(self):
        # SERVER-58785
        if await async_client_context.is_topology_type(
            ["sharded"]
        ) and not async_client_context.version.at_least(5, 0, 2):
            self.skipTest("This test needs MongoDB 5.0.2 or newer")

        client = await self.async_rs_client(server_api=ServerApi("1", strict=True))
        await client.db.sales.drop()

        # Start Versioned API Example 5
        def strptime(s):
            return datetime.datetime.strptime(s, "%Y-%m-%dT%H:%M:%SZ")

        await client.db.sales.insert_many(
            [
                {
                    "_id": 1,
                    "item": "abc",
                    "price": 10,
                    "quantity": 2,
                    "date": strptime("2021-01-01T08:00:00Z"),
                },
                {
                    "_id": 2,
                    "item": "jkl",
                    "price": 20,
                    "quantity": 1,
                    "date": strptime("2021-02-03T09:00:00Z"),
                },
                {
                    "_id": 3,
                    "item": "xyz",
                    "price": 5,
                    "quantity": 5,
                    "date": strptime("2021-02-03T09:05:00Z"),
                },
                {
                    "_id": 4,
                    "item": "abc",
                    "price": 10,
                    "quantity": 10,
                    "date": strptime("2021-02-15T08:00:00Z"),
                },
                {
                    "_id": 5,
                    "item": "xyz",
                    "price": 5,
                    "quantity": 10,
                    "date": strptime("2021-02-15T09:05:00Z"),
                },
                {
                    "_id": 6,
                    "item": "xyz",
                    "price": 5,
                    "quantity": 5,
                    "date": strptime("2021-02-15T12:05:10Z"),
                },
                {
                    "_id": 7,
                    "item": "xyz",
                    "price": 5,
                    "quantity": 10,
                    "date": strptime("2021-02-15T14:12:12Z"),
                },
                {
                    "_id": 8,
                    "item": "abc",
                    "price": 10,
                    "quantity": 5,
                    "date": strptime("2021-03-16T20:20:13Z"),
                },
            ]
        )
        # End Versioned API Example 5

        with self.assertRaisesRegex(
            OperationFailure,
            "Provided apiStrict:true, but the command count is not in API Version 1",
        ):
            await client.db.command("count", "sales", query={})
        # Start Versioned API Example 6
        # pymongo.errors.OperationFailure: Provided apiStrict:true, but the command count is not in API Version 1, full error: {'ok': 0.0, 'errmsg': 'Provided apiStrict:true, but the command count is not in API Version 1', 'code': 323, 'codeName': 'APIStrictError'}
        # End Versioned API Example 6

        # Start Versioned API Example 7
        await client.db.sales.count_documents({})
        # End Versioned API Example 7

        # Start Versioned API Example 8
        # 8
        # End Versioned API Example 8


class TestSnapshotQueryExamples(AsyncIntegrationTest):
    @async_client_context.require_version_min(5, 0)
    async def test_snapshot_query(self):
        client = self.client

        if not await async_client_context.is_topology_type(["replicaset", "sharded"]):
            self.skipTest("Must be a sharded or replicaset")

        self.addAsyncCleanup(client.drop_database, "pets")
        db = client.pets
        await db.drop_collection("cats")
        await db.drop_collection("dogs")
        await db.cats.insert_one(
            {"name": "Whiskers", "color": "white", "age": 10, "adoptable": True}
        )
        await db.dogs.insert_one(
            {"name": "Pebbles", "color": "Brown", "age": 10, "adoptable": True}
        )

        async def predicate_one():
            return await self.check_for_snapshot(db.cats)

        async def predicate_two():
            return await self.check_for_snapshot(db.dogs)

        await async_wait_until(predicate_two, "success")
        await async_wait_until(predicate_one, "success")

        # Start Snapshot Query Example 1

        db = client.pets
        async with client.start_session(snapshot=True) as s:
            adoptablePetsCount = (
                await (
                    await db.cats.aggregate(
                        [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}],
                        session=s,
                    )
                ).next()
            )["adoptableCatsCount"]

            adoptablePetsCount += (
                await (
                    await db.dogs.aggregate(
                        [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}],
                        session=s,
                    )
                ).next()
            )["adoptableDogsCount"]

        print(adoptablePetsCount)

        # End Snapshot Query Example 1
        db = client.retail
        self.addAsyncCleanup(client.drop_database, "retail")
        await db.drop_collection("sales")

        saleDate = datetime.datetime.now()
        await db.sales.insert_one({"shoeType": "boot", "price": 30, "saleDate": saleDate})

        async def predicate_three():
            return await self.check_for_snapshot(db.sales)

        await async_wait_until(predicate_three, "success")

        # Start Snapshot Query Example 2
        db = client.retail
        async with client.start_session(snapshot=True) as s:
            _ = (
                await (
                    await db.sales.aggregate(
                        [
                            {
                                "$match": {
                                    "$expr": {
                                        "$gt": [
                                            "$saleDate",
                                            {
                                                "$dateSubtract": {
                                                    "startDate": "$$NOW",
                                                    "unit": "day",
                                                    "amount": 1,
                                                }
                                            },
                                        ]
                                    }
                                }
                            },
                            {"$count": "totalDailySales"},
                        ],
                        session=s,
                    )
                ).next()
            )["totalDailySales"]

        # End Snapshot Query Example 2

    async def check_for_snapshot(self, collection):
        """Wait for snapshot reads to become available to prevent this error:
        [246:SnapshotUnavailable]: Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1646666892, 4). Collection minimum is Timestamp(1646666892, 5) (on localhost:27017, modern retry, attempt 1)
        From https://github.com/mongodb/mongo-ruby-driver/commit/7c4117b58e3d12e237f7536f7521e18fc15f79ac
        """
        async with self.client.start_session(snapshot=True) as s:
            try:
                if await collection.find_one(session=s):
                    return True
                return False
            except OperationFailure as e:
                # Retry them as the server demands...
                if e.code == 246:  # SnapshotUnavailable
                    return False
                raise


if __name__ == "__main__":
    unittest.main()
