File: Embedded.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (722 lines) | stat: -rw-r--r-- 29,724 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Dispatch

private final class EmbeddedScheduledTask {
    let task: () -> Void
    let readyTime: NIODeadline
    let insertOrder: UInt64

    init(readyTime: NIODeadline, insertOrder: UInt64, task: @escaping () -> Void) {
        self.readyTime = readyTime
        self.insertOrder = insertOrder
        self.task = task
    }
}

extension EmbeddedScheduledTask: Comparable {
    static func < (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool {
        if lhs.readyTime == rhs.readyTime {
            return lhs.insertOrder < rhs.insertOrder
        } else {
            return lhs.readyTime < rhs.readyTime
        }
    }

    static func == (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool {
        return lhs === rhs
    }
}

/// An `EventLoop` that is embedded in the current running context with no external
/// control.
///
/// Unlike more complex `EventLoop`s, such as `SelectableEventLoop`, the `EmbeddedEventLoop`
/// has no proper eventing mechanism. Instead, reads and writes are fully controlled by the
/// entity that instantiates the `EmbeddedEventLoop`. This property makes `EmbeddedEventLoop`
/// of limited use for many application purposes, but highly valuable for testing and other
/// kinds of mocking.
///
/// Time is controllable on an `EmbeddedEventLoop`. It begins at `NIODeadline.uptimeNanoseconds(0)`
/// and may be advanced by a fixed amount by using `advanceTime(by:)`, or advanced to a point in
/// time with `advanceTime(to:)`.
///
/// - warning: Unlike `SelectableEventLoop`, `EmbeddedEventLoop` **is not thread-safe**. This
///     is because it is intended to be run in the thread that instantiated it. Users are
///     responsible for ensuring they never call into the `EmbeddedEventLoop` in an
///     unsynchronized fashion.
public final class EmbeddedEventLoop: EventLoop {
    /// The current "time" for this event loop. This is an amount in nanoseconds.
    /* private but tests */ internal var _now: NIODeadline = .uptimeNanoseconds(0)

    private var scheduledTasks = PriorityQueue<EmbeddedScheduledTask>()

    // The number of the next task to be created. We track the order so that when we execute tasks
    // scheduled at the same time, we may do so in the order in which they were submitted for
    // execution.
    private var taskNumber: UInt64 = 0

    private func nextTaskNumber() -> UInt64 {
        defer {
            self.taskNumber += 1
        }
        return self.taskNumber
    }

    /// - see: `EventLoop.inEventLoop`
    public var inEventLoop: Bool {
        return true
    }

    /// Initialize a new `EmbeddedEventLoop`.
    public init() { }

    /// - see: `EventLoop.scheduleTask(deadline:_:)`
    @discardableResult
    public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
        let promise: EventLoopPromise<T> = makePromise()
        let task = EmbeddedScheduledTask(readyTime: deadline, insertOrder: self.nextTaskNumber()) {
            do {
                promise.succeed(try task())
            } catch let err {
                promise.fail(err)
            }
        }

        let scheduled = Scheduled(promise: promise, cancellationTask: {
            self.scheduledTasks.remove(task)
        })
        scheduledTasks.push(task)
        return scheduled
    }

    /// - see: `EventLoop.scheduleTask(in:_:)`
    @discardableResult
    public func scheduleTask<T>(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T> {
        return scheduleTask(deadline: self._now + `in`, task)
    }

    /// On an `EmbeddedEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. This means that
    /// `task` will be run the next time you call `EmbeddedEventLoop.run`.
    public func execute(_ task: @escaping () -> Void) {
        self.scheduleTask(deadline: self._now, task)
    }

    /// Run all tasks that have previously been submitted to this `EmbeddedEventLoop`, either by calling `execute` or
    /// events that have been enqueued using `scheduleTask`/`scheduleRepeatedTask`/`scheduleRepeatedAsyncTask` and whose
    /// deadlines have expired.
    ///
    /// - seealso: `EmbeddedEventLoop.advanceTime`.
    public func run() {
        // Execute all tasks that are currently enqueued to be executed *now*.
        self.advanceTime(to: self._now)
    }

    /// Runs the event loop and moves "time" forward by the given amount, running any scheduled
    /// tasks that need to be run.
    public func advanceTime(by increment: TimeAmount) {
        self.advanceTime(to: self._now + increment)
    }

    /// Runs the event loop and moves "time" forward to the given point in time, running any scheduled
    /// tasks that need to be run.
    ///
    /// - Note: If `deadline` is before the current time, the current time will not be advanced.
    public func advanceTime(to deadline: NIODeadline) {
        let newTime = max(deadline, self._now)

        while let nextTask = self.scheduledTasks.peek() {
            guard nextTask.readyTime <= newTime else {
                break
            }

            // Now we want to grab all tasks that are ready to execute at the same
            // time as the first.
            var tasks = Array<EmbeddedScheduledTask>()
            while let candidateTask = self.scheduledTasks.peek(), candidateTask.readyTime == nextTask.readyTime {
                tasks.append(candidateTask)
                self.scheduledTasks.pop()
            }

            // Set the time correctly before we call into user code, then
            // call in for all tasks.
            self._now = nextTask.readyTime

            for task in tasks {
                task.task()
            }
        }

        // Finally ensure we got the time right.
        self._now = newTime
    }

    internal func drainScheduledTasksByRunningAllCurrentlyScheduledTasks() {
        var currentlyScheduledTasks = self.scheduledTasks
        while let nextTask = currentlyScheduledTasks.pop() {
            self._now = nextTask.readyTime
            nextTask.task()
        }
        // Just drop all the remaining scheduled tasks. Despite having run all the tasks that were
        // scheduled when we entered the method this may still contain tasks as running the tasks
        // may have enqueued more tasks.
        while self.scheduledTasks.pop() != nil {}
    }

    /// - see: `EventLoop.close`
    func close() throws {
        // Nothing to do here
    }

    /// - see: `EventLoop.shutdownGracefully`
    public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
        run()
        queue.sync {
            callback(nil)
        }
    }

    deinit {
        precondition(scheduledTasks.isEmpty, "Embedded event loop freed with unexecuted scheduled tasks!")
    }
}

@usableFromInline
class EmbeddedChannelCore: ChannelCore {
    var isOpen: Bool = true
    var isActive: Bool = false

