File: Codec.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (812 lines) | stat: -rw-r--r-- 35,939 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//


/// State of the current decoding process.
public enum DecodingState {
    /// Continue decoding.
    case `continue`

    /// Stop decoding until more data is ready to be processed.
    case needMoreData
}

/// Common errors thrown by `ByteToMessageDecoder`s.
public enum ByteToMessageDecoderError: Error {
    /// More data has been received by a `ByteToMessageHandler` despite the fact that an error has previously been
    /// emitted. The associated `Error` is the error previously emitted and the `ByteBuffer` is the extra data that has
    /// been received. The common cause for this error to be emitted is the user not having torn down the `Channel`
    /// after previously an `Error` has been sent through the pipeline using `fireErrorCaught`.
    case dataReceivedInErrorState(Error, ByteBuffer)

    /// This error can be thrown by `ByteToMessageDecoder`s if there was unexpectedly some left-over data when the
    /// `ByteToMessageDecoder` was removed from the pipeline or the `Channel` was closed.
    case leftoverDataWhenDone(ByteBuffer)
}

extension ByteToMessageDecoderError {
    // TODO: For NIO 3, make this an enum case (or whatever best way for Errors we have come up with).
    /// This error can be thrown by `ByteToMessageDecoder`s if the incoming payload is larger than the max specified.
    public struct PayloadTooLargeError: Error {}
}


