1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package integration
import (
"context"
"sync"
"time"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
"go.mongodb.org/mongo-driver/mongo/address"
"go.mongodb.org/mongo-driver/mongo/description"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
)
// Helper functions for the operations in the unified spec test runner that require assertions about SDAM and connection
// pool events.
var (
poolEventTypesMap = map[string]string{
"PoolClearedEvent": event.PoolCleared,
}
defaultCallbackTimeout = 10 * time.Second
)
// unifiedRunnerEventMonitor monitors connection pool-related events.
type unifiedRunnerEventMonitor struct {
poolEventCount map[string]int
poolEventCountLock sync.Mutex
sdamMonitor *event.ServerMonitor
serverMarkedUnknownCount int
serverMarkedUnknownCountLock sync.Mutex
}
func newUnifiedRunnerEventMonitor() *unifiedRunnerEventMonitor {
urem := unifiedRunnerEventMonitor{
poolEventCount: make(map[string]int),
}
urem.sdamMonitor = &event.ServerMonitor{
ServerDescriptionChanged: (func(e *event.ServerDescriptionChangedEvent) {
urem.serverMarkedUnknownCountLock.Lock()
defer urem.serverMarkedUnknownCountLock.Unlock()
// Spec tests only ever handle ServerMarkedUnknown ServerDescriptionChangedEvents
// for the time being.
if e.NewDescription.Kind == description.Unknown {
urem.serverMarkedUnknownCount++
}
}),
}
return &urem
}
// handlePoolEvent can be used as the event handler for a connection pool monitor.
func (u *unifiedRunnerEventMonitor) handlePoolEvent(evt *event.PoolEvent) {
u.poolEventCountLock.Lock()
defer u.poolEventCountLock.Unlock()
u.poolEventCount[evt.Type]++
}
// getPoolEventCount returns the number of pool events of the given type, or 0 if no events were recorded.
func (u *unifiedRunnerEventMonitor) getPoolEventCount(eventType string) int {
u.poolEventCountLock.Lock()
defer u.poolEventCountLock.Unlock()
mappedType := poolEventTypesMap[eventType]
return u.poolEventCount[mappedType]
}
// getServerMarkedUnknownEvent returns the number of ServerMarkedUnknownEvents, or 0 if none were recorded.
func (u *unifiedRunnerEventMonitor) getServerMarkedUnknownCount() int {
u.serverMarkedUnknownCountLock.Lock()
defer u.serverMarkedUnknownCountLock.Unlock()
return u.serverMarkedUnknownCount
}
func waitForEvent(mt *mtest.T, test *testCase, op *operation) {
eventType := op.Arguments.Lookup("event").StringValue()
expectedCount := int(op.Arguments.Lookup("count").Int32())
callback := func() {
for {
var count int
// Spec tests only ever wait for ServerMarkedUnknown SDAM events for the time being.
if eventType == "ServerMarkedUnknownEvent" {
count = test.monitor.getServerMarkedUnknownCount()
} else {
count = test.monitor.getPoolEventCount(eventType)
}
if count >= expectedCount {
return
}
time.Sleep(100 * time.Millisecond)
}
}
assert.Soon(mt, callback, defaultCallbackTimeout)
}
func assertEventCount(mt *mtest.T, testCase *testCase, op *operation) {
eventType := op.Arguments.Lookup("event").StringValue()
expectedCount := int(op.Arguments.Lookup("count").Int32())
var gotCount int
// Spec tests only ever assert ServerMarkedUnknown SDAM events for the time being.
if eventType == "ServerMarkedUnknownEvent" {
gotCount = testCase.monitor.getServerMarkedUnknownCount()
} else {
gotCount = testCase.monitor.getPoolEventCount(eventType)
}
assert.Equal(mt, expectedCount, gotCount, "expected count %d for event %s, got %d", expectedCount, eventType,
gotCount)
}
func recordPrimary(mt *mtest.T, testCase *testCase) {
testCase.recordedPrimary = getPrimaryAddress(mt, testCase.testTopology, true)
}
func waitForPrimaryChange(mt *mtest.T, testCase *testCase, op *operation) {
callback := func() {
for {
if getPrimaryAddress(mt, testCase.testTopology, false) != testCase.recordedPrimary {
return
}
}
}
timeout := convertValueToMilliseconds(mt, op.Arguments.Lookup("timeoutMS"))
assert.Soon(mt, callback, timeout)
}
// getPrimaryAddress returns the address of the current primary. If failFast is true, the server selection fast path
// is used and the function will fail if the fast path doesn't return a server.
func getPrimaryAddress(mt *mtest.T, topo *topology.Topology, failFast bool) address.Address {
mt.Helper()
ctx, cancel := context.WithCancel(mtest.Background)
defer cancel()
if failFast {
cancel()
}
primary, err := topo.SelectServer(ctx, description.ReadPrefSelector(readpref.Primary()))
assert.Nil(mt, err, "SelectServer error: %v", err)
return primary.(*topology.SelectedServer).Description().Addr
}
|