File: StreamChannelsTest.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (924 lines) | stat: -rw-r--r-- 44,478 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import XCTest
@testable import NIO
import NIOTestUtils
import NIOConcurrencyHelpers

class StreamChannelTest: XCTestCase {
    var buffer: ByteBuffer! = nil

    override func setUp() {
        self.buffer = ByteBufferAllocator().buffer(capacity: 128)
    }

    override func tearDown() {
        self.buffer = nil
    }

    func testEchoBasic() throws {
        class EchoHandler: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer

            func channelRead(context: ChannelHandlerContext, data: NIOAny) {
                context.write(data, promise: nil)
            }

            func channelReadComplete(context: ChannelHandlerContext) {
                context.flush()
            }
        }

        func runTest(chan1: Channel, chan2: Channel) throws {
            var everythingBuffer = chan1.allocator.buffer(capacity: 300000)
            let allDonePromise = chan1.eventLoop.makePromise(of: ByteBuffer.self)
            XCTAssertNoThrow(try chan1.pipeline.addHandler(EchoHandler()).wait())
            XCTAssertNoThrow(try chan2.pipeline.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)).wait())

            for f in [1, 10, 100, 1_000, 10_000, 300_000] {
                let from = everythingBuffer.writerIndex
                everythingBuffer.writeString("\(f)")
                everythingBuffer.writeBytes(repeatElement(UInt8(ascii: "x"), count: f))
                XCTAssertNoThrow(chan2.writeAndFlush(everythingBuffer.getSlice(at: from,
                                                                               length: everythingBuffer.writerIndex - from)!))
            }
            let from = everythingBuffer.writerIndex
            everythingBuffer.writeString("$") // magic end marker that will cause the channel to close
            XCTAssertNoThrow(chan2.writeAndFlush(everythingBuffer.getSlice(at: from, length: 1)!))
            XCTAssertNoThrow(XCTAssertEqual(everythingBuffer, try allDonePromise.futureResult.wait()))
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testSyncChannelOptions() throws {
        class GetAndSetAutoReadHandler: ChannelInboundHandler {
            typealias InboundIn = Any

            func handlerAdded(context: ChannelHandlerContext) {
                guard let syncOptions = context.channel.syncOptions else {
                    XCTFail("Sync options are not supported on \(type(of: context.channel))")
                    return
                }

                XCTAssertTrue(try syncOptions.getOption(ChannelOptions.autoRead))
                XCTAssertNoThrow(try syncOptions.setOption(ChannelOptions.autoRead, value: false))
                XCTAssertFalse(try syncOptions.getOption(ChannelOptions.autoRead))
            }
        }

        func runTest(chan1: Channel, chan2: Channel) throws {
            XCTAssertNoThrow(try chan1.pipeline.addHandler(GetAndSetAutoReadHandler()).wait())
            XCTAssertNoThrow(try chan2.pipeline.addHandler(GetAndSetAutoReadHandler()).wait())
        }

        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testChannelReturnsNilForDefaultSyncOptionsImplementation() throws {
        class TestChannel: Channel {
            var allocator: ByteBufferAllocator { fatalError() }
            var closeFuture: EventLoopFuture<Void> { fatalError() }
            var pipeline: ChannelPipeline { fatalError() }
            var localAddress: SocketAddress? = nil
            var remoteAddress: SocketAddress? = nil
            var parent: Channel? = nil
            var _channelCore: ChannelCore { fatalError() }
            var eventLoop: EventLoop { fatalError() }
            var isWritable: Bool = false
            var isActive: Bool = false

            func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
                fatalError()
            }

            func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> {
                fatalError()
            }

            init() {
            }
        }

        let channel = TestChannel()
        XCTAssertNil(channel.syncOptions)
    }

    func testWritabilityStartsTrueGoesFalseAndBackToTrue() throws {
        class WritabilityTrackerStateMachine: ChannelInboundHandler {
            typealias InboundIn = Never
            typealias OutboundOut = ByteBuffer

            enum State: Int {
                case beginsTrue = 0
                case thenFalse = 1
                case thenTrueAgain = 2
            }

            var channelWritabilityChangedCalls = 0
            var state = State.beginsTrue
            let writabilityNowFalsePromise: EventLoopPromise<Void>
            let writeFullyDonePromise: EventLoopPromise<Void>

            init(writabilityNowFalsePromise: EventLoopPromise<Void>,
                 writeFullyDonePromise: EventLoopPromise<Void>) {
                self.writabilityNowFalsePromise = writabilityNowFalsePromise
                self.writeFullyDonePromise = writeFullyDonePromise
            }

            func handlerAdded(context: ChannelHandlerContext) {
                // 5 MB, this must be safely more than send buffer + receive buffer. The reason is that we don't want
                // the overall write to complete before we make the other end of the channel readable.
                let totalAmount = 5 * 1024 * 1024
                let chunkSize = 10 * 1024
                XCTAssertEqual(.beginsTrue, self.state)
                self.state = .thenFalse
                XCTAssertEqual(true, context.channel.isWritable)

                var buffer = context.channel.allocator.buffer(capacity: chunkSize)
                buffer.writeBytes(repeatElement(UInt8(ascii: "x"), count: chunkSize))
                for _ in 0 ..< (totalAmount / chunkSize) {
                    context.write(self.wrapOutboundOut(buffer)).whenFailure { error in
                        XCTFail("unexpected error \(error)")
                    }
                }
                context.write(self.wrapOutboundOut(buffer)).map {
                    XCTAssertEqual(self.state, .thenTrueAgain)
                }.recover { error in
                    XCTFail("unexpected error \(error)")
                }.cascade(to: self.writeFullyDonePromise)
                context.flush()
            }

            func channelWritabilityChanged(context: ChannelHandlerContext) {
                self.channelWritabilityChangedCalls += 1
                XCTAssertEqual(self.state.rawValue % 2 == 0, context.channel.isWritable)
                XCTAssertEqual(State(rawValue: self.channelWritabilityChangedCalls), self.state)
                if let newState = State(rawValue: self.channelWritabilityChangedCalls + 1) {
                    if self.state == .thenFalse {
                        context.eventLoop.scheduleTask(in: .microseconds(100)) {
                            // Let's delay this a tiny little bit just so we get a higher chance to actually exhaust all
                            // the buffers. The delay is not necessary for this test to work but it makes the tests a
                            // little bit harder.
                            self.writabilityNowFalsePromise.succeed(())
                        }
                    }
                    self.state = newState
                }
            }
        }

        func runTest(chan1: Channel, chan2: Channel) throws {
            let allDonePromise = chan1.eventLoop.makePromise(of: ByteBuffer.self)
            let writabilityFalsePromise = chan1.eventLoop.makePromise(of: Void.self)
            let writeFullyDonePromise = chan1.eventLoop.makePromise(of: Void.self)
            XCTAssertNoThrow(try chan2.setOption(ChannelOptions.autoRead, value: false).wait())
            XCTAssertNoThrow(try chan2.pipeline.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)).wait())
            XCTAssertNoThrow(try chan1.pipeline.addHandler(WritabilityTrackerStateMachine(writabilityNowFalsePromise: writabilityFalsePromise,
                                                                                          writeFullyDonePromise: writeFullyDonePromise)).wait())

            // Writability should turn false because we're writing lots of data and we aren't reading.
            XCTAssertNoThrow(try writabilityFalsePromise.futureResult.wait())
            // Ok, let's read.
            XCTAssertNoThrow(try chan2.setOption(ChannelOptions.autoRead, value: true).wait())
            // Which should lead to the write to complete.
            XCTAssertNoThrow(try writeFullyDonePromise.futureResult.wait())
            // To finish up, let's just tear this down.
            XCTAssertNoThrow(try chan2.close().wait())
            XCTAssertNoThrow(try chan1.closeFuture.wait())
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testHalfCloseOwnOutput() throws {
        func runTest(chan1: Channel, chan2: Channel) throws {
            let readPromise = chan2.eventLoop.makePromise(of: Void.self)
            let eofPromise = chan1.eventLoop.makePromise(of: Void.self)

            XCTAssertNoThrow(try chan1.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
            XCTAssertNoThrow(try chan1.pipeline.addHandler(FulfillOnFirstEventHandler(userInboundEventTriggeredPromise: eofPromise)).wait())

            // let's close chan2's output
            XCTAssertNoThrow(try chan2.close(mode: .output).wait())
            XCTAssertNoThrow(try eofPromise.futureResult.wait())

            self.buffer.writeString("X")
            XCTAssertNoThrow(try chan2.pipeline.addHandler(FulfillOnFirstEventHandler(channelReadPromise: readPromise)).wait())

            // let's write a byte from chan1 to chan2.
            XCTAssertNoThrow(try chan1.writeAndFlush(self.buffer).wait(), "write on \(chan1) failed")

            // and wait for it to arrive
            XCTAssertNoThrow(try readPromise.futureResult.wait())

            XCTAssertNoThrow(try chan1.syncCloseAcceptingAlreadyClosed())
            XCTAssertNoThrow(try chan2.syncCloseAcceptingAlreadyClosed())
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testHalfCloseOwnInput() {
        func runTest(chan1: Channel, chan2: Channel) throws {

            let readPromise = chan1.eventLoop.makePromise(of: Void.self)

            XCTAssertNoThrow(try chan2.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
            // let's close chan2's input
            XCTAssertNoThrow(try chan2.close(mode: .input).wait())

            self.buffer.writeString("X")
            XCTAssertNoThrow(try chan1.pipeline.addHandler(FulfillOnFirstEventHandler(channelReadPromise: readPromise)).wait())

            // let's write a byte from chan2 to chan1.
            XCTAssertNoThrow(try chan2.writeAndFlush(self.buffer).wait())

            // and wait for it to arrive
            XCTAssertNoThrow(try readPromise.futureResult.wait())

            XCTAssertNoThrow(try chan1.syncCloseAcceptingAlreadyClosed())
            XCTAssertNoThrow(try chan2.syncCloseAcceptingAlreadyClosed())
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testDoubleShutdownInput() {
        func runTest(chan1: Channel, chan2: Channel) throws {
            XCTAssertNoThrow(try chan1.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
            XCTAssertNoThrow(try chan1.close(mode: .input).wait())
            XCTAssertThrowsError(try chan1.close(mode: .input).wait()) { error in
                XCTAssertEqual(ChannelError.inputClosed, error as? ChannelError, "\(chan1)")
            }
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testDoubleShutdownOutput() {
        func runTest(chan1: Channel, chan2: Channel) throws {
            XCTAssertNoThrow(try chan2.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
            XCTAssertNoThrow(try chan1.close(mode: .output).wait())
            XCTAssertThrowsError(try chan1.close(mode: .output).wait()) { error in
                XCTAssertEqual(ChannelError.outputClosed, error as? ChannelError, "\(chan1)")
            }
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testWriteFailsAfterOutputClosed() {
        func runTest(chan1: Channel, chan2: Channel) throws {
            XCTAssertNoThrow(try chan2.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
            XCTAssertNoThrow(try chan1.close(mode: .output).wait())
            var buffer = chan1.allocator.buffer(capacity: 10)
            buffer.writeString("helloworld")
            XCTAssertThrowsError(try chan1.writeAndFlush(buffer).wait()) { error in
                XCTAssertEqual(ChannelError.outputClosed, error as? ChannelError, "\(chan1)")
            }
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testVectorWrites() {
        func runTest(chan1: Channel, chan2: Channel) throws {
            let readPromise = chan2.eventLoop.makePromise(of: Void.self)
            XCTAssertNoThrow(chan2.pipeline.addHandler(FulfillOnFirstEventHandler(channelReadPromise: readPromise)))
            var buffer = chan1.allocator.buffer(capacity: 1)
            buffer.writeString("X")
            for _ in 0..<100 {
                chan1.write(buffer, promise: nil)
            }
            chan1.flush()
            XCTAssertNoThrow(try readPromise.futureResult.wait())
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testLotsOfWritesWhilstOtherSideNotReading() {
        // This is a regression test for a problem where we would spin on EVFILT_EXCEPT despite the fact that there
        // was no EOF or any other exceptional event present. So this is a regression test for rdar://53656794 and https://github.com/apple/swift-nio/pull/526.
        class FailOnReadHandler: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer

            let areReadsOkayNow: NIOAtomic<Bool>

            init(areReadOkayNow: NIOAtomic<Bool>) {
                self.areReadsOkayNow = areReadOkayNow
            }

            func channelRead(context: ChannelHandlerContext, data: NIOAny) {
                guard self.areReadsOkayNow.load() else {
                    XCTFail("unexpected read of \(self.unwrapInboundIn(data))")
                    return
                }
            }

            func channelReadComplete(context: ChannelHandlerContext) {
                guard self.areReadsOkayNow.load() else {
                    XCTFail("unexpected readComplete")
                    return
                }
            }
        }

        func runTest(receiver: Channel, sender: Channel) throws {
            let sends = NIOAtomic<Int>.makeAtomic(value: 0)
            precondition(receiver.eventLoop !== sender.eventLoop,
                         "this test cannot run if sender and receiver live on the same EventLoop. \(receiver)")
            XCTAssertNoThrow(try receiver.setOption(ChannelOptions.autoRead, value: false).wait())
            let areReadsOkayNow: NIOAtomic<Bool> = .makeAtomic(value: false)
            XCTAssertNoThrow(try receiver.pipeline.addHandler(FailOnReadHandler(areReadOkayNow: areReadsOkayNow)).wait())

            // We will immediately send exactly the amount of data that fits in the receiver's receive buffer.
            let receiveBufferSize = Int((try? receiver.getOption(ChannelOptions.socketOption(.so_rcvbuf)).wait()) ?? 8192)
            var buffer = sender.allocator.buffer(capacity: receiveBufferSize)
            buffer.writeBytes(Array(repeating: UInt8(ascii: "X"), count: receiveBufferSize))

            XCTAssertNoThrow(try sender.eventLoop.submit {
                func send() {
                    var allBuffer = buffer
                    // When we run through this for the first time, we send exactly the receive buffer size, after that
                    // we send one byte at a time. Sending the receive buffer will trigger the EVFILT_EXCEPT loop
                    // (rdar://53656794) for UNIX Domain Sockets and the additional 1 byte send loop will also pretty
                    // reliably trigger it for TCP sockets.
                    let myBuffer = allBuffer.readSlice(length: sends.load() == 0 ? receiveBufferSize : 1)!
                    sender.writeAndFlush(myBuffer).map {
                        _ = sends.add(1)
                        sender.eventLoop.scheduleTask(in: .microseconds(1)) {
                            send()
                        }
                    }.whenFailure { error in
                        XCTAssert(areReadsOkayNow.load(), "error before the channel should go down")
                        guard case .some(.ioOnClosedChannel) = error as? ChannelError else {
                            XCTFail("unexpected error: \(error)")
                            return
                        }
                    }
                }
                send()
            }.wait())

            for _ in 0..<10 {
                // We just spin here for a little while to check that there are no bogus events available on the
                // selector.
                let eventLoop = (receiver.eventLoop as! SelectableEventLoop)
                XCTAssertNoThrow(try eventLoop._selector.testsOnly_withUnsafeSelectorFD { fd in
                        try assertNoSelectorChanges(fd: fd, selector:eventLoop._selector)
                    }, "after \(sends.load()) sends, we got an unexpected selector event for \(receiver)")
                usleep(10000)
            }
            // We'll soon close the channels, so reads are now acceptable (from the EOF that we may read).
            XCTAssertTrue(areReadsOkayNow.compareAndExchange(expected: false, desired: true))
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(forceSeparateEventLoops: true, runTest))
    }

    func testFlushInWritePromise() {
        class WaitForTwoBytesHandler: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer

            private let allDonePromise: EventLoopPromise<Void>
            private var numberOfBytes = 0

            init(allDonePromise: EventLoopPromise<Void>) {
                self.allDonePromise = allDonePromise
            }

            func channelRead(context: ChannelHandlerContext, data: NIOAny) {
                // The two writes could be coalesced, so we add up the bytes and not always the number of read calls.
                self.numberOfBytes += self.unwrapInboundIn(data).readableBytes
                if self.numberOfBytes == 2 {
                    self.allDonePromise.succeed(())
                }
            }
        }

        func runTest(receiver: Channel, sender: Channel) throws {
            let allDonePromise = receiver.eventLoop.makePromise(of: Void.self)
            XCTAssertNoThrow(try sender.setOption(ChannelOptions.writeSpin, value: 0).wait())
            XCTAssertNoThrow(try receiver.pipeline.addHandler(WaitForTwoBytesHandler(allDonePromise: allDonePromise)).wait())
            var buffer = sender.allocator.buffer(capacity: 1)
            buffer.writeString("X")
            XCTAssertNoThrow(try sender.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
                let writePromise = sender.eventLoop.makePromise(of: Void.self)
                let bothWritesResult = writePromise.futureResult.flatMap {
                    sender.writeAndFlush(buffer)
                }
                sender.writeAndFlush(buffer, promise: writePromise)
                return bothWritesResult
            }.wait())
            XCTAssertNoThrow(try allDonePromise.futureResult.wait())
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testWriteAndFlushInChannelWritabilityChangedToTrue() {
        // regression test for rdar://58571521
        final class WriteWhenWritabilityGoesToTrue: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer
            typealias OutboundOut = ByteBuffer

            private var numberOfCalls = 0

            func channelWritabilityChanged(context: ChannelHandlerContext) {
                self.numberOfCalls += 1

                switch self.numberOfCalls {
                case 1:
                    // This is us exceeding the high water mark
                    XCTAssertFalse(context.channel.isWritable)
                case 2:
                    // This is after the two bytes have been written.
                    XCTAssertTrue(context.channel.isWritable)

                    // Now, let's trigger another write which should cause flushNow to be re-entered. But first, let's
                    // raise the high water mark so we don't get another call straight away.
                    var buffer = context.channel.allocator.buffer(capacity: 5)
                    buffer.writeString("hello")
                    context.channel.setOption(ChannelOptions.writeBufferWaterMark, value: .init(low: 1024, high: 1024)).flatMap {
                        context.writeAndFlush(self.wrapOutboundOut(buffer))
                    }.whenFailure { error in
                        XCTFail("unexpected error: \(error)")
                    }
                default:
                    XCTFail("call \(self.numberOfCalls) to \(#function) unexpected")
                }
            }
        }

        final class WaitForNumberOfBytes: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer

            private let allDonePromise: EventLoopPromise<Void>
            private var numberOfReads = 0
            private let expectedNumberOfBytes: Int

            init(numberOfBytes: Int, allDonePromise: EventLoopPromise<Void>) {
                self.expectedNumberOfBytes = numberOfBytes
                self.allDonePromise = allDonePromise
            }

            func channelRead(context: ChannelHandlerContext, data: NIOAny) {
                // The two writes could be coalesced, so we add up the bytes and not always the number of read calls.
                self.numberOfReads += self.unwrapInboundIn(data).readableBytes
                if self.numberOfReads >= self.expectedNumberOfBytes {
                    self.allDonePromise.succeed(())
                }
            }

            func handlerRemoved(context: ChannelHandlerContext) {
                self.allDonePromise.fail(ChannelError.ioOnClosedChannel)
            }
        }

        func runTest(receiver: Channel, sender: Channel) throws {
            // Write spin might just disturb this test so let's switch it off
            XCTAssertNoThrow(try sender.setOption(ChannelOptions.writeSpin, value: 0).wait())
            // Writing more than the high water mark will cause the channel to become unwritable very easily
            XCTAssertNoThrow(try sender.setOption(ChannelOptions.writeBufferWaterMark, value: .init(low: 1, high: 1)).wait())

            let sevenBytesReceived = receiver.eventLoop.makePromise(of: Void.self)
            XCTAssertNoThrow(try receiver.pipeline.addHandler(WaitForNumberOfBytes(numberOfBytes: 7,
                                                                                   allDonePromise: sevenBytesReceived)).wait())

            let eventCounterHandler = EventCounterHandler()
            XCTAssertNoThrow(try sender.pipeline.addHandler(EventCounterHandler()).wait())
            XCTAssertNoThrow(try sender.pipeline.addHandler(WriteWhenWritabilityGoesToTrue()).wait())

            var buffer = sender.allocator.buffer(capacity: 5)
            buffer.writeString("XX") // 2 bytes, exceeds the high water mark

            XCTAssertTrue(sender.isWritable)
            XCTAssertEqual(0, eventCounterHandler.channelWritabilityChangedCalls)
            XCTAssertNoThrow(try sender.writeAndFlush(buffer).wait())
            XCTAssertNoThrow(try sevenBytesReceived.futureResult.wait())
        }

        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testWritabilityChangedDoesNotGetCalledOnSimpleWrite() {
        func runTest(receiver: Channel, sender: Channel) throws {
            let eventCounter = EventCounterHandler()
            XCTAssertNoThrow(try sender.pipeline.addHandler(eventCounter).wait())
            var buffer = sender.allocator.buffer(capacity: 1)
            buffer.writeString("X")
            XCTAssertNoThrow(try sender.writeAndFlush(buffer).wait())
            XCTAssertEqual(0, eventCounter.channelWritabilityChangedCalls)
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testWriteAndFlushFromReentrantFlushNowTriggeredOutOfWritabilityWhereOuterSaysAllWrittenAndInnerDoesNot() {
        // regression test for rdar://58571521, harder version

        /*
         What we're doing here is to enter exactly the following scenario which used to be an issue.

         1: writable()
         2: --> flushNow (result: .writtenCompletely)
         3:     --> writabilityChanged callout
         4:         --> flushNow because user calls writeAndFlush (result: .couldNotWriteEverything)
         5:         --> registerForWritable (because line 4 could not write everything and flushNow returned .register)
         6: --> unregisterForWritable (because line 2 wrote everything and flushNow returned .unregister)

         line 6 undoes the registeration in line 5. The fix makes sure that flushNow never re-enters and therefore the
         problem described above cannot happen anymore.

         Our match plan is the following:
         - receiver: switch off autoRead
         - sender: send 1k chunks of "0"s until we get a writabilityChange to false, then write a "1" sentinel
         - sender: should now be registered for writes
         - receiver: allocate a buffer big enough for the "0....1" and read it out as soon as possible
         - sender: the kernel should now call us with the `writable()` notification
         - sender: the remaining "0...1" should now go out of the door together, which means that `flushNow` decides
                   to `.unregister`
         - sender: because we now `.unregister` and also fall below the low watermark, we will send a writabilityChange
                   notification from which we will send a large 100MB chunk which certainly requires a new `writable()`
                   registration (which was previously lost)
         - receiver: just read off all the bytes
         - test: wait until the 100MB write completes which means that we didn't lost that `writable()` registration and
                 everybody should be happy :)
         */

        final class WriteUntilWriteDoesNotCompletelyInstantlyHandler: ChannelInboundHandler, RemovableChannelHandler {
            typealias InboundIn = ByteBuffer
            typealias OutboundOut = ByteBuffer

            enum State {
                case writingUntilFull
                case writeSentinel
                case done
            }

            let chunkSize: Int
            let wroteEnoughToBeStuckPromise: EventLoopPromise<Int>
            var state = State.writingUntilFull
            var bytesWritten = 0

            init(chunkSize: Int, wroteEnoughToBeStuckPromise: EventLoopPromise<Int>) {
                self.chunkSize = chunkSize
                self.wroteEnoughToBeStuckPromise = wroteEnoughToBeStuckPromise
            }

            func handlerAdded(context: ChannelHandlerContext) {
                // We set the high watermark such that if we can't write something immediately, we'll get a
                // writabilityChanged notification.
                context.channel.setOption(ChannelOptions.writeBufferWaterMark,
                                          value: .init(low: self.chunkSize,
                                                       high: self.chunkSize + 1)).whenFailure { error in
                    XCTFail("unexpected error \(error)")
                }

                // Write spin count would make the test less deterministic, so let's switch it off.
                context.channel.setOption(ChannelOptions.writeSpin, value: 0).whenFailure { error in
                    XCTFail("unexpected error \(error)")
                }

                context.eventLoop.execute {
                    self.kickOff(context: context)
                }
            }

            func handlerRemoved(context: ChannelHandlerContext) {
                XCTAssertEqual(.done, self.state)
            }

            func kickOff(context: ChannelHandlerContext) {
                var buffer = context.channel.allocator.buffer(capacity: self.chunkSize)
                buffer.writeBytes(Array(repeating: UInt8(ascii: "0"), count: chunkSize))

                func writeOneMore() {
                    self.bytesWritten += buffer.readableBytes
                    context.writeAndFlush(self.wrapOutboundOut(buffer)).whenFailure { error in
                        XCTFail("unexpected error \(error)")
                    }
                    context.eventLoop.scheduleTask(in: .microseconds(100)) {
                        switch self.state {
                        case .writingUntilFull:
                            // We're just enqueing another chunk.
                            writeOneMore()
                        case .writeSentinel:
                            // We've seen the notification that the channel is unwritable, let's write one more byte.
                            buffer.clear()
                            buffer.writeString("1")
                            self.state = .done
                            self.bytesWritten += 1
                            context.writeAndFlush(self.wrapOutboundOut(buffer)).whenFailure { error in
                                XCTFail("unexpected error \(error)")
                            }
                            self.wroteEnoughToBeStuckPromise.succeed(self.bytesWritten)
                        case .done:
                            () // let's ignore this.
                        }
                    }
                }
                context.eventLoop.execute {
                    writeOneMore() // this kicks everything off
                }
            }

            func channelWritabilityChanged(context: ChannelHandlerContext) {
                switch self.state {
                case .writingUntilFull:
                    XCTAssert(!context.channel.isWritable)
                    self.state = .writeSentinel
                case .writeSentinel:
                    XCTFail("we shouldn't see another notification here writable=\(context.channel.isWritable)")
                case .done:
                    () // ignored, we're done
                }
                context.fireChannelWritabilityChanged()
                self.wroteEnoughToBeStuckPromise.futureResult.whenSuccess { _ in
                    context.pipeline.removeHandler(self).whenFailure { error in
                        XCTFail("unexpected error \(error)")
                    }
                }
            }
        }

        final class WriteWhenChannelBecomesWritableAgain: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer
            typealias OutboundOut = ByteBuffer

            enum State {
                case waitingForNotWritable
                case waitingForWritableAgain
                case done
            }

            var state = State.waitingForNotWritable
            let beganBigWritePromise: EventLoopPromise<Void>
            let finishedBigWritePromise: EventLoopPromise<Void>

            init(beganBigWritePromise: EventLoopPromise<Void>, finishedBigWritePromise: EventLoopPromise<Void>) {
                self.beganBigWritePromise = beganBigWritePromise
                self.finishedBigWritePromise = finishedBigWritePromise
            }

            func handlerRemoved(context: ChannelHandlerContext) {
                XCTAssertEqual(.done, self.state)
            }

            func channelWritabilityChanged(context: ChannelHandlerContext) {
                switch self.state {
                case .waitingForNotWritable:
                    XCTAssert(!context.channel.isWritable)
                    self.state = .waitingForWritableAgain
                case .waitingForWritableAgain:
                    XCTAssert(context.channel.isWritable)
                    self.state = .done
                    var buffer = context.channel.allocator.buffer(capacity: 10 * 1024 * 1024)
                    buffer.writeBytes(Array(repeating: UInt8(ascii: "X"), count: buffer.capacity - 1))
                    context.writeAndFlush(self.wrapOutboundOut(buffer), promise: self.finishedBigWritePromise)
                    self.beganBigWritePromise.succeed(())
                case .done:
                    () // ignored
                }
            }
        }

        final class ReadChunksUntilWeSee1Handler: ChannelDuplexHandler {
            typealias InboundIn = ByteBuffer
            typealias OutboundIn = ByteBuffer

            enum State {
                case waitingForInitialOutsideReadCall
                case waitingForZeroesTerminatedByOne
                case done
            }

            var state: State = .waitingForInitialOutsideReadCall

            func handlerAdded(context: ChannelHandlerContext) {
                context.channel.setOption(ChannelOptions.autoRead, value: false).whenFailure { error in
                    XCTFail("unexpected error \(error)")
                }
            }

            func handlerRemoved(context: ChannelHandlerContext) {
                XCTAssertEqual(.done, self.state)
            }

            func channelRead(context: ChannelHandlerContext, data: NIOAny) {
                let buffer = self.unwrapInboundIn(data)
                switch self.state {
                case .waitingForInitialOutsideReadCall:
                    XCTFail("unexpected \(#function)")
                case .waitingForZeroesTerminatedByOne:
                    buffer.withUnsafeReadableBytes { buffer in
                        if buffer.first(where: { byte in byte == UInt8(ascii: "1" )}) != nil {
                            self.state = .done
                        }
                    }
                case .done:
                    () // let's ignore those reads, just 100 MB of Xs.
                }
            }

            func channelReadComplete(context: ChannelHandlerContext) {
                switch self.state {
                case .waitingForInitialOutsideReadCall:
                    XCTFail("unexpected \(#function)")
                case .waitingForZeroesTerminatedByOne:
                    context.read() // read more
                case .done:
                    () // let's stop reading forever
                }
            }

            func read(context: ChannelHandlerContext) {
                switch self.state {
                case .waitingForInitialOutsideReadCall:
                    self.state = .waitingForZeroesTerminatedByOne
                case .waitingForZeroesTerminatedByOne, .done:
                    () // nothing else to do
                }
                context.read()
            }
        }

        final class FailOnError: ChannelInboundHandler {
            typealias InboundIn = Never

            func errorCaught(context: ChannelHandlerContext, error: Error) {
                XCTFail("unexpected error in \(context.channel): \(error)")
            }
        }

        func runTest(receiver: Channel, sender: Channel) throws {
            // This promise will be fulfilled once we have exhausted all buffers and writes no longer worked for the
            // sender. We can then start reading. The integer is the number of written bytes.
            let wroteEnoughToBeStuckPromise: EventLoopPromise<Int> = sender.eventLoop.makePromise()

            // This promise is fulfilled when we enqueue the big write on the sender side
            let beganBigWritePromise: EventLoopPromise<Void> = sender.eventLoop.makePromise()

            // This promise is fulfilled when we're done writing the big write, ie. all is done.
            let finishedBigWritePromise: EventLoopPromise<Void> = sender.eventLoop.makePromise()

            let chunkSize = 1024

            // We need to not read automatically from the receiving end to be able to force writability notifications
            // for the sender.
            XCTAssertNoThrow(try receiver.setOption(ChannelOptions.autoRead, value: false).wait())

            XCTAssertNoThrow(try receiver.pipeline.addHandler(ReadChunksUntilWeSee1Handler()).wait())

            XCTAssertNoThrow(try sender.pipeline.addHandler(WriteWhenChannelBecomesWritableAgain(beganBigWritePromise: beganBigWritePromise,
                                                                                                 finishedBigWritePromise: finishedBigWritePromise)).wait())
            XCTAssertNoThrow(try sender.pipeline.addHandler(FailOnError()).wait())
            XCTAssertNoThrow(try receiver.pipeline.addHandler(FailOnError()).wait())

            XCTAssertNoThrow(try sender.pipeline.addHandler(WriteUntilWriteDoesNotCompletelyInstantlyHandler(chunkSize: chunkSize,
                                                                                                             wroteEnoughToBeStuckPromise: wroteEnoughToBeStuckPromise),
                                                            position: .first).wait())
            var howManyBytes: Int? = nil

            XCTAssertNoThrow(howManyBytes = try wroteEnoughToBeStuckPromise.futureResult.wait())
            guard let bytes = howManyBytes else {
                XCTFail("couldn't determine how much was written.")
                return
            }

            // Let's prepare the receiver's allocator to allocate exactly the right amount of bytes :), ...
            XCTAssertNoThrow(try receiver.setOption(ChannelOptions.recvAllocator,
                                                    value: FixedSizeRecvByteBufferAllocator(capacity: bytes)).wait())

            // ... wait for the sender to not send any more, and ...
            XCTAssertNoThrow(try wroteEnoughToBeStuckPromise.futureResult.wait())

            // ... make the receiver read.
            receiver.read()

            // Now, we wait until the big write has been enqueued, that's when we should enter the main stage of this
            // test.
            XCTAssertNoThrow(try beganBigWritePromise.futureResult.wait())

            // We now just set autoRead to true and let the receiver receive everything to tear everthing down.
            XCTAssertNoThrow(try receiver.setOption(ChannelOptions.autoRead, value: true).wait())

            XCTAssertNoThrow(try finishedBigWritePromise.futureResult.wait())
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }

    func testCloseInReEntrantFlushNowCall() {
        func runTest(receiver: Channel, sender: Channel) throws {
            final class CloseInWritabilityChanged: ChannelInboundHandler {
                typealias InboundIn = Never
                typealias OutboundOut = ByteBuffer

                private let amount: Int
                private var numberOfCalls = 0

                init(amount: Int) {
                    self.amount = amount
                }

                func channelWritabilityChanged(context: ChannelHandlerContext) {
                    self.numberOfCalls += 1
                    switch self.numberOfCalls {
                    case 1:
                        XCTAssertFalse(context.channel.isWritable) // because we sent more than high water
                    case 2:
                        XCTAssertTrue(context.channel.isWritable) // but actually only 2 bytes

                        // Let's send another 2 bytes, ...
                        var buffer = context.channel.allocator.buffer(capacity: amount)
                        buffer.writeBytes(Array(repeating: UInt8(ascii: "X"), count: amount))
                        context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)

                        // ... and let's close
                        context.close(promise: nil)
                    case 3:
                        XCTAssertFalse(context.channel.isWritable) // 2 bytes > high water
                    default:
                        XCTFail("\(self.numberOfCalls) calls to \(#function) are unexpected")
                    }
                }
            }

            let amount = 2
            XCTAssertNoThrow(try sender.pipeline.addHandler(CloseInWritabilityChanged(amount: amount)).wait())
            XCTAssertNoThrow(try sender.setOption(ChannelOptions.writeBufferWaterMark,
                                                  value: .init(low: amount - 1,
                                                               high: amount - 1)).wait())
            var buffer = sender.allocator.buffer(capacity: amount)
            buffer.writeBytes(Array(repeating: UInt8(ascii: "X"), count: amount))
            XCTAssertNoThrow(try sender.writeAndFlush(buffer).wait())
        }
        XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
    }
}

final class AccumulateAllReads: ChannelInboundHandler {
    typealias InboundIn =  ByteBuffer

    var accumulator: ByteBuffer!
    let allDonePromise: EventLoopPromise<ByteBuffer>

    init(allDonePromise: EventLoopPromise<ByteBuffer>) {
        self.allDonePromise = allDonePromise
    }

    func handlerAdded(context: ChannelHandlerContext) {
        self.accumulator = context.channel.allocator.buffer(capacity: 1024)
    }

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        var buffer = self.unwrapInboundIn(data)
        let closeAfter = buffer.readableBytesView.last == UInt8(ascii: "$")
        self.accumulator.writeBuffer(&buffer)
        if closeAfter {
            context.close(promise: nil)
        }
    }

    func channelInactive(context: ChannelHandlerContext) {
        self.allDonePromise.succeed(self.accumulator)
        self.accumulator = nil
    }
}

private func assertNoSelectorChanges(fd: CInt, selector: NIO.Selector<NIORegistration>, file: StaticString = #file, line: UInt = #line) throws {
    struct UnexpectedSelectorChanges: Error, CustomStringConvertible {
        let description: String
    }

    #if os(macOS) || os(iOS) || os(watchOS) || os(tvOS) || os(FreeBSD)
    var ev: kevent = .init()
    var nothing: timespec = .init()
    let numberOfEvents = try KQueue.kevent(kq: fd, changelist: nil, nchanges: 0, eventlist: &ev, nevents: 1, timeout: &nothing)
    guard numberOfEvents == 0 else {
        throw UnexpectedSelectorChanges(description: "\(ev)")
    }
    #elseif os(Linux) || os(Android)
    #if !SWIFTNIO_USE_IO_URING
    var ev = Epoll.epoll_event()
    let numberOfEvents = try Epoll.epoll_wait(epfd: fd, events: &ev, maxevents: 1, timeout: 0)
    guard numberOfEvents == 0 else {
        throw UnexpectedSelectorChanges(description: "\(ev) [userdata: \(EPollUserData(rawValue: ev.data.u64))]")
    }
    #else
    let events: UnsafeMutablePointer<URingEvent> = UnsafeMutablePointer.allocate(capacity: 1)
    events.initialize(to: URingEvent())
    let numberOfEvents = try selector.ring.io_uring_wait_cqe_timeout(events: events, maxevents: 1, timeout: TimeAmount.seconds(0))
    events.deinitialize(count: 1)
    events.deallocate()
    guard numberOfEvents == 0 else {
        throw UnexpectedSelectorChanges(description: "\(selector)")
    }
    #endif
    #else
    #warning("assertNoSelectorChanges unsupported on this OS.")
    #endif
}