File: Cancellator.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (269 lines) | stat: -rw-r--r-- 9,766 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

import Dispatch
import Foundation
import class TSCBasic.Thread
#if canImport(WinSDK)
import WinSDK
#endif

public typealias CancellationHandler = @Sendable (DispatchTime) throws -> Void

public final class Cancellator: Cancellable, Sendable {
    public typealias RegistrationKey = String

    private let observabilityScope: ObservabilityScope?
    private let registry = ThreadSafeKeyValueStore<String, (name: String, handler: CancellationHandler)>()
    private let cancelationQueue = DispatchQueue(
        label: "org.swift.swiftpm.cancellator",
        qos: .userInteractive,
        attributes: .concurrent
    )
    private let cancelling = ThreadSafeBox<Bool>(false)

    private static let signalHandlerLock = NSLock()
    private static var isSignalHandlerInstalled = false

    public init(observabilityScope: ObservabilityScope?) {
        self.observabilityScope = observabilityScope
    }

    #if os(Windows)
    // unfortunately this is needed for C callback handlers used by Windows shutdown handler
    static var shared: Cancellator?
    #endif

    /// Installs signal handlers to terminate sub-processes on cancellation.
    public func installSignalHandlers() {
        Self.signalHandlerLock.withLock {
            precondition(!Self.isSignalHandlerInstalled)

            #if os(Windows)
            // Closures passed to `SetConsoleCtrlHandler` can't capture context, working around that with a global.
            Self.shared = self

            // set shutdown handler to terminate sub-processes, etc
            _ = SetConsoleCtrlHandler({ _ in
                // Terminate all processes on receiving an interrupt signal.
                try? Cancellator.shared?.cancel(deadline: .now() + .seconds(30))

                // Reset the handler.
                _ = SetConsoleCtrlHandler(nil, false)

                // Exit as if by signal()
                TerminateProcess(GetCurrentProcess(), 3)

                return true
            }, true)
            #else
            // trap SIGINT to terminate sub-processes, etc
            signal(SIGINT, SIG_IGN)
            let interruptSignalSource = DispatchSource.makeSignalSource(signal: SIGINT)
            interruptSignalSource.setEventHandler { [weak self] in
                // cancel the trap?
                interruptSignalSource.cancel()

                // Terminate all processes on receiving an interrupt signal.
                try? self?.cancel(deadline: .now() + .seconds(30))

                // Install the default signal handler.
                var action = sigaction()
                #if canImport(Darwin) || os(OpenBSD)
                action.__sigaction_u.__sa_handler = SIG_DFL
                #elseif canImport(Musl)
                action.__sa_handler.sa_handler = SIG_DFL
                #elseif os(Android)
                action.sa_handler = SIG_DFL
                #else
                action.__sigaction_handler = unsafeBitCast(
                    SIG_DFL,
                    to: sigaction.__Unnamed_union___sigaction_handler.self
                )
                #endif
                sigaction(SIGINT, &action, nil)
                kill(getpid(), SIGINT)
            }
            interruptSignalSource.resume()
            #endif

            Self.isSignalHandlerInstalled = true
        }
    }

    @discardableResult
    public func register(name: String, handler: @escaping CancellationHandler) -> RegistrationKey? {
        if self.cancelling.get(default: false) {
            self.observabilityScope?.emit(debug: "not registering '\(name)' with terminator, termination in progress")
            return .none
        }
        let key = UUID().uuidString
        self.observabilityScope?.emit(debug: "registering '\(name)' with terminator")
        self.registry[key] = (name: name, handler: handler)
        return key
    }

    @discardableResult
    public func register(name: String, handler: Cancellable) -> RegistrationKey? {
        self.register(name: name, handler: handler.cancel(deadline:))
    }

    @discardableResult
    public func register(name: String, handler: @escaping @Sendable () throws -> Void) -> RegistrationKey? {
        self.register(name: name, handler: { _ in try handler() })
    }

    package func register(_ process: AsyncProcess) -> RegistrationKey? {
        self.register(name: "\(process.arguments.joined(separator: " "))", handler: process.terminate)
    }

