File: server_stream.rb

package info (click to toggle)
ruby-gapic-common 1.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 392 kB
  • sloc: ruby: 2,081; makefile: 4
file content (101 lines) | stat: -rw-r--r-- 3,443 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "json"

module Gapic
  module Rest
    ##
    # A class to provide the Enumerable interface to the response of a REST server-streaming dmethod.
    #
    # ServerStream provides the enumerations over the individual response messages within the stream.
    #
    # @example normal iteration over resources.
    #   server_stream.each { |response| puts response }
    #
    class ServerStream
      include Enumerable

      ##
      # Initializes ServerStream object.
      #
      # @param message_klass [Class]
      # @param json_enumerator [Enumerator<String>]
      def initialize message_klass, json_enumerator
        @json_enumerator = json_enumerator
        @obj = ""
        @message_klass = message_klass
        @ready_objs = [] # List of strings
      end

      ##
      # Iterate over JSON objects in the streamed response.
      #
      # @yield [Object] Gives one complete Message object.
      #
      # @return [Enumerator] if no block is provided
      #
      def each
        return enum_for :each unless block_given?

        loop do
          while @ready_objs.empty?
            begin
              chunk = @json_enumerator.next
              next unless chunk
              next_json! chunk
            rescue StopIteration
              dangling_content = @obj.strip
              error_expl = "Dangling content left after iterating through the stream. " \
                           "This means that not all content was received or parsed correctly. " \
                           "It is likely a result of server or network error."
              error_text = "#{error_expl}\n Content left unparsed: #{dangling_content}"

              raise Gapic::Common::Error, error_text unless dangling_content.empty?
              return
            end
          end
          yield @message_klass.decode_json @ready_objs.shift, ignore_unknown_fields: true
        end
      end

      private

      ##
      # Builds the next JSON object of the server stream from chunk.
      #
      # @param chunk [String] Contains (partial) JSON object
      #
      def next_json! chunk
        chunk.chars.each do |char|
          # Invariant: @obj is always either a part of a single JSON object or the entire JSON object.
          # Hence, it's safe to strip whitespace, commans and array brackets. These characters
          # are only added before @obj is a complete JSON object and essentially can be flushed.
          next if @obj.empty? && char != "{"
          @obj += char
          next unless char == "}"
          begin
            # Two choices here: append a Ruby object into
            # ready_objs or a string. Going with the latter here.
            JSON.parse @obj
            @ready_objs.append @obj
            @obj = ""
          rescue JSON::ParserError
            next
          end
        end
      end
    end
  end
end