File: MultiHandle.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (629 lines) | stat: -rw-r--r-- 26,897 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
// -----------------------------------------------------------------------------
///
/// libcurl *multi handle* wrapper.
/// These are libcurl helpers for the URLSession API code.
/// - SeeAlso: https://curl.haxx.se/libcurl/c/
/// - SeeAlso: URLSession.swift
///
// -----------------------------------------------------------------------------

#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
import SwiftFoundation
#else
import Foundation
#endif

@_implementationOnly import _CFURLSessionInterface
import Dispatch



extension URLSession {
    /// Minimal wrapper around [curl multi interface](https://curl.haxx.se/libcurl/c/libcurl-multi.html).
    ///
    /// The the *multi handle* manages the sockets for easy handles
    /// (`_EasyHandle`), and this implementation uses
    /// libdispatch to listen for sockets being read / write ready.
    ///
    /// Using `DispatchSource` allows this implementation to be
    /// non-blocking and all code to run on the same thread /
    /// `DispatchQueue` -- thus keeping is simple.
    ///
    /// - SeeAlso: _EasyHandle
    internal final class _MultiHandle {
        let rawHandle = CFURLSessionMultiHandleInit()
        let queue: DispatchQueue
        let group = DispatchGroup()
        fileprivate var easyHandles: [_EasyHandle] = []
        fileprivate var socketReferences: [CFURLSession_socket_t: _SocketReference] = [:]
        fileprivate var timeoutSource: _TimeoutSource? = nil
        private var reentrantInUpdateTimeoutTimer = false
        
        init(configuration: URLSession._Configuration, workQueue: DispatchQueue) {
            queue = DispatchQueue(label: "MultiHandle.isolation", target: workQueue)
            setupCallbacks()
            configure(with: configuration)
        }
        deinit {
            // C.f.: <https://curl.haxx.se/libcurl/c/curl_multi_cleanup.html>
            easyHandles.forEach {
                try! CFURLSessionMultiHandleRemoveHandle(rawHandle, $0.rawHandle).asError()
            }
            try! CFURLSessionMultiHandleDeinit(rawHandle).asError()
        }
    }
}

extension URLSession._MultiHandle {
    func configure(with configuration: URLSession._Configuration) {
        if maxHostConnectionsSupported() {
            try! CFURLSession_multi_setopt_l(rawHandle, CFURLSessionMultiOptionMAX_HOST_CONNECTIONS, numericCast(configuration.httpMaximumConnectionsPerHost)).asError()
        }
        
        try! CFURLSession_multi_setopt_l(rawHandle, CFURLSessionMultiOptionPIPELINING, configuration.httpShouldUsePipelining ? 3 : 2).asError()
        //TODO: We may want to set
        //    CFURLSessionMultiOptionMAXCONNECTS
        //    CFURLSessionMultiOptionMAX_TOTAL_CONNECTIONS
    }
}

fileprivate extension URLSession._MultiHandle {
    static func from(callbackUserData userdata: UnsafeMutableRawPointer?) -> URLSession._MultiHandle? {
        guard let userdata = userdata else { return nil }
        return Unmanaged<URLSession._MultiHandle>.fromOpaque(userdata).takeUnretainedValue()
    }
}

