File: SymbolGraphConcurrentDecoder.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (197 lines) | stat: -rw-r--r-- 8,401 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
 This source file is part of the Swift.org open source project

 Copyright (c) 2021 Apple Inc. and the Swift project authors
 Licensed under Apache License v2.0 with Runtime Library Exception

 See https://swift.org/LICENSE.txt for license information
 See https://swift.org/CONTRIBUTORS.txt for Swift project authors
*/

import Foundation
import SymbolKit

public extension CodingUserInfoKey {
    /// A user info key to store a symbol counter in the decoder.
    static let symbolCounter = CodingUserInfoKey(rawValue: "symbolCounter")!

    /// A user info key to store the current batch index.
    static let batchIndex = CodingUserInfoKey(rawValue: "symbolScaffolding")!
    
    /// A user info key to store the total amount of concurrent batches.
    static let batchCount = CodingUserInfoKey(rawValue: "symbolBatches")!
}

/// A concurrent symbol graph JSON decoder.
enum SymbolGraphConcurrentDecoder {
    
    /// Decodes the given data into a symbol graph concurrently.
    /// - Parameters:
    ///   - data: JSON data to decode.
    ///   - concurrentBatches: The number of batches to decode concurrently.
    /// - Returns: The decoded symbol graph.
    ///
    /// This method spawns multiple concurrent decoder workers that work with the same symbol graph data.
    /// As symbols make up for the most of the data in the symbol graph each of the
    /// concurrent workers decodes just a piece of the complete symbol list.
    ///
    /// ## Implementation
    ///
    /// Each worker with an index `N` out of all the workers decodes every `N`th symbol in the symbol graph.
    /// If you have 4 workers, the 1st worker decodes symbols with indexes `[0, 4, 8, 12, ...]`, the 2nd
    /// worker decodes symbols `[1, 5, 9, 13, ...]`, and so forth.
    ///
    /// ```
    /// Thread [N=1]  S - - - S - - - S - - - S - - - >
    /// Thread [N=2]  - S - - - S - - - S - - - S - - >
    /// Thread [N=3]  - - S - - - S - - - S - - - S - >
    /// Thread [N=4]  - - - S - - - S - - - S - - - S >
    /// ```
    ///
    /// Once a worker has finished walking all symbols and decoding the ones in its batch, it adds the symbols
    /// to the decoded symbol graph's symbols eventually adding up to all the symbols in the graph.
    /// > Note: Since each worker needs to walk over all symbols that are contained in the symbol graph,
    /// it made sense to spread the work equally (in other words to decode each N-th symbol per worker)
    /// so that we can get the best performance out of the concurrent work.
    
    static func decode(_ data: Data, concurrentBatches: Int = 4, using decoder: JSONDecoder = JSONDecoder()) throws -> SymbolGraph {
        
        
        var symbolGraph: SymbolGraph!

        let decodeError = Synchronized<Error?>(nil)
        let symbols = Synchronized<[String: SymbolGraph.Symbol]>([:])

        let group = DispatchGroup()
        let queue = DispatchQueue(label: "com.swift.SymbolGraphConcurrentDecoder", qos: .unspecified, attributes: .concurrent)
        
        // Concurrently decode metadata and relationships.
        group.async(queue: queue) {
            do {
                // Decode the symbol graph bar the symbol list.
                symbolGraph = try JSONDecoder(like: decoder).decode(SymbolGraphWithoutSymbols.self, from: data).symbolGraph
            } catch {
                decodeError.sync({ $0 = error })
            }
        }
        
        // Concurrently decode each batch of symbols in the graph.
        (0..<concurrentBatches).concurrentPerform { batchIndex in
            let batchDecoder = JSONDecoder(like: decoder)
            
            // Configure the decoder to decode the current batch
            batchDecoder.userInfo[CodingUserInfoKey.symbolCounter] = Counter()
            batchDecoder.userInfo[CodingUserInfoKey.batchIndex] = batchIndex
            batchDecoder.userInfo[CodingUserInfoKey.batchCount] = concurrentBatches
            
            // Decode and add the symbols batch to `batches`.
            do {
                let batch = try batchDecoder.decode(SymbolGraphBatch.self, from: data)
                symbols.sync({
                    for symbol in batch.symbols {
                        if let existing = $0[symbol.identifier.precise] {
                            $0[symbol.identifier.precise] = SymbolGraph._symbolToKeepInCaseOfPreciseIdentifierConflict(existing, symbol)
                        } else {
                            $0[symbol.identifier.precise] = symbol
                        }
                    }
                })
            } catch {
                decodeError.sync({ $0 = error })
            }
        }
        
        // Wait until all concurrent tasks have completed.
        group.wait()
        
        // If an error happened during decoding re-throw.
        if let lastError = decodeError.sync({ $0 }) {
            throw lastError
        }

        symbolGraph.symbols = symbols.sync({ $0 })
        return symbolGraph
    }
    