/// `ByteToMessageDecoder`s decode bytes in a stream-like fashion from `ByteBuffer` to another message type.
///
/// ### Purpose
///
/// A `ByteToMessageDecoder` provides a simplified API for handling streams of incoming data that can be broken
/// up into messages. This API boils down to two methods: `decode`, and `decodeLast`. These two methods, when
/// implemented, will be used by a `ByteToMessageHandler` paired with a `ByteToMessageDecoder` to decode the
/// incoming byte stream into a sequence of messages.
///
/// The reason this helper exists is to smooth away some of the boilerplate and edge case handling code that
/// is often necessary when implementing parsers in a SwiftNIO `ChannelPipeline`. A `ByteToMessageDecoder`
/// never needs to worry about how inbound bytes will be buffered, as `ByteToMessageHandler` deals with that
/// automatically. A `ByteToMessageDecoder` also never needs to worry about memory exclusivity violations
/// that can occur when re-entrant `ChannelPipeline` operations occur, as `ByteToMessageHandler` will deal with
/// those as well.
///
/// ### Implementing ByteToMessageDecoder
///
/// A type that implements `ByteToMessageDecoder` may implement two methods: decode and decodeLast. Implementations
/// must implement decode: if they do not implement decodeLast, a default implementation will be used that 
/// simply calls decode.
///
/// `decode` is the main decoding method, and is the one that will be called most often. `decode` is invoked
/// whenever data is received by the wrapping `ByteToMessageHandler`. It is invoked with a `ByteBuffer` containing
/// all the received data (including any data previously buffered), as well as a `ChannelHandlerContext` that can be
/// used in the `decode` function.
///
/// `decode` is called in a loop by the `ByteToMessageHandler`. This loop continues until one of two cases occurs:
///
/// 1. The input `ByteBuffer` has no more readable bytes (i.e. `.readableBytes == 0`); OR
/// 2. The `decode` method returns `.needMoreData`.
///
/// The reason this method is invoked in a loop is to ensure that the stream-like properties of inbound data are
/// respected. It is entirely possible for `ByteToMessageDecoder` to receive either fewer bytes than a single message,
/// or multiple messages in one go. Rather than have the `ByteToMessageDecoder` handle all of the complexity of this,
/// the logic can be boiled down to a single choice: has the `ByteToMessageDecoder` been able to move the state forward
/// or not? If it has, rather than containing an internal loop it may simply return `.continue` in order to request that
/// `decode` be invoked again immediately. If it has not, it can return `.needMoreData` to ask to be left alone until more
/// data has been returned from the network.
///
/// Essentially, if the next parsing step could not be taken because there wasn't enough data available, return `.needMoreData`.
/// Otherwise, return `.continue`. This will allow a `ByteToMessageDecoder` implementation to ignore the awkward way data
/// arrives from the network, and to just treat it as a series of `decode` calls.
///
/// `decodeLast` is a cousin of `decode`. It is also called in a loop, but unlike with `decode` this loop will only ever
/// occur once: when the `ChannelHandlerContext` belonging to this `ByteToMessageDecoder` is about to become invalidated.
/// This invalidation happens in two situations: when EOF is received from the network, or when the `ByteToMessageDecoder`
/// is being removed from the `ChannelPipeline`. The distinction between these two states is captured by the value of
/// `seenEOF`.
///
/// In this condition, the `ByteToMessageDecoder` must now produce any final messages it can with the bytes it has
/// available. In protocols where EOF is used as a message delimiter, having `decodeLast` called with `seenEOF == true`
/// may produce further messages. In other cases, `decodeLast` may choose to deliver any buffered bytes as "leftovers",
/// either in error messages or via `channelRead`. This can occur if, for example, a protocol upgrade is occurring.
///
/// As with `decode`, `decodeLast` is invoked in a loop. This allows the same simplification as `decode` allows: when
/// a message is completely parsed, the `decodeLast` function can return `.continue` and be re-invoked from the top,
/// rather than containing an internal loop.
///
/// Note that the value of `seenEOF` may change between calls to `decodeLast` in some rare situations.
///
/// ### Implementers Notes
///
/// /// `ByteToMessageHandler` will turn your `ByteToMessageDecoder` into a `ChannelInboundHandler`. `ByteToMessageHandler`
/// also solves a couple of tricky issues for you. Most importantly, in a `ByteToMessageDecoder` you do _not_ need to
/// worry about re-entrancy. Your code owns the passed-in `ByteBuffer` for the duration of the `decode`/`decodeLast` call and
/// can modify it at will.
///
/// If a custom frame decoder is required, then one needs to be careful when implementing
/// one with `ByteToMessageDecoder`. Ensure there are enough bytes in the buffer for a
/// complete frame by checking `buffer.readableBytes`. If there are not enough bytes
/// for a complete frame, return without modifying the reader index to allow more bytes to arrive.
///
/// To check for complete frames without modifying the reader index, use methods like `buffer.getInteger`.
/// You  _MUST_ use the reader index when using methods like `buffer.getInteger`.
/// For example calling `buffer.getInteger(at: 0)` is assuming the frame starts at the beginning of the buffer, which
/// is not always the case. Use `buffer.getInteger(at: buffer.readerIndex)` instead.
///
/// If you move the reader index forward, either manually or by using one of `buffer.read*` methods, you must ensure
/// that you no longer need to see those bytes again as they will not be returned to you the next time `decode` is
/// called. If you still need those bytes to come back, consider taking a local copy of buffer inside the function to
/// perform your read operations on.
///
/// The `ByteBuffer` passed in as `buffer` is a slice of a larger buffer owned by the `ByteToMessageDecoder`
/// implementation. Some aspects of this buffer are preserved across calls to `decode`, meaning that any changes to
/// those properties you make in your `decode` method will be reflected in the next call to decode. In particular,
/// moving the reader index forward persists across calls. When your method returns, if the reader index has advanced,
/// those bytes are considered "consumed" and will not be available in future calls to `decode`.
/// Please note, however, that the numerical value of the `readerIndex` itself is not preserved, and may not be the same
/// from one call to the next. Please do not rely on this numerical value: if you need
/// to recall where a byte is relative to the `readerIndex`, use an offset rather than an absolute value.
///
/// ### Using ByteToMessageDecoder
///
/// To add a `ByteToMessageDecoder` to the `ChannelPipeline` use
///
///     channel.pipeline.addHandler(ByteToMessageHandler(MyByteToMessageDecoder()))
///
public protocol ByteToMessageDecoder {
    /// The type of the messages this `ByteToMessageDecoder` decodes to.
    associatedtype InboundOut

