File: sdam_prose_test.go

package info (click to toggle)
golang-mongodb-mongo-driver 1.8.4%2Bds1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-backports
  • size: 18,520 kB
  • sloc: perl: 533; ansic: 491; python: 432; makefile: 187; sh: 72
file content (133 lines) | stat: -rw-r--r-- 5,244 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package integration

import (
	"runtime"
	"testing"
	"time"

	"go.mongodb.org/mongo-driver/internal"
	"go.mongodb.org/mongo-driver/internal/testutil/assert"
	"go.mongodb.org/mongo-driver/mongo/description"
	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
	"go.mongodb.org/mongo-driver/mongo/options"
)

func TestSDAMProse(t *testing.T) {
	mt := mtest.New(t)
	defer mt.Close()

	// Server limits non-streaming heartbeats and explicit server transition checks to at most one
	// per 500ms. Set the test interval to 500ms to minimize the difference between the behavior of
	// streaming and non-streaming heartbeat intervals.
	heartbeatInterval := 500 * time.Millisecond
	heartbeatIntervalClientOpts := options.Client().
		SetHeartbeatInterval(heartbeatInterval)
	heartbeatIntervalMtOpts := mtest.NewOptions().
		ClientOptions(heartbeatIntervalClientOpts).
		CreateCollection(false).
		ClientType(mtest.Proxy)
	mt.RunOpts("heartbeats processed more frequently", heartbeatIntervalMtOpts, func(mt *mtest.T) {
		// Test that setting heartbeat interval to 500ms causes the client to process heartbeats
		// approximately every 500ms instead of the default 10s. Note that a Client doesn't
		// guarantee that it will process heartbeats exactly every 500ms, just that it will wait at
		// least 500ms between heartbeats (and should process heartbeats more frequently for shorter
		// interval settings).
		//
		// For number of nodes N, interval I, and duration D, a Client should process at most X
		// operations:
		//
		//   X = (N * (2 handshakes + D/I heartbeats + D/I RTTs))
		//
		// Assert that a Client processes the expected number of operations for heartbeats sent at
		// an interval between I and 2*I to account for different actual heartbeat intervals under
		// different runtime conditions.

		// Measure the actual amount of time between the start of the test and when we inspect the
		// sent messages. The sleep duration will be at least the specified duration but
		// possibly longer, which could lead to extra heartbeat messages, so account for that in
		// the assertions.
		start := time.Now()
		time.Sleep(2 * time.Second)
		messages := mt.GetProxiedMessages()
		duration := time.Since(start)

		numNodes := len(options.Client().ApplyURI(mtest.ClusterURI()).Hosts)
		maxExpected := numNodes * (2 + 2*int(duration/heartbeatInterval))
		minExpected := numNodes * (2 + 2*int(duration/(heartbeatInterval*2)))

		assert.True(
			mt,
			len(messages) >= minExpected && len(messages) <= maxExpected,
			"expected number of messages to be in range [%d, %d], got %d"+
				" (num nodes = %d, duration = %v, interval = %v)",
			minExpected,
			maxExpected,
			len(messages),
			numNodes,
			duration,
			heartbeatInterval)
	})

	mt.RunOpts("rtt tests", noClientOpts, func(mt *mtest.T) {
		clientOpts := options.Client().
			SetHeartbeatInterval(500 * time.Millisecond).
			SetAppName("streamingRttTest")
		mtOpts := mtest.NewOptions().
			MinServerVersion("4.4").
			ClientOptions(clientOpts)
		mt.RunOpts("rtt is continuously updated", mtOpts, func(mt *mtest.T) {
			// Test that the RTT monitor updates the RTT for server descriptions.

			// The server has been discovered by the create command issued by mtest. Sleep for two seconds to allow
			// multiple heartbeats to finish.
			testTopology := getTopologyFromClient(mt.Client)
			time.Sleep(2 * time.Second)
			for _, serverDesc := range testTopology.Description().Servers {
				assert.NotEqual(mt, description.Unknown, serverDesc.Kind, "server %v is Unknown", serverDesc)
				assert.True(mt, serverDesc.AverageRTTSet, "AverageRTTSet for server description %v is false", serverDesc)

				if runtime.GOOS != "windows" {
					// Windows has a lower time resolution than other platforms, which causes the reported RTT to be
					// 0 if it's below some threshold. The assertion above already confirms that the RTT is set to
					// a value, so we can skip this assertion on Windows.
					assert.True(mt, serverDesc.AverageRTT > 0, "server description %v has 0 RTT", serverDesc)
				}
			}

			// Force hello requests to block for 500ms and wait until a server's average RTT goes over 250ms.
			mt.SetFailPoint(mtest.FailPoint{
				ConfigureFailPoint: "failCommand",
				Mode: mtest.FailPointMode{
					Times: 1000,
				},
				Data: mtest.FailPointData{
					FailCommands:    []string{internal.LegacyHello, "hello"},
					BlockConnection: true,
					BlockTimeMS:     500,
					AppName:         "streamingRttTest",
				},
			})
			callback := func() {
				for {
					// We don't know which server received the failpoint command, so we wait until any of the server
					// RTTs cross the threshold.
					for _, serverDesc := range testTopology.Description().Servers {
						if serverDesc.AverageRTT > 250*time.Millisecond {
							return
						}
					}

					// The next update will be in ~500ms.
					time.Sleep(500 * time.Millisecond)
				}
			}
			assert.Soon(t, callback, defaultCallbackTimeout)
		})
	})
}