    var eventLoop: EventLoop
    var closePromise: EventLoopPromise<Void>
    var error: Optional<Error>

    private let pipeline: ChannelPipeline

    init(pipeline: ChannelPipeline, eventLoop: EventLoop) {
        closePromise = eventLoop.makePromise()
        self.pipeline = pipeline
        self.eventLoop = eventLoop
        self.error = nil
    }

    deinit {
        assert(self.pipeline.destroyed, "leaked an open EmbeddedChannel, maybe forgot to call channel.finish()?")
        isOpen = false
        closePromise.succeed(())
    }

    /// Contains the flushed items that went into the `Channel` (and on a regular channel would have hit the network).
    @usableFromInline
    var outboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()

    /// Contains the unflushed items that went into the `Channel`
    @usableFromInline
    var pendingOutboundBuffer: MarkedCircularBuffer<(NIOAny, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 16)

    /// Contains the items that travelled the `ChannelPipeline` all the way and hit the tail channel handler. On a
    /// regular `Channel` these items would be lost.
    @usableFromInline
    var inboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()

    @usableFromInline
    func localAddress0() throws -> SocketAddress {
        throw ChannelError.operationUnsupported
    }

    @usableFromInline
    func remoteAddress0() throws -> SocketAddress {
        throw ChannelError.operationUnsupported
    }

    @usableFromInline
    func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
        guard self.isOpen else {
            promise?.fail(ChannelError.alreadyClosed)
            return
        }
        isOpen = false
        isActive = false
        promise?.succeed(())

        // As we called register() in the constructor of EmbeddedChannel we also need to ensure we call unregistered here.
        pipeline.fireChannelInactive0()
        pipeline.fireChannelUnregistered0()