    /// Decode from a `ByteBuffer`.
    ///
    /// This method will be called in a loop until either the input `ByteBuffer` has nothing to read left or
    /// `DecodingState.needMoreData` is returned. If `DecodingState.continue` is returned and the `ByteBuffer`
    /// contains more readable bytes, this method will immediately be invoked again, unless `decodeLast` needs
    /// to be invoked instead.
    ///
    /// - parameters:
    ///     - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
    ///     - buffer: The `ByteBuffer` from which we decode.
    /// - returns: `DecodingState.continue` if we should continue calling this method or `DecodingState.needMoreData` if it should be called
    ///            again once more data is present in the `ByteBuffer`.
    mutating func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState

    /// Decode from a `ByteBuffer` when no more data is incoming and the `ByteToMessageDecoder` is about to leave
    /// the pipeline.
    ///
    /// This method is called in a loop only once, when the `ChannelHandlerContext` goes inactive (i.e. when `channelInactive` is fired or
    /// the `ByteToMessageDecoder` is removed from the pipeline).
    ///
    /// Like with `decode`, this method will be called in a loop until either `DecodingState.needMoreData` is returned from the method
    /// or until the input `ByteBuffer` has no more readable bytes. If `DecodingState.continue` is returned and the `ByteBuffer`
    /// contains more readable bytes, this method will immediately be invoked again.
    ///
    /// - parameters:
    ///     - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
    ///     - buffer: The `ByteBuffer` from which we decode.
    ///     - seenEOF: `true` if EOF has been seen. Usually if this is `false` the handler has been removed.
    /// - returns: `DecodingState.continue` if we should continue calling this method or `DecodingState.needMoreData` if it should be called
    ///            again when more data is present in the `ByteBuffer`.
    mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws  -> DecodingState

    /// Called once this `ByteToMessageDecoder` is removed from the `ChannelPipeline`.
    ///
    /// - parameters:
    ///     - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
    mutating func decoderRemoved(context: ChannelHandlerContext)

    /// Called when this `ByteToMessageDecoder` is added to the `ChannelPipeline`.
    ///
    /// - parameters:
    ///     - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
    mutating func decoderAdded(context: ChannelHandlerContext)

    /// Determine if the read bytes in the given `ByteBuffer` should be reclaimed and their associated memory freed.
    /// Be aware that reclaiming memory may involve memory copies and so is not free.
    ///
    /// - parameters:
    ///     - buffer: The `ByteBuffer` to check
    /// - return: `true` if memory should be reclaimed, `false` otherwise.
    mutating func shouldReclaimBytes(buffer: ByteBuffer) -> Bool
}

/// Some `ByteToMessageDecoder`s need to observe `write`s (which are outbound events). `ByteToMessageDecoder`s which
/// implement the `WriteObservingByteToMessageDecoder` protocol will be notified about every outbound write.
///
/// `WriteObservingByteToMessageDecoder` may only observe a `write` and must not try to transform or block it in any
/// way. After the `write` method returns the `write` will be forwarded to the next outbound handler.
public protocol WriteObservingByteToMessageDecoder: ByteToMessageDecoder {
    /// The type of `write`s.
    associatedtype OutboundIn

    /// `write` is called for every incoming `write` incoming to the corresponding `ByteToMessageHandler`.
    ///
    /// - parameters:
    ///    - data: The data that was written.
    mutating func write(data: OutboundIn)
}

extension ByteToMessageDecoder {
    public mutating func decoderRemoved(context: ChannelHandlerContext) {
    }

    public mutating func decoderAdded(context: ChannelHandlerContext) {
    }

