File: session.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (602 lines) | stat: -rw-r--r-- 20,606 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# frozen_string_literal: true

module HTTPX
  # Class implementing the APIs being used publicly.
  #
  #   HTTPX.get(..) #=> delegating to an internal HTTPX::Session object.
  #   HTTPX.plugin(..).get(..) #=> creating an intermediate HTTPX::Session with plugin, then sending the GET request
  class Session
    include Loggable
    include Chainable

    # initializes the session with a set of +options+, which will be shared by all
    # requests sent from it.
    #
    # When pass a block, it'll yield itself to it, then closes after the block is evaluated.
    def initialize(options = EMPTY_HASH, &blk)
      @options = self.class.default_options.merge(options)
      @persistent = @options.persistent
      @pool = @options.pool_class.new(@options.pool_options)
      @wrapped = false
      @closing = false
      INSTANCES[self] = self if @persistent && @options.close_on_fork && INSTANCES
      wrap(&blk) if blk
    end

    # Yields itself the block, then closes it after the block is evaluated.
    #
    #   session.wrap do |http|
    #     http.get("https://wikipedia.com")
    #   end # wikipedia connection closes here
    def wrap
      prev_wrapped = @wrapped
      @wrapped = true
      was_initialized = false
      current_selector = get_current_selector do
        selector = Selector.new

        set_current_selector(selector)

        was_initialized = true

        selector
      end
      begin
        yield self
      ensure
        unless prev_wrapped
          if @persistent
            deactivate(current_selector)
          else
            close(current_selector)
          end
        end
        @wrapped = prev_wrapped
        set_current_selector(nil) if was_initialized
      end
    end

    # closes all the active connections from the session.
    #
    # when called directly without specifying +selector+, all available connections
    # will be picked up from the connection pool and closed. Connections in use
    # by other sessions, or same session in a different thread, will not be reaped.
    def close(selector = Selector.new)
      # throw resolvers away from the pool
      @pool.reset_resolvers

      # preparing to throw away connections
      while (connection = @pool.pop_connection)
        next if connection.state == :closed

        select_connection(connection, selector)
      end

      selector_close(selector)
    end

    # performs one, or multple requests; it accepts:
    #
    # 1. one or multiple HTTPX::Request objects;
    # 2. an HTTP verb, then a sequence of URIs or URI/options tuples;
    # 3. one or multiple HTTP verb / uri / (optional) options tuples;
    #
    # when present, the set of +options+ kwargs is applied to all of the
    # sent requests.
    #
    # respectively returns a single HTTPX::Response response, or all of them in an Array, in the same order.
    #
    #  resp1 = session.request(req1)
    #  resp1, resp2 = session.request(req1, req2)
    #  resp1 = session.request("GET", "https://server.org/a")
    #  resp1, resp2 = session.request("GET", ["https://server.org/a", "https://server.org/b"])
    #  resp1, resp2 = session.request(["GET", "https://server.org/a"], ["GET", "https://server.org/b"])
    #  resp1 = session.request("POST", "https://server.org/a", form: { "foo" => "bar" })
    #  resp1, resp2 = session.request(["POST", "https://server.org/a", form: { "foo" => "bar" }], ["GET", "https://server.org/b"])
    #  resp1, resp2 = session.request("GET", ["https://server.org/a", "https://server.org/b"], headers: { "x-api-token" => "TOKEN" })
    #
    def request(*args, **params)
      raise ArgumentError, "must perform at least one request" if args.empty?

      requests = args.first.is_a?(Request) ? args : build_requests(*args, params)
      responses = send_requests(*requests)
      return responses.first if responses.size == 1

      responses
    end

    # returns a HTTP::Request instance built from the HTTP +verb+, the request +uri+, and
    # the optional set of request-specific +options+. This request **must** be sent through
    # the same session it was built from.
    #
    #   req = session.build_request("GET", "https://server.com")
    #   resp = session.request(req)
    def build_request(verb, uri, params = EMPTY_HASH, options = @options)
      rklass = options.request_class
      request = rklass.new(verb, uri, options, params)
      request.persistent = @persistent
      set_request_callbacks(request)
      request
    end

    def select_connection(connection, selector)
      pin(connection, selector)
      connection.log(level: 2) do
        "registering into selector##{selector.object_id}"
      end
      selector.register(connection)
    end

    def pin(conn_or_resolver, selector)
      conn_or_resolver.current_session = self
      conn_or_resolver.current_selector = selector
    end

    alias_method :select_resolver, :select_connection

    def deselect_connection(connection, selector, cloned = false)
      connection.log(level: 2) do
        "deregistering connection##{connection.object_id}(#{connection.state}) from selector##{selector.object_id}"
      end
      selector.deregister(connection)

      return if cloned

      return if @closing && connection.state == :closed

      connection.log(level: 2) { "check-in connection##{connection.object_id}(#{connection.state}) in pool##{@pool.object_id}" }
      @pool.checkin_connection(connection)
    end

    def deselect_resolver(resolver, selector)
      resolver.log(level: 2) do
        "deregistering resolver##{resolver.object_id}(#{resolver.state}) from selector##{selector.object_id}"
      end
      selector.deregister(resolver)

      return if @closing && resolver.closed?

      resolver.log(level: 2) { "check-in resolver##{resolver.object_id}(#{resolver.state}) in pool##{@pool.object_id}" }
      @pool.checkin_resolver(resolver)
    end

    def try_clone_connection(connection, selector, family)
      connection.family ||= family

      return connection if connection.family == family

      new_connection = connection.class.new(connection.origin, connection.options)

      new_connection.family = family

      connection.sibling = new_connection

      do_init_connection(new_connection, selector)
      new_connection
    end

    # returns the HTTPX::Connection through which the +request+ should be sent through.
    def find_connection(request_uri, selector, options)
      log(level: 2) { "finding connection for #{request_uri}..." }
      if (connection = selector.find_connection(request_uri, options))
        connection.idling if connection.state == :closed
        connection.log(level: 2) { "found connection##{connection.object_id}(#{connection.state}) in selector##{selector.object_id}" }
        return connection
      end

      connection = @pool.checkout_connection(request_uri, options)

      connection.log(level: 2) { "found connection##{connection.object_id}(#{connection.state}) in pool##{@pool.object_id}" }

      case connection.state
      when :idle
        do_init_connection(connection, selector)
      when :open
        if options.io
          select_connection(connection, selector)
        else
          pin(connection, selector)
        end
      when :closing, :closed
        connection.idling
        if connection.addresses?
          select_connection(connection, selector)
        else
          # if addresses expired, resolve again
          resolve_connection(connection, selector)
        end
      else
        pin(connection, selector)
      end

      connection
    end

    private

    def selector_close(selector)
      begin
        @closing = true
        selector.terminate
      ensure
        @closing = false
      end
    end

    # tries deactivating connections in the +selector+, deregistering the ones that have been deactivated.
    def deactivate(selector)
      selector.each_connection.to_a.each(&:deactivate)
    end

    # callback executed when an HTTP/2 promise frame has been received.
    def on_promise(_, stream)
      log(level: 2) { "#{stream.id}: refusing stream!" }
      stream.refuse
    end

    # returns the corresponding HTTP::Response to the given +request+ if it has been received.
    def fetch_response(request, _selector, _options)
      response = request.response

      return unless response && response.finished?

      log(level: 2) { "response fetched" }

      response
    end

    # sends the +request+ to the corresponding HTTPX::Connection
    def send_request(request, selector, options = request.options)
      error = begin
        catch(:resolve_error) do
          connection = find_connection(request.uri, selector, options)
          connection.send(request)
        end
      rescue StandardError => e
        e
      end
      return unless error && error.is_a?(Exception)

      raise error unless error.is_a?(Error)

      response = ErrorResponse.new(request, error)
      request.response = response
      request.emit(:response, response)
    end

    # returns a set of HTTPX::Request objects built from the given +args+ and +options+.
    def build_requests(*args, params)
      requests = if args.size == 1
        reqs = args.first
        reqs.map do |verb, uri, ps = EMPTY_HASH|
          request_params = params
          request_params = request_params.merge(ps) unless ps.empty?
          build_request(verb, uri, request_params)
        end
      else
        verb, uris = args
        if uris.respond_to?(:each)
          uris.enum_for(:each).map do |uri, ps = EMPTY_HASH|
            request_params = params
            request_params = request_params.merge(ps) unless ps.empty?
            build_request(verb, uri, request_params)
          end
        else
          [build_request(verb, uris, params)]
        end
      end
      raise ArgumentError, "wrong number of URIs (given 0, expect 1..+1)" if requests.empty?

      requests
    end

    def set_request_callbacks(request)
      request.on(:promise, &method(:on_promise))
    end

    def do_init_connection(connection, selector)
      resolve_connection(connection, selector) unless connection.family
    end

    # sends an array of HTTPX::Request +requests+, returns the respective array of HTTPX::Response objects.
    def send_requests(*requests)
      selector = get_current_selector { Selector.new }
      begin
        _send_requests(requests, selector)
        receive_requests(requests, selector)
      ensure
        unless @wrapped
          if @persistent
            deactivate(selector)
          else
            close(selector)
          end
        end
      end
    end

    # sends an array of HTTPX::Request objects
    def _send_requests(requests, selector)
      requests.each do |request|
        send_request(request, selector)
      end
    end

    # returns the array of HTTPX::Response objects corresponding to the array of HTTPX::Request +requests+.
    def receive_requests(requests, selector)
      responses = [] # : Array[response]

      # guarantee ordered responses
      loop do
        request = requests.first

        return responses unless request

        catch(:coalesced) { selector.next_tick } until (response = fetch_response(request, selector, request.options))
        request.complete!(response)

        responses << response
        requests.shift

        break if requests.empty?

        next unless selector.empty?

        # in some cases, the pool of connections might have been drained because there was some
        # handshake error, and the error responses have already been emitted, but there was no
        # opportunity to traverse the requests, hence we're returning only a fraction of the errors
        # we were supposed to. This effectively fetches the existing responses and return them.
        exit_from_loop = true

        requests_to_remove = [] # : Array[Request]

        requests.each do |req|
          response = fetch_response(req, selector, request.options)

          if exit_from_loop && response
            req.complete!(response)
            responses << response
            requests_to_remove << req
          else
            # fetch_response may resend requests. when that happens, we need to go back to the initial
            # loop and process the selector. we still do a pass-through on the remainder of requests, so
            # that every request that need to be resent, is resent.
            exit_from_loop = false

            raise Error, "something went wrong, responses not found and requests not resent" if selector.empty?
          end
        end

        break if exit_from_loop

        requests -= requests_to_remove
      end
      responses
    end

    def resolve_connection(connection, selector)
      if connection.addresses? || connection.open?
        #
        # there are two cases in which we want to activate initialization of
        # connection immediately:
        #
        # 1. when the connection already has addresses, i.e. it doesn't need to
        #    resolve a name (not the same as name being an IP, yet)
        # 2. when the connection is initialized with an external already open IO.
        #
        on_resolver_connection(connection, selector)
        return
      end

      resolver = find_resolver_for(connection, selector)

      pin(connection, selector)
      resolver.early_resolve(connection) || resolver.lazy_resolve(connection)
    end

    def on_resolver_connection(connection, selector)
      from_pool = false
      found_connection = selector.find_mergeable_connection(connection) || begin
        from_pool = true
        connection.log(level: 2) do
          "try finding a mergeable connection in pool##{@pool.object_id}"
        end
        @pool.checkout_mergeable_connection(connection)
      end

      return select_connection(connection, selector) unless found_connection

      connection.log(level: 2) do
        "try coalescing from #{from_pool ? "pool##{@pool.object_id}" : "selector##{selector.object_id}"} " \
          "(connection##{found_connection.object_id}[#{found_connection.origin}])"
      end

      coalesce_connections(found_connection, connection, selector, from_pool)
    end

    def find_resolver_for(connection, selector)
      if (resolver = selector.find_resolver(connection.options))
        resolver.log(level: 2) { "found resolver##{resolver.object_id}(#{resolver.state}) in selector##{selector.object_id}" }
        return resolver
      end

      resolver = @pool.checkout_resolver(connection.options)
      resolver.log(level: 2) { "found resolver##{resolver.object_id}(#{resolver.state}) in pool##{@pool.object_id}" }
      pin(resolver, selector)

      resolver
    end

    # coalesces +conn2+ into +conn1+. if +conn1+ was loaded from the connection pool
    # (it is known via +from_pool+), then it adds its to the +selector+.
    def coalesce_connections(conn1, conn2, selector, from_pool)
      unless conn1.coalescable?(conn2)
        conn2.log(level: 2) { "not coalescing with conn##{conn1.object_id}[#{conn1.origin}])" }
        select_connection(conn2, selector)
        if from_pool
          conn1.log(level: 2) { "check-in connection##{conn1.object_id}(#{conn1.state}) in pool##{@pool.object_id}" }
          @pool.checkin_connection(conn1)
        end
        return
      end

      conn2.log(level: 2) { "coalescing with connection##{conn1.object_id}[#{conn1.origin}])" }
      select_connection(conn1, selector) if from_pool
      conn2.coalesce!(conn1)
      conn2.disconnect
    end

    def get_current_selector
      selector_store[self] || (yield if block_given?)
    end

    def set_current_selector(selector)
      if selector
        selector_store[self] = selector
      else
        selector_store.delete(self)
      end
    end

    def selector_store
      th_current = Thread.current

      thread_selector_store(th_current) || begin
        {}.compare_by_identity.tap do |store|
          th_current.thread_variable_set(:httpx_persistent_selector_store, store)
        end
      end
    end

    def thread_selector_store(th)
      th.thread_variable_get(:httpx_persistent_selector_store)
    end

    Options.freeze
    @default_options = Options.new
    @default_options.freeze
    @plugins = []

    class << self
      attr_reader :default_options

      def inherited(klass)
        super
        klass.instance_variable_set(:@default_options, @default_options)
        klass.instance_variable_set(:@plugins, @plugins.dup)
        klass.instance_variable_set(:@callbacks, @callbacks.dup)
      end

      # returns a new HTTPX::Session instance, with the plugin pointed by +pl+ loaded.
      #
      #   session_with_retries = session.plugin(:retries)
      #   session_with_custom = session.plugin(CustomPlugin)
      #
      def plugin(pl, options = nil, &block)
        label = pl
        pl = Plugins.load_plugin(pl) if pl.is_a?(Symbol)
        raise ArgumentError, "Invalid plugin type: #{pl.class.inspect}" unless pl.is_a?(Module)

        if !@plugins.include?(pl)
          @plugins << pl
          pl.load_dependencies(self, &block) if pl.respond_to?(:load_dependencies)

          @default_options = @default_options.dup

          include(pl::InstanceMethods) if defined?(pl::InstanceMethods)
          extend(pl::ClassMethods) if defined?(pl::ClassMethods)

          opts = @default_options
          opts.extend_with_plugin_classes(pl)

          if defined?(pl::OptionsMethods)
            # when a class gets dup'ed, the #initialize_dup callbacks isn't triggered.
            # moreover, and because #method_added does not get triggered on mixin include,
            # the callback is also forcefully manually called here.
            opts.options_class.instance_variable_set(:@options_names, opts.options_class.options_names.dup)
            (pl::OptionsMethods.instance_methods + pl::OptionsMethods.private_instance_methods - Object.instance_methods).each do |meth|
              opts.options_class.method_added(meth)
            end
            @default_options = opts.options_class.new(opts)
          end

          @default_options = pl.extra_options(@default_options) if pl.respond_to?(:extra_options)
          @default_options = @default_options.merge(options) if options

          if pl.respond_to?(:subplugins)
            pl.subplugins.transform_keys(&Plugins.method(:load_plugin)).each do |main_pl, sub_pl|
              # in case the main plugin has already been loaded, then apply subplugin functionality
              # immediately
              next unless @plugins.include?(main_pl)

              plugin(sub_pl, options, &block)
            end
          end

          pl.configure(self, &block) if pl.respond_to?(:configure)

          if label.is_a?(Symbol)
            # in case an already-loaded plugin complements functionality of
            # the plugin currently being loaded, loaded it now
            @plugins.each do |registered_pl|
              next if registered_pl == pl

              next unless registered_pl.respond_to?(:subplugins)

              sub_pl = registered_pl.subplugins[label]

              next unless sub_pl

              plugin(sub_pl, options, &block)
            end
          end

          @default_options.freeze
          set_temporary_name("#{superclass}/#{pl}") if respond_to?(:set_temporary_name) # ruby 3.4 only
        elsif options
          # this can happen when two plugins are loaded, an one of them calls the other under the hood,
          # albeit changing some default.
          @default_options = pl.extra_options(@default_options) if pl.respond_to?(:extra_options)
          @default_options = @default_options.merge(options) if options

          @default_options.freeze
        end

        self
      end
    end

    # setup of the support for close_on_fork sessions.
    # adapted from https://github.com/mperham/connection_pool/blob/main/lib/connection_pool.rb#L48
    if Process.respond_to?(:fork)
      INSTANCES = ObjectSpace::WeakMap.new
      private_constant :INSTANCES

      def self.after_fork
        INSTANCES.each_value(&:close)
        nil
      end

      if ::Process.respond_to?(:_fork)
        module ForkTracker
          def _fork
            pid = super
            Session.after_fork if pid.zero?
            pid
          end
        end
        Process.singleton_class.prepend(ForkTracker)
      end
    else
      INSTANCES = nil
      private_constant :INSTANCES

      def self.after_fork
        # noop
      end
    end
  end

  # session may be overridden by certain adapters.
  S = Session
end