        eventLoop.execute {
            // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline
            self.pipeline.removeHandlers()
            self.closePromise.succeed(())
        }
    }

    @usableFromInline
    func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
        promise?.succeed(())
    }

    @usableFromInline
    func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
        isActive = true
        promise?.succeed(())
        pipeline.fireChannelActive0()
    }

    @usableFromInline
    func register0(promise: EventLoopPromise<Void>?) {
        promise?.succeed(())
        pipeline.fireChannelRegistered0()
    }

    @usableFromInline
    func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
        isActive = true
        register0(promise: promise)
        pipeline.fireChannelActive0()
    }

    @usableFromInline
    func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
        self.pendingOutboundBuffer.append((data, promise))
    }

    @usableFromInline
    func flush0() {
        self.pendingOutboundBuffer.mark()

        while self.pendingOutboundBuffer.hasMark, let dataAndPromise = self.pendingOutboundBuffer.popFirst() {
            self.addToBuffer(buffer: &self.outboundBuffer, data: dataAndPromise.0)
            dataAndPromise.1?.succeed(())
        }
    }

    @usableFromInline
    func read0() {
        // NOOP
    }

    public final func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
        promise?.fail(ChannelError.operationUnsupported)
    }

    @usableFromInline
    func channelRead0(_ data: NIOAny) {
        addToBuffer(buffer: &inboundBuffer, data: data)
    }

    public func errorCaught0(error: Error) {
        if self.error == nil {
            self.error = error
        }
    }

    private func addToBuffer<T>(buffer: inout CircularBuffer<T>, data: T) {
        buffer.append(data)
    }
}

/// `EmbeddedChannel` is a `Channel` implementation that does neither any
/// actual IO nor has a proper eventing mechanism. The prime use-case for
/// `EmbeddedChannel` is in unit tests when you want to feed the inbound events
/// and check the outbound events manually.
///
/// Please remember to call `finish()` when you are no longer using this
/// `EmbeddedChannel`.
///
/// To feed events through an `EmbeddedChannel`'s `ChannelPipeline` use
/// `EmbeddedChannel.writeInbound` which accepts data of any type. It will then
/// forward that data through the `ChannelPipeline` and the subsequent
/// `ChannelInboundHandler` will receive it through the usual `channelRead`
/// event. The user is responsible for making sure the first
/// `ChannelInboundHandler` expects data of that type.
///
/// `EmbeddedChannel` automatically collects arriving outbound data and makes it
/// available one-by-one through `readOutbound`.
///
/// - note: `EmbeddedChannel` is currently only compatible with
///   `EmbeddedEventLoop`s and cannot be used with `SelectableEventLoop`s from
///   for example `MultiThreadedEventLoopGroup`.
/// - warning: Unlike other `Channel`s, `EmbeddedChannel` **is not thread-safe**. This
///     is because it is intended to be run in the thread that instantiated it. Users are
///     responsible for ensuring they never call into an `EmbeddedChannel` in an
///     unsynchronized fashion. `EmbeddedEventLoop`s notes also apply as
///     `EmbeddedChannel` uses an `EmbeddedEventLoop` as its `EventLoop`.
public final class EmbeddedChannel: Channel {
    /// `LeftOverState` represents any left-over inbound, outbound, and pending outbound events that hit the
    /// `EmbeddedChannel` and were not consumed when `finish` was called on the `EmbeddedChannel`.
    ///
    /// `EmbeddedChannel` is most useful in testing and usually in unit tests, you want to consume all inbound and
    /// outbound data to verify they are what you expect. Therefore, when you `finish` an `EmbeddedChannel` it will
    /// return if it's either `.clean` (no left overs) or that it has `.leftOvers`.
    public enum LeftOverState {
        /// The `EmbeddedChannel` is clean, ie. no inbound, outbound, or pending outbound data left on `finish`.
        case clean

        /// The `EmbeddedChannel` has inbound, outbound, or pending outbound data left on `finish`.
        case leftOvers(inbound: [NIOAny], outbound: [NIOAny], pendingOutbound: [NIOAny])

