File: callbacks.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (141 lines) | stat: -rw-r--r-- 4,027 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# frozen_string_literal: true

module HTTPX
  module Plugins
    #
    # This plugin adds suppoort for callbacks around the request/response lifecycle.
    #
    # https://gitlab.com/os85/httpx/-/wikis/Events
    #
    module Callbacks
      CALLBACKS = %i[
        connection_opened connection_closed
        request_error
        request_started request_body_chunk request_completed
        response_started response_body_chunk response_completed
      ].freeze

      # connection closed user-space errors happen after errors can be surfaced to requests,
      # so they need to pierce through the scheduler, which is only possible by simulating an
      # interrupt.
      class CallbackError < Exception; end # rubocop:disable Lint/InheritException

      module InstanceMethods
        include HTTPX::Callbacks

        CALLBACKS.each do |meth|
          class_eval(<<-MOD, __FILE__, __LINE__ + 1)
            def on_#{meth}(&blk)   # def on_connection_opened(&blk)
              on(:#{meth}, &blk)   #   on(:connection_opened, &blk)
              self                 #   self
            end                    # end
          MOD
        end

        def plugin(*args, &blk)
          super(*args).tap do |sess|
            CALLBACKS.each do |cb|
              next unless callbacks_for?(cb)

              sess.callbacks(cb).concat(callbacks(cb))
            end

            sess.wrap(&blk) if blk
          end
        end

        private

        def branch(options, &blk)
          super(options).tap do |sess|
            CALLBACKS.each do |cb|
              next unless callbacks_for?(cb)

              sess.callbacks(cb).concat(callbacks(cb))
            end
            sess.wrap(&blk) if blk
          end
        end

        def do_init_connection(connection, selector)
          super
          connection.on(:open) do
            next unless connection.current_session == self

            emit_or_callback_error(:connection_opened, connection.origin, connection.io.socket)
          end
          connection.on(:callback_connection_closed) do
            next unless connection.current_session == self

            emit_or_callback_error(:connection_closed, connection.origin) if connection.used?
          end

          connection
        end

        def set_request_callbacks(request)
          super

          request.on(:headers) do
            emit_or_callback_error(:request_started, request)
          end
          request.on(:body_chunk) do |chunk|
            emit_or_callback_error(:request_body_chunk, request, chunk)
          end
          request.on(:done) do
            emit_or_callback_error(:request_completed, request)
          end

          request.on(:response_started) do |res|
            if res.is_a?(Response)
              emit_or_callback_error(:response_started, request, res)
              res.on(:chunk_received) do |chunk|
                emit_or_callback_error(:response_body_chunk, request, res, chunk)
              end
            else
              emit_or_callback_error(:request_error, request, res.error)
            end
          end
          request.on(:response) do |res|
            emit_or_callback_error(:response_completed, request, res) if res.is_a?(Response)
          end
        end

        def emit_or_callback_error(*args)
          emit(*args)
        rescue StandardError => e
          ex = CallbackError.new(e.message)
          ex.set_backtrace(e.backtrace)
          raise ex
        end

        def receive_requests(*)
          super
        rescue CallbackError => e
          raise e.cause
        end

        def close(*)
          super
        rescue CallbackError => e
          raise e.cause
        end
      end

      module ConnectionMethods
        private

        def disconnect
          return if @exhausted

          return unless @current_session && @current_selector

          emit(:callback_connection_closed)

          super
        end
      end
    end
    register_plugin :callbacks, Callbacks
  end
end