1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
extension EventLoopFuture {
/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback,
/// which will provide a new `EventLoopFuture` alongside the `EventLoop` associated with this future.
///
/// This allows you to dynamically dispatch new asynchronous tasks as phases in a
/// longer series of processing steps. Note that you can use the results of the
/// current `EventLoopFuture<Value>` when determining how to dispatch the next operation.
///
/// This works well when you have APIs that already know how to return `EventLoopFuture`s.
/// You can do something with the result of one and just return the next future:
///
/// ```
/// let d1 = networkRequest(args).future()
/// let d2 = d1.flatMapWithEventLoop { t, eventLoop -> EventLoopFuture<NewValue> in
/// eventLoop.makeSucceededFuture(t + 1)
/// }
/// d2.whenSuccess { u in
/// NSLog("Result of second request: \(u)")
/// }
/// ```
///
/// Note: In a sense, the `EventLoopFuture<NewValue>` is returned before it's created.
///
/// - parameters:
/// - callback: Function that will receive the value of this `EventLoopFuture` and return
/// a new `EventLoopFuture`.
/// - returns: A future that will receive the eventual value.
@inlinable
@preconcurrency
public func flatMapWithEventLoop<NewValue>(_ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture<NewValue>) -> EventLoopFuture<NewValue> {
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self.eventLoop)
self._whenComplete { [eventLoop = self.eventLoop] in
switch self._value! {
case .success(let t):
let futureU = callback(t, eventLoop)
if futureU.eventLoop.inEventLoop {
return futureU._addCallback {
next._setValue(value: futureU._value!)
}
} else {
futureU.cascade(to: next)
return CallbackList()
}
case .failure(let error):
return next._setValue(value: .failure(error))
}
}
return next.futureResult
}
/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
/// may recover from the error by returning an `EventLoopFuture<NewValue>`. The callback is intended to potentially
/// recover from the error by returning a new `EventLoopFuture` that will eventually contain the recovered
/// result.
///
/// If the callback cannot recover it should return a failed `EventLoopFuture`.
///
/// - parameters:
/// - callback: Function that will receive the error value of this `EventLoopFuture` and return
/// a new value lifted into a new `EventLoopFuture`.
/// - returns: A future that will receive the recovered value.
@inlinable
@preconcurrency
public func flatMapErrorWithEventLoop(_ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture<Value>) -> EventLoopFuture<Value> {
let next = EventLoopPromise<Value>.makeUnleakablePromise(eventLoop: self.eventLoop)
self._whenComplete { [eventLoop = self.eventLoop] in
switch self._value! {
case .success(let t):
return next._setValue(value: .success(t))
case .failure(let e):
let t = callback(e, eventLoop)
if t.eventLoop.inEventLoop {
return t._addCallback {
next._setValue(value: t._value!)
}
} else {
t.cascade(to: next)
return CallbackList()
}
}
}
return next.futureResult
}
/// Returns a new `EventLoopFuture` that fires only when this `EventLoopFuture` and
/// all the provided `futures` complete. It then provides the result of folding the value of this
/// `EventLoopFuture` with the values of all the provided `futures`.
///
/// This function is suited when you have APIs that already know how to return `EventLoopFuture`s.
///
/// The returned `EventLoopFuture` will fail as soon as the a failure is encountered in any of the
/// `futures` (or in this one). However, the failure will not occur until all preceding
/// `EventLoopFutures` have completed. At the point the failure is encountered, all subsequent
/// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once
/// a failure is encountered, it will immediately fail the overall EventLoopFuture.
///
/// - parameters:
/// - futures: An array of `EventLoopFuture<NewValue>` to wait for.
/// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`.
/// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`.
@inlinable
@preconcurrency
public func foldWithEventLoop<OtherValue>(
_ futures: [EventLoopFuture<OtherValue>],
with combiningFunction: @escaping @Sendable (Value, OtherValue, EventLoop) -> EventLoopFuture<Value>
) -> EventLoopFuture<Value> {
func fold0(eventLoop: EventLoop) -> EventLoopFuture<Value> {
let body = futures.reduce(self) { (f1: EventLoopFuture<Value>, f2: EventLoopFuture<OtherValue>) -> EventLoopFuture<Value> in
let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture<Value> in
let (f1Value, f2Value) = args
self.eventLoop.assertInEventLoop()
return combiningFunction(f1Value, f2Value, eventLoop)
}
assert(newFuture.eventLoop === self.eventLoop)
return newFuture
}
return body
}
if self.eventLoop.inEventLoop {
return fold0(eventLoop: self.eventLoop)
} else {
let promise = self.eventLoop.makePromise(of: Value.self)
self.eventLoop.execute { [eventLoop = self.eventLoop] in
fold0(eventLoop: eventLoop).cascade(to: promise)
}
return promise.futureResult
}
}
}
|