File: causal_consistency_test.go

package info (click to toggle)
golang-mongodb-mongo-driver 1.8.1%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 18,500 kB
  • sloc: perl: 533; ansic: 491; python: 432; makefile: 187; sh: 72
file content (287 lines) | stat: -rw-r--r-- 11,416 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package integration

import (
	"testing"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/primitive"
	"go.mongodb.org/mongo-driver/internal/testutil/assert"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
	"go.mongodb.org/mongo-driver/mongo/options"
	"go.mongodb.org/mongo-driver/mongo/readconcern"
	"go.mongodb.org/mongo-driver/mongo/writeconcern"
)

// set of operations that support read concerns taken from read/write concern spec.
// the spec also lists "count" but that has been deprecated and removed from the driver.
var readConcernOperations = map[string]struct{}{
	"Aggregate": {},
	"Distinct":  {},
	"Find":      {},
}

func TestCausalConsistency_Supported(t *testing.T) {
	mt := mtest.New(t, mtest.NewOptions().MinServerVersion("3.6").Topologies(mtest.ReplicaSet, mtest.Sharded).CreateClient(false))
	defer mt.Close()

	mt.Run("operation time nil", func(mt *mtest.T) {
		// when a ClientSession is first created, the operation time is nil

		sess, err := mt.Client.StartSession()
		assert.Nil(mt, err, "StartSession error: %v", err)
		defer sess.EndSession(mtest.Background)
		assert.Nil(mt, sess.OperationTime(), "expected nil operation time, got %v", sess.OperationTime())
	})
	mt.Run("no cluster time on first command", func(mt *mtest.T) {
		// first read in a causally consistent session must not send afterClusterTime to the server

		ccOpts := options.Session().SetCausalConsistency(true)
		_ = mt.Client.UseSessionWithOptions(mtest.Background, ccOpts, func(sc mongo.SessionContext) error {
			_, _ = mt.Coll.Find(sc, bson.D{})
			return nil
		})

		evt := mt.GetStartedEvent()
		assert.Equal(mt, "find", evt.CommandName, "expected 'find' event, got '%v'", evt.CommandName)
		checkOperationTime(mt, evt.Command, false)
	})
	mt.Run("operation time updated", func(mt *mtest.T) {
		// first read or write on a ClientSession should update the operationTime of the session, even if there is an error

		sess, err := mt.Client.StartSession()
		assert.Nil(mt, err, "StartSession error: %v", err)
		defer sess.EndSession(mtest.Background)

		_ = mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
			_, _ = mt.Coll.Find(sc, bson.D{})
			return nil
		})

		evt := mt.GetSucceededEvent()
		assert.Equal(mt, "find", evt.CommandName, "expected 'find' event, got '%v'", evt.CommandName)
		serverT, serverI := evt.Reply.Lookup("operationTime").Timestamp()
		serverTs := &primitive.Timestamp{serverT, serverI}
		sessionTs := sess.OperationTime()
		assert.NotNil(mt, sessionTs, "expected session operation time, got nil")
		assert.True(mt, serverTs.Equal(*sessionTs), "expected operation time %v, got %v", serverTs, sessionTs)
	})
	mt.RunOpts("operation time sent", noClientOpts, func(mt *mtest.T) {
		// findOne followed by another read operation should include operationTime returned by server for the first
		// operation as the afterClusterTime field of the second operation

		for _, sf := range createFunctionsSlice() {
			// skip write operations
			if _, ok := readConcernOperations[sf.fnName]; !ok {
				continue
			}

			mt.Run(sf.name, func(mt *mtest.T) {
				sess, err := mt.Client.StartSession()
				assert.Nil(mt, err, "StartSession error: %v", err)
				defer sess.EndSession(mtest.Background)

				_ = mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
					_ = mt.Coll.FindOne(sc, bson.D{})
					return nil
				})
				currOptime := sess.OperationTime()
				assert.NotNil(mt, currOptime, "expected session operation time, got nil")

				mt.ClearEvents()
				_ = sf.execute(mt, sess)
				_, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
				assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
				assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
			})
		}
	})
	mt.RunOpts("write then read", noClientOpts, func(mt *mtest.T) {
		// any write operation followed by a findOne should include operationTime of the first operation as afterClusterTime
		// in the second operation

		for _, sf := range createFunctionsSlice() {
			// skip read operations
			if _, ok := readConcernOperations[sf.fnName]; ok {
				continue
			}

			mt.Run(sf.name, func(mt *mtest.T) {
				sess, err := mt.Client.StartSession()
				assert.Nil(mt, err, "StartSession error: %v", err)
				defer sess.EndSession(mtest.Background)

				_ = sf.execute(mt, sess)
				currOptime := sess.OperationTime()
				assert.NotNil(mt, currOptime, "expected session operation time, got nil")

				mt.ClearEvents()
				_ = mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
					_ = mt.Coll.FindOne(sc, bson.D{})
					return nil
				})
				_, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
				assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
				assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
			})
		}
	})
	mt.Run("non-consistent read", func(mt *mtest.T) {
		// a read operation in a non causally-consistent session should not include afterClusterTime

		sessOpts := options.Session().SetCausalConsistency(false)
		_ = mt.Client.UseSessionWithOptions(mtest.Background, sessOpts, func(sc mongo.SessionContext) error {
			_, _ = mt.Coll.Find(sc, bson.D{})
			mt.ClearEvents()
			_, _ = mt.Coll.Find(sc, bson.D{})
			return nil
		})
		evt := mt.GetStartedEvent()
		assert.Equal(mt, "find", evt.CommandName, "expected 'find' command, got '%v'", evt.CommandName)
		checkOperationTime(mt, evt.Command, false)
	})
	mt.Run("default read concern", func(mt *mtest.T) {
		// when using the deafult server read concern, the readConcern parameter in the command sent to the server should
		// not include a level field

		sess, err := mt.Client.StartSession()
		assert.Nil(mt, err, "StartSession error: %v", err)
		defer sess.EndSession(mtest.Background)

		_ = mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
			_ = mt.Coll.FindOne(sc, bson.D{})
			return nil
		})
		currOptime := sess.OperationTime()
		mt.ClearEvents()
		_ = mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
			_ = mt.Coll.FindOne(sc, bson.D{})
			return nil
		})

		level, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
		assert.Equal(mt, "", level, "expected command to not have read concern level, got %s", level)
		assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
		assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
	})
	localRcOpts := options.Client().SetReadConcern(readconcern.Local())
	mt.RunOpts("custom read concern", mtest.NewOptions().ClientOptions(localRcOpts), func(mt *mtest.T) {
		sess, err := mt.Client.StartSession()
		assert.Nil(mt, err, "StartSession error: %v", err)
		defer sess.EndSession(mtest.Background)

		_ = mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
			_ = mt.Coll.FindOne(sc, bson.D{})
			return nil
		})
		currOptime := sess.OperationTime()
		mt.ClearEvents()
		_ = mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
			_ = mt.Coll.FindOne(sc, bson.D{})
			return nil
		})

		level, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
		assert.Equal(mt, "local", level, "expected read concern level 'local', got %s", level)
		assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
		assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
	})
	unackWcOpts := options.Collection().SetWriteConcern(writeconcern.New(writeconcern.W(0)))
	mt.RunOpts("unacknowledged write", mtest.NewOptions().CollectionOptions(unackWcOpts), func(mt *mtest.T) {
		// unacknowledged write should not update the operationTime of a session

		sess, err := mt.Client.StartSession()
		assert.Nil(mt, err, "StartSession error: %v", err)
		defer sess.EndSession(mtest.Background)

		_ = mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
			_, _ = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
			return nil
		})
		optime := sess.OperationTime()
		assert.Nil(mt, optime, "expected operation time nil, got %v", optime)
	})
	mt.Run("clusterTime included", func(mt *mtest.T) {
		// $clusterTime should be included in commands if the deployment supports cluster times

		_ = mt.Coll.FindOne(mtest.Background, bson.D{})
		evt := mt.GetStartedEvent()
		assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
		_, err := evt.Command.LookupErr("$clusterTime")
		assert.Nil(mt, err, "expected $clusterTime in command, got nil")
	})
}