    /// Default implementation to detect once bytes should be reclaimed.
    public func shouldReclaimBytes(buffer: ByteBuffer) -> Bool {
        // We want to reclaim in the following cases:
        //
        // 1. If there is at least 2kB of memory to reclaim
        // 2. If the buffer is more than 50% reclaimable memory and is at least
        //    1kB in size.
        if buffer.readerIndex >= 2048 {
            return true
        }
        return buffer.storageCapacity > 1024 && (buffer.storageCapacity - buffer.readerIndex) < buffer.readerIndex
    }

    public func wrapInboundOut(_ value: InboundOut) -> NIOAny {
        return NIOAny(value)
    }
    
    public mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws  -> DecodingState {
        while try self.decode(context: context, buffer: &buffer) == .continue {}
        return .needMoreData
    }
}

private struct B2MDBuffer {
    /// `B2MDBuffer`'s internal state, either we're already processing a buffer or we're ready to.
    private enum State {
        case processingInProgress
        case ready
    }

    /// Can we produce a buffer to be processed right now or not?
    enum BufferAvailability {
        /// No, because no bytes available
        case nothingAvailable
        /// No, because we're already processing one
        case bufferAlreadyBeingProcessed
        /// Yes please, here we go.
        case available(ByteBuffer)
    }

    /// Result of a try to process a buffer.
    enum BufferProcessingResult {
        /// Could not process a buffer because we are already processing one on the same call stack.
        case cannotProcessReentrantly
        /// Yes, we did process some.
        case didProcess(DecodingState)
    }

    private var state: State = .ready
    private var buffers: CircularBuffer<ByteBuffer> = CircularBuffer(initialCapacity: 4)
    private let emptyByteBuffer: ByteBuffer

    init(emptyByteBuffer: ByteBuffer) {
        assert(emptyByteBuffer.readableBytes == 0)
        self.emptyByteBuffer = emptyByteBuffer
    }
}

// MARK: B2MDBuffer Main API
extension B2MDBuffer {
    /// Start processing some bytes if possible, if we receive a returned buffer (through `.available(ByteBuffer)`)
    /// we _must_ indicate the processing has finished by calling `finishProcessing`.
    mutating func startProcessing(allowEmptyBuffer: Bool) -> BufferAvailability {
        switch self.state {
        case .processingInProgress:
            return .bufferAlreadyBeingProcessed
        case .ready where self.buffers.count > 0:
            var buffer = self.buffers.removeFirst()
            buffer.writeBuffers(self.buffers)
            self.buffers.removeAll(keepingCapacity: self.buffers.capacity < 16) // don't grow too much
            if buffer.readableBytes > 0 || allowEmptyBuffer {
                self.state = .processingInProgress
                return .available(buffer)
            } else {
                return .nothingAvailable
            }
        case .ready:
            assert(self.buffers.isEmpty)
            if allowEmptyBuffer {
                self.state = .processingInProgress
                return .available(self.emptyByteBuffer)
            }
            return .nothingAvailable
        }
    }

    mutating func finishProcessing(remainder buffer: inout ByteBuffer) -> Void {
        assert(self.state == .processingInProgress)
        self.state = .ready
        if buffer.readableBytes == 0 && self.buffers.isEmpty {
            // fast path, no bytes left and no other buffers, just return
            return
        }
        if buffer.readableBytes > 0 {
            self.buffers.prepend(buffer)
        } else {
            buffer.discardReadBytes()
            buffer.writeBuffers(self.buffers)
            self.buffers.removeAll(keepingCapacity: self.buffers.capacity < 16) // don't grow too much
            self.buffers.append(buffer)
        }
    }

    mutating func append(buffer: ByteBuffer) {
        if buffer.readableBytes > 0 {
            self.buffers.append(buffer)
        }
    }
}

// MARK: B2MDBuffer Helpers
private extension ByteBuffer {
    mutating func writeBuffers(_ buffers: CircularBuffer<ByteBuffer>) {
        guard buffers.count > 0 else {
            return
        }
        var requiredCapacity: Int = self.writerIndex
        for buffer in buffers {
            requiredCapacity += buffer.readableBytes
        }
        self.reserveCapacity(requiredCapacity)
        for var buffer in buffers {
            self.writeBuffer(&buffer)
        }
    }
}

