1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
# frozen_string_literal: true
require 'pathname'
require 'forwardable'
module Seahorse
module Client
module Plugins
# @api private
class ReadCallbackIO
extend Forwardable
def_delegators :@io, :size
def initialize(io, on_read = nil)
@io = io
@on_read = on_read if on_read.is_a? Proc
@bytes_read = 0
# Some IO objects support readpartial - IO.copy_stream used by the
# request will call readpartial if available, so define a wrapper
# for it if the underlying IO supports it.
if @io.respond_to?(:readpartial)
def self.readpartial(*args)
@io.readpartial(*args).tap do |chunk|
handle_chunk(chunk)
end
end
end
end
attr_reader :io
def read(*args)
@io.read(*args).tap do |chunk|
handle_chunk(chunk)
end
end
private
def handle_chunk(chunk)
@bytes_read += chunk.bytesize if chunk && chunk.respond_to?(:bytesize)
total_size = @io.respond_to?(:size) ? @io.size : nil
@on_read.call(chunk, @bytes_read, total_size) if @on_read
end
end
# @api private
class RequestCallback < Plugin
option(
:on_chunk_sent,
default: nil,
doc_type: 'Proc',
docstring: <<-DOCS)
When a Proc object is provided, it will be used as callback when each chunk
of the request body is sent. It provides three arguments: the chunk,
the number of bytes read from the body, and the total number of
bytes in the body.
DOCS
option(:on_chunk_received,
default: nil,
doc_type: 'Proc',
docstring: <<-DOCS)
When a Proc object is provided, it will be used as callback when each chunk
of the response body is received. It provides three arguments: the chunk,
the number of bytes received, and the total number of
bytes in the response (or nil if the server did not send a `content-length`).
DOCS
# @api private
class OptionHandler < Client::Handler
def call(context)
if context.params.is_a?(Hash) && context.params[:on_chunk_sent]
on_chunk_sent = context.params.delete(:on_chunk_sent)
end
on_chunk_sent = context.config.on_chunk_sent if on_chunk_sent.nil?
context[:on_chunk_sent] = on_chunk_sent if on_chunk_sent
if context.params.is_a?(Hash) && context.params[:on_chunk_received]
on_chunk_received = context.params.delete(:on_chunk_received)
end
on_chunk_received = context.config.on_chunk_received if on_chunk_received.nil?
add_response_events(on_chunk_received, context) if on_chunk_received
@handler.call(context)
end
def add_response_events(on_chunk_received, context)
shared_data = {bytes_received: 0}
context.http_response.on_headers do |_status, headers|
shared_data[:content_length] = headers['content-length']&.to_i
end
context.http_response.on_data do |chunk|
shared_data[:bytes_received] += chunk.bytesize if chunk && chunk.respond_to?(:bytesize)
on_chunk_received.call(chunk, shared_data[:bytes_received], shared_data[:content_length])
end
end
end
# @api private
class ReadCallbackHandler < Client::Handler
def call(context)
if (callback = context[:on_chunk_sent])
context.http_request.body = ReadCallbackIO.new(
context.http_request.body,
callback
)
@handler.call(context).tap do
unwrap_callback_body(context)
end
else
@handler.call(context)
end
end
def unwrap_callback_body(context)
body = context.http_request.body
if body.is_a? ReadCallbackIO
context.http_request.body = body.io
end
end
end
# OptionHandler is needed to remove :on_chunk_sent
# from the params before build
handler(OptionHandler, step: :initialize)
# ReadCallbackHandlerneeds to go late in the call stack
# other plugins including Sigv4 and content_md5 read the request body
# and rewind it
handler(ReadCallbackHandler, step: :sign, priority: 0)
end
end
end
end
|