1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
# frozen_string_literal: true
module HTTPX
module Plugins
#
# This plugin adds suppoort for callbacks around the request/response lifecycle.
#
# https://gitlab.com/os85/httpx/-/wikis/Events
#
module Callbacks
CALLBACKS = %i[
connection_opened connection_closed
request_error
request_started request_body_chunk request_completed
response_started response_body_chunk response_completed
].freeze
# connection closed user-space errors happen after errors can be surfaced to requests,
# so they need to pierce through the scheduler, which is only possible by simulating an
# interrupt.
class CallbackError < Exception; end # rubocop:disable Lint/InheritException
module InstanceMethods
include HTTPX::Callbacks
CALLBACKS.each do |meth|
class_eval(<<-MOD, __FILE__, __LINE__ + 1)
def on_#{meth}(&blk) # def on_connection_opened(&blk)
on(:#{meth}, &blk) # on(:connection_opened, &blk)
self # self
end # end
MOD
end
def plugin(*args, &blk)
super(*args).tap do |sess|
CALLBACKS.each do |cb|
next unless callbacks_for?(cb)
sess.callbacks(cb).concat(callbacks(cb))
end
sess.wrap(&blk) if blk
end
end
private
def branch(options, &blk)
super(options).tap do |sess|
CALLBACKS.each do |cb|
next unless callbacks_for?(cb)
sess.callbacks(cb).concat(callbacks(cb))
end
sess.wrap(&blk) if blk
end
end
def do_init_connection(connection, selector)
super
connection.on(:open) do
next unless connection.current_session == self
emit_or_callback_error(:connection_opened, connection.origin, connection.io.socket)
end
connection.on(:callback_connection_closed) do
next unless connection.current_session == self
emit_or_callback_error(:connection_closed, connection.origin) if connection.used?
end
connection
end
def set_request_callbacks(request)
super
request.on(:headers) do
emit_or_callback_error(:request_started, request)
end
request.on(:body_chunk) do |chunk|
emit_or_callback_error(:request_body_chunk, request, chunk)
end
request.on(:done) do
emit_or_callback_error(:request_completed, request)
end
request.on(:response_started) do |res|
if res.is_a?(Response)
emit_or_callback_error(:response_started, request, res)
res.on(:chunk_received) do |chunk|
emit_or_callback_error(:response_body_chunk, request, res, chunk)
end
else
emit_or_callback_error(:request_error, request, res.error)
end
end
request.on(:response) do |res|
emit_or_callback_error(:response_completed, request, res) if res.is_a?(Response)
end
end
def emit_or_callback_error(*args)
emit(*args)
rescue StandardError => e
ex = CallbackError.new(e.message)
ex.set_backtrace(e.backtrace)
raise ex
end
def receive_requests(*)
super
rescue CallbackError => e
raise e.cause
end
def close(*)
super
rescue CallbackError => e
raise e.cause
end
end
module ConnectionMethods
private
def disconnect
return if @exhausted
return unless @current_session && @current_selector
emit(:callback_connection_closed)
super
end
end
end
register_plugin :callbacks, Callbacks
end
end
|