        /// `true` if the `EmbeddedChannel` was `clean` on `finish`, ie. there is no unconsumed inbound, outbound, or
        /// pending outbound data left on the `Channel`.
        public var isClean: Bool {
            if case .clean = self {
                return true
            } else {
                return false
            }
        }

        /// `true` if the `EmbeddedChannel` if there was unconsumed inbound, outbound, or pending outbound data left
        /// on the `Channel` when it was `finish`ed.
        public var hasLeftOvers: Bool {
            return !self.isClean
        }
    }

    /// `BufferState` represents the state of either the inbound, or the outbound `EmbeddedChannel` buffer. These
    /// buffers contain data that travelled the `ChannelPipeline` all the way.
    ///
    /// If the last `ChannelHandler` explicitly (by calling `fireChannelRead`) or implicitly (by not implementing
    /// `channelRead`) sends inbound data into the end of the `EmbeddedChannel`, it will be held in the
    /// `EmbeddedChannel`'s inbound buffer. Similarly for `write` on the outbound side. The state of the respective
    /// buffer will be returned from `writeInbound`/`writeOutbound` as a `BufferState`.
    public enum BufferState {
        /// The buffer is empty.
        case empty

        /// The buffer is non-empty.
        case full([NIOAny])

        /// Returns `true` is the buffer was empty.
        public var isEmpty: Bool {
            if case .empty = self {
                return true
            } else {
                return false
            }
        }

        /// Returns `true` if the buffer was non-empty.
        public var isFull: Bool {
            return !self.isEmpty
        }
    }

    /// `WrongTypeError` is throws if you use `readInbound` or `readOutbound` and request a certain type but the first
    /// item in the respective buffer is of a different type.
    public struct WrongTypeError: Error, Equatable {
        /// The type you expected.
        public let expected: Any.Type

        /// The type of the actual first element.
        public let actual: Any.Type

        public init(expected: Any.Type, actual: Any.Type) {
            self.expected = expected
            self.actual = actual
        }

        public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool {
            return lhs.expected == rhs.expected && lhs.actual == rhs.actual
        }
    }

    /// Returns `true` if the `EmbeddedChannel` is 'active'.
    ///
    /// An active `EmbeddedChannel` can be closed by calling `close` or `finish` on the `EmbeddedChannel`.
    ///
    /// - note: An `EmbeddedChannel` starts _inactive_ and can be activated, for example by calling `connect`.
    public var isActive: Bool { return channelcore.isActive }

    /// - see: `Channel.closeFuture`
    public var closeFuture: EventLoopFuture<Void> { return channelcore.closePromise.futureResult }

    @usableFromInline
    /*private but usableFromInline */ lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(pipeline: self._pipeline, eventLoop: self.eventLoop)

    /// - see: `Channel._channelCore`
    public var _channelCore: ChannelCore {
        return channelcore
    }

    /// - see: `Channel.pipeline`
    public var pipeline: ChannelPipeline {
        return _pipeline
    }

    /// - see: `Channel.isWritable`
    public var isWritable: Bool = true

    /// Synchronously closes the `EmbeddedChannel`.
    ///
    /// Errors in the `EmbeddedChannel` can be consumed using `throwIfErrorCaught`.
    ///
    /// - parameters:
    ///     - acceptAlreadyClosed: Whether `finish` should throw if the `EmbeddedChannel` has been previously `close`d.
    /// - returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been
    ///            consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed
    ///            writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound
    ///            events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`.
    public func finish(acceptAlreadyClosed: Bool) throws -> LeftOverState {
        do {
            try close().wait()
        } catch let error as ChannelError {
            guard error == .alreadyClosed && acceptAlreadyClosed else {
                throw error
            }
        }
        self.embeddedEventLoop.drainScheduledTasksByRunningAllCurrentlyScheduledTasks()
        self.embeddedEventLoop.run()
        try throwIfErrorCaught()
        let c = self.channelcore
        if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty {
            return .clean
        } else {
            return .leftOvers(inbound: Array(c.inboundBuffer),
                              outbound: Array(c.outboundBuffer),
                              pendingOutbound: c.pendingOutboundBuffer.map { $0.0 })
        }
    }

