1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package integration
import (
"sync"
"testing"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
errorNotPrimary int32 = 10107
errorShutdownInProgress int32 = 91
errorInterruptedAtShutdown int32 = 11600
)
// testPoolMonitor exposes an *event.PoolMonitor and collects all events logged to that
// *event.PoolMonitor. It is safe to use from multiple concurrent goroutines.
type testPoolMonitor struct {
*event.PoolMonitor
events []*event.PoolEvent
mu sync.RWMutex
}
func newTestPoolMonitor() *testPoolMonitor {
tpm := &testPoolMonitor{
events: make([]*event.PoolEvent, 0),
}
tpm.PoolMonitor = &event.PoolMonitor{
Event: func(evt *event.PoolEvent) {
tpm.mu.Lock()
defer tpm.mu.Unlock()
tpm.events = append(tpm.events, evt)
},
}
return tpm
}
// Events returns a copy of the events collected by the testPoolMonitor. Filters can optionally be
// applied to the returned events set and are applied using AND logic (i.e. all filters must return
// true to include the event in the result).
func (tpm *testPoolMonitor) Events(filters ...func(*event.PoolEvent) bool) []*event.PoolEvent {
filtered := make([]*event.PoolEvent, 0, len(tpm.events))
tpm.mu.RLock()
defer tpm.mu.RUnlock()
for _, evt := range tpm.events {
keep := true
for _, filter := range filters {
if !filter(evt) {
keep = false
break
}
}
if keep {
filtered = append(filtered, evt)
}
}
return filtered
}
// IsPoolCleared returns true if there are any events of type "event.PoolCleared" in the events
// recorded by the testPoolMonitor.
func (tpm *testPoolMonitor) IsPoolCleared() bool {
poolClearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
return evt.Type == event.PoolCleared
})
return len(poolClearedEvents) > 0
}
var poolChan = make(chan *event.PoolEvent, 100)
// TODO(GODRIVER-2068): Replace all uses of poolMonitor with individual instances of testPoolMonitor.
var poolMonitor = &event.PoolMonitor{
Event: func(event *event.PoolEvent) {
poolChan <- event
},
}
func isPoolCleared() bool {
for len(poolChan) > 0 {
curr := <-poolChan
if curr.Type == event.PoolCleared {
return true
}
}
return false
}
func clearPoolChan() {
for len(poolChan) > 0 {
<-poolChan
}
}
func TestConnectionsSurvivePrimaryStepDown(t *testing.T) {
mt := mtest.New(t, mtest.NewOptions().Topologies(mtest.ReplicaSet).CreateClient(false))
defer mt.Close()
clientOpts := options.Client().
ApplyURI(mtest.ClusterURI()).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor)
getMoreOpts := mtest.NewOptions().MinServerVersion("4.2").ClientOptions(clientOpts)
mt.RunOpts("getMore iteration", getMoreOpts, func(mt *mtest.T) {
clearPoolChan()
initCollection(mt, mt.Coll)
cur, err := mt.Coll.Find(mtest.Background, bson.D{}, options.Find().SetBatchSize(2))
assert.Nil(mt, err, "Find error: %v", err)
defer cur.Close(mtest.Background)
assert.True(mt, cur.Next(mtest.Background), "expected Next true, got false")
// replSetStepDown can fail with transient errors, so we use executeAdminCommandWithRetry to handle them and
// retry until a timeout is hit.
stepDownCmd := bson.D{
{"replSetStepDown", 5},
{"force", true},
}
stepDownOpts := options.RunCmd().SetReadPreference(mtest.PrimaryRp)
executeAdminCommandWithRetry(mt, mt.Client, stepDownCmd, stepDownOpts)
assert.True(mt, cur.Next(mtest.Background), "expected Next true, got false")
assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
})
mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
// Use a low heartbeat frequency so the Client will quickly recover when using failpoints that cause SDAM state
// changes.
clientOpts.SetHeartbeatInterval(defaultHeartbeatInterval)
testCases := []struct {
name string
minVersion, maxVersion string
errCode int32
poolCleared bool
}{
{"notPrimary keep pool", "4.2", "", errorNotPrimary, false},
{"notPrimary reset pool", "4.0", "4.0", errorNotPrimary, true},
{"shutdown in progress reset pool", "4.0", "", errorShutdownInProgress, true},
{"interrupted at shutdown reset pool", "4.0", "", errorInterruptedAtShutdown, true},
}
for _, tc := range testCases {
tcOpts := mtest.NewOptions().ClientOptions(clientOpts)
if tc.minVersion != "" {
tcOpts.MinServerVersion(tc.minVersion)
}
if tc.maxVersion != "" {
tcOpts.MaxServerVersion(tc.maxVersion)
}
mt.RunOpts(tc.name, tcOpts, func(mt *mtest.T) {
clearPoolChan()
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
ErrorCode: tc.errCode,
},
})
_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"test", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
cerr, ok := err.(mongo.CommandError)
assert.True(mt, ok, "expected error type %v, got %v", mongo.CommandError{}, err)
assert.Equal(mt, tc.errCode, cerr.Code, "expected error code %v, got %v", tc.errCode, cerr.Code)
if tc.poolCleared {
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
return
}
// if pool shouldn't be cleared, another operation should succeed
_, err = mt.Coll.InsertOne(mtest.Background, bson.D{{"test", 1}})
assert.Nil(mt, err, "InsertOne error: %v", err)
assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
})
}
})
}
|