private extension B2MDBuffer {
    func _testOnlyOneBuffer() -> ByteBuffer? {
        switch self.buffers.count {
        case 0:
            return nil
        case 1:
            return self.buffers.first
        default:
            let firstIndex = self.buffers.startIndex
            var firstBuffer = self.buffers[firstIndex]
            for var buffer in self.buffers[self.buffers.index(after: firstIndex)...] {
                firstBuffer.writeBuffer(&buffer)
            }
            return firstBuffer
        }
    }
}

/// A handler which turns a given `ByteToMessageDecoder` into a `ChannelInboundHandler` that can then be added to a
/// `ChannelPipeline`.
///
/// Most importantly, `ByteToMessageHandler` handles the tricky buffer management for you and flattens out all
/// re-entrancy on `channelRead` that may happen in the `ChannelPipeline`.
public final class ByteToMessageHandler<Decoder: ByteToMessageDecoder> {
    public typealias InboundIn = ByteBuffer
    public typealias InboundOut = Decoder.InboundOut

    private enum DecodeMode {
        /// This is a usual decode, ie. not the last chunk
        case normal

        /// Last chunk
        case last
    }

    private enum RemovalState {
        /// Not added to any `ChannelPipeline` yet.
        case notAddedToPipeline

        /// No one tried to remove this handler.
        case notBeingRemoved

        /// The user-triggered removal has been started but isn't complete yet. This state will not be entered if the
        /// removal is triggered by Channel teardown.
        case removalStarted

        /// The user-triggered removal is complete. This state will not be entered if the removal is triggered by
        /// Channel teardown.
        case removalCompleted

        /// This handler has been removed from the pipeline.
        case handlerRemovedCalled
    }

    private enum State {
        case active
        case leftoversNeedProcessing
        case done
        case error(Error)

        var isError: Bool {
            switch self {
            case .active, .leftoversNeedProcessing, .done:
                return false
            case .error:
                return true
            }
        }

        var isFinalState: Bool {
            switch self {
            case .active, .leftoversNeedProcessing:
                return false
            case .done, .error:
                return true
            }
        }

        var isActive: Bool {
            switch self {
            case .done, .error, .leftoversNeedProcessing:
                return false
            case .active:
                return true
            }
        }

        var isLeftoversNeedProcessing: Bool {
            switch self {
            case .done, .error, .active:
                return false
            case .leftoversNeedProcessing:
                return true
            }
        }
    }

    internal private(set) var decoder: Decoder? // only `nil` if we're already decoding (ie. we're re-entered)
    private let maximumBufferSize: Int?
    private var queuedWrites = CircularBuffer<NIOAny>(initialCapacity: 1) // queues writes received whilst we're already decoding (re-entrant write)
    private var state: State = .active {
        willSet {
            assert(!self.state.isFinalState, "illegal state on state set: \(self.state)") // we can never leave final states
        }
    }
    private var removalState: RemovalState = .notAddedToPipeline
    // sadly to construct a B2MDBuffer we need an empty ByteBuffer which we can only get from the allocator, so IUO.
    private var buffer: B2MDBuffer!
    private var seenEOF: Bool = false
    private var selfAsCanDequeueWrites: CanDequeueWrites? = nil

    /// @see: ByteToMessageHandler.init(_:maximumBufferSize)
    public convenience init(_ decoder: Decoder) {
        self.init(decoder, maximumBufferSize: nil)
    }

    /// Initialize a `ByteToMessageHandler`.
    ///
    /// - parameters:
    ///     - decoder: The `ByteToMessageDecoder` to decode the bytes into message.
    ///     - maximumBufferSize: The maximum number of bytes to aggregate in-memory.
    public init(_ decoder: Decoder, maximumBufferSize: Int? = nil) {
        self.decoder = decoder
        self.maximumBufferSize = maximumBufferSize
    }

