1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2018 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
import Foundation
/// An operation queue that stops scheduling new operations as soon as the the
/// completion handler returns `false`.
public final class StressTesterOperationQueue<Item: Operation> {
private let serialQueue = DispatchQueue(label: "\(StressTesterOperationQueue.self)")
private let queue = OperationQueue()
private let operations: [Item]
private let completionHandler: (_ index: Int, _ operation: Item, _ operationsCompleted: Int, _ totalOperationCount: Int) -> Bool
public init(operations: [Item], maxWorkers: Int? = nil,
completionHandler: @escaping (Int, Item, Int, Int) -> Bool) {
self.operations = operations
self.completionHandler = completionHandler
let processorCount = ProcessInfo.processInfo.activeProcessorCount
if let maxWorkers = maxWorkers, maxWorkers < processorCount {
queue.maxConcurrentOperationCount = maxWorkers
} else {
queue.maxConcurrentOperationCount = processorCount
}
}
public func waitUntilFinished() {
let group = DispatchGroup()
var completed = 0
for (index, operation) in operations.enumerated() {
group.enter()
operation.completionBlock = { [weak self, weak operation] in
defer { group.leave() }
guard let `self` = self, let operation = operation else { return }
self.serialQueue.sync {
completed += 1
if !self.completionHandler(index, operation, completed, self.operations.count) {
self.cancelAfter(index)
}
}
}
queue.addOperation(operation)
}
queue.waitUntilAllOperationsAreFinished()
group.wait()
}
private func cancelAfter(_ index: Int) {
let nextIndex = index.advanced(by: 1)
if nextIndex < self.operations.endIndex {
self.operations[nextIndex...].forEach { $0.cancel() }
}
}
}
|