1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
/// Wraps a NIO ``Channel`` object into a form suitable for use in Swift Concurrency.
///
/// ``NIOAsyncChannel`` abstracts the notion of a NIO ``Channel`` into something that
/// can safely be used in a structured concurrency context. In particular, this exposes
/// the following functionality:
///
/// - reads are presented as an `AsyncSequence`
/// - writes can be written to with async functions on a writer, providing back pressure
/// - channels can be closed seamlessly
///
/// This type does not replace the full complexity of NIO's ``Channel``. In particular, it
/// does not expose the following functionality:
///
/// - user events
/// - traditional NIO back pressure such as writability signals and the ``Channel/read()`` call
///
/// Users are encouraged to separate their ``ChannelHandler``s into those that implement
/// protocol-specific logic (such as parsers and encoders) and those that implement business
/// logic. Protocol-specific logic should be implemented as a ``ChannelHandler``, while business
/// logic should use ``NIOAsyncChannel`` to consume and produce data to the network.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
public struct Configuration: Sendable {
/// The back pressure strategy of the ``NIOAsyncChannel/inbound``.
public var backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark
/// If outbound half closure should be enabled. Outbound half closure is triggered once
/// the ``NIOAsyncChannelOutboundWriter`` is either finished or deinitialized.
public var isOutboundHalfClosureEnabled: Bool
/// The ``NIOAsyncChannel/inbound`` message's type.
public var inboundType: Inbound.Type
/// The ``NIOAsyncChannel/outbound`` message's type.
public var outboundType: Outbound.Type
/// Initializes a new ``NIOAsyncChannel/Configuration``.
///
/// - Parameters:
/// - backPressureStrategy: The back pressure strategy of the ``NIOAsyncChannel/inbound``. Defaults
/// to a watermarked strategy (lowWatermark: 2, highWatermark: 10).
/// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled. Outbound half closure is triggered once
/// the ``NIOAsyncChannelOutboundWriter`` is either finished or deinitialized. Defaults to `false`.
/// - inboundType: The ``NIOAsyncChannel/inbound`` message's type.
/// - outboundType: The ``NIOAsyncChannel/outbound`` message's type.
public init(
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark = .init(lowWatermark: 2, highWatermark: 10),
isOutboundHalfClosureEnabled: Bool = false,
inboundType: Inbound.Type = Inbound.self,
outboundType: Outbound.Type = Outbound.self
) {
self.backPressureStrategy = backPressureStrategy
self.isOutboundHalfClosureEnabled = isOutboundHalfClosureEnabled
self.inboundType = inboundType
self.outboundType = outboundType
}
}
/// The underlying channel being wrapped by this ``NIOAsyncChannel``.
public let channel: Channel
/// The stream of inbound messages.
///
/// - Important: The `inbound` stream is a unicast `AsyncSequence` and only one iterator can be created.
@available(*, deprecated, message: "Use the executeThenClose scoped method instead.")
public var inbound: NIOAsyncChannelInboundStream<Inbound> {
self._inbound
}
/// The writer for writing outbound messages.
@available(*, deprecated, message: "Use the executeThenClose scoped method instead.")
public var outbound: NIOAsyncChannelOutboundWriter<Outbound> {
self._outbound
}
@usableFromInline
let _inbound: NIOAsyncChannelInboundStream<Inbound>
@usableFromInline
let _outbound: NIOAsyncChannelOutboundWriter<Outbound>
/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel``.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
public init(
wrappingChannelSynchronously channel: Channel,
configuration: Configuration = .init()
) throws {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: false
)
}
/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel`` where the outbound type is `Never`.
///
/// This initializer will finish the ``NIOAsyncChannel/outbound`` immediately.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
public init(
wrappingChannelSynchronously channel: Channel,
configuration: Configuration = .init()
) throws where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: false
)
self._outbound.finish()
}
/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel``.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@available(*, deprecated, renamed: "init(wrappingChannelSynchronously:configuration:)", message: "This method has been deprecated since it defaults to deinit based resource teardown")
@inlinable
public init(
synchronouslyWrapping channel: Channel,
configuration: Configuration = .init()
) throws {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: true
)
}
/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel`` where the outbound type is `Never`.
///
/// This initializer will finish the ``NIOAsyncChannel/outbound`` immediately.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
@available(*, deprecated, renamed: "init(wrappingChannelSynchronously:configuration:)", message: "This method has been deprecated since it defaults to deinit based resource teardown")
public init(
synchronouslyWrapping channel: Channel,
configuration: Configuration = .init()
) throws where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: true
)
self._outbound.finish()
}
@inlinable
internal init(
channel: Channel,
inboundStream: NIOAsyncChannelInboundStream<Inbound>,
outboundWriter: NIOAsyncChannelOutboundWriter<Outbound>
) {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
self._inbound = inboundStream
self._outbound = outboundWriter
}
/// This method is only used from our server bootstrap to allow us to run the child channel initializer
/// at the right moment.
///
/// - Important: This is not considered stable API and should not be used.
@inlinable
@available(*, deprecated, message: "This method has been deprecated since it defaults to deinit based resource teardown")
public static func _wrapAsyncChannelWithTransformations(
synchronouslyWrapping channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = false,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
) throws -> NIOAsyncChannel<Inbound, Outbound> where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
let (inboundStream, outboundWriter): (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) = try channel._syncAddAsyncHandlersWithTransformations(
backPressureStrategy: backPressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: true,
channelReadTransformation: channelReadTransformation
)
outboundWriter.finish()
return .init(
channel: channel,
inboundStream: inboundStream,
outboundWriter: outboundWriter
)
}
/// This method is only used from our server bootstrap to allow us to run the child channel initializer
/// at the right moment.
///
/// - Important: This is not considered stable API and should not be used.
@inlinable
public static func _wrapAsyncChannelWithTransformations(
wrappingChannelSynchronously channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = false,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
) throws -> NIOAsyncChannel<Inbound, Outbound> where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
let (inboundStream, outboundWriter): (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) = try channel._syncAddAsyncHandlersWithTransformations(
backPressureStrategy: backPressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: false,
channelReadTransformation: channelReadTransformation
)
outboundWriter.finish()
return .init(
channel: channel,
inboundStream: inboundStream,
outboundWriter: outboundWriter
)
}
/// Provides scoped access to the inbound and outbound side of the underlying ``Channel``.
///
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameter body: A closure that gets scoped access to the inbound and outbound.
public func executeThenClose<Result>(
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>, _ outbound: NIOAsyncChannelOutboundWriter<Outbound>) async throws -> Result
) async throws -> Result {
let result: Result
do {
result = try await body(self._inbound, self._outbound)
} catch let bodyError {
do {
self._outbound.finish()
try await self.channel.close().get()
throw bodyError
} catch {
throw bodyError
}
}
do {
self._outbound.finish()
try await self.channel.close().get()
} catch {
if let error = error as? ChannelError, error == .alreadyClosed {
return result
}
throw error
}
return result
}
/// Provides scoped access to the inbound side of the underlying ``Channel``.
///
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameter body: A closure that gets scoped access to the inbound.
public func executeThenClose<Result>(
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>) async throws -> Result
) async throws -> Result where Outbound == Never {
try await self.executeThenClose { inbound, _ in
try await body(inbound)
}
}
}
extension Channel {
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@inlinable
func _syncAddAsyncHandlers<Inbound: Sendable, Outbound: Sendable>(
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
isOutboundHalfClosureEnabled: Bool,
closeOnDeinit: Bool
) throws -> (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) {
self.eventLoop.assertInEventLoop()
let inboundStream = try NIOAsyncChannelInboundStream<Inbound>.makeWrappingHandler(
channel: self,
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit
)
let writer = try NIOAsyncChannelOutboundWriter<Outbound>(
channel: self,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: closeOnDeinit
)
return (inboundStream, writer)
}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@inlinable
func _syncAddAsyncHandlersWithTransformations<ChannelReadResult>(
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
isOutboundHalfClosureEnabled: Bool,
closeOnDeinit: Bool,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<ChannelReadResult>
) throws -> (NIOAsyncChannelInboundStream<ChannelReadResult>, NIOAsyncChannelOutboundWriter<Never>) {
self.eventLoop.assertInEventLoop()
let inboundStream = try NIOAsyncChannelInboundStream<ChannelReadResult>.makeTransformationHandler(
channel: self,
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit,
channelReadTransformation: channelReadTransformation
)
let writer = try NIOAsyncChannelOutboundWriter<Never>(
channel: self,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: closeOnDeinit
)
return (inboundStream, writer)
}
}
|