    /// Synchronously closes the `EmbeddedChannel`.
    ///
    /// This method will throw if the `Channel` hit any unconsumed errors or if the `close` fails. Errors in the
    /// `EmbeddedChannel` can be consumed using `throwIfErrorCaught`.
    ///
    /// - returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been
    ///            consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed
    ///            writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound
    ///            events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`.
    public func finish() throws -> LeftOverState {
        return try self.finish(acceptAlreadyClosed: false)
    }

    private var _pipeline: ChannelPipeline!

    /// - see: `Channel.allocator`
    public var allocator: ByteBufferAllocator = ByteBufferAllocator()

    /// - see: `Channel.eventLoop`
    public var eventLoop: EventLoop {
        return self.embeddedEventLoop
    }

    /// Returns the `EmbeddedEventLoop` that this `EmbeddedChannel` uses. This will return the same instance as
    /// `EmbeddedChannel.eventLoop` but as the concrete `EmbeddedEventLoop` rather than as `EventLoop` existential.
    public var embeddedEventLoop: EmbeddedEventLoop = EmbeddedEventLoop()

    /// - see: `Channel.localAddress`
    public var localAddress: SocketAddress? = nil

    /// - see: `Channel.remoteAddress`
    public var remoteAddress: SocketAddress? = nil

    /// `nil` because `EmbeddedChannel`s don't have parents.
    public let parent: Channel? = nil

    /// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s outbound buffer. If the
    /// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there
    /// are no elements in the outbound buffer, `nil` will be returned.
    ///
    /// Data hits the `EmbeddedChannel`'s outbound buffer when data was written using `write`, then `flush`ed, and
    /// then travelled the `ChannelPipeline` all the way too the front. For data to hit the outbound buffer, the very
    /// first `ChannelHandler` must have written and flushed it either explicitly (by calling
    /// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`.
    ///
    /// - note: Outbound events travel the `ChannelPipeline` _back to front_.
    /// - note: `EmbeddedChannel.writeOutbound` will `write` data through the `ChannelPipeline`, starting with last
    ///         `ChannelHandler`.
    @inlinable
    public func readOutbound<T>(as type: T.Type = T.self) throws -> T? {
        return try _readFromBuffer(buffer: &channelcore.outboundBuffer)
    }

    /// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s inbound buffer. If the
    /// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there
    /// are no elements in the outbound buffer, `nil` will be returned.
    ///
    /// Data hits the `EmbeddedChannel`'s inbound buffer when data was send through the pipeline using `fireChannelRead`
    /// and then travelled the `ChannelPipeline` all the way too the back. For data to hit the inbound buffer, the
    /// last `ChannelHandler` must have send the event either explicitly (by calling
    /// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
    ///
    /// - note: `EmbeddedChannel.writeInbound` will fire data through the `ChannelPipeline` using `fireChannelRead`.
    @inlinable
    public func readInbound<T>(as type: T.Type = T.self) throws -> T? {
        return try _readFromBuffer(buffer: &channelcore.inboundBuffer)
    }

