File: FullRequestResponse.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (132 lines) | stat: -rw-r--r-- 5,062 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

// THIS FILE IS MOSTLY COPIED FROM swift-nio-extras

import NIO
import NIOHTTP1

public final class MakeFullRequestHandler: ChannelOutboundHandler {
    public typealias OutboundOut = HTTPClientRequestPart
    public typealias OutboundIn = HTTPRequestHead

    public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
        let req = self.unwrapOutboundIn(data)

        context.write(self.wrapOutboundOut(.head(req)), promise: nil)
        context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
    }
}

/// `RequestResponseHandler` receives a `Request` alongside an `EventLoopPromise<Response>` from the `Channel`'s
/// outbound side. It will fulfill the promise with the `Response` once it's received from the `Channel`'s inbound
/// side.
///
/// `RequestResponseHandler` does support pipelining `Request`s and it will send them pipelined further down the
/// `Channel`. Should `RequestResponseHandler` receive an error from the `Channel`, it will fail all promises meant for
/// the outstanding `Reponse`s and close the `Channel`. All requests enqueued after an error occured will be immediately
/// failed with the first error the channel received.
///
/// `RequestResponseHandler` requires that the `Response`s arrive on `Channel` in the same order as the `Request`s
/// were submitted.
public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandler {
    public typealias InboundIn = Response
    public typealias InboundOut = Never
    public typealias OutboundIn = (Request, EventLoopPromise<Response>)
    public typealias OutboundOut = Request

    private enum State {
        case operational
        case error(Error)

        var isOperational: Bool {
            switch self {
            case .operational:
                return true
            case .error:
                return false
            }
        }
    }

    private var state: State = .operational
    private var promiseBuffer: CircularBuffer<EventLoopPromise<Response>>


    /// Create a new `RequestResponseHandler`.
    ///
    /// - parameters:
    ///    - initialBufferCapacity: `RequestResponseHandler` saves the promises for all outstanding responses in a
    ///          buffer. `initialBufferCapacity` is the initial capacity for this buffer. You usually do not need to set
    ///          this parameter unless you intend to pipeline very deeply and don't want the buffer to resize.
    public init(initialBufferCapacity: Int = 4) {
        self.promiseBuffer = CircularBuffer(initialCapacity: initialBufferCapacity)
    }

    public func channelInactive(context: ChannelHandlerContext) {
        switch self.state {
        case .error:
            // We failed any outstanding promises when we entered the error state and will fail any
            // new promises in write.
            assert(self.promiseBuffer.count == 0)
        case .operational:
            let promiseBuffer = self.promiseBuffer
            self.promiseBuffer.removeAll()
            promiseBuffer.forEach { promise in
                promise.fail(ChannelError.eof)
            }
        }
        context.fireChannelInactive()
    }

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        guard self.state.isOperational else {
            // we're in an error state, ignore further responses
            assert(self.promiseBuffer.count == 0)
            return
        }

        let response = self.unwrapInboundIn(data)
        let promise = self.promiseBuffer.removeFirst()

        promise.succeed(response)
    }

    public func errorCaught(context: ChannelHandlerContext, error: Error) {
        guard self.state.isOperational else {
            assert(self.promiseBuffer.count == 0)
            return
        }
        self.state = .error(error)
        let promiseBuffer = self.promiseBuffer
        self.promiseBuffer.removeAll()
        context.close(promise: nil)
        promiseBuffer.forEach {
            $0.fail(error)
        }
    }

    public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
        let (request, responsePromise) = self.unwrapOutboundIn(data)
        switch self.state {
        case .error(let error):
            assert(self.promiseBuffer.count == 0)
            responsePromise.fail(error)
            promise?.fail(error)
        case .operational:
            self.promiseBuffer.append(responsePromise)
            context.write(self.wrapOutboundOut(request), promise: promise)
        }
    }
}