    deinit {
        if self.removalState != .notAddedToPipeline {
            // we have been added to the pipeline, if not, we don't need to check our state.
            assert(self.removalState == .handlerRemovedCalled,
                   "illegal state in deinit: removalState = \(self.removalState)")
            assert(self.state.isFinalState, "illegal state in deinit: state = \(self.state)")
        }
    }
}

// MARK: ByteToMessageHandler: Test Helpers
extension ByteToMessageHandler {
    internal var cumulationBuffer: ByteBuffer? {
        return self.buffer._testOnlyOneBuffer()
    }
}

private protocol CanDequeueWrites {
    func dequeueWrites()
}

extension ByteToMessageHandler: CanDequeueWrites where Decoder: WriteObservingByteToMessageDecoder {
    fileprivate func dequeueWrites() {
        while self.queuedWrites.count > 0 {
            // self.decoder can't be `nil`, this is only allowed to be called when we're not already on the stack
            self.decoder!.write(data: self.unwrapOutboundIn(self.queuedWrites.removeFirst()))
        }
    }
}


// MARK: ByteToMessageHandler's Main API
extension ByteToMessageHandler {
    @inline(__always) // allocations otherwise (reconsider with Swift 5.1)
    private func withNextBuffer(allowEmptyBuffer: Bool, _ body: (inout Decoder, inout ByteBuffer) throws -> DecodingState) rethrows -> B2MDBuffer.BufferProcessingResult {
        switch self.buffer.startProcessing(allowEmptyBuffer: allowEmptyBuffer) {
        case .bufferAlreadyBeingProcessed:
            return .cannotProcessReentrantly
        case .nothingAvailable:
            return .didProcess(.needMoreData)
        case .available(var buffer):
            var possiblyReclaimBytes = false
            var decoder: Decoder? = nil
            swap(&decoder, &self.decoder)
            assert(decoder != nil) // self.decoder only `nil` if we're being re-entered, but .available means we're not
            defer {
                swap(&decoder, &self.decoder)
                if buffer.readableBytes > 0 && possiblyReclaimBytes {
                    // we asserted above that the decoder we just swapped back in was non-nil so now `self.decoder` must
                    // be non-nil.
                    if self.decoder!.shouldReclaimBytes(buffer: buffer) {
                        buffer.discardReadBytes()
                    }
                }
                self.buffer.finishProcessing(remainder: &buffer)
            }
            let decodeResult = try body(&decoder!, &buffer)

            // If we .continue, there's no point in trying to reclaim bytes because we'll loop again. If we need more
            // data on the other hand, we should try to reclaim some of those bytes.
            possiblyReclaimBytes = decodeResult == .needMoreData
            return .didProcess(decodeResult)
        }
    }

    private func processLeftovers(context: ChannelHandlerContext) {
        guard self.state.isActive else {
            // we are processing or have already processed the leftovers
            return
        }

        do {
            switch try self.decodeLoop(context: context, decodeMode: .last) {
            case .didProcess:
                self.state = .done
            case .cannotProcessReentrantly:
                self.state = .leftoversNeedProcessing
            }
        } catch {
            self.state = .error(error)
            context.fireErrorCaught(error)
        }
    }

    private func tryDecodeWrites() {
        if self.queuedWrites.count > 0 {
            // this must succeed because unless we implement `CanDequeueWrites`, `queuedWrites` must always be empty.
            self.selfAsCanDequeueWrites!.dequeueWrites()
        }
    }