    /// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`.
    ///
    /// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called
    /// with the data you provide.
    ///
    /// - parameters:
    ///    - data: The data to fire through the pipeline.
    /// - returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline`
    //             all the way.
    @inlinable
    @discardableResult public func writeInbound<T>(_ data: T) throws -> BufferState {
        pipeline.fireChannelRead(NIOAny(data))
        pipeline.fireChannelReadComplete()
        try throwIfErrorCaught()
        return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer))
    }

    /// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`.
    ///
    /// The immediate effect being that the first `ChannelOutboundHandler` will get its `write` method called
    /// with the data you provide. Note that the first `ChannelOutboundHandler` in the pipeline is the _last_ handler
    /// because outbound events travel the pipeline from back to front.
    ///
    /// - parameters:
    ///    - data: The data to fire through the pipeline.
    /// - returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline`
    //             all the way.
    @inlinable
    @discardableResult public func writeOutbound<T>(_ data: T) throws -> BufferState {
        try writeAndFlush(NIOAny(data)).wait()
        return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.outboundBuffer))
    }

    /// This method will throw the error that is stored in the `EmbeddedChannel` if any.
    ///
    /// The `EmbeddedChannel` will store an error some error travels the `ChannelPipeline` all the way past its end.
    public func throwIfErrorCaught() throws {
        if let error = channelcore.error {
            channelcore.error = nil
            throw error
        }
    }

    @inlinable
    func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? {
        if buffer.isEmpty {
            return nil
        }
        let elem = buffer.removeFirst()
        guard let t = elem.tryAs(type: T.self) else {
            throw WrongTypeError(expected: T.self, actual: type(of: elem.forceAs(type: Any.self)))
        }
        return t
    }

    /// Create a new instance.
    ///
    /// During creation it will automatically also register itself on the `EmbeddedEventLoop`.
    ///
    /// - parameters:
    ///     - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register or `nil` if none should be added.
    ///     - loop: The `EmbeddedEventLoop` to use.
    public convenience init(handler: ChannelHandler? = nil, loop: EmbeddedEventLoop = EmbeddedEventLoop()) {
        let handlers = handler.map { [$0] } ?? []
        self.init(handlers: handlers, loop: loop)
    }
    
    /// Create a new instance.
    ///
    /// During creation it will automatically also register itself on the `EmbeddedEventLoop`.
    ///
    /// - parameters:
    ///     - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register.
    ///     - loop: The `EmbeddedEventLoop` to use.
    public init(handlers: [ChannelHandler], loop: EmbeddedEventLoop = EmbeddedEventLoop()) {
        self.embeddedEventLoop = loop
        self._pipeline = ChannelPipeline(channel: self)

        try! self._pipeline.syncOperations.addHandlers(handlers)

        // This will never throw...
        try! register().wait()
    }

    /// - see: `Channel.setOption`
    @inlinable
    public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
        self.setOptionSync(option, value: value)
        return self.eventLoop.makeSucceededVoidFuture()
    }

    @inlinable
    internal func setOptionSync<Option: ChannelOption>(_ option: Option, value: Option.Value) {
        // No options supported
        fatalError("no options supported")
    }

    /// - see: `Channel.getOption`
    @inlinable
    public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value>  {
        return self.eventLoop.makeSucceededFuture(self.getOptionSync(option))
    }

    @inlinable
    internal func getOptionSync<Option: ChannelOption>(_ option: Option) -> Option.Value {
        if option is ChannelOptions.Types.AutoReadOption {
            return true as! Option.Value
        }
        fatalError("option \(option) not supported")
    }

    /// Fires the (outbound) `bind` event through the `ChannelPipeline`. If the event hits the `EmbeddedChannel` which
    /// happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
    /// `EmbeddedChannel`'s `localAddress`.
    ///
    /// - parameters:
    ///     - address: The address to fake-bind to.
    ///     - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
    public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
        promise?.futureResult.whenSuccess {
            self.localAddress = address
        }
        pipeline.bind(to: address, promise: promise)
    }

    /// Fires the (outbound) `connect` event through the `ChannelPipeline`. If the event hits the `EmbeddedChannel`
    /// which happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
    /// `EmbeddedChannel`'s `remoteAddress`.
    ///
    /// - parameters:
    ///     - address: The address to fake-bind to.
    ///     - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
    public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
        promise?.futureResult.whenSuccess {
            self.remoteAddress = address
        }
        pipeline.connect(to: address, promise: promise)
    }
}

extension EmbeddedChannel {
    public struct SynchronousOptions: NIOSynchronousChannelOptions {
        @usableFromInline
        internal let channel: EmbeddedChannel

        fileprivate init(channel: EmbeddedChannel) {
            self.channel = channel
        }

        @inlinable
        public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
            self.channel.setOptionSync(option, value: value)
        }

        @inlinable
        public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
            return self.channel.getOptionSync(option)
        }
    }

    public final var syncOptions: NIOSynchronousChannelOptions? {
        return SynchronousOptions(channel: self)
    }
}