File: AsyncChannelInboundStream.swift

package info (click to toggle)
swiftlang 6.1.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,791,532 kB
  • sloc: cpp: 9,901,743; ansic: 2,201,431; asm: 1,091,827; python: 308,252; objc: 82,166; f90: 80,126; lisp: 38,358; pascal: 25,559; sh: 20,429; ml: 5,058; perl: 4,745; makefile: 4,484; awk: 3,535; javascript: 3,018; xml: 918; fortran: 664; cs: 573; ruby: 396
file content (197 lines) | stat: -rw-r--r-- 7,446 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// The inbound message asynchronous sequence of a ``NIOAsyncChannel``.
///
/// This is a unicast async sequence that allows a single iterator to be created.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
    @usableFromInline
    typealias Producer = NIOThrowingAsyncSequenceProducer<Inbound, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate>

    /// A source used for driving a ``NIOAsyncChannelInboundStream`` during tests.
    public struct TestSource {
        @usableFromInline
        internal let continuation: AsyncThrowingStream<Inbound, Error>.Continuation

        @inlinable
        init(continuation: AsyncThrowingStream<Inbound, Error>.Continuation) {
            self.continuation = continuation
        }

        /// Yields the element to the inbound stream.
        ///
        /// - Parameter element: The element to yield to the inbound stream.
        @inlinable
        public func yield(_ element: Inbound) {
            self.continuation.yield(element)
        }

        /// Finished the inbound stream.
        ///
        /// - Parameter error: The error to throw, or nil, to finish normally.
        @inlinable
        public func finish(throwing error: Error? = nil) {
            self.continuation.finish(throwing: error)
        }
    }

    @usableFromInline
    enum _Backing: Sendable {
        case asyncStream(AsyncThrowingStream<Inbound, Error>)
        case producer(Producer)
    }

    /// The underlying async sequence.
    @usableFromInline
    let _backing: _Backing

    /// Creates a new stream with a source for testing.
    ///
    /// This is useful for writing unit tests where you want to drive a ``NIOAsyncChannelInboundStream``.
    ///
    /// - Returns: A tuple containing the input stream and a test source to drive it.
    @inlinable
    public static func makeTestingStream() -> (Self, TestSource) {
        var continuation: AsyncThrowingStream<Inbound, Error>.Continuation!
        let stream = AsyncThrowingStream<Inbound, Error> { continuation = $0 }
        let source = TestSource(continuation: continuation)
        let inputStream = Self(stream: stream)
        return (inputStream, source)
    }

    @inlinable
    init(stream: AsyncThrowingStream<Inbound, Error>) {
        self._backing = .asyncStream(stream)
    }

    @inlinable
    init<HandlerInbound: Sendable>(
        channel: Channel,
        backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
        closeOnDeinit: Bool,
        handler: NIOAsyncChannelInboundStreamChannelHandler<HandlerInbound, Inbound>
    ) throws {
        channel.eventLoop.preconditionInEventLoop()
        let strategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark

        if let userProvided = backPressureStrategy {
            strategy = userProvided
        } else {
            // Default strategy. These numbers are fairly arbitrary, but they line up with the default value of
            // maxMessagesPerRead.
            strategy = .init(lowWatermark: 2, highWatermark: 10)
        }

        let sequence = Producer.makeSequence(
            backPressureStrategy: strategy,
            finishOnDeinit: closeOnDeinit,
            delegate: NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate(handler: handler)
        )
        handler.source = sequence.source
        try channel.pipeline.syncOperations.addHandler(handler)
        self._backing = .producer(sequence.sequence)
    }

    /// Creates a new ``NIOAsyncChannelInboundStream`` which is used when the pipeline got synchronously wrapped.
    @inlinable
    static func makeWrappingHandler(
        channel: Channel,
        backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
        closeOnDeinit: Bool
    ) throws -> NIOAsyncChannelInboundStream {
        let handler = NIOAsyncChannelInboundStreamChannelHandler<Inbound, Inbound>.makeHandler(
            eventLoop: channel.eventLoop
        )

        return try .init(
            channel: channel,
            backPressureStrategy: backPressureStrategy,
            closeOnDeinit: closeOnDeinit,
            handler: handler
        )
    }

    /// Creates a new ``NIOAsyncChannelInboundStream`` which has hooks for transformations.
    @inlinable
    static func makeTransformationHandler(
        channel: Channel,
        backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
        closeOnDeinit: Bool,
        channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
    ) throws -> NIOAsyncChannelInboundStream {
        let handler = NIOAsyncChannelInboundStreamChannelHandler<Channel, Inbound>.makeHandlerWithTransformations(
            eventLoop: channel.eventLoop,
            channelReadTransformation: channelReadTransformation
        )

        return try .init(
            channel: channel,
            backPressureStrategy: backPressureStrategy,
            closeOnDeinit: closeOnDeinit,
            handler: handler
        )
    }
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOAsyncChannelInboundStream: AsyncSequence {
    public typealias Element = Inbound

    public struct AsyncIterator: AsyncIteratorProtocol {
        @usableFromInline
        enum _Backing {
            case asyncStream(AsyncThrowingStream<Inbound, Error>.Iterator)
            case producer(Producer.AsyncIterator)
        }

        @usableFromInline var _backing: _Backing

        @inlinable
        init(_ backing: NIOAsyncChannelInboundStream<Inbound>._Backing) {
            switch backing {
            case .asyncStream(let asyncStream):
                self._backing = .asyncStream(asyncStream.makeAsyncIterator())
            case .producer(let producer):
                self._backing = .producer(producer.makeAsyncIterator())
            }
        }

        @inlinable
        public mutating func next() async throws -> Element? {
            switch self._backing {
            case .asyncStream(var iterator):
                defer {
                    self._backing = .asyncStream(iterator)
                }
                let value = try await iterator.next()
                return value

            case .producer(let iterator):
                return try await iterator.next()
            }
        }
    }

    @inlinable
    public func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(self._backing)
    }
}

/// The ``NIOAsyncChannelInboundStream/AsyncIterator`` MUST NOT be shared across `Task`s. With marking this as
/// unavailable we are explicitly declaring this.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@available(*, unavailable)
extension NIOAsyncChannelInboundStream.AsyncIterator: Sendable {}