1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
# Copyright (C) 2017 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'mongo/collection/view/change_stream/retryable'
module Mongo
class Collection
class View
# Provides behaviour around a `$changeStream` pipeline stage in the
# aggregation framework. Specifying this stage allows users to request that
# notifications are sent for all changes to a particular collection or database.
#
# @note Only available in server versions 3.6 and higher.
# @note ChangeStreams do not work properly with JRuby because of the issue documented
# here: https://github.com/jruby/jruby/issues/4212
# Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread.
# So calling #next on the change stream will cause getmores to be called in a loop in the background.
#
#
# @since 2.5.0
class ChangeStream < Aggregation
include Retryable
# @return [ String ] The fullDocument option default value.
#
# @since 2.5.0
FULL_DOCUMENT_DEFAULT = 'default'.freeze
# @return [ BSON::Document ] The change stream options.
#
# @since 2.5.0
attr_reader :options
# Initialize the change stream for the provided collection view, pipeline
# and options.
#
# @example Create the new change stream view.
# ChangeStream.new(view, pipeline, options)
#
# @param [ Collection::View ] view The collection view.
# @param [ Array<Hash> ] pipeline The pipeline of operators to filter the change notifications.
# @param [ Hash ] options The change stream options.
#
# @option options [ String ] :full_document Allowed values: ‘default’, ‘updateLookup’. Defaults to ‘default’.
# When set to ‘updateLookup’, the change notification for partial updates will include both a delta
# describing the changes to the document, as well as a copy of the entire document that was changed
# from some time after the change occurred.
# @option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point for the
# new change stream.
# @option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to wait
# on new documents to satisfy a change stream query.
# @option options [ Integer ] :batch_size The number of documents to return per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
#
# @since 2.5.0
def initialize(view, pipeline, options = {})
@view = view
@change_stream_filters = pipeline && pipeline.dup
@options = options && options.dup.freeze
@resume_token = @options[:resume_after]
read_with_one_retry { create_cursor! }
end
# Iterate through documents returned by the change stream.
#
# @example Iterate through the stream of documents.
# stream.each do |document|
# p document
# end
#
# @return [ Enumerator ] The enumerator.
#
# @since 2.5.0
#
# @yieldparam [ BSON::Document ] Each change stream document.
def each
raise StopIteration.new if closed?
begin
@cursor.each do |doc|
cache_resume_token(doc)
yield doc
end if block_given?
@cursor.to_enum
rescue => e
close
if retryable?(e)
create_cursor!
retry
end
raise
end
end
# Close the change stream.
#
# @example Close the change stream.
# stream.close
#
# @return [ nil ] nil.
#
# @since 2.5.0
def close
unless closed?
begin; @cursor.send(:kill_cursors); rescue; end
@cursor = nil
end
end
# Is the change stream closed?
#
# @example Determine whether the change stream is closed.
# stream.closed?
#
# @return [ true, false ] If the change stream is closed.
#
# @since 2.5.0
def closed?
@cursor.nil?
end
# Get a formatted string for use in inspection.
#
# @example Inspect the change stream object.
# stream.inspect
#
# @return [ String ] The change stream inspection.
#
# @since 2.5.0
def inspect
"#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
"options=#{@options} resume_token=#{@resume_token}>"
end
private
def cache_resume_token(doc)
unless @resume_token = (doc[:_id] && doc[:_id].dup)
raise Error::MissingResumeToken.new
end
end
def create_cursor!
session = client.send(:get_session, @options)
server = server_selector.select_server(cluster, false)
result = send_initial_query(server, session)
@cursor = Cursor.new(view, result, server, disable_retry: true, session: session)
end
def pipeline
change_doc = { fullDocument: ( @options[:full_document] || FULL_DOCUMENT_DEFAULT ) }
change_doc[:resumeAfter] = @resume_token if @resume_token
[{ '$changeStream' => change_doc }] + @change_stream_filters
end
def send_initial_query(server, session)
initial_query_op(session).execute(server)
end
end
end
end
end
|