File: Collection%2BConcurrentPerform.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (146 lines) | stat: -rw-r--r-- 7,007 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
 This source file is part of the Swift.org open source project

 Copyright (c) 2021 Apple Inc. and the Swift project authors
 Licensed under Apache License v2.0 with Runtime Library Exception

 See https://swift.org/LICENSE.txt for license information
 See https://swift.org/CONTRIBUTORS.txt for Swift project authors
*/

import Foundation

// Until we find a better way to manage memory on Linux we will disable
// concurrency in the Collection extensions in this file and have tests expect
// them to work serially on Linux. rdar://75794062

#if os(macOS) || os(iOS)
private let useConcurrentCollectionExtensions = true
#else
private let useConcurrentCollectionExtensions = false
#endif

extension Collection where Index == Int {

    /// Concurrently transforms the elements of a collection.
    /// - Parameters:
    ///   - batches: The number of batches to split the elements.
    ///   - block: A `(Element) -> Result` block that will be used to transform each of the collections elements concurrently.
    ///
    /// Use ``concurrentMap(batches:block)`` when you want to transform a collection concurrently and preserve the count and order of the elements.
    /// > Warning: As multiple copies of `block` are executed concurrently, mutating shared state outside the closure is not safe.
    func concurrentMap<Result>(
        batches: UInt = UInt(ProcessInfo.processInfo.processorCount * 4),
        block: (Element) -> Result) -> [Result] {
        
        // If concurrency is disabled fall back on `map`.
        guard useConcurrentCollectionExtensions else { return map(block) }
        
        guard !isEmpty else { return [] }
        precondition(batches > 0, "The number of concurrent batches should be greater than zero.")
        
        let batchElementCount = Int(Double(count) / Double(batches) + 1)
        let allResults = Synchronized<[Int: [Result]]>([:])
        
        // Concurrently run `block` over slices of the collection.
        DispatchQueue.concurrentPerform(iterations: Int(batches)) { batch in
            // Determine the start index and the elements count of each batch.
            let startOffset = batch * batchElementCount
            let batchCount = Swift.min(batchElementCount, count - startOffset)
            guard batchCount > 0 else { return }

            // Create a new array to collect results within this batch.
            var batchResults = [Result]()
            batchResults.reserveCapacity(batchCount)
            
            // Run serially `block` over the elements
            for offset in startOffset ..< startOffset + batchCount {
                batchResults.append(block(self[offset]))
            }
            
            // Add the batch results to a dictionary keyed by the batch number
            allResults.sync({ $0[batch] = batchResults })
        }
        
        // Stitch together the batch results in the correct order
        return allResults.sync({ allResults in
            // Sort the keys to preserve the original element order.
            return allResults.keys.sorted().reduce(into: [Result]()) { result, batchNr in
                result.append(contentsOf: allResults[batchNr]!)
            }
        })
    }

    /// Concurrently performs a block over the elements of the collection.
    /// - Parameters:
    ///   - batches: The number of batches to split the elements.
    ///   - block: A `(Element) -> Void` block that will be executed for each of the collections elements concurrently.
    /// > Note: Unlike `map` or similar functions, this function does not preserve the element order from the collection
    ///         to the order of elements in the results array.
    func concurrentPerform(
        batches: UInt = UInt(ProcessInfo.processInfo.processorCount * 4),
        block: (Element) -> Void) {

        // If concurrency is disabled fall back on `forEach`.
        guard useConcurrentCollectionExtensions else { return forEach(block) }

        let _ = concurrentPerform { element, _ in block(element) } as [Void]
    }
    
    /// Concurrently performs a block over the elements of the collection and collects any results.
    /// - Parameters:
    ///   - batches: The number of batches to split the elements.
    ///   - block: A `(Element, inout [Result]) -> Void` block that will be executed for each of the collections elements concurrently.
    ///
    /// The difference in behavior compared to ``concurrentMap(batches:block:)`` is that with this API you
    /// can freely mutate the returned results from each block. For example use `concurrentPerform(batches:block:)`
    /// to process a collection of inputs and add any encountered problems (if any) to the results to handle
    /// synchronously after the concurrent work is completed.
    ///
    /// > Warning: As multiple copies of `block` are executed concurrently, mutating shared state outside the closure is not safe.
    ///
    /// > Note: Mutating the results parameter of `block` from inside the block is safe as that parameter is
    ///         shared only between the blocks in a single batch which are executed serially.
    ///
    /// > Note: Unlike `map` or similar functions, this function does not preserve the element order from the collection
    ///         to the order of elements in the results array.
    func concurrentPerform<Result>(
        batches: UInt = UInt(ProcessInfo.processInfo.processorCount * 4),
        block: (Element, inout [Result]) -> Void) -> [Result] {
        
        // If concurrency is disabled fall back on `forEach`.
        guard useConcurrentCollectionExtensions else {
            var results = [Result]()
            forEach { block($0, &results) }
            return results
        }

        guard !isEmpty else { return [] }
        precondition(batches > 0, "The number of concurrent batches should be greater than zero.")
        
        let batchElementCount = Int(Double(count) / Double(batches) + 1)
        let allResults = Synchronized<[Result]>([])
        
        // Concurrently run `block` over slices of the collection.
        DispatchQueue.concurrentPerform(iterations: Int(batches)) { batch in
            // Determine the start index and the elements count of each batch.
            let startOffset = batch * batchElementCount
            let batchCount = Swift.min(batchElementCount, count - startOffset)
            guard batchCount > 0 else { return }

            // Create a new array to collect results within this batch.
            var batchResults = [Result]()
            batchResults.reserveCapacity(batchCount)
            
            // Run serially `block` over the elements
            for offset in startOffset ..< startOffset + batchCount {
                block(self[offset], &batchResults)
            }
            
            allResults.sync({ $0.append(contentsOf: batchResults) })
        }
        
        // Return the collected results from all batches.
        return allResults.sync({ $0 })
    }
}