1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
/// State of the current decoding process.
public enum DecodingState {
/// Continue decoding.
case `continue`
/// Stop decoding until more data is ready to be processed.
case needMoreData
}
/// Common errors thrown by `ByteToMessageDecoder`s.
public enum ByteToMessageDecoderError: Error {
/// More data has been received by a `ByteToMessageHandler` despite the fact that an error has previously been
/// emitted. The associated `Error` is the error previously emitted and the `ByteBuffer` is the extra data that has
/// been received. The common cause for this error to be emitted is the user not having torn down the `Channel`
/// after previously an `Error` has been sent through the pipeline using `fireErrorCaught`.
case dataReceivedInErrorState(Error, ByteBuffer)
/// This error can be thrown by `ByteToMessageDecoder`s if there was unexpectedly some left-over data when the
/// `ByteToMessageDecoder` was removed from the pipeline or the `Channel` was closed.
case leftoverDataWhenDone(ByteBuffer)
}
extension ByteToMessageDecoderError {
// TODO: For NIO 3, make this an enum case (or whatever best way for Errors we have come up with).
/// This error can be thrown by `ByteToMessageDecoder`s if the incoming payload is larger than the max specified.
public struct PayloadTooLargeError: Error {}
}
/// `ByteToMessageDecoder`s decode bytes in a stream-like fashion from `ByteBuffer` to another message type.
///
/// ### Purpose
///
/// A `ByteToMessageDecoder` provides a simplified API for handling streams of incoming data that can be broken
/// up into messages. This API boils down to two methods: `decode`, and `decodeLast`. These two methods, when
/// implemented, will be used by a `ByteToMessageHandler` paired with a `ByteToMessageDecoder` to decode the
/// incoming byte stream into a sequence of messages.
///
/// The reason this helper exists is to smooth away some of the boilerplate and edge case handling code that
/// is often necessary when implementing parsers in a SwiftNIO `ChannelPipeline`. A `ByteToMessageDecoder`
/// never needs to worry about how inbound bytes will be buffered, as `ByteToMessageHandler` deals with that
/// automatically. A `ByteToMessageDecoder` also never needs to worry about memory exclusivity violations
/// that can occur when re-entrant `ChannelPipeline` operations occur, as `ByteToMessageHandler` will deal with
/// those as well.
///
/// ### Implementing ByteToMessageDecoder
///
/// A type that implements `ByteToMessageDecoder` may implement two methods: decode and decodeLast. Implementations
/// must implement decode: if they do not implement decodeLast, a default implementation will be used that
/// simply calls decode.
///
/// `decode` is the main decoding method, and is the one that will be called most often. `decode` is invoked
/// whenever data is received by the wrapping `ByteToMessageHandler`. It is invoked with a `ByteBuffer` containing
/// all the received data (including any data previously buffered), as well as a `ChannelHandlerContext` that can be
/// used in the `decode` function.
///
/// `decode` is called in a loop by the `ByteToMessageHandler`. This loop continues until one of two cases occurs:
///
/// 1. The input `ByteBuffer` has no more readable bytes (i.e. `.readableBytes == 0`); OR
/// 2. The `decode` method returns `.needMoreData`.
///
/// The reason this method is invoked in a loop is to ensure that the stream-like properties of inbound data are
/// respected. It is entirely possible for `ByteToMessageDecoder` to receive either fewer bytes than a single message,
/// or multiple messages in one go. Rather than have the `ByteToMessageDecoder` handle all of the complexity of this,
/// the logic can be boiled down to a single choice: has the `ByteToMessageDecoder` been able to move the state forward
/// or not? If it has, rather than containing an internal loop it may simply return `.continue` in order to request that
/// `decode` be invoked again immediately. If it has not, it can return `.needMoreData` to ask to be left alone until more
/// data has been returned from the network.
///
/// Essentially, if the next parsing step could not be taken because there wasn't enough data available, return `.needMoreData`.
/// Otherwise, return `.continue`. This will allow a `ByteToMessageDecoder` implementation to ignore the awkward way data
/// arrives from the network, and to just treat it as a series of `decode` calls.
///
/// `decodeLast` is a cousin of `decode`. It is also called in a loop, but unlike with `decode` this loop will only ever
/// occur once: when the `ChannelHandlerContext` belonging to this `ByteToMessageDecoder` is about to become invalidated.
/// This invalidation happens in two situations: when EOF is received from the network, or when the `ByteToMessageDecoder`
/// is being removed from the `ChannelPipeline`. The distinction between these two states is captured by the value of
/// `seenEOF`.
///
/// In this condition, the `ByteToMessageDecoder` must now produce any final messages it can with the bytes it has
/// available. In protocols where EOF is used as a message delimiter, having `decodeLast` called with `seenEOF == true`
/// may produce further messages. In other cases, `decodeLast` may choose to deliver any buffered bytes as "leftovers",
/// either in error messages or via `channelRead`. This can occur if, for example, a protocol upgrade is occurring.
///
/// As with `decode`, `decodeLast` is invoked in a loop. This allows the same simplification as `decode` allows: when
/// a message is completely parsed, the `decodeLast` function can return `.continue` and be re-invoked from the top,
/// rather than containing an internal loop.
///
/// Note that the value of `seenEOF` may change between calls to `decodeLast` in some rare situations.
///
/// ### Implementers Notes
///
/// /// `ByteToMessageHandler` will turn your `ByteToMessageDecoder` into a `ChannelInboundHandler`. `ByteToMessageHandler`
/// also solves a couple of tricky issues for you. Most importantly, in a `ByteToMessageDecoder` you do _not_ need to
/// worry about re-entrancy. Your code owns the passed-in `ByteBuffer` for the duration of the `decode`/`decodeLast` call and
/// can modify it at will.
///
/// If a custom frame decoder is required, then one needs to be careful when implementing
/// one with `ByteToMessageDecoder`. Ensure there are enough bytes in the buffer for a
/// complete frame by checking `buffer.readableBytes`. If there are not enough bytes
/// for a complete frame, return without modifying the reader index to allow more bytes to arrive.
///
/// To check for complete frames without modifying the reader index, use methods like `buffer.getInteger`.
/// You _MUST_ use the reader index when using methods like `buffer.getInteger`.
/// For example calling `buffer.getInteger(at: 0)` is assuming the frame starts at the beginning of the buffer, which
/// is not always the case. Use `buffer.getInteger(at: buffer.readerIndex)` instead.
///
/// If you move the reader index forward, either manually or by using one of `buffer.read*` methods, you must ensure
/// that you no longer need to see those bytes again as they will not be returned to you the next time `decode` is
/// called. If you still need those bytes to come back, consider taking a local copy of buffer inside the function to
/// perform your read operations on.
///
/// The `ByteBuffer` passed in as `buffer` is a slice of a larger buffer owned by the `ByteToMessageDecoder`
/// implementation. Some aspects of this buffer are preserved across calls to `decode`, meaning that any changes to
/// those properties you make in your `decode` method will be reflected in the next call to decode. In particular,
/// moving the reader index forward persists across calls. When your method returns, if the reader index has advanced,
/// those bytes are considered "consumed" and will not be available in future calls to `decode`.
/// Please note, however, that the numerical value of the `readerIndex` itself is not preserved, and may not be the same
/// from one call to the next. Please do not rely on this numerical value: if you need
/// to recall where a byte is relative to the `readerIndex`, use an offset rather than an absolute value.
///
/// ### Using ByteToMessageDecoder
///
/// To add a `ByteToMessageDecoder` to the `ChannelPipeline` use
///
/// channel.pipeline.addHandler(ByteToMessageHandler(MyByteToMessageDecoder()))
///
public protocol ByteToMessageDecoder {
/// The type of the messages this `ByteToMessageDecoder` decodes to.
associatedtype InboundOut
/// Decode from a `ByteBuffer`.
///
/// This method will be called in a loop until either the input `ByteBuffer` has nothing to read left or
/// `DecodingState.needMoreData` is returned. If `DecodingState.continue` is returned and the `ByteBuffer`
/// contains more readable bytes, this method will immediately be invoked again, unless `decodeLast` needs
/// to be invoked instead.
///
/// - parameters:
/// - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
/// - buffer: The `ByteBuffer` from which we decode.
/// - returns: `DecodingState.continue` if we should continue calling this method or `DecodingState.needMoreData` if it should be called
/// again once more data is present in the `ByteBuffer`.
mutating func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState
/// Decode from a `ByteBuffer` when no more data is incoming and the `ByteToMessageDecoder` is about to leave
/// the pipeline.
///
/// This method is called in a loop only once, when the `ChannelHandlerContext` goes inactive (i.e. when `channelInactive` is fired or
/// the `ByteToMessageDecoder` is removed from the pipeline).
///
/// Like with `decode`, this method will be called in a loop until either `DecodingState.needMoreData` is returned from the method
/// or until the input `ByteBuffer` has no more readable bytes. If `DecodingState.continue` is returned and the `ByteBuffer`
/// contains more readable bytes, this method will immediately be invoked again.
///
/// - parameters:
/// - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
/// - buffer: The `ByteBuffer` from which we decode.
/// - seenEOF: `true` if EOF has been seen. Usually if this is `false` the handler has been removed.
/// - returns: `DecodingState.continue` if we should continue calling this method or `DecodingState.needMoreData` if it should be called
/// again when more data is present in the `ByteBuffer`.
mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState
/// Called once this `ByteToMessageDecoder` is removed from the `ChannelPipeline`.
///
/// - parameters:
/// - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
mutating func decoderRemoved(context: ChannelHandlerContext)
/// Called when this `ByteToMessageDecoder` is added to the `ChannelPipeline`.
///
/// - parameters:
/// - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
mutating func decoderAdded(context: ChannelHandlerContext)
/// Determine if the read bytes in the given `ByteBuffer` should be reclaimed and their associated memory freed.
/// Be aware that reclaiming memory may involve memory copies and so is not free.
///
/// - parameters:
/// - buffer: The `ByteBuffer` to check
/// - return: `true` if memory should be reclaimed, `false` otherwise.
mutating func shouldReclaimBytes(buffer: ByteBuffer) -> Bool
}
/// Some `ByteToMessageDecoder`s need to observe `write`s (which are outbound events). `ByteToMessageDecoder`s which
/// implement the `WriteObservingByteToMessageDecoder` protocol will be notified about every outbound write.
///
/// `WriteObservingByteToMessageDecoder` may only observe a `write` and must not try to transform or block it in any
/// way. After the `write` method returns the `write` will be forwarded to the next outbound handler.
public protocol WriteObservingByteToMessageDecoder: ByteToMessageDecoder {
/// The type of `write`s.
associatedtype OutboundIn
/// `write` is called for every incoming `write` incoming to the corresponding `ByteToMessageHandler`.
///
/// - parameters:
/// - data: The data that was written.
mutating func write(data: OutboundIn)
}
extension ByteToMessageDecoder {
public mutating func decoderRemoved(context: ChannelHandlerContext) {
}
public mutating func decoderAdded(context: ChannelHandlerContext) {
}
/// Default implementation to detect once bytes should be reclaimed.
public func shouldReclaimBytes(buffer: ByteBuffer) -> Bool {
// We want to reclaim in the following cases:
//
// 1. If there is at least 2kB of memory to reclaim
// 2. If the buffer is more than 50% reclaimable memory and is at least
// 1kB in size.
if buffer.readerIndex >= 2048 {
return true
}
return buffer.storageCapacity > 1024 && (buffer.storageCapacity - buffer.readerIndex) < buffer.readerIndex
}
public func wrapInboundOut(_ value: InboundOut) -> NIOAny {
return NIOAny(value)
}
public mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
while try self.decode(context: context, buffer: &buffer) == .continue {}
return .needMoreData
}
}
private struct B2MDBuffer {
/// `B2MDBuffer`'s internal state, either we're already processing a buffer or we're ready to.
private enum State {
case processingInProgress
case ready
}
/// Can we produce a buffer to be processed right now or not?
enum BufferAvailability {
/// No, because no bytes available
case nothingAvailable
/// No, because we're already processing one
case bufferAlreadyBeingProcessed
/// Yes please, here we go.
case available(ByteBuffer)
}
/// Result of a try to process a buffer.
enum BufferProcessingResult {
/// Could not process a buffer because we are already processing one on the same call stack.
case cannotProcessReentrantly
/// Yes, we did process some.
case didProcess(DecodingState)
}
private var state: State = .ready
private var buffers: CircularBuffer<ByteBuffer> = CircularBuffer(initialCapacity: 4)
private let emptyByteBuffer: ByteBuffer
init(emptyByteBuffer: ByteBuffer) {
assert(emptyByteBuffer.readableBytes == 0)
self.emptyByteBuffer = emptyByteBuffer
}
}
// MARK: B2MDBuffer Main API
extension B2MDBuffer {
/// Start processing some bytes if possible, if we receive a returned buffer (through `.available(ByteBuffer)`)
/// we _must_ indicate the processing has finished by calling `finishProcessing`.
mutating func startProcessing(allowEmptyBuffer: Bool) -> BufferAvailability {
switch self.state {
case .processingInProgress:
return .bufferAlreadyBeingProcessed
case .ready where self.buffers.count > 0:
var buffer = self.buffers.removeFirst()
buffer.writeBuffers(self.buffers)
self.buffers.removeAll(keepingCapacity: self.buffers.capacity < 16) // don't grow too much
if buffer.readableBytes > 0 || allowEmptyBuffer {
self.state = .processingInProgress
return .available(buffer)
} else {
return .nothingAvailable
}
case .ready:
assert(self.buffers.isEmpty)
if allowEmptyBuffer {
self.state = .processingInProgress
return .available(self.emptyByteBuffer)
}
return .nothingAvailable
}
}
mutating func finishProcessing(remainder buffer: inout ByteBuffer) -> Void {
assert(self.state == .processingInProgress)
self.state = .ready
if buffer.readableBytes == 0 && self.buffers.isEmpty {
// fast path, no bytes left and no other buffers, just return
return
}
if buffer.readableBytes > 0 {
self.buffers.prepend(buffer)
} else {
buffer.discardReadBytes()
buffer.writeBuffers(self.buffers)
self.buffers.removeAll(keepingCapacity: self.buffers.capacity < 16) // don't grow too much
self.buffers.append(buffer)
}
}
mutating func append(buffer: ByteBuffer) {
if buffer.readableBytes > 0 {
self.buffers.append(buffer)
}
}
}
// MARK: B2MDBuffer Helpers
private extension ByteBuffer {
mutating func writeBuffers(_ buffers: CircularBuffer<ByteBuffer>) {
guard buffers.count > 0 else {
return
}
var requiredCapacity: Int = self.writerIndex
for buffer in buffers {
requiredCapacity += buffer.readableBytes
}
self.reserveCapacity(requiredCapacity)
for var buffer in buffers {
self.writeBuffer(&buffer)
}
}
}
private extension B2MDBuffer {
func _testOnlyOneBuffer() -> ByteBuffer? {
switch self.buffers.count {
case 0:
return nil
case 1:
return self.buffers.first
default:
let firstIndex = self.buffers.startIndex
var firstBuffer = self.buffers[firstIndex]
for var buffer in self.buffers[self.buffers.index(after: firstIndex)...] {
firstBuffer.writeBuffer(&buffer)
}
return firstBuffer
}
}
}
/// A handler which turns a given `ByteToMessageDecoder` into a `ChannelInboundHandler` that can then be added to a
/// `ChannelPipeline`.
///
/// Most importantly, `ByteToMessageHandler` handles the tricky buffer management for you and flattens out all
/// re-entrancy on `channelRead` that may happen in the `ChannelPipeline`.
public final class ByteToMessageHandler<Decoder: ByteToMessageDecoder> {
public typealias InboundIn = ByteBuffer
public typealias InboundOut = Decoder.InboundOut
private enum DecodeMode {
/// This is a usual decode, ie. not the last chunk
case normal
/// Last chunk
case last
}
private enum RemovalState {
/// Not added to any `ChannelPipeline` yet.
case notAddedToPipeline
/// No one tried to remove this handler.
case notBeingRemoved
/// The user-triggered removal has been started but isn't complete yet. This state will not be entered if the
/// removal is triggered by Channel teardown.
case removalStarted
/// The user-triggered removal is complete. This state will not be entered if the removal is triggered by
/// Channel teardown.
case removalCompleted
/// This handler has been removed from the pipeline.
case handlerRemovedCalled
}
private enum State {
case active
case leftoversNeedProcessing
case done
case error(Error)
var isError: Bool {
switch self {
case .active, .leftoversNeedProcessing, .done:
return false
case .error:
return true
}
}
var isFinalState: Bool {
switch self {
case .active, .leftoversNeedProcessing:
return false
case .done, .error:
return true
}
}
var isActive: Bool {
switch self {
case .done, .error, .leftoversNeedProcessing:
return false
case .active:
return true
}
}
var isLeftoversNeedProcessing: Bool {
switch self {
case .done, .error, .active:
return false
case .leftoversNeedProcessing:
return true
}
}
}
internal private(set) var decoder: Decoder? // only `nil` if we're already decoding (ie. we're re-entered)
private let maximumBufferSize: Int?
private var queuedWrites = CircularBuffer<NIOAny>(initialCapacity: 1) // queues writes received whilst we're already decoding (re-entrant write)
private var state: State = .active {
willSet {
assert(!self.state.isFinalState, "illegal state on state set: \(self.state)") // we can never leave final states
}
}
private var removalState: RemovalState = .notAddedToPipeline
// sadly to construct a B2MDBuffer we need an empty ByteBuffer which we can only get from the allocator, so IUO.
private var buffer: B2MDBuffer!
private var seenEOF: Bool = false
private var selfAsCanDequeueWrites: CanDequeueWrites? = nil
/// @see: ByteToMessageHandler.init(_:maximumBufferSize)
public convenience init(_ decoder: Decoder) {
self.init(decoder, maximumBufferSize: nil)
}
/// Initialize a `ByteToMessageHandler`.
///
/// - parameters:
/// - decoder: The `ByteToMessageDecoder` to decode the bytes into message.
/// - maximumBufferSize: The maximum number of bytes to aggregate in-memory.
public init(_ decoder: Decoder, maximumBufferSize: Int? = nil) {
self.decoder = decoder
self.maximumBufferSize = maximumBufferSize
}
deinit {
if self.removalState != .notAddedToPipeline {
// we have been added to the pipeline, if not, we don't need to check our state.
assert(self.removalState == .handlerRemovedCalled,
"illegal state in deinit: removalState = \(self.removalState)")
assert(self.state.isFinalState, "illegal state in deinit: state = \(self.state)")
}
}
}
// MARK: ByteToMessageHandler: Test Helpers
extension ByteToMessageHandler {
internal var cumulationBuffer: ByteBuffer? {
return self.buffer._testOnlyOneBuffer()
}
}
private protocol CanDequeueWrites {
func dequeueWrites()
}
extension ByteToMessageHandler: CanDequeueWrites where Decoder: WriteObservingByteToMessageDecoder {
fileprivate func dequeueWrites() {
while self.queuedWrites.count > 0 {
// self.decoder can't be `nil`, this is only allowed to be called when we're not already on the stack
self.decoder!.write(data: self.unwrapOutboundIn(self.queuedWrites.removeFirst()))
}
}
}
// MARK: ByteToMessageHandler's Main API
extension ByteToMessageHandler {
@inline(__always) // allocations otherwise (reconsider with Swift 5.1)
private func withNextBuffer(allowEmptyBuffer: Bool, _ body: (inout Decoder, inout ByteBuffer) throws -> DecodingState) rethrows -> B2MDBuffer.BufferProcessingResult {
switch self.buffer.startProcessing(allowEmptyBuffer: allowEmptyBuffer) {
case .bufferAlreadyBeingProcessed:
return .cannotProcessReentrantly
case .nothingAvailable:
return .didProcess(.needMoreData)
case .available(var buffer):
var possiblyReclaimBytes = false
var decoder: Decoder? = nil
swap(&decoder, &self.decoder)
assert(decoder != nil) // self.decoder only `nil` if we're being re-entered, but .available means we're not
defer {
swap(&decoder, &self.decoder)
if buffer.readableBytes > 0 && possiblyReclaimBytes {
// we asserted above that the decoder we just swapped back in was non-nil so now `self.decoder` must
// be non-nil.
if self.decoder!.shouldReclaimBytes(buffer: buffer) {
buffer.discardReadBytes()
}
}
self.buffer.finishProcessing(remainder: &buffer)
}
let decodeResult = try body(&decoder!, &buffer)
// If we .continue, there's no point in trying to reclaim bytes because we'll loop again. If we need more
// data on the other hand, we should try to reclaim some of those bytes.
possiblyReclaimBytes = decodeResult == .needMoreData
return .didProcess(decodeResult)
}
}
private func processLeftovers(context: ChannelHandlerContext) {
guard self.state.isActive else {
// we are processing or have already processed the leftovers
return
}
do {
switch try self.decodeLoop(context: context, decodeMode: .last) {
case .didProcess:
self.state = .done
case .cannotProcessReentrantly:
self.state = .leftoversNeedProcessing
}
} catch {
self.state = .error(error)
context.fireErrorCaught(error)
}
}
private func tryDecodeWrites() {
if self.queuedWrites.count > 0 {
// this must succeed because unless we implement `CanDequeueWrites`, `queuedWrites` must always be empty.
self.selfAsCanDequeueWrites!.dequeueWrites()
}
}
private func decodeLoop(context: ChannelHandlerContext, decodeMode: DecodeMode) throws -> B2MDBuffer.BufferProcessingResult {
assert(!self.state.isError)
var allowEmptyBuffer = decodeMode == .last
while (self.state.isActive && self.removalState == .notBeingRemoved) || decodeMode == .last {
let result = try self.withNextBuffer(allowEmptyBuffer: allowEmptyBuffer) { decoder, buffer in
let decoderResult: DecodingState
if decodeMode == .normal {
assert(self.state.isActive, "illegal state for normal decode: \(self.state)")
decoderResult = try decoder.decode(context: context, buffer: &buffer)
} else {
allowEmptyBuffer = false
decoderResult = try decoder.decodeLast(context: context, buffer: &buffer, seenEOF: self.seenEOF)
}
if decoderResult == .needMoreData, let maximumBufferSize = self.maximumBufferSize, buffer.readableBytes > maximumBufferSize {
throw ByteToMessageDecoderError.PayloadTooLargeError()
}
return decoderResult
}
switch result {
case .didProcess(.continue):
self.tryDecodeWrites()
continue
case .didProcess(.needMoreData):
if self.queuedWrites.count > 0 {
self.tryDecodeWrites()
continue // we might have received more, so let's spin once more
} else {
return .didProcess(.needMoreData)
}
case .cannotProcessReentrantly:
return .cannotProcessReentrantly
}
}
return .didProcess(.continue)
}
}
// MARK: ByteToMessageHandler: ChannelInboundHandler
extension ByteToMessageHandler: ChannelInboundHandler {
public func handlerAdded(context: ChannelHandlerContext) {
guard self.removalState == .notAddedToPipeline else {
preconditionFailure("\(self) got readded to a ChannelPipeline but ByteToMessageHandler is single-use")
}
self.removalState = .notBeingRemoved
self.buffer = B2MDBuffer(emptyByteBuffer: context.channel.allocator.buffer(capacity: 0))
// here we can force it because we know that the decoder isn't in use if we're just adding this handler
self.selfAsCanDequeueWrites = self as? CanDequeueWrites // we need to cache this as it allocates.
self.decoder!.decoderAdded(context: context)
}
public func handlerRemoved(context: ChannelHandlerContext) {
// very likely, the removal state is `.notBeingRemoved` or `.removalCompleted` here but we can't assert it
// because the pipeline might be torn down during the formal removal process.
self.removalState = .handlerRemovedCalled
if !self.state.isFinalState {
self.state = .done
}
self.selfAsCanDequeueWrites = nil
// here we can force it because we know that the decoder isn't in use because the removal is always
// eventLoop.execute'd
self.decoder!.decoderRemoved(context: context)
}
/// Calls `decode` until there is nothing left to decode.
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let buffer = self.unwrapInboundIn(data)
if case .error(let error) = self.state {
context.fireErrorCaught(ByteToMessageDecoderError.dataReceivedInErrorState(error, buffer))
return
}
self.buffer.append(buffer: buffer)
do {
switch try self.decodeLoop(context: context, decodeMode: .normal) {
case .didProcess:
switch self.state {
case .active:
() // cool, all normal
case .done, .error:
() // fair, all done already
case .leftoversNeedProcessing:
// seems like we received a `channelInactive` or `handlerRemoved` whilst we were processing a read
switch try self.decodeLoop(context: context, decodeMode: .last) {
case .didProcess:
() // expected and cool
case .cannotProcessReentrantly:
preconditionFailure("bug in NIO: non-reentrant decode loop couldn't run \(self), \(self.state)")
}
self.state = .done
}
case .cannotProcessReentrantly:
// fine, will be done later
()
}
} catch {
self.state = .error(error)
context.fireErrorCaught(error)
}
}
/// Call `decodeLast` before forward the event through the pipeline.
public func channelInactive(context: ChannelHandlerContext) {
self.seenEOF = true
self.processLeftovers(context: context)
context.fireChannelInactive()
}
public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if event as? ChannelEvent == .some(.inputClosed) {
self.seenEOF = true
self.processLeftovers(context: context)
}
context.fireUserInboundEventTriggered(event)
}
}
extension ByteToMessageHandler: ChannelOutboundHandler, _ChannelOutboundHandler where Decoder: WriteObservingByteToMessageDecoder {
public typealias OutboundIn = Decoder.OutboundIn
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
if self.decoder != nil {
let data = self.unwrapOutboundIn(data)
assert(self.queuedWrites.isEmpty)
self.decoder!.write(data: data)
} else {
self.queuedWrites.append(data)
}
context.write(data, promise: promise)
}
}
/// A protocol for straightforward encoders which encode custom messages to `ByteBuffer`s.
/// To add a `MessageToByteEncoder` to a `ChannelPipeline`, use
/// `channel.pipeline.addHandler(MessageToByteHandler(myEncoder)`.
public protocol MessageToByteEncoder {
associatedtype OutboundIn
/// Called once there is data to encode.
///
/// - parameters:
/// - data: The data to encode into a `ByteBuffer`.
/// - out: The `ByteBuffer` into which we want to encode.
func encode(data: OutboundIn, out: inout ByteBuffer) throws
}
extension ByteToMessageHandler: RemovableChannelHandler {
public func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {
precondition(self.removalState == .notBeingRemoved)
self.removalState = .removalStarted
context.eventLoop.execute {
self.processLeftovers(context: context)
assert(!self.state.isLeftoversNeedProcessing, "illegal state: \(self.state)")
switch self.removalState {
case .removalStarted:
self.removalState = .removalCompleted
case .handlerRemovedCalled:
// if we're here, then the channel has also been torn down between the start and the completion of
// the user-triggered removal. That's okay.
()
default:
assertionFailure("illegal removal state: \(self.removalState)")
}
// this is necessary as it'll complete the promise.
context.leavePipeline(removalToken: removalToken)
}
}
}
/// A handler which turns a given `MessageToByteEncoder` into a `ChannelOutboundHandler` that can then be added to a
/// `ChannelPipeline`.
public final class MessageToByteHandler<Encoder: MessageToByteEncoder>: ChannelOutboundHandler {
public typealias OutboundOut = ByteBuffer
public typealias OutboundIn = Encoder.OutboundIn
private enum State {
case notInChannelYet
case operational
case error(Error)
case done
var readyToBeAddedToChannel: Bool {
switch self {
case .notInChannelYet:
return true
case .operational, .error, .done:
return false
}
}
}
private var state: State = .notInChannelYet
private let encoder: Encoder
private var buffer: ByteBuffer? = nil
public init(_ encoder: Encoder) {
self.encoder = encoder
}
}
extension MessageToByteHandler {
public func handlerAdded(context: ChannelHandlerContext) {
precondition(self.state.readyToBeAddedToChannel,
"illegal state when adding to Channel: \(self.state)")
self.state = .operational
self.buffer = context.channel.allocator.buffer(capacity: 256)
}
public func handlerRemoved(context: ChannelHandlerContext) {
self.state = .done
self.buffer = nil
}
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
switch self.state {
case .notInChannelYet:
preconditionFailure("MessageToByteHandler.write called before it was added to a Channel")
case .error(let error):
promise?.fail(error)
context.fireErrorCaught(error)
return
case .done:
// let's just ignore this
return
case .operational:
// there's actually some work to do here
break
}
let data = self.unwrapOutboundIn(data)
do {
self.buffer!.clear()
try self.encoder.encode(data: data, out: &self.buffer!)
context.write(self.wrapOutboundOut(self.buffer!), promise: promise)
} catch {
self.state = .error(error)
promise?.fail(error)
context.fireErrorCaught(error)
}
}
}
|