    /// A wrapper type that decodes everything in the symbol graph but the symbols list.
    struct SymbolGraphWithoutSymbols: Decodable {
        /// The decoded symbol graph.
        let symbolGraph: SymbolGraph
        
        typealias CodingKeys = SymbolGraph.CodingKeys

        public init(from decoder: Decoder) throws {
            let container = try decoder.container(keyedBy: CodingKeys.self)
            
            let metadata = try container.decode(SymbolGraph.Metadata.self, forKey: .metadata)
            let module = try container.decode(SymbolGraph.Module.self, forKey: .module)
            let relationships = try container.decode([SymbolGraph.Relationship].self, forKey: .relationships)
            self.symbolGraph = SymbolGraph(metadata: metadata, module: module, symbols: [], relationships: relationships)
        }
    }
    
    /// A wrapper type that decodes only a batch of symbols out of a symbol graph.
    struct SymbolGraphBatch: Decodable {
        /// A list of decoded symbols.
        let symbols: [SymbolGraph.Symbol]

        typealias CodingKeys = SymbolGraph.CodingKeys

        init(from decoder: Decoder) throws {
            let container = try decoder.container(keyedBy: CodingKeys.self)
            
            symbols = try container.decode([OptionalBatchSymbol].self, forKey: .symbols)
                // Remove the skipped optional symbols that don't belong to the batch
                .compactMap(\.symbol)
        }
    }
    
    /// A wrapper type that decodes a symbol only if it belongs to the batch configured
    /// in the given decoder.
    struct OptionalBatchSymbol: Decodable {
        /// The decoded symbol. `nil` if the symbol didn't belong the correct batch.
        private(set) var symbol: SymbolGraph.Symbol?

        /// Creates a new instance by decoding from the given decoder.
        ///
        /// > Warning: Crashes when decoding using a decoder that is not correctly
        /// configured for concurrent decoding.
        public init(from decoder: Decoder) throws {
            let check = decoder.userInfo[CodingUserInfoKey.batchIndex] as! Int
            let count = (decoder.userInfo[CodingUserInfoKey.symbolCounter] as! Counter).increment()
            let batches = decoder.userInfo[CodingUserInfoKey.batchCount] as! Int
            
            /// Don't decode if this symbol doesn't belong to the current batch.
            guard count % batches == check else { return }
            
            /// Decode the symbol as usual.
            symbol = try SymbolGraph.Symbol(from: decoder)
        }
    }
    
    /// An auto-increment counter.
    class Counter {
        /// The current value of the counter.
        private var count = 0
        
        /// Get the current value and increment.
        func increment() -> Int {
            defer { count += 1}
            return count
        }
    }
}

private extension JSONDecoder {
    /// Creates a new decoder with the same configuration as the
    /// old one.
    convenience init(like old: JSONDecoder) {
        self.init()
        
        self.userInfo = old.userInfo
        self.dataDecodingStrategy = old.dataDecodingStrategy
        self.dateDecodingStrategy = old.dateDecodingStrategy
        self.keyDecodingStrategy = old.keyDecodingStrategy
        self.nonConformingFloatDecodingStrategy = old.nonConformingFloatDecodingStrategy
    }
}