File: change-streams.yml

package info (click to toggle)
ruby-mongo 2.23.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,020 kB
  • sloc: ruby: 110,810; makefile: 5
file content (358 lines) | stat: -rw-r--r-- 13,723 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
description: "timeoutMS behaves correctly for change streams"

schemaVersion: "1.9"

runOnRequirements:
  - minServerVersion: "4.4"
    topologies: ["replicaset", "sharded"]

createEntities:
  - client:
      id: &failPointClient failPointClient
      useMultipleMongoses: false
  - client:
      id: &client client
      useMultipleMongoses: false
      observeEvents:
        - commandStartedEvent
      # Drivers are not required to execute killCursors during resume attempts, so it should be ignored for command
      # monitoring assertions.
      ignoreCommandMonitoringEvents: ["killCursors"]
  - database:
      id: &database database
      client: *client
      databaseName: &databaseName test
  - collection:
      id: &collection collection
      database: *database
      collectionName: &collectionName coll

initialData:
  - collectionName: *collectionName
    databaseName: *databaseName
    documents: []

tests:
  - description: "error if maxAwaitTimeMS is greater than timeoutMS"
    operations:
      - name: createChangeStream
        object: *collection
        arguments:
          pipeline: []
          timeoutMS: 5
          maxAwaitTimeMS: 10
        expectError:
          isClientError: true

  - description: "error if maxAwaitTimeMS is equal to timeoutMS"
    operations:
      - name: createChangeStream
        object: *collection
        arguments:
          pipeline: []
          timeoutMS: 5
          maxAwaitTimeMS: 5
        expectError:
          isClientError: true

  - description: "timeoutMS applied to initial aggregate"
    operations:
      - name: failPoint
        object: testRunner
        arguments:
          client: *failPointClient
          failPoint:
            configureFailPoint: failCommand
            mode: { times: 1 }
            data:
              failCommands: ["aggregate"]
              blockConnection: true
              blockTimeMS: 55
      - name: createChangeStream
        object: *collection
        arguments:
          pipeline: []
          timeoutMS: 50
        expectError:
          isTimeoutError: true
    expectEvents:
      - client: *client
        events:
          - commandStartedEvent:
              commandName: aggregate
              databaseName: *databaseName
              command:
                aggregate: *collectionName
                maxTimeMS: { $$type: ["int", "long"] }

  # If maxAwaitTimeMS is not set, timeoutMS should be refreshed for the getMore and the getMore should not have a
  # maxTimeMS field. This test requires a high timeout because the server applies a default 1000ms maxAwaitTime. To
  # ensure that the driver is refreshing the timeout between commands, the test blocks aggregate and getMore commands
  # for 30ms each and creates/iterates a change stream with timeoutMS=1050. The initial aggregate will block for 30ms
  # and the getMore will block for 1030ms.
  - description: "timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set"
    operations:
      - name: failPoint
        object: testRunner
        arguments:
          client: *failPointClient
          failPoint:
            configureFailPoint: failCommand
            mode: { times: 2 }
            data:
              failCommands: ["aggregate", "getMore"]
              blockConnection: true
              blockTimeMS: 30
      - name: createChangeStream
        object: *collection
        arguments:
          pipeline: []
          timeoutMS: 1050
        saveResultAsEntity: &changeStream changeStream
      - name: iterateOnce
        object: *changeStream
    expectEvents:
      - client: *client
        events:
          - commandStartedEvent:
              commandName: aggregate
              databaseName: *databaseName
              command:
                aggregate: *collectionName
                maxTimeMS: { $$type: ["int", "long"] }
          - commandStartedEvent:
              commandName: getMore
              databaseName: *databaseName
              command:
                getMore: { $$type: ["int", "long"] }
                collection: *collectionName
                maxTimeMS: { $$exists: false }

  # If maxAwaitTimeMS is set, timeoutMS should still be refreshed for the getMore and the getMore command should have a
  # maxTimeMS field.
  - description: "timeoutMS is refreshed for getMore if maxAwaitTimeMS is set"
    operations:
      - name: failPoint
        object: testRunner
        arguments:
          client: *failPointClient
          failPoint:
            configureFailPoint: failCommand
            mode: { times: 2 }
            data:
              failCommands: ["aggregate", "getMore"]
              blockConnection: true
              # was 15, changed to 30 to account for jruby driver latency.
              blockTimeMS: 30
      - name: createChangeStream
        object: *collection
        arguments:
          pipeline: []
          # was 20, changed to 29 to account for native ruby driver latency.
          # Changed again to 59 to account for additional jruby driver latency.
          # The idea for this test is that each operation is delayed by 15ms
          # (by failpoint). the timeout for each operation is set to (originally)
          # 20ms, because if timeoutMS was not refreshed for getMore, it would timeout.
          # However, we're tickling the 20ms timeout because the driver itself
          # is taking more than 5ms to do its thing.
          #
          # Changing the blockTimeMS in the failpoint to 30ms, and then bumping
          # the timeout to almost twice that (59ms) should give us the same
          # effect in the test.
          timeoutMS: 59
          batchSize: 2
          maxAwaitTimeMS: 1
        saveResultAsEntity: &changeStream changeStream
      - name: iterateOnce
        object: *changeStream
    expectEvents:
      - client: *client
        events:
          - commandStartedEvent:
              commandName: aggregate
              databaseName: *databaseName
              command:
                aggregate: *collectionName
                maxTimeMS: { $$type: ["int", "long"] }
          - commandStartedEvent:
              commandName: getMore
              databaseName: *databaseName
              command:
                getMore: { $$type: ["int", "long"] }
                collection: *collectionName
                maxTimeMS: 1

  # The timeout should be applied to the entire resume attempt, not individually to each command. The test creates a
  # change stream with timeoutMS=20 which returns an empty initial batch and then sets a fail point to block both
  # getMore and aggregate for 12ms each and fail with a resumable error. When the resume attempt happens, the getMore
  # and aggregate block for longer than 20ms total, so it times out.
  - description: "timeoutMS applies to full resume attempt in a next call"
    operations:
      - name: createChangeStream
        object: *collection
        arguments:
          pipeline: []
          # Originally set to 20, but the Ruby driver was too-often taking
          # that much time, and causing the timing of the test to fail. Instead,
          # bumped the timout to 23ms, which is just less than twice the
          # blockTimeMS for the failpoint. It still failed on jruby, so the
          # timeout (and blockTimeMS) were drastically increased to accomodate
          # JRuby. This tests the same thing, but gives the driver a bit more
          # breathing space.
          timeoutMS: 99
        saveResultAsEntity: &changeStream changeStream
      - name: failPoint
        object: testRunner
        arguments:
          client: *failPointClient
          failPoint:
            configureFailPoint: failCommand
            mode: { times: 2 }
            data:
              failCommands: ["getMore", "aggregate"]
              blockConnection: true
              # Originally 12, bumped it to 50 to give the jruby driver a bit
              # more breathing room.
              blockTimeMS: 50
              errorCode: 7 # HostNotFound - resumable but does not require an SDAM state change.
              # failCommand doesn't correctly add the ResumableChangeStreamError by default. It needs to be specified
              # manually here so the error is considered resumable. The failGetMoreAfterCursorCheckout fail point
              # would add the label without this, but it does not support blockConnection functionality.
              errorLabels: ["ResumableChangeStreamError"]
      - name: iterateUntilDocumentOrError
        object: *changeStream
        expectError:
          isTimeoutError: true
    expectEvents:
      - client: *client
        events:
          - commandStartedEvent:
              commandName: aggregate
              databaseName: *databaseName
              command:
                aggregate: *collectionName
                maxTimeMS: { $$type: ["int", "long"] }
          - commandStartedEvent:
              commandName: getMore
              databaseName: *databaseName
              command:
                getMore: { $$type: ["int", "long"] }
                collection: *collectionName
                maxTimeMS: { $$exists: false }
          - commandStartedEvent:
              commandName: aggregate
              databaseName: *databaseName
              command:
                aggregate: *collectionName
                maxTimeMS: { $$type: ["int", "long"] }

  - description: "change stream can be iterated again if previous iteration times out"
    operations:
      - name: createChangeStream
        object: *collection
        arguments:
          pipeline: []
          # Specify a short maxAwaitTimeMS because otherwise the getMore on the new cursor will wait for 1000ms and
          # time out.
          maxAwaitTimeMS: 1
          timeoutMS: 100
        saveResultAsEntity: &changeStream changeStream
      # Block getMore for 150ms to force the next() call to time out.
      - name: failPoint
        object: testRunner
        arguments:
          client: *failPointClient
          failPoint:
            configureFailPoint: failCommand
            mode: { times: 1 }
            data:
              failCommands: ["getMore"]
              blockConnection: true
              blockTimeMS: 150
      # The original aggregate didn't return any events so this should do a getMore and return a timeout error.
      - name: iterateUntilDocumentOrError
        object: *changeStream
        expectError:
          isTimeoutError: true
      # The previous iteration attempt timed out so this should re-create the change stream. We use iterateOnce rather
      # than iterateUntilDocumentOrError because there haven't been any events and we only want to assert that the
      # cursor was re-created.
      - name: iterateOnce
        object: *changeStream
    expectEvents:
      - client: *client
        events:
          - commandStartedEvent:
              commandName: aggregate
              databaseName: *databaseName
              command:
                aggregate: *collectionName
                maxTimeMS: { $$type: ["int", "long"] }
          # The iterateUntilDocumentOrError operation should send a getMore.
          - commandStartedEvent:
              commandName: getMore
              databaseName: *databaseName
              command:
                getMore: { $$type: ["int", "long"] }
                collection: *collectionName
          # The iterateOnce operation should re-create the cursor via an aggregate and then send a getMore to iterate
          # the new cursor.
          - commandStartedEvent:
              commandName: aggregate
              databaseName: *databaseName
              command:
                aggregate: *collectionName
                maxTimeMS: { $$type: ["int", "long"] }
          - commandStartedEvent:
              commandName: getMore
              databaseName: *databaseName
              command:
                getMore: { $$type: ["int", "long"] }
                collection: *collectionName

  # The timeoutMS value should be refreshed for getMore's. This is a failure test. The createChangeStream operation
  # sets timeoutMS=10 and the getMore blocks for 15ms, causing iteration to fail with a timeout error.
  - description: "timeoutMS is refreshed for getMore - failure"
    operations:
      - name: failPoint
        object: testRunner
        arguments:
          client: *failPointClient
          failPoint:
            configureFailPoint: failCommand
            mode: { times: 1 }
            data:
              failCommands: ["getMore"]
              blockConnection: true
              # blockTimeMS: 15
              # Increase timeout
              blockTimeMS: 30
      - name: createChangeStream
        object: *collection
        arguments:
          pipeline: []
          # timeoutMS: 10
          # Increase timeout
          timeoutMS: 20
        saveResultAsEntity: &changeStream changeStream
      # The first iteration should do a getMore
      - name: iterateUntilDocumentOrError
        object: *changeStream
        expectError:
          isTimeoutError: true
    expectEvents:
      - client: *client
        events:
          - commandStartedEvent:
              commandName: aggregate
              databaseName: *databaseName
              command:
                aggregate: *collectionName
                maxTimeMS: { $$type: ["int", "long"] }
          # The iterateUntilDocumentOrError operation should send a getMore.
          - commandStartedEvent:
              commandName: getMore
              databaseName: *databaseName
              command:
                getMore: { $$type: ["int", "long"] }
                collection: *collectionName