1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift open source project
//
// Copyright (c) 2023 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
import _Concurrency
import DequeModule
/// Type modeled after a "token bucket" pattern, which is similar to a semaphore, but is built with
/// Swift Concurrency primitives.
public actor TokenBucket {
private var tokens: Int
private var waiters: Deque<CheckedContinuation<Void, Never>>
public init(tokens: Int) {
self.tokens = tokens
self.waiters = Deque()
}
/// Executes an `async` closure immediately when a token is available.
/// Only the same number of closures will be executed concurrently as the number
/// of `tokens` passed to ``TokenBucket/init(tokens:)``, all subsequent
/// invocations of `withToken` will suspend until a "free" token is available.
/// - Parameter body: The closure to invoke when a token is available.
/// - Returns: Resulting value returned by `body`.
public func withToken<ReturnType: Sendable>(
_ body: @Sendable () async throws -> ReturnType
) async rethrows -> ReturnType {
await self.getToken()
defer {
self.returnToken()
}
return try await body()
}
private func getToken() async {
if self.tokens > 0 {
self.tokens -= 1
return
}
await withCheckedContinuation {
self.waiters.append($0)
}
}
private func returnToken() {
if let nextWaiter = self.waiters.popFirst() {
nextWaiter.resume()
} else {
self.tokens += 1
}
}
}
|