    #if !os(iOS) && !os(watchOS) && !os(tvOS)
    public func register(_ process: Foundation.Process) -> RegistrationKey? {
        self.register(name: "\(process.description)", handler: process.terminate(timeout:))
    }
    #endif

    public func deregister(_ key: RegistrationKey) {
        self.registry[key] = nil
    }

    public func cancel(deadline: DispatchTime) throws {
        self._cancel(deadline: deadline)
    }

    // marked internal for testing
    @discardableResult
    internal func _cancel(deadline: DispatchTime? = .none) -> Int {
        self.cancelling.put(true)

        self.observabilityScope?
            .emit(info: "starting cancellation cycle with \(self.registry.count) cancellation handlers registered")

        let deadline = deadline ?? .now() + .seconds(30)
        // deadline for individual handlers set slightly before overall deadline
        let delta: DispatchTimeInterval = .nanoseconds(abs(deadline.distance(to: .now()).nanoseconds() ?? 0) / 5)
        let handlersDeadline = deadline - delta

        let cancellationHandlers = self.registry.get()
        let cancelled = ThreadSafeArrayStore<String>()
        let group = DispatchGroup()
        for (_, (name, handler)) in cancellationHandlers {
            self.cancelationQueue.async(group: group) {
                do {
                    self.observabilityScope?.emit(debug: "cancelling '\(name)'")
                    try handler(handlersDeadline)
                    cancelled.append(name)
                } catch {
                    self.observabilityScope?.emit(
                        warning: "failed cancelling '\(name)'",
                        underlyingError: error
                    )
                }
            }
        }

        if case .timedOut = group.wait(timeout: deadline) {
            self.observabilityScope?
                .emit(
                    warning: "timeout waiting for cancellation with \(cancellationHandlers.count - cancelled.count) cancellation handlers remaining"
                )
        } else {
            self.observabilityScope?.emit(info: "cancellation cycle completed successfully")
        }

        self.cancelling.put(false)

        return cancelled.count
    }
}

public protocol Cancellable {
    func cancel(deadline: DispatchTime) throws -> Void
}

public struct CancellationError: Error, CustomStringConvertible {
    public let description: String

    public init() {
        self.init(description: "Operation cancelled")
    }

    private init(description: String) {
        self.description = description
    }

    static func failedToRegisterProcess(_ process: AsyncProcess) -> Self {
        Self(
            description: """
            failed to register a cancellation handler for this process invocation `\(
                process.arguments.joined(separator: " ")
            )`
            """
        )
    }
}

extension AsyncProcess {
    fileprivate func terminate(timeout: DispatchTime) {
        // send graceful shutdown signal
        self.signal(SIGINT)

        // start a thread to see if we need to terminate more forcibly
        let forceKillSemaphore = DispatchSemaphore(value: 0)
        let forceKillThread = TSCBasic.Thread {
            if case .timedOut = forceKillSemaphore.wait(timeout: timeout) {
                // send a force-kill signal
                #if os(Windows)
                self.signal(SIGTERM)
                #else
                self.signal(SIGKILL)
                #endif
            }
        }
        forceKillThread.start()
        _ = try? self.waitUntilExit()
        forceKillSemaphore.signal() // let the force-kill thread know we do not need it any more
        // join the force-kill thread thread so we don't exit before everything terminates
        forceKillThread.join()
    }
}

#if !os(iOS) && !os(watchOS) && !os(tvOS)
extension Foundation.Process {
    fileprivate func terminate(timeout: DispatchTime) {
        guard self.isRunning else {
            return
        }

        // send graceful shutdown signal (SIGINT)
        self.interrupt()

        // start a thread to see if we need to terminate more forcibly
        let forceKillSemaphore = DispatchSemaphore(value: 0)
        let forceKillThread = TSCBasic.Thread {
            if case .timedOut = forceKillSemaphore.wait(timeout: timeout) {
                guard self.isRunning else {
                    return
                }

                // force kill (SIGTERM)
                self.terminate()
            }
        }
        forceKillThread.start()
        self.waitUntilExit()
        forceKillSemaphore.signal() // let the force-kill thread know we do not need it any more
        // join the force-kill thread thread so we don't exit before everything terminates
        forceKillThread.join()
    }
}
#endif