File: request_callback.rb

package info (click to toggle)
ruby-aws-sdk-core 3.235.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,288 kB
  • sloc: ruby: 17,870; makefile: 4
file content (141 lines) | stat: -rw-r--r-- 4,590 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# frozen_string_literal: true

require 'pathname'
require 'forwardable'

module Seahorse
  module Client
    module Plugins

      # @api private
      class ReadCallbackIO
        extend Forwardable
        def_delegators :@io, :size

        def initialize(io, on_read = nil)
          @io = io
          @on_read = on_read if on_read.is_a? Proc
          @bytes_read = 0

          # Some IO objects support readpartial - IO.copy_stream used by the
          # request will call readpartial if available, so define a wrapper
          # for it if the underlying IO supports it.
          if @io.respond_to?(:readpartial)
            def self.readpartial(*args)
              @io.readpartial(*args).tap do |chunk|
                handle_chunk(chunk)
              end
            end
          end
        end

        attr_reader :io

        def read(*args)
          @io.read(*args).tap do |chunk|
            handle_chunk(chunk)
          end
        end

        private

        def handle_chunk(chunk)
          @bytes_read += chunk.bytesize if chunk && chunk.respond_to?(:bytesize)
          total_size = @io.respond_to?(:size) ? @io.size : nil
          @on_read.call(chunk, @bytes_read, total_size) if @on_read
        end
      end

      # @api private
      class RequestCallback < Plugin

        option(
          :on_chunk_sent,
           default: nil,
           doc_type: 'Proc',
           docstring: <<-DOCS)
When a Proc object is provided, it will be used as callback when each chunk 
of the request body is sent. It provides three arguments: the chunk,
the number of bytes read from the body, and the total number of 
bytes in the body.
          DOCS

        option(:on_chunk_received,
          default: nil,
          doc_type: 'Proc',
          docstring: <<-DOCS)
When a Proc object is provided, it will be used as callback when each chunk 
of the response body is received. It provides three arguments: the chunk,
the number of bytes received, and the total number of 
bytes in the response (or nil if the server did not send a `content-length`).
        DOCS

        # @api private
        class OptionHandler < Client::Handler
          def call(context)
            if context.params.is_a?(Hash) && context.params[:on_chunk_sent]
              on_chunk_sent = context.params.delete(:on_chunk_sent)
            end
            on_chunk_sent = context.config.on_chunk_sent if on_chunk_sent.nil?
            context[:on_chunk_sent] = on_chunk_sent if on_chunk_sent

            if context.params.is_a?(Hash) && context.params[:on_chunk_received]
              on_chunk_received = context.params.delete(:on_chunk_received)
            end
            on_chunk_received = context.config.on_chunk_received if on_chunk_received.nil?

            add_response_events(on_chunk_received, context) if on_chunk_received

            @handler.call(context)
          end

          def add_response_events(on_chunk_received, context)
            shared_data = {bytes_received: 0}

            context.http_response.on_headers do |_status, headers|
              shared_data[:content_length] = headers['content-length']&.to_i
            end

            context.http_response.on_data do |chunk|
              shared_data[:bytes_received] += chunk.bytesize if chunk && chunk.respond_to?(:bytesize)
              on_chunk_received.call(chunk, shared_data[:bytes_received], shared_data[:content_length])
            end
          end
        end

        # @api private
        class ReadCallbackHandler < Client::Handler
          def call(context)
            if (callback = context[:on_chunk_sent])
              context.http_request.body = ReadCallbackIO.new(
                context.http_request.body,
                callback
              )
              @handler.call(context).tap do
                unwrap_callback_body(context)
              end
            else
              @handler.call(context)
            end
          end

          def unwrap_callback_body(context)
            body = context.http_request.body
            if body.is_a? ReadCallbackIO
              context.http_request.body = body.io
            end
          end
        end

        # OptionHandler is needed to remove :on_chunk_sent
        # from the params before build
        handler(OptionHandler, step: :initialize)

        # ReadCallbackHandlerneeds to go late in the call stack
        # other plugins including Sigv4 and content_md5 read the request body
        # and rewind it
        handler(ReadCallbackHandler, step: :sign, priority: 0)
      end
    end
  end
end