fileprivate extension URLSession._MultiHandle {
    /// Forward the libcurl callbacks into Swift methods
    func setupCallbacks() {
        // Socket
        try! CFURLSession_multi_setopt_ptr(rawHandle, CFURLSessionMultiOptionSOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError()
        try! CFURLSession_multi_setopt_sf(rawHandle, CFURLSessionMultiOptionSOCKETFUNCTION) { (easyHandle: CFURLSessionEasyHandle, socket: CFURLSession_socket_t, what: Int32, userdata: UnsafeMutableRawPointer?, socketptr: UnsafeMutableRawPointer?) -> Int32 in
            guard let handle = URLSession._MultiHandle.from(callbackUserData: userdata) else { fatalError() }
            return handle.register(socket: socket, for: easyHandle, what: what, socketSourcePtr: socketptr)
            }.asError()
        // Timeout:
        try! CFURLSession_multi_setopt_ptr(rawHandle, CFURLSessionMultiOptionTIMERDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError()
#if os(Windows) && (arch(arm64) || arch(x86_64))
        typealias CFURLSessionMultiOption = Int32
#else
        typealias CFURLSessionMultiOption = Int
#endif
        try! CFURLSession_multi_setopt_tf(rawHandle, CFURLSessionMultiOptionTIMERFUNCTION) { (_, timeout: CFURLSessionMultiOption, userdata: UnsafeMutableRawPointer?) -> Int32 in
            guard let handle = URLSession._MultiHandle.from(callbackUserData: userdata) else { fatalError() }
            handle.updateTimeoutTimer(to: numericCast(timeout))
            return 0
            }.asError()
    }
    /// <https://curl.haxx.se/libcurl/c/CURLMOPT_SOCKETFUNCTION.html> and
    /// <https://curl.haxx.se/libcurl/c/curl_multi_socket_action.html>
    func register(socket: CFURLSession_socket_t, for easyHandle: CFURLSessionEasyHandle, what: Int32, socketSourcePtr: UnsafeMutableRawPointer?) -> Int32 {
        // We get this callback whenever we need to register or unregister a
        // given socket with libdispatch.
        // The `action` / `what` defines if we should register or unregister
        // that we're interested in read and/or write readiness. We will do so
        // through libdispatch (DispatchSource) and store the source(s) inside
        // a `SocketSources` which we in turn store inside libcurl's multi handle
        // by means of curl_multi_assign() -- we retain the object fist.
        let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
        var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
        if socketSources == nil && action.needsSource {
            let s = _SocketSources()
            let p = Unmanaged.passRetained(s).toOpaque()
            CFURLSessionMultiHandleAssign(rawHandle, socket, UnsafeMutableRawPointer(p))
            socketSources = s
        } else if socketSources != nil && action == .unregister {
            // We need to release the stored pointer:
            if let opaque = socketSourcePtr {
                Unmanaged<_SocketSources>.fromOpaque(opaque).release()
            }
            socketSources?.tearDown(handle: self, socket: socket, queue: queue)
            socketSources = nil
        }
        if let ss = socketSources {
            let handler = DispatchWorkItem { [weak self] in
                self?.performAction(for: socket)
            }
            ss.createSources(with: action, handle: self, socket: socket, queue: queue, handler: handler)
        }
        return 0
    }

    /// What read / write ready event to register / unregister.
    ///
    /// This re-maps `CFURLSessionPoll` / `CURL_POLL`.
    enum _SocketRegisterAction {
        case none
        case registerRead
        case registerWrite
        case registerReadAndWrite
        case unregister
    }
}

extension Collection where Element == _EasyHandle {
  internal func firstIndex(of element: Element) -> Index? {
    var i = self.startIndex
    while i != self.endIndex {
      if self[i] == element { return i }
      self.formIndex(after: &i)
    }
    return nil
  }
}

private extension URLSession._MultiHandle {
    class _SocketReference {
        let socket: CFURLSession_socket_t
        var shouldClose: Bool
        var workItem: DispatchWorkItem?
        
        init(socket: CFURLSession_socket_t) {
            self.socket = socket
            shouldClose = false
        }
        
        deinit {
            if shouldClose {
                #if os(Windows)
                closesocket(socket)
                #else
                close(socket)
                #endif
            }
        }
    }

    /// Creates and stores socket reference. Reentrancy is not supported.
    /// Trying to begin operation for same socket twice would mean something
    /// went horribly wrong, or our assumptions about CURL register/unregister
    /// action flow are nor correct.
    func beginOperation(for socket: CFURLSession_socket_t) -> _SocketReference {
        let reference = _SocketReference(socket: socket)
        precondition(socketReferences.updateValue(reference, forKey: socket) == nil, "Reentrancy is not supported for socket operations")
        return reference
    }

    /// Removes socket reference from the shared store. If there is work item scheduled,
    /// executes it on the current thread.
    func endOperation(for socketReference: _SocketReference) {
        precondition(socketReferences.removeValue(forKey: socketReference.socket) != nil, "No operation associated with the socket")
        if let workItem = socketReference.workItem, !workItem.isCancelled {
            // CURL never asks for socket close without unregistering first, and
            // we should cancel pending work when unregister action is requested.
            precondition(!socketReference.shouldClose, "Socket close was scheduled, but there is some pending work left")
            workItem.perform()
        }
    }

    /// Marks this reference to close socket on deinit. This allows us
    /// to extend socket lifecycle by keeping the reference alive.
    func scheduleClose(for socket: CFURLSession_socket_t) {
        let reference = socketReferences[socket] ?? _SocketReference(socket: socket)
        reference.shouldClose = true
    }

    /// Schedules work to be performed when an operation ends for the socket,
    /// or performs it immediately if there is no operation in progress.
    /// 
    /// We're using this to postpone Dispatch Source creation when
    /// previous Dispatch Source is not cancelled yet.
    func schedule(_ workItem: DispatchWorkItem, for socket: CFURLSession_socket_t) {
        guard let socketReference = socketReferences[socket] else { 
            workItem.perform()
            return
        }
        // CURL never asks for register without pairing it with unregister later,
        // and we're cancelling pending work item on unregister.
        // But it is safe to just drop existing work item anyway,
        // and replace it with the new one.
        socketReference.workItem = workItem
    }

    /// Cancels pending work for socket operation. Does nothing if
    /// there is no operation in progress or no pending work item.
    /// 
    /// CURL may become not interested in Dispatch Sources
    /// we have planned to create. In this case we should just cancel
    /// scheduled work. 
    func cancelWorkItem(for socket: CFURLSession_socket_t) {
        guard let socketReference = socketReferences[socket] else { 
            return
        }
        socketReference.workItem?.cancel()
        socketReference.workItem = nil
    }

}

internal extension URLSession._MultiHandle {
    /// Add an easy handle -- start its transfer.
    func add(_ handle: _EasyHandle) {
        // Set CLOSESOCKETFUNCTION. Note that while the option belongs to easy_handle,
        // the connection cache is managed by CURL multi_handle, and sockets can actually
        // outlive easy_handle (even after curl_easy_cleanup call). That's why
        // socket management lives in _MultiHandle.
        try! CFURLSession_easy_setopt_ptr(handle.rawHandle, CFURLSessionOptionCLOSESOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError()
        try! CFURLSession_easy_setopt_scl(handle.rawHandle, CFURLSessionOptionCLOSESOCKETFUNCTION) {  (clientp: UnsafeMutableRawPointer?, item: CFURLSession_socket_t) in
            guard let handle = URLSession._MultiHandle.from(callbackUserData: clientp) else { fatalError() }
            handle.scheduleClose(for: item)
            return 0
        }.asError()
        
        // If this is the first handle being added, we need to `kick` the
        // underlying multi handle by calling `timeoutTimerFired` as
        // described in
        // <https://curl.haxx.se/libcurl/c/curl_multi_socket_action.html>.
        // That will initiate the registration for timeout timer and socket
        // readiness.
        let needsTimeout = self.easyHandles.isEmpty
        self.easyHandles.append(handle)
        try! CFURLSessionMultiHandleAddHandle(self.rawHandle, handle.rawHandle).asError()
        if needsTimeout {
            self.timeoutTimerFired()
        }
    }
    /// Remove an easy handle -- stop its transfer.
    func remove(_ handle: _EasyHandle) {
        guard let idx = self.easyHandles.firstIndex(of: handle) else {
            fatalError("Handle not in list.")
        }
        self.easyHandles.remove(at: idx)
        try! CFURLSessionMultiHandleRemoveHandle(self.rawHandle, handle.rawHandle).asError()
    }
}

fileprivate extension URLSession._MultiHandle {
    /// This gets called when we should ask curl to perform action on a socket.
    func performAction(for socket: CFURLSession_socket_t) {
        try! readAndWriteAvailableData(on: socket)
    }
    /// This gets called when our timeout timer fires.
    ///
    /// libcurl relies on us calling curl_multi_socket_action() every now and then.
    func timeoutTimerFired() {
        try! readAndWriteAvailableData(on: CFURLSessionSocketTimeout)
    }
    /// reads/writes available data given an action
    func readAndWriteAvailableData(on socket: CFURLSession_socket_t) throws {
        var runningHandlesCount = Int32(0)
        try CFURLSessionMultiHandleAction(rawHandle, socket, 0, &runningHandlesCount).asError()
        //TODO: Do we remove the timeout timer here if / when runningHandles == 0 ?
        readMessages()
    }
    
    /// Check the status of all individual transfers.
    ///
    /// libcurl refers to this as “read multi stack informationals”.
    /// Check for transfers that completed.
    func readMessages() {
        // We pop the messages one by one in a loop:
        repeat {
            // count will contain the messages left in the queue
            var count = Int32(0)
            let info = CFURLSessionMultiHandleInfoRead(rawHandle, &count)
            guard let handle = info.easyHandle else { break }
            let code = info.resultCode
            completedTransfer(forEasyHandle: handle, easyCode: code)
        } while true
    }
    
    /// Transfer completed.
    func completedTransfer(forEasyHandle handle: CFURLSessionEasyHandle, easyCode: CFURLSessionEasyCode) {
        // Look up the matching wrapper:
        guard let idx = easyHandles.firstIndex(where: { $0.rawHandle == handle }) else {
            fatalError("Transfer completed for easy handle, but it is not in the list of added handles.")
        }
        let easyHandle = easyHandles[idx]
        // Find the NSURLError code
        var error: NSError?
        if let errorCode = easyHandle.urlErrorCode(for: easyCode) {
            var errorDescription: String = ""
            if easyHandle.errorBuffer[0] == 0 {
                let description = CFURLSessionEasyCodeDescription(easyCode)!
                errorDescription = NSString(bytes: UnsafeMutableRawPointer(mutating: description), length: strlen(description), encoding: String.Encoding.utf8.rawValue)! as String
            } else {
                if let firstNull = easyHandle.errorBuffer.firstIndex(of: 0), firstNull != easyHandle.errorBuffer.startIndex {
                    errorDescription = String(validating: easyHandle.errorBuffer[easyHandle.errorBuffer.startIndex..<firstNull], as: UTF8.self) ?? ""
                }
            }
            
            error = NSError(domain: NSURLErrorDomain, code: errorCode, userInfo: [
                NSLocalizedDescriptionKey: errorDescription
            ])
        }
        completedTransfer(forEasyHandle: easyHandle, error: error)
    }
    /// Transfer completed.
    func completedTransfer(forEasyHandle handle: _EasyHandle, error: NSError?) {
        handle.completedTransfer(withError: error)
    }
}

fileprivate extension _EasyHandle {
    /// An error code within the `NSURLErrorDomain` based on the error of the
    /// easy handle.
    /// - Note: The error value is set only on failure. You can't use it to
    ///   determine *if* something failed or not, only *why* it failed.
    func urlErrorCode(for easyCode: CFURLSessionEasyCode) -> Int? {
        switch (easyCode, CInt(connectFailureErrno)) {
        case (CFURLSessionEasyCodeOK, _):
            return nil
        case (_, ECONNREFUSED):
            return NSURLErrorCannotConnectToHost
        case (CFURLSessionEasyCodeUNSUPPORTED_PROTOCOL, _):
            return NSURLErrorUnsupportedURL
        case (CFURLSessionEasyCodeURL_MALFORMAT, _):
            return NSURLErrorBadURL
        case (CFURLSessionEasyCodeCOULDNT_RESOLVE_HOST, _):
            // Oddly, this appears to happen for malformed URLs, too.
            return NSURLErrorCannotFindHost
        case (CFURLSessionEasyCodeRECV_ERROR, ECONNRESET):
            return NSURLErrorNetworkConnectionLost
        case (CFURLSessionEasyCodeSEND_ERROR, ECONNRESET):
            return NSURLErrorNetworkConnectionLost
        case (CFURLSessionEasyCodeGOT_NOTHING, _):
            return NSURLErrorBadServerResponse
        case (CFURLSessionEasyCodeABORTED_BY_CALLBACK, _):
            return NSURLErrorUnknown // Or NSURLErrorCancelled if we're in such a state
        case (CFURLSessionEasyCodeCOULDNT_CONNECT, ETIMEDOUT):
            return NSURLErrorTimedOut
        case (CFURLSessionEasyCodeOPERATION_TIMEDOUT, _):
            return NSURLErrorTimedOut
        default:
            //TODO: Need to map to one of the NSURLError... constants
            return NSURLErrorUnknown
        }
    }
}

internal func ==(lhs: CFURLSessionPoll, rhs: CFURLSessionPoll) -> Bool {
    return lhs.value == rhs.value
}
internal func ~=(lhs: CFURLSessionPoll, rhs: CFURLSessionPoll) -> Bool {
    return lhs == rhs
}

fileprivate extension URLSession._MultiHandle._SocketRegisterAction {
    init(rawValue: CFURLSessionPoll) {
        switch rawValue {
        case CFURLSessionPollNone:
            self = .none
        case CFURLSessionPollIn:
            self = .registerRead
        case CFURLSessionPollOut:
            self = .registerWrite
        case CFURLSessionPollInOut:
            self = .registerReadAndWrite
        case CFURLSessionPollRemove:
            self = .unregister
        default:
            fatalError("Invalid CFURLSessionPoll value.")
        }
    }
}

fileprivate extension URLSession._MultiHandle._SocketRegisterAction {
    /// Should a libdispatch source be registered for **read** readiness?
    var needsReadSource: Bool {
        switch self {
        case .none: return false
        case .registerRead: return true
        case .registerWrite: return false
        case .registerReadAndWrite: return true
        case .unregister: return false
        }
    }
    /// Should a libdispatch source be registered for **write** readiness?
    var needsWriteSource: Bool {
        switch self {
        case .none: return false
        case .registerRead: return false
        case .registerWrite: return true
        case .registerReadAndWrite: return true
        case .unregister: return false
        }
    }
    /// Should either a **read** or a **write** readiness libdispatch source be
    /// registered?
    var needsSource: Bool {
        return needsReadSource || needsWriteSource
    }
}

/// A helper class that wraps a libdispatch timer.
///
/// Used to implement the timeout of `URLSession.MultiHandle` and `URLSession.EasyHandle`
class _TimeoutSource {
    let rawSource: DispatchSource 
    let milliseconds: Int
    let queue: DispatchQueue        //needed to restart the timer for EasyHandles
    let handler: DispatchWorkItem   //needed to restart the timer for EasyHandles
    init(queue: DispatchQueue, milliseconds: Int, handler: DispatchWorkItem) {
        self.queue = queue
        self.handler = handler
        self.milliseconds = milliseconds
        self.rawSource = DispatchSource.makeTimerSource(queue: queue) as! DispatchSource
        
        let delay = UInt64(max(1, milliseconds - 1)) 
        let start = DispatchTime.now() + DispatchTimeInterval.milliseconds(Int(delay))
        
        rawSource.schedule(deadline: start, repeating: .never, leeway: (milliseconds == 1) ? .microseconds(Int(1)) : .milliseconds(Int(1)))
        rawSource.setEventHandler(handler: handler)
        rawSource.resume() 
    }
    deinit {
        rawSource.cancel()
    }
}

fileprivate extension URLSession._MultiHandle {

    /// <https://curl.haxx.se/libcurl/c/CURLMOPT_TIMERFUNCTION.html>
    func updateTimeoutTimer(to value: Int) {
        updateTimeoutTimer(to: _Timeout(timeout: value))
    }
    
    func updateTimeoutTimer(to timeout: _Timeout) {
        nonisolated(unsafe) let nonisolatedSelf = self
        // Set up a timeout timer based on the given value:
        switch timeout {
        case .none:
            timeoutSource = nil
        case .immediate:
            timeoutSource = nil
            queue.async { nonisolatedSelf.timeoutTimerFired() }
        case .milliseconds(let milliseconds):
            //TODO: Could simply change the existing timer by using DispatchSourceTimer again.
            let block = DispatchWorkItem { [weak self] in
                self?.timeoutTimerFired()
            }
            // Note: Previous timer instance would cancel internal Dispatch timer in deinit
            timeoutSource = _TimeoutSource(queue: queue, milliseconds: milliseconds, handler: block)
        }
    }
    enum _Timeout {
        case milliseconds(Int)
        case none
        case immediate
    }
}

fileprivate extension URLSession._MultiHandle._Timeout {
    init(timeout: Int) {
        switch timeout {
        case -1:
            self = .none
        case 0:
            self = .immediate
        default:
            self = .milliseconds(timeout)
        }
    }
}


/// Read and write libdispatch sources for a specific socket.
///
/// A simple helper that combines two sources -- both being optional.
///
/// This info is stored into the socket using `curl_multi_assign()`.
///
/// - SeeAlso: URLSession.MultiHandle.SocketRegisterAction
fileprivate class _SocketSources {
    var readSource: DispatchSource?
    var writeSource: DispatchSource?

    func createReadSource(socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
        guard readSource == nil else { return }
#if os(Windows)
        let s = DispatchSource.makeReadSource(handle: HANDLE(bitPattern: Int(socket))!, queue: queue)
#else
        let s = DispatchSource.makeReadSource(fileDescriptor: socket, queue: queue)
#endif
        s.setEventHandler(handler: handler)
        readSource = s as? DispatchSource
        s.resume()
    }

    func createWriteSource(socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
        guard writeSource == nil else { return }
#if os(Windows)
        let s = DispatchSource.makeWriteSource(handle: HANDLE(bitPattern: Int(socket))!, queue: queue)
#else
        let s = DispatchSource.makeWriteSource(fileDescriptor: socket, queue: queue)
#endif
        s.setEventHandler(handler: handler)
        writeSource = s as? DispatchSource
        s.resume()
    }

    func tearDown(handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue) {
        handle.cancelWorkItem(for: socket) // There could be pending register action which needs to be cancelled
        
        guard readSource != nil || writeSource != nil else {
            // This means that we have posponed (and already abandoned)
            // sources creation.
            return
        }
        
        // Socket is guaranteed to not to be closed as long as we keeping
        // the reference.
        let socketReference = handle.beginOperation(for: socket)
        let cancelHandlerGroup = DispatchGroup()
        [readSource, writeSource].compactMap({ $0 }).forEach { source in
            cancelHandlerGroup.enter()
            source.setCancelHandler {   
                cancelHandlerGroup.leave()
            }
            source.cancel()
        }
        cancelHandlerGroup.notify(queue: queue) {
            handle.endOperation(for: socketReference)
        }
        
        readSource = nil
        writeSource = nil
    }
}
extension _SocketSources {
    /// Create a read and/or write source as specified by the action.
    func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
        // CURL casually requests to unregister and register handlers for same 
        // socket in a row. There is (pretty low) chance of overlapping tear-down operation
        // with "register" request. Bad things could happen if we create 
        // a new Dispatch Source while other is being cancelled for the same socket.
        // We're using `_MultiHandle.schedule(_:for:)` here to postpone sources creation until
        // pending operation is finished (if there is none, submitted work item is performed
        // immediately).
        // Also, CURL may request unregister even before we perform any postponed work,
        // so we have to cancel such work in such case. See 
        let createSources = DispatchWorkItem {
            if action.needsReadSource {
                self.createReadSource(socket: socket, queue: queue, handler: handler)
            }
            if action.needsWriteSource {
                self.createWriteSource(socket: socket, queue: queue, handler: handler)
            }
        }
        if action.needsReadSource || action.needsWriteSource {
            handle.schedule(createSources, for: socket)
        }
    }
}
extension _SocketSources {
    /// Unwraps the `SocketSources`
    ///
    /// A `SocketSources` is stored into the multi handle's socket using
    /// `curl_multi_assign()`. This helper unwraps it from the returned
    /// `UnsafeMutablePointer<Void>`.
    static func from(socketSourcePtr ptr: UnsafeMutableRawPointer?) -> _SocketSources? {
        guard let ptr = ptr else { return nil }
        return Unmanaged<_SocketSources>.fromOpaque(ptr).takeUnretainedValue()
    }
}


internal func ==(lhs: CFURLSessionMultiCode, rhs: CFURLSessionMultiCode) -> Bool {
    return lhs.value == rhs.value
}
internal func ~=(lhs: CFURLSessionMultiCode, rhs: CFURLSessionMultiCode) -> Bool {
    return lhs == rhs
}

extension CFURLSessionMultiCode {
    internal func asError() throws {
        if self == CFURLSessionMultiCodeOK { return }
        throw NSError(domain: "libcurl.multi", code: Int(self.value))
    }
}