1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
|
# frozen_string_literal: true
module HTTPX
class StreamResponse
attr_reader :request
def initialize(request, session)
@request = request
@options = @request.options
@session = session
@response_enum = nil
@buffered_chunks = []
end
def each(&block)
return enum_for(__method__) unless block
if (response_enum = @response_enum)
@response_enum = nil
# streaming already started, let's finish it
while (chunk = @buffered_chunks.shift)
block.call(chunk)
end
# consume enum til the end
begin
while (chunk = response_enum.next)
block.call(chunk)
end
rescue StopIteration
return
end
end
@request.stream = self
begin
@on_chunk = block
response = @session.request(@request)
response.raise_for_status
ensure
@on_chunk = nil
end
end
def each_line
return enum_for(__method__) unless block_given?
line = "".b
each do |chunk|
line << chunk
while (idx = line.index("\n"))
yield line.byteslice(0..(idx - 1))
line = line.byteslice((idx + 1)..-1)
end
end
yield line unless line.empty?
end
# This is a ghost method. It's to be used ONLY internally, when processing streams
def on_chunk(chunk)
raise NoMethodError unless @on_chunk
@on_chunk.call(chunk)
end
# :nocov:
def inspect
"#<#{self.class}:#{object_id}>"
end
# :nocov:
def to_s
if @request.response
@request.response.to_s
else
@buffered_chunks.join
end
end
private
def response
@request.response || begin
response_enum = each
while (chunk = response_enum.next)
@buffered_chunks << chunk
break if @request.response
end
@response_enum = response_enum
@request.response
end
end
def respond_to_missing?(meth, include_private)
if (response = @request.response)
response.respond_to_missing?(meth, include_private)
else
@options.response_class.method_defined?(meth) || (include_private && @options.response_class.private_method_defined?(meth))
end || super
end
def method_missing(meth, *args, **kwargs, &block)
return super unless response.respond_to?(meth)
response.__send__(meth, *args, **kwargs, &block)
end
end
module Plugins
#
# This plugin adds support for streaming a response (useful for i.e. "text/event-stream" payloads).
#
# https://gitlab.com/os85/httpx/wikis/Stream
#
module Stream
STREAM_REQUEST_OPTIONS = { timeout: { read_timeout: Float::INFINITY, operation_timeout: 60 }.freeze }.freeze
def self.extra_options(options)
options.merge(
stream: false,
timeout: { read_timeout: Float::INFINITY, operation_timeout: 60 },
stream_response_class: Class.new(StreamResponse, &Options::SET_TEMPORARY_NAME).freeze
)
end
# adds support for the following options:
#
# :stream :: whether the request to process should be handled as a stream (defaults to <tt>false</tt>).
# :stream_response_class :: Class used to build the stream response object.
module OptionsMethods
def option_stream(val)
val
end
def option_stream_response_class(value)
value
end
def extend_with_plugin_classes(pl)
return super unless defined?(pl::StreamResponseMethods)
@stream_response_class = @stream_response_class.dup
Options::SET_TEMPORARY_NAME[@stream_response_class, pl]
@stream_response_class.__send__(:include, pl::StreamResponseMethods) if defined?(pl::StreamResponseMethods)
super
end
end
module InstanceMethods
def request(*args, **options)
if args.first.is_a?(Request)
requests = args
request = requests.first
unless request.options.stream && !request.stream
if options[:stream]
warn "passing `stream: true` with a request object is not supported anymore. " \
"You can instead build the request object with `stream :true`"
end
return super
end
else
return super unless options[:stream]
requests = build_requests(*args, options)
request = requests.first
end
raise Error, "only 1 response at a time is supported for streaming requests" unless requests.size == 1
@options.stream_response_class.new(request, self)
end
def build_request(verb, uri, params = EMPTY_HASH, options = @options)
return super unless params[:stream]
super(verb, uri, params, options.merge(STREAM_REQUEST_OPTIONS.merge(stream: true)))
end
end
module RequestMethods
attr_accessor :stream
end
module ResponseMethods
def stream
request = @request.root_request if @request.respond_to?(:root_request)
request ||= @request
request.stream
end
end
module ResponseBodyMethods
def initialize(*)
super
@stream = @response.stream
end
def write(chunk)
return super unless @stream
return 0 if chunk.empty?
chunk = decode_chunk(chunk)
@stream.on_chunk(chunk.dup)
chunk.bytesize
end
private
def transition(*)
return if @stream
super
end
end
end
register_plugin :stream, Stream
end
end
|