File: RegistryDownloadsManager.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (365 lines) | stat: -rw-r--r-- 15,243 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

import Basics
import Dispatch
import Foundation
import PackageLoading
import PackageModel

import struct TSCUtility.Version

public class RegistryDownloadsManager: Cancellable {
    public typealias Delegate = RegistryDownloadsManagerDelegate

    private let fileSystem: FileSystem
    private let path: AbsolutePath
    private let cachePath: AbsolutePath?
    private let registryClient: RegistryClient
    private let delegate: Delegate?

    private var pendingLookups = [PackageIdentity: DispatchGroup]()
    private var pendingLookupsLock = NSLock()

    public init(
        fileSystem: FileSystem,
        path: AbsolutePath,
        cachePath: AbsolutePath?,
        registryClient: RegistryClient,
        delegate: Delegate?
    ) {
        self.fileSystem = fileSystem
        self.path = path
        self.cachePath = cachePath
        self.registryClient = registryClient
        self.delegate = delegate
    }
    
    public func lookup(
        package: PackageIdentity,
        version: Version,
        observabilityScope: ObservabilityScope,
        delegateQueue: DispatchQueue,
        callbackQueue: DispatchQueue
    ) async throws -> AbsolutePath {
        try await safe_async {
            self.lookup(
                package: package,
                version: version,
                observabilityScope: observabilityScope,
                delegateQueue: delegateQueue,
                callbackQueue: callbackQueue,
                completion: $0
            )
        }
    }

    @available(*, noasync, message: "Use the async alternative")
    public func lookup(
        package: PackageIdentity,
        version: Version,
        observabilityScope: ObservabilityScope,
        delegateQueue: DispatchQueue,
        callbackQueue: DispatchQueue,
        completion: @escaping (Result<AbsolutePath, Error>) -> Void
    ) {
        // wrap the callback in the requested queue
        let completion = { result in callbackQueue.async { completion(result) } }

        let packageRelativePath: RelativePath
        let packagePath: AbsolutePath

        do {
            packageRelativePath = try package.downloadPath(version: version)
            packagePath = self.path.appending(packageRelativePath)

            // TODO: we can do some finger-print checking to improve the validation
            // already exists and valid, we can exit early
            if try self.fileSystem.validPackageDirectory(packagePath) {
                return completion(.success(packagePath))
            }
        } catch {
            return completion(.failure(error))
        }

        // next we check if there is a pending lookup
        self.pendingLookupsLock.lock()
        if let pendingLookup = self.pendingLookups[package] {
            self.pendingLookupsLock.unlock()
            // chain onto the pending lookup
            pendingLookup.notify(queue: callbackQueue) {
                // at this point the previous lookup should be complete and we can re-lookup
                self.lookup(
                    package: package,
                    version: version,
                    observabilityScope: observabilityScope,
                    delegateQueue: delegateQueue,
                    callbackQueue: callbackQueue,
                    completion: completion
                )
            }
        } else {
            // record the pending lookup
            assert(self.pendingLookups[package] == nil)
            let group = DispatchGroup()
            group.enter()
            self.pendingLookups[package] = group
            self.pendingLookupsLock.unlock()

            // inform delegate that we are starting to fetch
            // calculate if cached (for delegate call) outside queue as it may change while queue is processing
            let isCached = self.cachePath.map { self.fileSystem.exists($0.appending(packageRelativePath)) } ?? false
            delegateQueue.async {
                let details = FetchDetails(fromCache: isCached, updatedCache: false)
                self.delegate?.willFetch(package: package, version: version, fetchDetails: details)
            }

            // make sure destination is free.
            try? self.fileSystem.removeFileTree(packagePath)

            let start = DispatchTime.now()
            self.downloadAndPopulateCache(
                package: package,
                version: version,
                packagePath: packagePath,
                observabilityScope: observabilityScope,
                delegateQueue: delegateQueue,
                callbackQueue: callbackQueue
            ) { result in
                // inform delegate that we finished to fetch
                let duration = start.distance(to: .now())
                delegateQueue.async {
                    self.delegate?.didFetch(package: package, version: version, result: result, duration: duration)
                }
                // remove the pending lookup
                self.pendingLookupsLock.lock()
                self.pendingLookups[package]?.leave()
                self.pendingLookups[package] = nil
                self.pendingLookupsLock.unlock()
                // and done
                completion(result.map { _ in packagePath })
            }
        }
    }

