File: change_stream.rb

package info (click to toggle)
ruby-mongo 2.5.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,332 kB
  • sloc: ruby: 45,579; makefile: 5
file content (173 lines) | stat: -rw-r--r-- 6,243 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# Copyright (C) 2017 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/collection/view/change_stream/retryable'

module Mongo
  class Collection
    class View

      # Provides behaviour around a `$changeStream` pipeline stage in the
      # aggregation framework. Specifying this stage allows users to request that
      # notifications are sent for all changes to a particular collection or database.
      #
      # @note Only available in server versions 3.6 and higher.
      # @note ChangeStreams do not work properly with JRuby because of the issue documented
      #  here: https://github.com/jruby/jruby/issues/4212
      #  Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread.
      #  So calling #next on the change stream will cause getmores to be called in a loop in the background.
      #
      #
      # @since 2.5.0
      class ChangeStream < Aggregation
        include Retryable

        # @return [ String ] The fullDocument option default value.
        #
        # @since 2.5.0
        FULL_DOCUMENT_DEFAULT = 'default'.freeze

        # @return [ BSON::Document ] The change stream options.
        #
        # @since 2.5.0
        attr_reader :options

        # Initialize the change stream for the provided collection view, pipeline
        # and options.
        #
        # @example Create the new change stream view.
        #   ChangeStream.new(view, pipeline, options)
        #
        # @param [ Collection::View ] view The collection view.
        # @param [ Array<Hash> ] pipeline The pipeline of operators to filter the change notifications.
        # @param [ Hash ] options The change stream options.
        #
        # @option options [ String ] :full_document Allowed values: ‘default’, ‘updateLookup’. Defaults to ‘default’.
        #   When set to ‘updateLookup’, the change notification for partial updates will include both a delta
        #   describing the changes to the document, as well as a copy of the entire document that was changed
        #   from some time after the change occurred.
        # @option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point for the
        #   new change stream.
        # @option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to wait
        #   on new documents to satisfy a change stream query.
        # @option options [ Integer ] :batch_size The number of documents to return per batch.
        # @option options [ BSON::Document, Hash ] :collation The collation to use.
        #
        # @since 2.5.0
        def initialize(view, pipeline, options = {})
          @view = view
          @change_stream_filters = pipeline && pipeline.dup
          @options = options && options.dup.freeze
          @resume_token = @options[:resume_after]
          read_with_one_retry { create_cursor! }
        end

        # Iterate through documents returned by the change stream.
        #
        # @example Iterate through the stream of documents.
        #   stream.each do |document|
        #     p document
        #   end
        #
        # @return [ Enumerator ] The enumerator.
        #
        # @since 2.5.0
        #
        # @yieldparam [ BSON::Document ] Each change stream document.
        def each
          raise StopIteration.new if closed?
          begin
            @cursor.each do |doc|
              cache_resume_token(doc)
              yield doc
            end if block_given?
            @cursor.to_enum
          rescue => e
            close
            if retryable?(e)
              create_cursor!
              retry
            end
            raise
          end
        end

        # Close the change stream.
        #
        # @example Close the change stream.
        #   stream.close
        #
        # @return [ nil ] nil.
        #
        # @since 2.5.0
        def close
          unless closed?
            begin; @cursor.send(:kill_cursors); rescue; end
            @cursor = nil
          end
        end

        # Is the change stream closed?
        #
        # @example Determine whether the change stream is closed.
        #   stream.closed?
        #
        # @return [ true, false ] If the change stream is closed.
        #
        # @since 2.5.0
        def closed?
          @cursor.nil?
        end

        # Get a formatted string for use in inspection.
        #
        # @example Inspect the change stream object.
        #   stream.inspect
        #
        # @return [ String ] The change stream inspection.
        #
        # @since 2.5.0
        def inspect
          "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
            "options=#{@options} resume_token=#{@resume_token}>"
        end

        private

        def cache_resume_token(doc)
          unless @resume_token = (doc[:_id] && doc[:_id].dup)
            raise Error::MissingResumeToken.new
          end
        end

        def create_cursor!
          session = client.send(:get_session, @options)
          server = server_selector.select_server(cluster, false)
          result = send_initial_query(server, session)
          @cursor = Cursor.new(view, result, server, disable_retry: true, session: session)
        end

        def pipeline
          change_doc = { fullDocument: ( @options[:full_document] || FULL_DOCUMENT_DEFAULT ) }
          change_doc[:resumeAfter] = @resume_token if @resume_token
          [{ '$changeStream' => change_doc }] + @change_stream_filters
        end

        def send_initial_query(server, session)
          initial_query_op(session).execute(server)
        end
      end
    end
  end
end