1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
# Copyright 2019-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test compliance with the connections survive primary step down spec."""
from __future__ import annotations
import sys
from test.asynchronous.utils import async_ensure_all_connected
sys.path[0:0] = [""]
from test.asynchronous import (
AsyncIntegrationTest,
async_client_context,
unittest,
)
from test.asynchronous.helpers import async_repl_set_step_down
from test.utils_shared import (
CMAPListener,
)
from bson import SON
from pymongo import monitoring
from pymongo.asynchronous.collection import AsyncCollection
from pymongo.errors import NotPrimaryError
from pymongo.write_concern import WriteConcern
_IS_SYNC = False
class TestAsyncConnectionsSurvivePrimaryStepDown(AsyncIntegrationTest):
listener: CMAPListener
coll: AsyncCollection
@async_client_context.require_replica_set
async def asyncSetUp(self):
await super().asyncSetUp()
self.listener = CMAPListener()
self.client = await self.async_rs_or_single_client(
event_listeners=[self.listener], retryWrites=False, heartbeatFrequencyMS=500
)
# Ensure connections to all servers in replica set. This is to test
# that the is_writable flag is properly updated for connections that
# survive a replica set election.
await async_ensure_all_connected(self.client)
self.db = self.client.get_database("step-down", write_concern=WriteConcern("majority"))
self.coll = self.db.get_collection("step-down", write_concern=WriteConcern("majority"))
# Note that all ops use same write-concern as self.db (majority).
await self.db.drop_collection("step-down")
await self.db.create_collection("step-down")
self.listener.reset()
async def set_fail_point(self, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
await self.client.admin.command(cmd)
def verify_pool_cleared(self):
self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 1)
def verify_pool_not_cleared(self):
self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 0)
@async_client_context.require_version_min(4, 2, -1)
async def test_get_more_iteration(self):
# Insert 5 documents with WC majority.
await self.coll.insert_many([{"data": k} for k in range(5)])
# Start a find operation and retrieve first batch of results.
batch_size = 2
cursor = self.coll.find(batch_size=batch_size)
for _ in range(batch_size):
await cursor.next()
# Force step-down the primary.
await async_repl_set_step_down(self.client, replSetStepDown=5, force=True)
# Get await anext batch of results.
for _ in range(batch_size):
await cursor.next()
# Verify pool not cleared.
self.verify_pool_not_cleared()
# Attempt insertion to mark server description as stale and prevent a
# NotPrimaryError on the subsequent operation.
try:
await self.coll.insert_one({})
except NotPrimaryError:
pass
# Next insert should succeed on the new primary without clearing pool.
await self.coll.insert_one({})
self.verify_pool_not_cleared()
async def run_scenario(self, error_code, retry, pool_status_checker):
# Set fail point.
await self.set_fail_point(
{"mode": {"times": 1}, "data": {"failCommands": ["insert"], "errorCode": error_code}}
)
self.addAsyncCleanup(self.set_fail_point, {"mode": "off"})
# Insert record and verify failure.
with self.assertRaises(NotPrimaryError) as exc:
await self.coll.insert_one({"test": 1})
self.assertEqual(exc.exception.details["code"], error_code) # type: ignore[call-overload]
# Retry before CMAPListener assertion if retry_before=True.
if retry:
await self.coll.insert_one({"test": 1})
# Verify pool cleared/not cleared.
pool_status_checker()
# Always retry here to ensure discovery of new primary.
await self.coll.insert_one({"test": 1})
@async_client_context.require_version_min(4, 2, -1)
@async_client_context.require_test_commands
async def test_not_primary_keep_connection_pool(self):
await self.run_scenario(10107, True, self.verify_pool_not_cleared)
@async_client_context.require_version_min(4, 2, 0)
@async_client_context.require_test_commands
async def test_shutdown_in_progress(self):
await self.run_scenario(91, False, self.verify_pool_cleared)
@async_client_context.require_version_min(4, 2, 0)
@async_client_context.require_test_commands
async def test_interrupted_at_shutdown(self):
await self.run_scenario(11600, False, self.verify_pool_cleared)
if __name__ == "__main__":
unittest.main()
|