func TestCausalConsistency_NotSupported(t *testing.T) {
	// use RunOnBlock instead of mtest.NewOptions().MaxServerVersion("3.4").Topologies(mtest.Single) because
	// these tests should be run on servers <= 3.4 OR standalones
	rob := []mtest.RunOnBlock{
		{MaxServerVersion: "3.4"},
		{Topology: []mtest.TopologyKind{mtest.Single}},
	}
	mt := mtest.New(t, mtest.NewOptions().RunOn(rob...).CreateClient(false))
	defer mt.Close()

	mt.Run("afterClusterTime not included", func(mt *mtest.T) {
		// a read in a causally consistent session does not include afterClusterTime in a deployment that does not
		// support cluster times

		sessOpts := options.Session().SetCausalConsistency(true)
		_ = mt.Client.UseSessionWithOptions(mtest.Background, sessOpts, func(sc mongo.SessionContext) error {
			_, _ = mt.Coll.Find(sc, bson.D{})
			return nil
		})

		evt := mt.GetStartedEvent()
		assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
		checkOperationTime(mt, evt.Command, false)
	})
	mt.Run("clusterTime not included", func(mt *mtest.T) {
		// $clusterTime should not be included in commands if the deployment does not support cluster times

		_ = mt.Coll.FindOne(mtest.Background, bson.D{})
		evt := mt.GetStartedEvent()
		assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
		_, err := evt.Command.LookupErr("$clusterTime")
		assert.NotNil(mt, err, "expected $clusterTime to not be sent, but was")
	})
}

func checkOperationTime(mt *mtest.T, cmd bson.Raw, shouldInclude bool) {
	mt.Helper()

	_, optime := getReadConcernFields(mt, cmd)
	if shouldInclude {
		assert.NotNil(mt, optime, "expected operation time, got nil")
		return
	}
	assert.Nil(mt, optime, "did not expect operation time, got %v", optime)
}

func getReadConcernFields(mt *mtest.T, cmd bson.Raw) (string, *primitive.Timestamp) {
	mt.Helper()

	rc, err := cmd.LookupErr("readConcern")
	if err != nil {
		return "", nil
	}
	rcDoc := rc.Document()

	var level string
	var clusterTime *primitive.Timestamp

	if levelVal, err := rcDoc.LookupErr("level"); err == nil {
		level = levelVal.StringValue()
	}
	if ctVal, err := rcDoc.LookupErr("afterClusterTime"); err == nil {
		t, i := ctVal.Timestamp()
		clusterTime = &primitive.Timestamp{t, i}
	}
	return level, clusterTime
}