    /// Cancel any outstanding requests
    public func cancel(deadline: DispatchTime) throws {
        try self.registryClient.cancel(deadline: deadline)
    }

    private func downloadAndPopulateCache(
        package: PackageIdentity,
        version: Version,
        packagePath: AbsolutePath,
        observabilityScope: ObservabilityScope,
        delegateQueue: DispatchQueue,
        callbackQueue: DispatchQueue,
        completion: @escaping @Sendable (Result<FetchDetails, Error>) -> Void
    ) {
        if let cachePath {
            do {
                let relativePath = try package.downloadPath(version: version)
                let cachedPackagePath = cachePath.appending(relativePath)

                try self.initializeCacheIfNeeded(cachePath: cachePath)
                try self.fileSystem.withLock(on: cachedPackagePath, type: .exclusive) {
                    // download the package into the cache unless already exists
                    if try self.fileSystem.validPackageDirectory(cachedPackagePath) {
                        // extra validation to defend from racy edge cases
                        if self.fileSystem.exists(packagePath) {
                            throw StringError("\(packagePath) already exists unexpectedly")
                        }
                        // copy the package from the cache into the package path.
                        try self.fileSystem.createDirectory(packagePath.parentDirectory, recursive: true)
                        try self.fileSystem.copy(from: cachedPackagePath, to: packagePath)
                        completion(.success(.init(fromCache: true, updatedCache: false)))
                    } else {
                        // it is possible that we already created the directory before from failed attempts, so clear leftover data if present.
                        try? self.fileSystem.removeFileTree(cachedPackagePath)
                        // download the package from the registry
                        self.registryClient.downloadSourceArchive(
                            package: package,
                            version: version,
                            destinationPath: cachedPackagePath,
                            progressHandler: updateDownloadProgress,
                            fileSystem: self.fileSystem,
                            observabilityScope: observabilityScope,
                            callbackQueue: callbackQueue
                        ) { result in
                            completion(result.tryMap {
                                // extra validation to defend from racy edge cases
                                if self.fileSystem.exists(packagePath) {
                                    throw StringError("\(packagePath) already exists unexpectedly")
                                }
                                // copy the package from the cache into the package path.
                                try self.fileSystem.createDirectory(packagePath.parentDirectory, recursive: true)
                                try self.fileSystem.copy(from: cachedPackagePath, to: packagePath)
                                return FetchDetails(fromCache: true, updatedCache: true)
                            })
                        }
                    }
                }
            } catch {
                // download without populating the cache in the case of an error.
                observabilityScope.emit(
                    warning: "skipping cache due to an error",
                    underlyingError: error
                )
                // it is possible that we already created the directory from failed attempts, so clear leftover data if present.
                try? self.fileSystem.removeFileTree(packagePath)
                self.registryClient.downloadSourceArchive(
                    package: package,
                    version: version,
                    destinationPath: packagePath,
                    progressHandler: updateDownloadProgress,
                    fileSystem: self.fileSystem,
                    observabilityScope: observabilityScope,
                    callbackQueue: callbackQueue
                ) { result in
                    completion(result.map { FetchDetails(fromCache: false, updatedCache: false) })
                }
            }
        } else {
            // it is possible that we already created the directory from failed attempts, so clear leftover data if present.
            try? self.fileSystem.removeFileTree(packagePath)
            // download without populating the cache when no `cachePath` is set.
            self.registryClient.downloadSourceArchive(
                package: package,
                version: version,
                destinationPath: packagePath,
                progressHandler: updateDownloadProgress,
                fileSystem: self.fileSystem,
                observabilityScope: observabilityScope,
                callbackQueue: callbackQueue
            ) { result in
                completion(result.map { FetchDetails(fromCache: false, updatedCache: false) })
            }
        }

        // utility to update progress

        func updateDownloadProgress(downloaded: Int64, total: Int64?) {
            delegateQueue.async {
                self.delegate?.fetching(
                    package: package,
                    version: version,
                    bytesDownloaded: downloaded,
                    totalBytesToDownload: total
                )
            }
        }
    }