    private func decodeLoop(context: ChannelHandlerContext, decodeMode: DecodeMode) throws -> B2MDBuffer.BufferProcessingResult {
        assert(!self.state.isError)
        var allowEmptyBuffer = decodeMode == .last
        while (self.state.isActive && self.removalState == .notBeingRemoved) || decodeMode == .last {
            let result = try self.withNextBuffer(allowEmptyBuffer: allowEmptyBuffer) { decoder, buffer in
                let decoderResult: DecodingState
                if decodeMode == .normal {
                    assert(self.state.isActive, "illegal state for normal decode: \(self.state)")
                    decoderResult = try decoder.decode(context: context, buffer: &buffer)
                } else {
                    allowEmptyBuffer = false
                    decoderResult = try decoder.decodeLast(context: context, buffer: &buffer, seenEOF: self.seenEOF)
                }
                if decoderResult == .needMoreData, let maximumBufferSize = self.maximumBufferSize, buffer.readableBytes > maximumBufferSize {
                    throw ByteToMessageDecoderError.PayloadTooLargeError()
                }
                return decoderResult
            }
            switch result {
            case .didProcess(.continue):
                self.tryDecodeWrites()
                continue
            case .didProcess(.needMoreData):
                if self.queuedWrites.count > 0 {
                    self.tryDecodeWrites()
                    continue // we might have received more, so let's spin once more
                } else {
                    return .didProcess(.needMoreData)
                }
            case .cannotProcessReentrantly:
                return .cannotProcessReentrantly
            }
        }
        return .didProcess(.continue)
    }
}


// MARK: ByteToMessageHandler: ChannelInboundHandler
extension ByteToMessageHandler: ChannelInboundHandler {

    public func handlerAdded(context: ChannelHandlerContext) {
        guard self.removalState == .notAddedToPipeline else {
            preconditionFailure("\(self) got readded to a ChannelPipeline but ByteToMessageHandler is single-use")
        }
        self.removalState = .notBeingRemoved
        self.buffer = B2MDBuffer(emptyByteBuffer: context.channel.allocator.buffer(capacity: 0))
        // here we can force it because we know that the decoder isn't in use if we're just adding this handler
        self.selfAsCanDequeueWrites = self as? CanDequeueWrites // we need to cache this as it allocates.
        self.decoder!.decoderAdded(context: context)
    }


    public func handlerRemoved(context: ChannelHandlerContext) {
        // very likely, the removal state is `.notBeingRemoved` or `.removalCompleted` here but we can't assert it
        // because the pipeline might be torn down during the formal removal process.
        self.removalState = .handlerRemovedCalled
        if !self.state.isFinalState {
            self.state = .done
        }

        self.selfAsCanDequeueWrites = nil

        // here we can force it because we know that the decoder isn't in use because the removal is always
        // eventLoop.execute'd
        self.decoder!.decoderRemoved(context: context)
    }

    /// Calls `decode` until there is nothing left to decode.
    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let buffer = self.unwrapInboundIn(data)
        if case .error(let error) = self.state {
            context.fireErrorCaught(ByteToMessageDecoderError.dataReceivedInErrorState(error, buffer))
            return
        }
        self.buffer.append(buffer: buffer)
        do {
            switch try self.decodeLoop(context: context, decodeMode: .normal) {
            case .didProcess:
                switch self.state {
                case .active:
                    () // cool, all normal
                case .done, .error:
                    () // fair, all done already
                case .leftoversNeedProcessing:
                    // seems like we received a `channelInactive` or `handlerRemoved` whilst we were processing a read
                    switch try self.decodeLoop(context: context, decodeMode: .last) {
                    case .didProcess:
                        () // expected and cool
                    case .cannotProcessReentrantly:
                        preconditionFailure("bug in NIO: non-reentrant decode loop couldn't run \(self), \(self.state)")
                    }
                    self.state = .done
                }
            case .cannotProcessReentrantly:
                // fine, will be done later
                ()
            }
        } catch {
            self.state = .error(error)
            context.fireErrorCaught(error)
        }
    }

    /// Call `decodeLast` before forward the event through the pipeline.
    public func channelInactive(context: ChannelHandlerContext) {
        self.seenEOF = true

        self.processLeftovers(context: context)

        context.fireChannelInactive()
    }

    public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
        if event as? ChannelEvent == .some(.inputClosed) {
            self.seenEOF = true

            self.processLeftovers(context: context)
        }
        context.fireUserInboundEventTriggered(event)
    }
}

