File: JSONRPCConnection.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (644 lines) | stat: -rw-r--r-- 26,103 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2018 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

import Dispatch
import Foundation
import LSPLogging
import LanguageServerProtocol

#if canImport(CDispatch)
import struct CDispatch.dispatch_fd_t
#endif

/// A connection between a message handler (e.g. language server) in the same process as the connection object and a remote message handler (e.g. language client) that may run in another process using JSON RPC messages sent over a pair of in/out file descriptors.
///
/// For example, inside a language server, the `JSONRPCConnection` takes the language service implementation as its `receiveHandler` and itself provides the client connection for sending notifications and callbacks.
public final class JSONRPCConnection: Connection {

  /// A name of the endpoint for this connection, used for logging, e.g. `clangd`.
  private let name: String

  /// The message handler that handles requests and notifications sent through this connection.
  ///
  /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
  ///  - `init`: Reference to `JSONRPCConnection` trivially can't have escaped to other isolation domains yet.
  ///  - `start`: Is required to be call in the same serial code region as the initializer, so
  ///    `JSONRPCConnection` can't have escaped to other isolation domains yet.
  ///  - `deinit`: Can also only trivially be called once.
  nonisolated(unsafe) private var receiveHandler: MessageHandler?

  /// The queue on which we read the data
  private let queue: DispatchQueue = DispatchQueue(label: "jsonrpc-queue", qos: .userInitiated)