    public func remove(package: PackageIdentity) throws {
        let relativePath = try package.downloadPath()
        let packagesPath = self.path.appending(relativePath)
        try self.fileSystem.removeFileTree(packagesPath)
    }

    public func reset(observabilityScope: ObservabilityScope) {
        do {
            try self.fileSystem.removeFileTree(self.path)
        } catch {
            observabilityScope.emit(
                error: "Error resetting registry downloads at '\(self.path)'",
                underlyingError: error
            )
        }
    }

    public func purgeCache(observabilityScope: ObservabilityScope) {
        guard let cachePath else {
            return
        }

        guard self.fileSystem.exists(cachePath) else {
            return
        }

        do {
            try self.fileSystem.withLock(on: cachePath, type: .exclusive) {
                let cachedPackages = try self.fileSystem.getDirectoryContents(cachePath)
                for packagePath in cachedPackages {
                    let pathToDelete = cachePath.appending(component: packagePath)
                    do {
                        try self.fileSystem.removeFileTree(pathToDelete)
                    } catch {
                        observabilityScope.emit(
                            error: "Error removing cached package at '\(pathToDelete)'",
                            underlyingError: error
                        )
                    }
                }
            }
        } catch {
            observabilityScope.emit(
                error: "Error purging registry downloads cache at '\(cachePath)'",
                underlyingError: error
            )
        }
    }

    private func initializeCacheIfNeeded(cachePath: AbsolutePath) throws {
        if !self.fileSystem.exists(cachePath) {
            try self.fileSystem.createDirectory(cachePath, recursive: true)
        }
    }
}

/// Delegate to notify clients about actions being performed by RegistryManager.
public protocol RegistryDownloadsManagerDelegate {
    /// Called when a package is about to be fetched.
    func willFetch(package: PackageIdentity, version: Version, fetchDetails: RegistryDownloadsManager.FetchDetails)

    /// Called when a package has finished fetching.
    func didFetch(
        package: PackageIdentity,
        version: Version,
        result: Result<RegistryDownloadsManager.FetchDetails, Error>,
        duration: DispatchTimeInterval
    )

    /// Called every time the progress of a repository fetch operation updates.
    func fetching(package: PackageIdentity, version: Version, bytesDownloaded: Int64, totalBytesToDownload: Int64?)
}

extension RegistryDownloadsManager {
    /// Additional information about a fetch
    public struct FetchDetails: Equatable {
        /// Indicates if the repository was fetched from the cache or from the remote.
        public let fromCache: Bool
        /// Indicates whether the repository was already present in the cache and updated or if a clean fetch was performed.
        public let updatedCache: Bool
    }
}

extension FileSystem {
    func validPackageDirectory(_ path: AbsolutePath) throws -> Bool {
        if !self.exists(path) {
            return false
        }
        return try self.getDirectoryContents(path).contains(Manifest.filename)
    }
}

extension PackageIdentity {
    internal func downloadPath() throws -> RelativePath {
        guard let registryIdentity = self.registry else {
            throw StringError("invalid package identifier \(self), expected registry scope and name")
        }
        return try RelativePath(validating: registryIdentity.scope.description).appending(component: registryIdentity.name.description)
    }

    internal func downloadPath(version: Version) throws -> RelativePath {
        try self.downloadPath().appending(component: version.description)
    }
}