1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package integration
import (
"context"
"fmt"
"strings"
"testing"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/require"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.mongodb.org/mongo-driver/mongo/options"
)
func TestLoadBalancerSupport(t *testing.T) {
mt := mtest.New(t, mtest.NewOptions().Topologies(mtest.LoadBalanced).CreateClient(false))
mt.Run("RunCommandCursor pins to a connection", func(mt *mtest.T) {
// The LB spec tests cover the behavior for cursors created by CRUD operations, but RunCommandCursor is
// Go-specific so there is no spec test coverage for it.
initCollection(mt, mt.Coll)
findCmd := bson.D{
{"find", mt.Coll.Name()},
{"filter", bson.D{}},
{"batchSize", 2},
}
cursor, err := mt.DB.RunCommandCursor(context.Background(), findCmd)
assert.Nil(mt, err, "RunCommandCursor error: %v", err)
defer func() {
_ = cursor.Close(context.Background())
}()
assert.True(mt, cursor.ID() > 0, "expected cursor ID to be non-zero")
assert.Equal(mt, 1, mt.NumberConnectionsCheckedOut(),
"expected one connection to be checked out, got %d", mt.NumberConnectionsCheckedOut())
})
mt.RunOpts("wait queue timeout errors include extra information", noClientOpts, func(mt *mtest.T) {
// There are spec tests to assert this behavior, but they rely on the waitQueueTimeoutMS Client option, which is
// not supported in Go, so we have to skip them. These prose tests make the same assertions, but use context
// deadlines to force wait queue timeout errors.
assertErrorHasInfo := func(mt *mtest.T, err error, numCursorConns, numTxnConns, numOtherConns int) {
mt.Helper()
assert.NotNil(mt, err, "expected wait queue timeout error, got nil")
expectedMsg := fmt.Sprintf("maxPoolSize: 1, "+
"connections in use by cursors: %d, "+
"connections in use by transactions: %d, "+
"connections in use by other operations: %d",
numCursorConns, numTxnConns, numOtherConns,
)
assert.True(mt, strings.Contains(err.Error(), expectedMsg),
"expected error %q to contain substring %q", err, expectedMsg)
}
maxPoolSizeMtOpts := mtest.NewOptions().
ClientOptions(options.Client().SetMaxPoolSize(1))
mt.RunOpts("cursors", maxPoolSizeMtOpts, func(mt *mtest.T) {
initCollection(mt, mt.Coll)
findOpts := options.Find().SetBatchSize(2)
cursor, err := mt.Coll.Find(context.Background(), bson.M{}, findOpts)
assert.Nil(mt, err, "Find error: %v", err)
defer func() {
_ = cursor.Close(context.Background())
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
assertErrorHasInfo(mt, err, 1, 0, 0)
})
mt.RunOpts("transactions", maxPoolSizeMtOpts, func(mt *mtest.T) {
{
sess, err := mt.Client.StartSession()
assert.Nil(mt, err, "StartSession error: %v", err)
defer sess.EndSession(context.Background())
ctx := mongo.NewSessionContext(context.Background(), sess)
// Start a transaction and perform one transactional operation to pin a connection.
err = sess.StartTransaction()
assert.Nil(mt, err, "StartTransaction error: %v", err)
_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
assert.Nil(mt, err, "InsertOne error: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
_, err := mt.Coll.InsertOne(ctx, bson.M{"x": 1})
assertErrorHasInfo(mt, err, 0, 1, 0)
})
// GODRIVER-2867: Test that connections are unpinned from transactions
// when the transaction session is ended. Create a Client with
// maxPoolSize=1 and expect that it can start and commit 5 transactions
// with that 1 connection.
mt.RunOpts("transaction connections are unpinned", maxPoolSizeMtOpts, func(mt *mtest.T) {
{
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
for i := 0; i < 5; i++ {
sess, err := mt.Client.StartSession()
require.NoError(mt, err, "StartSession error")
err = sess.StartTransaction()
require.NoError(mt, err, "StartTransaction error")
ctx := mongo.NewSessionContext(ctx, sess)
_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
assert.NoError(mt, err, "InsertOne error")
err = sess.CommitTransaction(ctx)
assert.NoError(mt, err, "CommitTransaction error")
sess.EndSession(ctx)
}
}
})
})
}
|