1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
|
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Gapic
##
# Manages requests for an input stream and holds the stream open until {#close} is called.
#
class StreamInput
##
# Create a new input stream object to manage streaming requests and hold the stream open until {#close} is called.
#
# @param requests [Object]
#
def initialize *requests
@queue = Queue.new
# Push initial requests into the queue
requests.each { |request| @queue.push request }
end
##
# Adds a request object to the stream.
#
# @param request [Object]
#
# @return [StreamInput] Returns self.
#
def push request
@queue.push request
self
end
alias << push
alias append push
##
# Closes the stream.
#
# @return [StreamInput] Returns self.
#
def close
@queue.push self
self
end
##
# @private
# Iterates the requests given to the stream.
#
# @yield [request] The block for accessing each request.
# @yieldparam [Object] request The request object.
#
# @return [Enumerator] An Enumerator is returned if no block is given.
#
def to_enum
return enum_for :to_enum unless block_given?
loop do
request = @queue.pop
break if request.equal? self
yield request
end
end
end
end
|