extension ByteToMessageHandler: ChannelOutboundHandler, _ChannelOutboundHandler where Decoder: WriteObservingByteToMessageDecoder {
    public typealias OutboundIn = Decoder.OutboundIn
    public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
        if self.decoder != nil {
            let data = self.unwrapOutboundIn(data)
            assert(self.queuedWrites.isEmpty)
            self.decoder!.write(data: data)
        } else {
            self.queuedWrites.append(data)
        }
        context.write(data, promise: promise)
    }
}

/// A protocol for straightforward encoders which encode custom messages to `ByteBuffer`s.
/// To add a `MessageToByteEncoder` to a `ChannelPipeline`, use
/// `channel.pipeline.addHandler(MessageToByteHandler(myEncoder)`.
public protocol MessageToByteEncoder {
    associatedtype OutboundIn

    /// Called once there is data to encode.
    ///
    /// - parameters:
    ///     - data: The data to encode into a `ByteBuffer`.
    ///     - out: The `ByteBuffer` into which we want to encode.
    func encode(data: OutboundIn, out: inout ByteBuffer) throws
}

extension ByteToMessageHandler: RemovableChannelHandler {
    public func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {
        precondition(self.removalState == .notBeingRemoved)
        self.removalState = .removalStarted
        context.eventLoop.execute {
            self.processLeftovers(context: context)
            assert(!self.state.isLeftoversNeedProcessing, "illegal state: \(self.state)")
            switch self.removalState {
            case .removalStarted:
                self.removalState = .removalCompleted
            case .handlerRemovedCalled:
                // if we're here, then the channel has also been torn down between the start and the completion of
                // the user-triggered removal. That's okay.
                ()
            default:
                assertionFailure("illegal removal state: \(self.removalState)")
            }
            // this is necessary as it'll complete the promise.
            context.leavePipeline(removalToken: removalToken)
        }
    }
}

/// A handler which turns a given `MessageToByteEncoder` into a `ChannelOutboundHandler` that can then be added to a
/// `ChannelPipeline`.
public final class MessageToByteHandler<Encoder: MessageToByteEncoder>: ChannelOutboundHandler {
    public typealias OutboundOut = ByteBuffer
    public typealias OutboundIn = Encoder.OutboundIn

    private enum State {
        case notInChannelYet
        case operational
        case error(Error)
        case done

        var readyToBeAddedToChannel: Bool {
            switch self {
            case .notInChannelYet:
                return true
            case .operational, .error, .done:
                return false
            }
        }
    }

    private var state: State = .notInChannelYet
    private let encoder: Encoder
    private var buffer: ByteBuffer? = nil

    public init(_ encoder: Encoder) {
        self.encoder = encoder
    }
}

extension MessageToByteHandler {
    public func handlerAdded(context: ChannelHandlerContext) {
        precondition(self.state.readyToBeAddedToChannel,
                     "illegal state when adding to Channel: \(self.state)")
        self.state = .operational
        self.buffer = context.channel.allocator.buffer(capacity: 256)
    }

    public func handlerRemoved(context: ChannelHandlerContext) {
        self.state = .done
        self.buffer = nil
    }

    public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
        switch self.state {
        case .notInChannelYet:
            preconditionFailure("MessageToByteHandler.write called before it was added to a Channel")
        case .error(let error):
            promise?.fail(error)
            context.fireErrorCaught(error)
            return
        case .done:
            // let's just ignore this
            return
        case .operational:
            // there's actually some work to do here
            break
        }
        let data = self.unwrapOutboundIn(data)

        do {
            self.buffer!.clear()
            try self.encoder.encode(data: data, out: &self.buffer!)
            context.write(self.wrapOutboundOut(self.buffer!), promise: promise)
        } catch {
            self.state = .error(error)
            promise?.fail(error)
            context.fireErrorCaught(error)
        }
    }
}