  /// The queue on which we send data.
  private let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated)

  private let receiveIO: DispatchIO
  private let sendIO: DispatchIO
  private let messageRegistry: MessageRegistry

  enum State {
    case created, running, closed
  }

  /// Current state of the connection, used to ensure correct usage.
  ///
  /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
  ///  - `init`: Reference to `JSONRPCConnection` trivially can't have escaped to other isolation domains yet.
  ///  - `start`: Is required to be called in the same serial region as the initializer, so
  ///    `JSONRPCConnection` can't have escaped to other isolation domains yet.
  ///  - `closeAssumingOnQueue`: Synchronized on `queue`.
  ///  - `readyToSend`: Synchronized on `queue`.
  ///  - `deinit`: Can also only trivially be called once.
  private nonisolated(unsafe) var state: State

  /// Buffer of received bytes that haven't been parsed.
  ///
  /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
  ///  - The `receiveIO` handler: This is synchronized on `queue`.
  ///  - `requestBufferIsEmpty`: Also synchronized on `queue`.
  private nonisolated(unsafe) var requestBuffer: [UInt8] = []

  @_spi(Testing)
  public var requestBufferIsEmpty: Bool {
    queue.sync {
      requestBuffer.isEmpty
    }
  }

  /// An integer that hasn't been used for a request ID yet.
  ///
  /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
  ///  - `nextRequestID()`: This is synchronized on `queue`.
  private nonisolated(unsafe) var nextRequestIDStorage: Int = 0

  struct OutstandingRequest: Sendable {
    var responseType: ResponseType.Type
    var replyHandler: @Sendable (LSPResult<Any>) -> Void
  }

  /// The set of currently outstanding outgoing requests along with information about how to decode and handle their
  /// responses.
  ///
  /// All accesses to `outstandingRequests` must be on `queue` to avoid race conditions.
  private nonisolated(unsafe) var outstandingRequests: [RequestID: OutstandingRequest] = [:]

  /// A handler that will be called asynchronously when the connection is being
  /// closed.
  ///
  /// There are no race conditions to `closeHandler` because it is only set from `start`, which is required to be called
  /// in the same serial code region domain as the initializer, so it's serial and the `JSONRPCConnection` can't
  /// have escaped to other isolation domains yet.
  private nonisolated(unsafe) var closeHandler: (@Sendable () async -> Void)? = nil

  /// - Important: `start` must be called before sending any data over the `JSONRPCConnection`.
  public init(
    name: String,
    protocol messageRegistry: MessageRegistry,
    inFD: FileHandle,
    outFD: FileHandle
  ) {
    self.name = name
    self.receiveHandler = nil
    #if os(Linux) || os(Android)
    // We receive a `SIGPIPE` if we write to a pipe that points to a crashed process. This in particular happens if the
    // target of a `JSONRPCConnection` has crashed and we try to send it a message.
    // On Darwin, `DispatchIO` ignores `SIGPIPE` for the pipes handled by it, but that features is not available on Linux.
    // Instead, globally ignore `SIGPIPE` on Linux to prevent us from crashing if the `JSONRPCConnection`'s target crashes.
    globallyDisableSigpipe()
    #endif
    state = .created
    self.messageRegistry = messageRegistry

    let ioGroup = DispatchGroup()

    #if os(Windows)
    let rawInFD = dispatch_fd_t(bitPattern: inFD._handle)
    #else
    let rawInFD = inFD.fileDescriptor
    #endif

    ioGroup.enter()
    receiveIO = DispatchIO(
      type: .stream,
      fileDescriptor: rawInFD,
      queue: queue,
      cleanupHandler: { (error: Int32) in
        if error != 0 {
          logger.fault("IO error \(error)")
        }
        ioGroup.leave()
      }
    )

    #if os(Windows)
    let rawOutFD = dispatch_fd_t(bitPattern: outFD._handle)
    #else
    let rawOutFD = outFD.fileDescriptor
    #endif

    ioGroup.enter()
    sendIO = DispatchIO(
      type: .stream,
      fileDescriptor: rawOutFD,
      queue: sendQueue,
      cleanupHandler: { (error: Int32) in
        if error != 0 {
          logger.fault("IO error \(error)")
        }
        ioGroup.leave()
      }
    )

    ioGroup.notify(queue: queue) { [weak self] in
      guard let self = self else { return }
      Task {
        await self.closeHandler?()
        self.receiveHandler = nil  // break retain cycle
      }
    }

    // We cannot assume the client will send us bytes in packets of any particular size, so set the lower limit to 1.
    receiveIO.setLimit(lowWater: 1)
    receiveIO.setLimit(highWater: Int.max)

    sendIO.setLimit(lowWater: 1)
    sendIO.setLimit(highWater: Int.max)
  }

  deinit {
    assert(state == .closed)
  }

  /// Start processing `inFD` and send messages to `receiveHandler`.
  ///
  /// - parameter receiveHandler: The message handler to invoke for requests received on the `inFD`.
  ///
  /// - Important: `start` must be called before sending any data over the `JSONRPCConnection`.
  public func start(receiveHandler: MessageHandler, closeHandler: @escaping @Sendable () async -> Void = {}) {
    queue.sync {
      precondition(state == .created)
      state = .running
      self.receiveHandler = receiveHandler
      self.closeHandler = closeHandler

      receiveIO.read(offset: 0, length: Int.max, queue: queue) { done, data, errorCode in
        guard errorCode == 0 else {
          #if !os(Windows)
          if errorCode != POSIXError.ECANCELED.rawValue {
            logger.fault("IO error reading \(errorCode)")
          }
          #endif
          if done { self.closeAssumingOnQueue() }
          return
        }

        if done {
          self.closeAssumingOnQueue()
          return
        }

        guard let data = data, !data.isEmpty else {
          return
        }

        // Parse and handle any messages in `buffer + data`, leaving any remaining unparsed bytes in `buffer`.
        if self.requestBuffer.isEmpty {
          data.withUnsafeBytes { (pointer: UnsafePointer<UInt8>) in
            let rest = self.parseAndHandleMessages(from: UnsafeBufferPointer(start: pointer, count: data.count))
            self.requestBuffer.append(contentsOf: rest)
          }
        } else {
          self.requestBuffer.append(contentsOf: data)
          var unused = 0
          self.requestBuffer.withUnsafeBufferPointer { buffer in
            let rest = self.parseAndHandleMessages(from: buffer)
            unused = rest.count
          }
          self.requestBuffer.removeFirst(self.requestBuffer.count - unused)
        }
      }
    }
  }

  /// Send a notification to the client that informs the user about a message encoding or decoding error and asks them
  /// to file an issue.
  ///
  /// `message` describes what has gone wrong to the user.
  ///
  /// - Important: Must be called on `queue`
  private func sendMessageCodingErrorNotificationToClient(message: String) {
    dispatchPrecondition(condition: .onQueue(queue))
    let showMessage = ShowMessageNotification(
      type: .error,
      message: """
        \(message). Please run 'sourcekit-lsp diagnose' to file an issue.
        """
    )
    self.send(.notification(showMessage))
  }

  /// Decode a single JSONRPC message from the given `messageBytes`.
  ///
  /// `messageBytes` should be valid JSON, ie. this is the message sent from the client without the `Content-Length`
  /// header.
  ///
  /// If an error occurs during message parsing, this tries to recover as gracefully as possible and returns `nil`.
  /// Callers should consider the message handled and ignore it when this function returns `nil`.
  ///
  /// - Important: Must be called on `queue`
  private func decodeJSONRPCMessage(messageBytes: Slice<UnsafeBufferPointer<UInt8>>) -> JSONRPCMessage? {
    dispatchPrecondition(condition: .onQueue(queue))
    let decoder = JSONDecoder()

    // Set message registry to use for model decoding.
    decoder.userInfo[.messageRegistryKey] = messageRegistry

    // Setup callback for response type.
    decoder.userInfo[.responseTypeCallbackKey] = { (id: RequestID) -> ResponseType.Type? in
      guard let outstanding = self.outstandingRequests[id] else {
        logger.error("Unknown request for \(id, privacy: .public)")
        return nil
      }
      return outstanding.responseType
    }

    do {
      let pointer = UnsafeMutableRawPointer(mutating: UnsafeBufferPointer(rebasing: messageBytes).baseAddress!)
      return try decoder.decode(
        JSONRPCMessage.self,
        from: Data(bytesNoCopy: pointer, count: messageBytes.count, deallocator: .none)
      )
    } catch let error as MessageDecodingError {
      logger.fault("Failed to decode message: \(error.forLogging)")
      logger.fault("Malformed message: \(String(bytes: messageBytes, encoding: .utf8) ?? "<invalid UTF-8>")")

      // We failed to decode the message. Under those circumstances try to behave as LSP-conforming as possible.
      // Always log at the fault level so that we know something is going wrong from the logs.
      //
      // The pattern below is to handle the message in the best possible way and then `return nil` to acknowledge the
      // handling. That way the compiler enforces that we handle all code paths.
      switch error.messageKind {
      case .request:
        if let id = error.id {
          // If we know it was a request and we have the request ID, simply reply to the request and tell the client
          // that we couldn't parse it. That complies with LSP that all requests should eventually get a response.
          logger.fault(
            "Replying to request \(id, privacy: .public) with error response because we failed to decode the request"
          )
          self.send(.errorResponse(ResponseError(error), id: id))
          return nil
        }
        // If we don't know the ID of the request, ignore it and show a notification to the user.
        // That way the user at least knows that something is going wrong even if the client never gets a response
        // for the request.
        logger.fault("Ignoring request because we failed to decode the request and don't have a request ID")
        sendMessageCodingErrorNotificationToClient(message: "sourcekit-lsp failed to decode a request")
        return nil
      case .response:
        if let id = error.id {
          if let outstanding = self.outstandingRequests.removeValue(forKey: id) {
            // If we received a response to a request we sent to the client, assume that the client responded with an
            // error. That complies with LSP that all requests should eventually get a response.
            logger.fault(
              "Assuming an error response to request \(id, privacy: .public) because response from client could not be decoded"
            )
            outstanding.replyHandler(.failure(ResponseError(error)))
            return nil
          }
          // If there's an error in the response but we don't even know about the request, we can ignore it.
          logger.fault(
            "Ignoring response to request \(id, privacy: .public) because it could not be decoded and given request ID is unknown"
          )
          return nil
        }
        // And if we can't even recover the ID the response is for, we drop it. This means that whichever code in
        // sourcekit-lsp sent the request will probably never get a reply but there's nothing we can do about that.
        // Ideally requests sent from sourcekit-lsp to the client would have some kind of timeout anyway.
        logger.fault("Ignoring response because its request ID could not be recovered")
        return nil
      case .notification:
        if error.code == .methodNotFound {
          // If we receive a notification we don't know about, this might be a client sending a new LSP notification
          // that we don't know about. It can't be very critical so we ignore it without bothering the user with an
          // error notification.
          logger.fault("Ignoring notification because we don't know about it's method")
          return nil
        }
        // Ignoring any other notification might result in corrupted behavior. For example, ignoring a
        // `textDocument/didChange` will result in an out-of-sync state between the editor and sourcekit-lsp.
        // Warn the user about the error.
        logger.fault("Ignoring notification that may cause corrupted behavior")
        sendMessageCodingErrorNotificationToClient(message: "sourcekit-lsp failed to decode a notification")
        return nil
      case .unknown:
        // We don't know what has gone wrong. This could be any level of badness. Inform the user about it.
        logger.fault("Ignoring unknown message")
        sendMessageCodingErrorNotificationToClient(message: "sourcekit-lsp failed to decode a message")
        return nil
      }
    } catch {
      // We don't know what has gone wrong. This could be any level of badness. Inform the user about it and ignore the
      // message.
      logger.fault("Ignoring unknown message")
      sendMessageCodingErrorNotificationToClient(message: "sourcekit-lsp failed to decode an unknown message")
      return nil
    }
  }

  /// Whether we can send messages in the current state.
  ///
  /// - parameter shouldLog: Whether to log an info message if not ready.
  ///
  /// - Important: Must be called on `queue`. Note that the state might change as soon as execution leaves `queue`.
  func readyToSend(shouldLog: Bool = true) -> Bool {
    dispatchPrecondition(condition: .onQueue(queue))
    precondition(state != .created, "tried to send message before calling start(messageHandler:)")
    let ready = state == .running
    if shouldLog && !ready {
      logger.error("Ignoring message; state = \(String(reflecting: self.state), privacy: .public)")
    }
    return ready
  }

  /// Parse and handle all messages in `bytes`, returning a slice containing any remaining incomplete data.
  ///
  /// - Important: Must be called on `queue`
  func parseAndHandleMessages(from bytes: UnsafeBufferPointer<UInt8>) -> UnsafeBufferPointer<UInt8>.SubSequence {
    dispatchPrecondition(condition: .onQueue(queue))

    var bytes = bytes[...]

    MESSAGE_LOOP: while true {
      // Split the messages based on the Content-Length header.
      let messageBytes: Slice<UnsafeBufferPointer<UInt8>>
      do {
        guard let (header: _, message: message, rest: rest) = try bytes.jsonrpcSplitMessage() else {
          return bytes
        }
        messageBytes = message
        bytes = rest
      } catch {
        // We failed to parse the message header. There isn't really much we can do to recover because we lost our
        // anchor in the stream where new messages start. Crashing and letting ourselves be restarted by the client is
        // probably the best option.
        sendMessageCodingErrorNotificationToClient(message: "Failed to find next message in connection to editor")
        fatalError("fatal error encountered while splitting JSON RPC messages \(error)")
      }

      guard let message = decodeJSONRPCMessage(messageBytes: messageBytes) else {
        continue
      }
      handle(message)
    }
  }

  /// Handle a single message by dispatching it to `receiveHandler` or an appropriate reply handler.
  ///
  /// - Important: Must be called on `queue`
  func handle(_ message: JSONRPCMessage) {
    dispatchPrecondition(condition: .onQueue(queue))
    switch message {
    case .notification(let notification):
      notification._handle(self.receiveHandler!)
    case .request(let request, id: let id):
      request._handle(self.receiveHandler!, id: id) { (response, id) in
        self.sendReply(response, id: id)
      }
    case .response(let response, id: let id):
      guard let outstanding = outstandingRequests.removeValue(forKey: id) else {
        logger.error("No outstanding requests for response ID \(id, privacy: .public)")
        return
      }
      outstanding.replyHandler(.success(response))
    case .errorResponse(let error, id: let id):
      guard let id = id else {
        logger.error("Received error response for unknown request: \(error.forLogging)")
        return
      }
      guard let outstanding = outstandingRequests.removeValue(forKey: id) else {
        logger.error("No outstanding requests for error response ID \(id, privacy: .public)")
        return
      }
      outstanding.replyHandler(.failure(error))
    }
  }

  /// Send the raw data to the receiving end of this connection.
  ///
  /// If an unrecoverable error occurred on the channel's file descriptor, the connection gets closed.
  ///
  /// - Important: Must be called on `queue`
  private func send(data dispatchData: DispatchData) {
    dispatchPrecondition(condition: .onQueue(queue))
    guard readyToSend() else { return }

    sendIO.write(offset: 0, data: dispatchData, queue: sendQueue) { [weak self] done, _, errorCode in
      if errorCode != 0 {
        logger.fault("IO error sending message \(errorCode)")
        if done, let self {
          // An unrecoverable error occurs on the channel’s file descriptor.
          // Close the connection.
          self.queue.async {
            self.closeAssumingOnQueue()
          }
        }
      }
    }
  }

  /// Wrapper of `send(data:)` that automatically switches to `queue`.
  ///
  /// This should only be used to test that the client decodes messages correctly if data is delivered to it
  /// byte-by-byte instead of in larger chunks that contain entire messages.
  @_spi(Testing)
  public func send(_rawData dispatchData: DispatchData) {
    queue.sync {
      self.send(data: dispatchData)
    }
  }

  /// Send the given message to the receiving end of the connection.
  ///
  /// If an unrecoverable error occurred on the channel's file descriptor, the connection gets closed.
  ///
  /// - Important: Must be called on `queue`
  func send(_ message: JSONRPCMessage) {
    dispatchPrecondition(condition: .onQueue(queue))

    let encoder = JSONEncoder()

    let data: Data
    do {
      data = try encoder.encode(message)
    } catch {
      logger.fault("Failed to encode message: \(error.forLogging)")
      logger.fault("Malformed message: \(String(describing: message))")
      switch message {
      case .notification(_):
        // We want to send a notification to the editor but failed to encode it. Since dropping the notification might
        // result in getting out-of-sync state-wise with the editor (eg. for work progress notifications), inform the
        // user about it.
        sendMessageCodingErrorNotificationToClient(
          message: "sourcekit-lsp failed to encode a notification to the editor"
        )
        return
      case .request(_, _):
        // We want to send a notification to the editor but failed to encode it. We don't know the `reply` handle for
        // the request at this point so we can't synthesize an errorResponse for the request. This means that the
        // request will never receive a reply. Inform the user about it.
        sendMessageCodingErrorNotificationToClient(
          message: "sourcekit-lsp failed to encode a request to the editor"
        )
        return
      case .response(_, _):
        // The editor sent a request to sourcekit-lsp, which failed but we can't serialize the result back to the
        // client. This means that the request will never receive a reply. Inform the user about it and accept that
        // we'll never send a reply.
        sendMessageCodingErrorNotificationToClient(
          message: "sourcekit-lsp failed to encode a response to the editor"
        )
        return
      case .errorResponse(_, _):
        // Same as `.response`. Has an optional `id`, so can't share the case.
        sendMessageCodingErrorNotificationToClient(
          message: "sourcekit-lsp failed to encode an error response to the editor"
        )
        return
      }
    }

    var dispatchData = DispatchData.empty
    let header = "Content-Length: \(data.count)\r\n\r\n"
    header.utf8.map { $0 }.withUnsafeBytes { buffer in
      dispatchData.append(buffer)
    }
    data.withUnsafeBytes { rawBufferPointer in
      dispatchData.append(rawBufferPointer)
    }

    send(data: dispatchData)
  }

  /// Close the connection.
  ///
  /// The user-provided close handler will be called *asynchronously* when all outstanding I/O
  /// operations have completed. No new I/O will be accepted after `close` returns.
  public func close() {
    queue.sync { closeAssumingOnQueue() }
  }

  /// Close the connection, assuming that the code is already executing on `queue`.
  ///
  /// - Important: Must be called on `queue`.
  private func closeAssumingOnQueue() {
    dispatchPrecondition(condition: .onQueue(queue))
    sendQueue.sync {
      guard state == .running else { return }
      state = .closed

      logger.log("Closing JSONRPCConnection...")
      // Attempt to close the reader immediately; we do not need to accept remaining inputs.
      receiveIO.close(flags: .stop)
      // Close the writer after it finishes outstanding work.
      sendIO.close()
    }
  }

  /// Request id for the next outgoing request.
  ///
  /// - Important: Must be called on `queue`
  private func nextRequestID() -> RequestID {
    dispatchPrecondition(condition: .onQueue(queue))
    nextRequestIDStorage += 1
    return .number(nextRequestIDStorage)
  }

  // MARK: Connection interface

  /// Send the notification to the remote side of the notification.
  public func send(_ notification: some NotificationType) {
    queue.async {
      logger.info(
        """
        Sending notification to \(self.name, privacy: .public)
        \(notification.forLogging)
        """
      )
      self.send(.notification(notification))
    }
  }

  /// Send the given request to the remote side of the connection.
  ///
  /// When the receiving end replies to the request, execute `reply` with the response.
  public func send<Request: RequestType>(
    _ request: Request,
    reply: @escaping @Sendable (LSPResult<Request.Response>) -> Void
  ) -> RequestID {
    let id: RequestID = self.queue.sync {
      let id = nextRequestID()

      guard readyToSend() else {
        reply(.failure(.serverCancelled))
        return id
      }

      outstandingRequests[id] = OutstandingRequest(
        responseType: Request.Response.self,
        replyHandler: { anyResult in
          let result = anyResult.map { $0 as! Request.Response }
          switch result {
          case .success(let response):
            logger.info(
              """
              Received reply for request \(id, privacy: .public) from \(self.name, privacy: .public)
              \(response.forLogging)
              """
            )
          case .failure(let error):
            logger.error(
              """
              Received error for request \(id, privacy: .public) from \(self.name, privacy: .public)
              \(error.forLogging)
              """
            )
          }
          reply(result)
        }
      )
      logger.info(
        """
        Sending request to \(self.name, privacy: .public) (id: \(id, privacy: .public)):
        \(request.forLogging)
        """
      )

      send(.request(request, id: id))
      return id
    }

    return id
  }

  /// After the remote side of the connection sent a request to us, return a reply to the remote side.
  public func sendReply(_ response: LSPResult<ResponseType>, id: RequestID) {
    queue.async {
      switch response {
      case .success(let result):
        self.send(.response(result, id: id))
      case .failure(let error):
        self.send(.errorResponse(error, id: id))
      }
    }
  }
}