File: connection.rb

package info (click to toggle)
ruby-http-2 1.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 13,360 kB
  • sloc: ruby: 6,031; makefile: 4
file content (845 lines) | stat: -rw-r--r-- 30,978 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
# frozen_string_literal: true

module HTTP2
  # Default connection and stream flow control window (64KB).
  DEFAULT_FLOW_WINDOW = 65_535

  # Default header table size
  DEFAULT_HEADER_SIZE = 4096

  # Default stream_limit
  DEFAULT_MAX_CONCURRENT_STREAMS = 100

  # Default values for SETTINGS frame, as defined by the spec.
  SPEC_DEFAULT_CONNECTION_SETTINGS = {
    settings_header_table_size: 4096,
    settings_enable_push: 1, # enabled for servers
    settings_max_concurrent_streams: Framer::MAX_STREAM_ID, # unlimited
    settings_initial_window_size: 65_535,
    settings_max_frame_size: 16_384,
    settings_max_header_list_size: (2 << 30) - 1 # unlimited
  }.freeze

  DEFAULT_CONNECTION_SETTINGS = {
    settings_header_table_size: 4096,
    settings_enable_push: 1, # enabled for servers
    settings_max_concurrent_streams: 100,
    settings_initial_window_size: 65_535,
    settings_max_frame_size: 16_384,
    settings_max_header_list_size: (2 << 30) - 1 # unlimited
  }.freeze

  # Default stream priority (lower values are higher priority).
  DEFAULT_WEIGHT = 16

  # Default connection "fast-fail" preamble string as defined by the spec.
  CONNECTION_PREFACE_MAGIC = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"

  REQUEST_MANDATORY_HEADERS = %w[:scheme :method :authority :path].freeze

  RESPONSE_MANDATORY_HEADERS = %w[:status].freeze

  CONNECTION_FRAME_TYPES = %i[settings ping goaway].freeze

  HEADERS_FRAME_TYPES = %i[headers push_promise].freeze

  STREAM_OPEN_STATES = %i[open half_closed_local].freeze

  # Connection encapsulates all of the connection, stream, flow-control,
  # error management, and other processing logic required for a well-behaved
  # HTTP 2.0 endpoint.
  #
  # Note that this class should not be used directly. Instead, you want to
  # use either Client or Server class to drive the HTTP 2.0 exchange.
  #
  class Connection
    include FlowBuffer
    include Emitter
    include Error
    include BufferUtils

    # Connection state (:new, :closed).
    attr_reader :state

    # Size of current connection flow control window (by default, set to
    # infinity, but is automatically updated on receipt of peer settings).
    attr_reader :local_window
    attr_reader :remote_window, :remote_settings
    alias window local_window

    # Current settings value for local and peer
    attr_reader :local_settings

    # Pending settings value
    #  Sent but not ack'ed settings
    attr_reader :pending_settings

    # Number of active streams between client and server (reserved streams
    # are not counted towards the stream limit).
    attr_accessor :active_stream_count

    # Initializes new connection object.
    #
    def initialize(settings = {})
      @local_settings  = DEFAULT_CONNECTION_SETTINGS.merge(settings)
      @remote_settings = SPEC_DEFAULT_CONNECTION_SETTINGS.dup

      @compressor   = Header::Compressor.new(settings)
      @decompressor = Header::Decompressor.new(settings)

      @active_stream_count = 0
      @last_stream_id = 0
      @streams = {}
      @streams_recently_closed = {}
      @pending_settings = []

      @framer = Framer.new(@local_settings[:settings_max_frame_size])

      @local_window_limit = @local_settings[:settings_initial_window_size]
      @local_window = @local_window_limit
      @remote_window_limit = @remote_settings[:settings_initial_window_size]
      @remote_window = @remote_window_limit

      @recv_buffer = "".b
      @continuation = []
      @error = nil

      @h2c_upgrade = nil
      @closed_since = nil
      @received_frame = false

      # from mixins
      @listeners = Hash.new { |hash, key| hash[key] = [] }
      @send_buffer = FrameBuffer.new
    end

    def closed?
      @state == :closed
    end

    # Allocates new stream for current connection.
    #
    # @param priority [Integer]
    # @param window [Integer]
    # @param parent [Stream]
    def new_stream(**args)
      raise ConnectionClosed if @state == :closed
      raise StreamLimitExceeded if @active_stream_count >= @remote_settings[:settings_max_concurrent_streams]

      connection_error(:protocol_error, msg: "id is smaller than previous") if @stream_id < @last_stream_id

      stream = activate_stream(id: @stream_id, **args)
      @last_stream_id = stream.id

      @stream_id += 2

      stream
    end

    # Sends PING frame to the peer.
    #
    # @param payload [String] optional payload must be 8 bytes long
    # @param blk [Proc] callback to execute when PONG is received
    def ping(payload, &blk)
      send(type: :ping, stream: 0, payload: payload)
      once(:ack, &blk) if blk
    end

    # Sends a GOAWAY frame indicating that the peer should stop creating
    # new streams for current connection.
    #
    # Endpoints MAY append opaque data to the payload of any GOAWAY frame.
    # Additional debug data is intended for diagnostic purposes only and
    # carries no semantic value. Debug data MUST NOT be persistently stored,
    # since it could contain sensitive information.
    #
    # @param error [Symbol]
    # @param payload [String]
    def goaway(error = :no_error, payload = nil)
      send(type: :goaway, last_stream: @last_stream_id,
           error: error, payload: payload)
      @state = :closed
      @closed_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    end

    # Sends a WINDOW_UPDATE frame to the peer.
    #
    # @param increment [Integer]
    def window_update(increment)
      @local_window += increment
      send(type: :window_update, stream: 0, increment: increment)
    end

    # Sends a connection SETTINGS frame to the peer.
    # The values are reflected when the corresponding ACK is received.
    #
    # @param settings [Array or Hash]
    def settings(payload)
      validate_settings(@local_role, payload)
      @pending_settings << payload
      send(type: :settings, stream: 0, payload: payload)
      @pending_settings << payload
    end

    # Decodes incoming bytes into HTTP 2.0 frames and routes them to
    # appropriate receivers: connection frames are handled directly, and
    # stream frames are passed to appropriate stream objects.
    #
    # @param data [String] Binary encoded string
    def receive(data)
      append_str(@recv_buffer, data)

      # Upon establishment of a TCP connection and determination that
      # HTTP/2.0 will be used by both peers, each endpoint MUST send a
      # connection header as a final confirmation and to establish the
      # initial settings for the HTTP/2.0 connection.
      #
      # Client connection header is 24 byte connection header followed by
      # SETTINGS frame. Server connection header is SETTINGS frame only.
      if @state == :waiting_magic
        if @recv_buffer.size < 24
          raise HandshakeError unless CONNECTION_PREFACE_MAGIC.start_with? @recv_buffer

          return # maybe next time
        elsif read_str(@recv_buffer, 24) == CONNECTION_PREFACE_MAGIC
          # MAGIC is OK.  Send our settings
          @state = :waiting_connection_preface
          payload = @local_settings.reject { |k, v| v == SPEC_DEFAULT_CONNECTION_SETTINGS[k] }
          settings(payload)
        else
          raise HandshakeError
        end
      end

      while (frame = @framer.parse(@recv_buffer))
        # @type var stream_id: Integer
        stream_id = frame[:stream]
        frame_type = frame[:type]

        if is_a?(Client) && !@received_frame
          connection_error(:protocol_error, msg: "didn't receive settings") if frame_type != :settings
          @received_frame = true
        end

        # Implementations MUST discard frames
        # that have unknown or unsupported types.
        if frame_type.nil?
          # However, extension frames that appear in
          # the middle of a header block (Section 4.3) are not permitted; these
          # MUST be treated as a connection error (Section 5.4.1) of type
          # PROTOCOL_ERROR.
          connection_error(:protocol_error) unless @continuation.empty?
          next
        end

        emit(:frame_received, frame)

        # Header blocks MUST be transmitted as a contiguous sequence of frames
        # with no interleaved frames of any other type, or from any other stream.
        unless @continuation.empty?
          connection_error unless frame_type == :continuation && stream_id == @continuation.first[:stream]

          @continuation << frame
          unless frame[:flags].include? :end_headers
            buffered_payload = @continuation.sum { |f| f[:payload].bytesize }
            # prevent HTTP/2 CONTINUATION FLOOD
            # same heuristic as the one from HAProxy: https://www.haproxy.com/blog/haproxy-is-resilient-to-the-http-2-continuation-flood
            # different mitigation (connection closed, instead of 400 response)
            unless buffered_payload < @local_settings[:settings_max_frame_size]
              connection_error(:protocol_error,
                               msg: "too many continuations received")
            end

            next
          end

          payload = @continuation.map { |f| f[:payload] }.join

          frame = @continuation.shift
          frame_type = frame[:type]

          @continuation.clear

          frame.delete(:length)
          frame[:payload] = payload
          frame[:flags] << :end_headers
        end

        # SETTINGS frames always apply to a connection, never a single stream.
        # The stream identifier for a settings frame MUST be zero.  If an
        # endpoint receives a SETTINGS frame whose stream identifier field is
        # anything other than 0x0, the endpoint MUST respond with a connection
        # error (Section 5.4.1) of type PROTOCOL_ERROR.
        if connection_frame?(frame)
          connection_error(:protocol_error) unless stream_id.zero?
          connection_management(frame)
        else
          case frame_type
          when :headers
            # When server receives even-numbered stream identifier,
            # the endpoint MUST respond with a connection error of type PROTOCOL_ERROR.
            connection_error if stream_id.even? && is_a?(Server)

            # The last frame in a sequence of HEADERS/CONTINUATION
            # frames MUST have the END_HEADERS flag set.
            unless frame[:flags].include? :end_headers
              @continuation << frame
              next
            end

            # After sending a GOAWAY frame, the sender can discard frames
            # for new streams.  However, any frames that alter connection
            # state cannot be completely ignored.  For instance, HEADERS,
            # PUSH_PROMISE and CONTINUATION frames MUST be minimally
            # processed to ensure a consistent compression state
            decode_headers(frame)
            return if @state == :closed

            stream = @streams[stream_id]
            if stream.nil?
              verify_pseudo_headers(frame)

              verify_stream_order(stream_id)
              stream = activate_stream(
                id: stream_id,
                weight: frame[:weight] || DEFAULT_WEIGHT,
                dependency: frame[:dependency] || 0,
                exclusive: frame[:exclusive] || false
              )
              emit(:stream, stream)
            end

            stream << frame

          when :push_promise
            # The last frame in a sequence of PUSH_PROMISE/CONTINUATION
            # frames MUST have the END_HEADERS flag set
            unless frame[:flags].include? :end_headers
              @continuation << frame
              return
            end

            decode_headers(frame)
            return if @state == :closed

            # PUSH_PROMISE frames MUST be associated with an existing, peer-
            # initiated stream... A receiver MUST treat the receipt of a
            # PUSH_PROMISE on a stream that is neither "open" nor
            # "half-closed (local)" as a connection error (Section 5.4.1) of
            # type PROTOCOL_ERROR. Similarly, a receiver MUST treat the
            # receipt of a PUSH_PROMISE that promises an illegal stream
            # identifier (Section 5.1.1) (that is, an identifier for a stream
            # that is not currently in the "idle" state) as a connection error
            # (Section 5.4.1) of type PROTOCOL_ERROR, unless the receiver
            # recently sent a RST_STREAM frame to cancel the associated stream.
            parent = @streams[stream_id]
            pid = frame[:promise_stream]

            # if PUSH parent is recently closed, RST_STREAM the push
            if @streams_recently_closed[stream_id]
              send(type: :rst_stream, stream: pid, error: :refused_stream)
              return
            end

            connection_error(msg: "missing parent ID") if parent.nil?

            unless STREAM_OPEN_STATES.include?(parent.state)
              # An endpoint might receive a PUSH_PROMISE frame after it sends
              # RST_STREAM.  PUSH_PROMISE causes a stream to become "reserved".
              # The RST_STREAM does not cancel any promised stream.  Therefore, if
              # promised streams are not desired, a RST_STREAM can be used to
              # close any of those streams.
              if parent.closed == :local_rst
                # We can either (a) 'resurrect' the parent, or (b) RST_STREAM
                # ... sticking with (b), might need to revisit later.
                send(type: :rst_stream, stream: pid, error: :refused_stream)
              else
                connection_error
              end
            end

            _verify_pseudo_headers(frame, REQUEST_MANDATORY_HEADERS)
            verify_stream_order(pid)
            stream = activate_stream(id: pid, parent: parent)
            emit(:promise, stream)
            stream << frame
          else
            if (stream = @streams[stream_id])
              stream << frame
              if frame_type == :data
                update_local_window(frame)
                calculate_window_update(@local_window_limit)
              end
            else
              case frame_type
              # The PRIORITY frame can be sent for a stream in the "idle" or
              # "closed" state. This allows for the reprioritization of a
              # group of dependent streams by altering the priority of an
              # unused or closed parent stream.
              when :priority
                stream = activate_stream(
                  id: stream_id,
                  weight: frame[:weight] || DEFAULT_WEIGHT,
                  dependency: frame[:dependency] || 0,
                  exclusive: frame[:exclusive] || false
                )

                emit(:stream, stream)
                stream << frame

              # WINDOW_UPDATE can be sent by a peer that has sent a frame
              # bearing the END_STREAM flag. This means that a receiver could
              # receive a WINDOW_UPDATE frame on a "half-closed (remote)" or
              # "closed" stream. A receiver MUST NOT treat this as an error
              # (see Section 5.1).
              when :window_update
                unless @streams_recently_closed.key?(stream_id)
                  connection_error(:protocol_error, msg: "sent window update on idle stream")
                end
                stream = @streams_recently_closed[stream_id]
                process_window_update(frame: frame, encode: true)
              # Endpoints MUST ignore
              # WINDOW_UPDATE or RST_STREAM frames received in this state (closed), though
              # endpoints MAY choose to treat frames that arrive a significant
              # time after sending END_STREAM as a connection error.
              when :rst_stream
                unless @streams_recently_closed.key?(stream_id)
                  connection_error(:protocol_error, msg: "sent window update on idle stream")
                end
              else
                # An endpoint that receives an unexpected stream identifier
                # MUST respond with a connection error of type PROTOCOL_ERROR.
                connection_error(msg: "stream does not exist")
              end
            end
          end
        end
      end
    rescue StandardError => e
      raise if e.is_a?(Error::Error)

      connection_error(e: e)
    end

    def <<(data)
      receive(data)
    end

    private

    # Send an outgoing frame. DATA frames are subject to connection flow
    # control and may be split and / or buffered based on current window size.
    # All other frames are sent immediately.
    #
    # @note all frames are currently delivered in FIFO order.
    # @param frame [Hash]
    def send(frame)
      frame_type = frame[:type]

      emit(:frame_sent, frame)
      if frame_type == :data
        send_data(frame, true)

      elsif frame_type == :rst_stream && frame[:error] == :protocol_error
        # An endpoint can end a connection at any time. In particular, an
        # endpoint MAY choose to treat a stream error as a connection error.

        goaway(:protocol_error)
      else
        # HEADERS and PUSH_PROMISE may generate CONTINUATION. Also send
        # RST_STREAM that are not protocol errors
        encode(frame)
      end
    end

    # Applies HTTP 2.0 binary encoding to the frame.
    #
    # @param frame [Hash]
    def encode(frame)
      if HEADERS_FRAME_TYPES.include?(frame[:type])
        encode_headers(frame) # HEADERS and PUSH_PROMISE may create more than one frame
      else
        emit(:frame, @framer.generate(frame))
      end
    end

    # Check if frame is a connection frame: SETTINGS, PING, GOAWAY, and any
    # frame addressed to stream ID = 0.
    #
    # @param frame [Hash]
    # @return [Boolean]
    def connection_frame?(frame)
      frame[:stream].zero? || CONNECTION_FRAME_TYPES.include?(frame[:type])
    end

    # Process received connection frame (stream ID = 0).
    # - Handle SETTINGS updates
    # - Connection flow control (WINDOW_UPDATE)
    # - Emit PONG auto-reply to PING frames
    # - Mark connection as closed on GOAWAY
    #
    # @param frame [Hash]
    def connection_management(frame)
      frame_type = frame[:type]

      case @state
      when :waiting_connection_preface
        # The first frame MUST be a SETTINGS frame at the start of a connection.
        connection_error unless frame[:type] == :settings

        @state = :connected
        connection_settings(frame)

      when :connected
        case frame_type
        when :settings
          connection_settings(frame)
        when :window_update
          process_window_update(frame: frame, encode: true)
        when :ping
          ping_management(frame)
        when :goaway
          # Receivers of a GOAWAY frame MUST NOT open additional streams on
          # the connection, although a new connection can be established
          # for new streams.
          @state = :closed
          @closed_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
          emit(:goaway, frame[:last_stream], frame[:error], frame[:payload])
        when :altsvc
          origin = frame[:origin]
          # 4.  The ALTSVC HTTP/2 Frame
          # An ALTSVC frame on stream 0 with empty (length 0) "Origin"
          # information is invalid and MUST be ignored.
          emit(:altsvc, frame) if origin && !origin.empty?
        when :origin
          return if @h2c_upgrade || !frame[:flags].empty?

          frame[:payload].each do |orig|
            emit(:origin, orig)
          end
        when :blocked
          emit(:blocked, frame)
        else
          connection_error
        end
      when :closed
        case frame_type
        when :goaway
          connection_error
        when :ping
          ping_management(frame)
        else
          connection_error if (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @closed_since) > 15
        end
      else
        connection_error
      end
    end

    def ping_management(frame)
      if frame[:flags].include? :ack
        emit(:ack, frame[:payload])
      else
        send(type: :ping, stream: 0,
             flags: [:ack], payload: frame[:payload])
      end
    end

    # Validate settings parameters.  See sepc Section 6.5.2.
    #
    # @param role [Symbol] The sender's role: :client or :server
    # @return nil if no error.  Exception object in case of any error.
    def validate_settings(role, settings)
      settings.each do |key, v|
        case key
        when :settings_enable_push
          case role
          when :server
            # Section 8.2
            # Clients MUST reject any attempt to change the
            # SETTINGS_ENABLE_PUSH setting to a value other than 0 by treating the
            # message as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
            next if v.zero?

            connection_error(:protocol_error, msg: "invalid #{key} value")
          when :client
            # Any value other than 0 or 1 MUST be treated as a
            # connection error (Section 5.4.1) of type PROTOCOL_ERROR.
            next if v.zero? || v == 1

            connection_error(:protocol_error, msg: "invalid #{key} value")
          end
        when :settings_initial_window_size
          # Values above the maximum flow control window size of 2^31-1 MUST
          # be treated as a connection error (Section 5.4.1) of type
          # FLOW_CONTROL_ERROR.
          next if v <= 0x7fffffff

          connection_error(:flow_control_error, msg: "invalid #{key} value")
        when :settings_max_frame_size
          # The initial value is 2^14 (16,384) octets.  The value advertised
          # by an endpoint MUST be between this initial value and the maximum
          # allowed frame size (2^24-1 or 16,777,215 octets), inclusive.
          # Values outside this range MUST be treated as a connection error
          # (Section 5.4.1) of type PROTOCOL_ERROR.
          next if v.between?(16_384, 16_777_215)

          connection_error(:protocol_error, msg: "invalid #{key} value")
          # when :settings_max_concurrent_streams
          # Any value is valid
          # when :settings_header_table_size
          # Any value is valid
          # when :settings_max_header_list_size
          # Any value is valid
          # else # ignore unknown settings
        end
      end
    end

    # Update connection settings based on parameters set by the peer.
    #
    # @param frame [Hash]
    def connection_settings(frame)
      # Apply settings.
      #  side =
      #   local: previously sent and pended our settings should be effective
      #   remote: just received peer settings should immediately be effective
      if frame[:flags].include?(:ack)
        # Process pending settings we have sent.
        settings = @pending_settings.shift
        side = :local
      else
        validate_settings(@remote_role, frame[:payload])
        settings = frame[:payload]
        side = :remote
      end

      settings.each do |key, v|
        case side
        when :local
          @local_settings[key] = v
        when :remote
          @remote_settings[key] = v
        end

        case key
        when :settings_max_concurrent_streams
          # Do nothing.
          # The value controls at the next attempt of stream creation.

        when :settings_initial_window_size
          # A change to SETTINGS_INITIAL_WINDOW_SIZE could cause the available
          # space in a flow control window to become negative. A sender MUST
          # track the negative flow control window, and MUST NOT send new flow
          # controlled frames until it receives WINDOW_UPDATE frames that cause
          # the flow control window to become positive.
          case side
          when :local
            @local_window = @local_window - @local_window_limit + v
            @streams.each_value do |stream|
              stream.emit(:local_window, stream.local_window - @local_window_limit + v)
            end

            @local_window_limit = v
          when :remote
            # can adjust the initial window size for new streams by including a
            # value for SETTINGS_INITIAL_WINDOW_SIZE in the SETTINGS frame.
            # The connection flow-control window can only be changed using
            # WINDOW_UPDATE frames.
            @streams.each_value do |stream|
              # Event name is :window, not :remote_window
              stream.emit(:window, stream.remote_window - @remote_window_limit + v)
            end

            @remote_window_limit = v
          end

        when :settings_header_table_size
          # Setting header table size might cause some headers evicted
          case side
          when :local
            @compressor.table_size = v
          when :remote
            @decompressor.table_size = v
          end

        when :settings_enable_push
          # nothing to do

        when :settings_max_frame_size
          @framer.remote_max_frame_size = v

          # else # ignore unknown settings
        end
      end

      case side
      when :local
        # Received a settings_ack.  Notify application layer.
        emit(:settings_ack, frame, @pending_settings.size)
      when :remote
        unless @state == :closed || @h2c_upgrade == :start
          # Send ack to peer
          send(type: :settings, stream: 0, payload: [], flags: [:ack])
          # when initial window size changes, we try to flush any buffered
          # data.
          @streams.each_value(&:flush)
        end
      end
    end

    # Decode headers payload and update connection decompressor state.
    #
    # The receiver endpoint reassembles the header block by concatenating
    # the individual fragments, then decompresses the block to reconstruct
    # the header set - aka, header payloads are buffered until END_HEADERS,
    # or an END_PROMISE flag is seen.
    #
    # @param frame [Hash]
    def decode_headers(frame)
      frame[:payload] = @decompressor.decode(frame[:payload], frame) if frame[:payload].is_a?(String)
    rescue CompressionError => e
      connection_error(:compression_error, e: e)
    rescue ProtocolError => e
      connection_error(:protocol_error, e: e)
    rescue StandardError => e
      connection_error(:internal_error, e: e)
    end

    # Encode headers payload and update connection compressor state.
    #
    # @param headers_frame [Hash]
    def encode_headers(headers_frame)
      payload = headers_frame[:payload]
      begin
        payload = headers_frame[:payload] = @compressor.encode(payload) unless payload.is_a?(String)
      rescue StandardError => e
        connection_error(:compression_error, e: e)
      end

      max_frame_size = @remote_settings[:settings_max_frame_size]

      # if single frame, return immediately
      if payload.bytesize <= max_frame_size
        emit(:frame, @framer.generate(headers_frame))
        return
      end

      # split into multiple CONTINUATION frames
      headers_frame[:flags].delete(:end_headers)
      headers_frame[:payload] = payload.byteslice(0, max_frame_size)
      payload = payload.byteslice(max_frame_size..-1)

      # emit first HEADERS frame
      emit(:frame, @framer.generate(headers_frame))

      loop do
        continuation_frame = headers_frame.merge(
          type: :continuation,
          flags: EMPTY,
          payload: payload.byteslice(0, max_frame_size)
        )

        payload = payload.byteslice(max_frame_size..-1)

        if payload.nil? || payload.empty?
          continuation_frame[:flags] = [:end_headers]
          emit(:frame, @framer.generate(continuation_frame))
          break
        end

        emit(:frame, @framer.generate(continuation_frame))
      end
    end

    # Activates new incoming or outgoing stream and registers appropriate
    # connection managemet callbacks.
    #
    # @param id [Integer]
    # @param priority [Integer]
    # @param window [Integer]
    # @param parent [Stream]
    def activate_stream(id:, **args)
      connection_error(msg: "Stream ID already exists") if @streams.key?(id)

      raise StreamLimitExceeded if @active_stream_count >= @local_settings[:settings_max_concurrent_streams]

      stream = Stream.new(connection: self, id: id, **args)

      stream.once(:close) do
        @streams.delete(id)

        # Store a reference to the closed stream, such that we can respond
        # to any in-flight frames while close is registered on both sides.
        # References to such streams will be purged whenever another stream
        # is closed, with a minimum of 15s RTT time window.
        now = Process.clock_gettime(Process::CLOCK_MONOTONIC)

        _, closed_since = @streams_recently_closed.first

        # forego recently closed recycling if empty or the first element
        # hasn't expired yet (it's ordered).
        if closed_since && (now - closed_since) > 15
          # discards all streams which have closed for a while.
          # TODO: use a drop_while! variant whenever there is one.
          @streams_recently_closed = @streams_recently_closed.drop_while do |_, since|
            (now - since) > 15
          end.to_h
        end

        @streams_recently_closed[id] = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      end

      stream.on(:frame, &method(:send))
      @streams[id] = stream
    end

    def verify_stream_order(id)
      return unless id.odd?

      connection_error(msg: "Stream ID smaller than previous") if @last_stream_id >= id
      @last_stream_id = id
    end

    def _verify_pseudo_headers(frame, mandatory_headers)
      headers = frame[:payload]
      return if headers.is_a?(String)

      pseudo_headers = headers.take_while do |field, value|
        # use this loop to validate pseudo-headers
        connection_error(:protocol_error, msg: "path is empty") if field == ":path" && value.empty?
        field.start_with?(":")
      end.map(&:first)
      return if mandatory_headers.size == pseudo_headers.size &&
                (mandatory_headers - pseudo_headers).empty?

      connection_error(:protocol_error, msg: "invalid pseudo-headers")
    end

    # Emit GOAWAY error indicating to peer that the connection is being
    # aborted, and once sent, raise a local exception.
    #
    # @param error [Symbol]
    # @option error [Symbol] :no_error
    # @option error [Symbol] :internal_error
    # @option error [Symbol] :flow_control_error
    # @option error [Symbol] :stream_closed
    # @option error [Symbol] :frame_too_large
    # @option error [Symbol] :compression_error
    # @param msg [String]
    def connection_error(error = :protocol_error, msg: nil, e: nil)
      goaway(error) unless @state == :closed || @state == :new

      @state = :closed
      @error = error
      msg ||= e ? e.message : "protocol error"
      backtrace = e ? e.backtrace : nil
      raise Error.types[error], msg, backtrace
    end
    alias error connection_error

    def manage_state(_)
      yield
    end
  end
end