File: SelectorTest.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (435 lines) | stat: -rw-r--r-- 23,326 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import NIO
import NIOConcurrencyHelpers
import XCTest

class SelectorTest: XCTestCase {

    func testDeregisterWhileProcessingEvents() throws {
        try assertDeregisterWhileProcessingEvents(closeAfterDeregister: false)
    }

    func testDeregisterAndCloseWhileProcessingEvents() throws {
        try assertDeregisterWhileProcessingEvents(closeAfterDeregister: true)
    }

    private func assertDeregisterWhileProcessingEvents(closeAfterDeregister: Bool) throws {
        struct TestRegistration: Registration {

            let socket: Socket
            var interested: SelectorEventSet
            var registrationID: SelectorRegistrationID
        }

        let selector = try NIO.Selector<TestRegistration>()
        defer {
            XCTAssertNoThrow(try selector.close())
        }

        let socket1 = try Socket(protocolFamily: .inet, type: .stream)
        defer {
            if socket1.isOpen {
                XCTAssertNoThrow(try socket1.close())
            }
        }
        try socket1.setNonBlocking()

        let socket2 = try Socket(protocolFamily: .inet, type: .stream)
        defer {
            if socket2.isOpen {
                XCTAssertNoThrow(try socket2.close())
            }
        }
        try socket2.setNonBlocking()

        let serverSocket = try assertNoThrowWithValue(ServerSocket.bootstrap(protocolFamily: .inet,
                                                                             host: "127.0.0.1",
                                                                             port: 0))
        defer {
            XCTAssertNoThrow(try serverSocket.close())
        }
        _ = try socket1.connect(to: serverSocket.localAddress())
        _ = try socket2.connect(to: serverSocket.localAddress())

        let accepted1 = try serverSocket.accept()!
        defer {
            XCTAssertNoThrow(try accepted1.close())
        }
        let accepted2 = try serverSocket.accept()!
        defer {
            XCTAssertNoThrow(try accepted2.close())
        }

        // Register both sockets with .write. This will ensure both are ready when calling selector.whenReady.
        try selector.register(selectable: socket1 , interested: [.reset, .write], makeRegistration: { ev, regID in
            return TestRegistration(socket: socket1, interested: ev, registrationID: regID)
        })

        try selector.register(selectable: socket2 , interested: [.reset, .write], makeRegistration: { ev, regID in
            return TestRegistration(socket: socket2, interested: ev, registrationID: regID)
        })

        var readyCount = 0
        try selector.whenReady(strategy: .block, onLoopBegin: { }) { ev in
            readyCount += 1
            if socket1 === ev.registration.socket {
                try selector.deregister(selectable: socket2)
                if closeAfterDeregister {
                    try socket2.close()
                }
            } else if socket2 === ev.registration.socket {
                try selector.deregister(selectable: socket1)
                if closeAfterDeregister {
                    try socket1.close()
                }
            } else {
                XCTFail("ev.registration.socket was neither \(socket1) or \(socket2) but \(ev.registration.socket)")
            }
        }
        XCTAssertEqual(1, readyCount)
    }

    private static let testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse = 10
    func testWeDoNotDeliverEventsForPreviouslyClosedChannels() {
        /// We use this class to box mutable values, generally in this test anything boxed should only be read/written
        /// on the event loop `el`.
        class Box<T> {
            init(_ value: T) {
                self._value = value
            }
            private var _value: T
            var value: T {
                get {
                    XCTAssertNotNil(MultiThreadedEventLoopGroup.currentEventLoop)
                    return self._value
                }
                set {
                    XCTAssertNotNil(MultiThreadedEventLoopGroup.currentEventLoop)
                    self._value = newValue
                }
            }
        }
        enum DidNotReadError: Error {
            case didNotReadGotInactive
            case didNotReadGotReadComplete
        }

        /// This handler is inserted in the `ChannelPipeline` that are re-connected. So we're closing a bunch of
        /// channels and (in the same event loop tick) we then connect the same number for which I'm using the
        /// terminology 're-connect' here.
        /// These re-connected channels will re-use the fd numbers of the just closed channels. The interesting thing
        /// is that the `Selector` will still have events buffered for the _closed fds_. Note: the re-connected ones
        /// will end up using the _same_ fds and this test ensures that we're not getting the outdated events. In this
        /// case the outdated events are all `.readEOF`s which manifest as `channelReadComplete`s. If we're delivering
        /// outdated events, they will also happen in the _same event loop tick_ and therefore we do quite a few
        /// assertions that we're either in or not in that interesting event loop tick.
        class HappyWhenReadHandler: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer

            private let didReadPromise: EventLoopPromise<Void>
            private let hasReConnectEventLoopTickFinished: Box<Bool>
            private var didRead: Bool = false

            init(hasReConnectEventLoopTickFinished: Box<Bool>, didReadPromise: EventLoopPromise<Void>) {
                self.didReadPromise = didReadPromise
                self.hasReConnectEventLoopTickFinished = hasReConnectEventLoopTickFinished
            }

            func channelActive(context: ChannelHandlerContext) {
                // we expect these channels to be connected within the re-connect event loop tick
                XCTAssertFalse(self.hasReConnectEventLoopTickFinished.value)
            }

            func channelInactive(context: ChannelHandlerContext) {
                // we expect these channels to be close a while after the re-connect event loop tick
                XCTAssertTrue(self.hasReConnectEventLoopTickFinished.value)
                XCTAssertTrue(self.didRead)
                if !self.didRead {
                    self.didReadPromise.fail(DidNotReadError.didNotReadGotInactive)
                    context.close(promise: nil)
                }
            }

            func channelRead(context: ChannelHandlerContext, data: NIOAny) {
                // we expect these channels to get data only a while after the re-connect event loop tick as it's
                // impossible to get a read notification in the very same event loop tick that you got registered
                XCTAssertTrue(self.hasReConnectEventLoopTickFinished.value)

                XCTAssertFalse(self.didRead)
                var buf = self.unwrapInboundIn(data)
                XCTAssertEqual(1, buf.readableBytes)
                XCTAssertEqual("H", buf.readString(length: 1)!)
                self.didRead = true
                self.didReadPromise.succeed(())
            }

            func channelReadComplete(context: ChannelHandlerContext) {
                // we expect these channels to get data only a while after the re-connect event loop tick as it's
                // impossible to get a read notification in the very same event loop tick that you got registered
                XCTAssertTrue(self.hasReConnectEventLoopTickFinished.value)
                XCTAssertTrue(self.didRead)
                if !self.didRead {
                    self.didReadPromise.fail(DidNotReadError.didNotReadGotReadComplete)
                    context.close(promise: nil)
                }
            }
        }

        /// This handler will wait for all client channels to have come up and for one of them to have received EOF.
        /// (We will see the EOF as they're set to support half-closure). Then, it'll close half of those file
        /// descriptors and open the same number of new ones. The new ones (called re-connected) will share the same
        /// fd numbers as the recently closed ones. That brings us in an interesting situation: There will (very likely)
        /// be `.readEOF` events enqueued for the just closed ones and because the re-connected channels share the same
        /// fd numbers danger looms. The `HappyWhenReadHandler` above makes sure nothing bad happens.
        class CloseEveryOtherAndOpenNewOnesHandler: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer

            private let allChannels: Box<[Channel]>
            private let serverAddress: SocketAddress
            private let everythingWasReadPromise: EventLoopPromise<Void>
            private let hasReConnectEventLoopTickFinished: Box<Bool>

            init(allChannels: Box<[Channel]>,
                 hasReConnectEventLoopTickFinished: Box<Bool>,
                 serverAddress: SocketAddress,
                 everythingWasReadPromise: EventLoopPromise<Void>) {
                self.allChannels = allChannels
                self.serverAddress = serverAddress
                self.everythingWasReadPromise = everythingWasReadPromise
                self.hasReConnectEventLoopTickFinished = hasReConnectEventLoopTickFinished
            }

            func channelActive(context: ChannelHandlerContext) {
                // collect all the channels
                context.channel.getOption(ChannelOptions.allowRemoteHalfClosure).whenSuccess { halfClosureAllowed in
                    precondition(halfClosureAllowed,
                                 "the test configuration is bogus: half-closure is dis-allowed which breaks the setup of this test")
                }
                self.allChannels.value.append(context.channel)
            }

            func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
                // this is the `.readEOF` that is triggered by the `ServerHandler`'s `close` calls because our channel
                // supports half-closure
                guard self.allChannels.value.count == SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse else {
                    return
                }
                // all channels are up, so let's construct the situation we want to be in:
                // 1. let's close half the channels
                // 2. then re-connect (must be synchronous) the same number of channels and we'll get fd number re-use

                context.channel.eventLoop.execute {
                    // this will be run immediately after we processed all `Selector` events so when
                    // `self.hasReConnectEventLoopTickFinished.value` becomes true, we're out of the event loop
                    // tick that is interesting.
                    XCTAssertFalse(self.hasReConnectEventLoopTickFinished.value)
                    self.hasReConnectEventLoopTickFinished.value = true
                }
                XCTAssertFalse(self.hasReConnectEventLoopTickFinished.value)

                let everyOtherIndex = stride(from: 0, to: SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse, by: 2)
                for f in everyOtherIndex {
                    XCTAssertTrue(self.allChannels.value[f].isActive)
                    // close will succeed synchronously as we're on the right event loop.
                    self.allChannels.value[f].close(promise: nil)
                    XCTAssertFalse(self.allChannels.value[f].isActive)
                }

                // now we have completed stage 1: we freed up a bunch of file descriptor numbers, so let's open
                // some new ones
                var reconnectedChannelsHaveRead: [EventLoopFuture<Void>] = []
                for _ in everyOtherIndex {
                    var hasBeenAdded: Bool = false
                    let p = context.channel.eventLoop.makePromise(of: Void.self)
                    reconnectedChannelsHaveRead.append(p.futureResult)
                    let newChannel = ClientBootstrap(group: context.eventLoop)
                        .channelInitializer { channel in
                            channel.pipeline.addHandler(HappyWhenReadHandler(hasReConnectEventLoopTickFinished: self.hasReConnectEventLoopTickFinished,
                                                                               didReadPromise: p)).map {
                                                                                hasBeenAdded = true
                            }
                        }
                        .connect(to: self.serverAddress)
                        .map { (channel: Channel) -> Void in
                            XCTAssertFalse(self.hasReConnectEventLoopTickFinished.value,
                                           """
                                               This is bad: the connect of the channels to be re-connected has not
                                               completed synchronously.
                                               We assumed that on all platform a UNIX Domain Socket connect is
                                               synchronous but we must be wrong :(.
                                               The good news is: Not everything is lost, this test should also work
                                               if you instead open a regular file (in O_RDWR) and just use this file's
                                               fd with `ClientBootstrap(group: group).withConnectedSocket(fileFD)`.
                                               Sure, a file is not a socket but it's always readable and writable and
                                               that fulfills the requirements we have here.
                                               I still hope this change will never have to be done.
                                               Note: if you changed anything about the pipeline's handler adding/removal
                                               you might also have a bug there.
                                               """)
                    }
                    // just to make sure we got `newChannel` synchronously and we could add our handler to the
                    // pipeline synchronously too.
                    XCTAssertTrue(newChannel.isFulfilled)
                    XCTAssertTrue(hasBeenAdded)
                }

                // if all the new re-connected channels have read, then we're happy here.
                EventLoopFuture.andAllSucceed(reconnectedChannelsHaveRead, on: context.eventLoop)
                    .cascade(to: self.everythingWasReadPromise)
                // let's also remove all the channels so this code will not be triggered again.
                self.allChannels.value.removeAll()
            }

        }

        // all of the following are boxed as we need mutable references to them, they can only be read/written on the
        // event loop `el`.
        let allServerChannels: Box<[Channel]> = Box([])
        let allChannels: Box<[Channel]> = Box([])
        let hasReConnectEventLoopTickFinished: Box<Bool> = Box(false)
        let numberOfConnectedChannels: Box<Int> = Box(0)

        /// This spawns a server, always send a character immediately and after the first
        /// `SelectorTest.numberOfChannelsToUse` have been established, we'll close them all. That will trigger
        /// an `.readEOF` in the connected client channels which will then trigger other interesting things (see above).
        class ServerHandler: ChannelInboundHandler {
            typealias InboundIn = ByteBuffer

            private var number: Int = 0
            private let allServerChannels: Box<[Channel]>
            private let numberOfConnectedChannels: Box<Int>

            init(allServerChannels: Box<[Channel]>, numberOfConnectedChannels: Box<Int>) {
                self.allServerChannels = allServerChannels
                self.numberOfConnectedChannels = numberOfConnectedChannels
            }

            func channelActive(context: ChannelHandlerContext) {
                var buf = context.channel.allocator.buffer(capacity: 1)
                buf.writeString("H")
                context.channel.writeAndFlush(buf, promise: nil)
                self.number += 1
                self.allServerChannels.value.append(context.channel)
                if self.allServerChannels.value.count == SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse {
                    // just to be sure all of the client channels have connected
                    XCTAssertEqual(SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse, numberOfConnectedChannels.value)
                    self.allServerChannels.value.forEach { c in
                        c.close(promise: nil)
                    }
                }
            }
        }
        let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1)
        let el = elg.next()
        defer {
            XCTAssertNoThrow(try elg.syncShutdownGracefully())
        }
        XCTAssertNoThrow(try withTemporaryUnixDomainSocketPathName { udsPath in
            let secondServerChannel = try! ServerBootstrap(group: el)
                .childChannelInitializer { channel in
                    channel.pipeline.addHandler(ServerHandler(allServerChannels: allServerChannels,
                                                              numberOfConnectedChannels: numberOfConnectedChannels))
                }
                .bind(to: SocketAddress(unixDomainSocketPath: udsPath))
                .wait()

            let everythingWasReadPromise = el.makePromise(of: Void.self)
            XCTAssertNoThrow(try el.submit { () -> [EventLoopFuture<Channel>] in
                (0..<SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse).map { (_: Int) in
                    ClientBootstrap(group: el)
                        .channelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
                        .channelInitializer { channel in
                            channel.pipeline.addHandler(CloseEveryOtherAndOpenNewOnesHandler(allChannels: allChannels,
                                                                                             hasReConnectEventLoopTickFinished: hasReConnectEventLoopTickFinished,
                                                                                             serverAddress: secondServerChannel.localAddress!,
                                                                                             everythingWasReadPromise: everythingWasReadPromise))
                        }
                        .connect(to: secondServerChannel.localAddress!)
                        .map { channel in
                            numberOfConnectedChannels.value += 1
                            return channel
                    }
                }
                }.wait().forEach { XCTAssertNoThrow(try $0.wait()) } as Void)
            XCTAssertNoThrow(try everythingWasReadPromise.futureResult.wait())
        })
    }

    func testTimerFDIsLevelTriggered() throws {
        // this is a regression test for https://github.com/apple/swift-nio/issues/872
        let delayToUseInMicroSeconds: Int64 = 100_000 // needs to be much greater than time it takes to EL.execute

        let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
        defer {
            XCTAssertNoThrow(try group.syncShutdownGracefully())
        }
        class FakeSocket: Socket {
            private let hasBeenClosedPromise: EventLoopPromise<Void>
            init(hasBeenClosedPromise: EventLoopPromise<Void>, socket: NIOBSDSocket.Handle) throws {
                self.hasBeenClosedPromise = hasBeenClosedPromise
                try super.init(socket: socket)
            }
            override func close() throws {
                self.hasBeenClosedPromise.succeed(())
                try super.close()
            }
        }
        var socketFDs: [CInt] = [-1, -1]
        XCTAssertNoThrow(try Posix.socketpair(domain: .local,
                                              type: .stream,
                                              protocol: 0,
                                              socketVector: &socketFDs))

        let numberFires = NIOAtomic<Int>.makeAtomic(value: 0)
        let el = group.next() as! SelectableEventLoop
        let channelHasBeenClosedPromise = el.makePromise(of: Void.self)
        let channel = try SocketChannel(socket: FakeSocket(hasBeenClosedPromise: channelHasBeenClosedPromise,
                                                           socket: socketFDs[0]), eventLoop: el)
        let sched = el.scheduleRepeatedTask(initialDelay: .microseconds(delayToUseInMicroSeconds),
                                            delay: .microseconds(delayToUseInMicroSeconds)) { (_: RepeatedTask) in
            _ = numberFires.add(1)
        }
        XCTAssertNoThrow(try el.submit {
            // EL tick 1: this is used to
            //   - actually arm the timer (timerfd_settime)
            //   - set the channel restration up
            if numberFires.load() > 0 {
                print("WARNING: This test hit a race and this result doesn't mean it actually worked." +
                      " This should really only ever happen in very bizarre conditions.")
            }
            channel.interestedEvent = [.readEOF, .reset]
            func workaroundSR9815() {
                channel.registerAlreadyConfigured0(promise: nil)
            }
            workaroundSR9815()
        }.wait())
        usleep(10_000) // this makes this repro very stable
        el.execute {
            // EL tick 2: this is used to
            //   - close one end of the socketpair so that in EL tick 3, we'll see a EPOLLHUP
            //   - sleep `delayToUseInMicroSeconds + 10` so in EL tick 3, we'll also see timerfd fire
            close(socketFDs[1])
            usleep(.init(delayToUseInMicroSeconds))
        }

        // EL tick 3: happens in the background here. We will likely lose the timer signal because of the
        // `deregistrationsHappened` workaround in `Selector.swift` and we expect to pick it up again when we enter
        // `epoll_wait`/`kevent` next. This however only works if the timer event is level triggered.
        assert(numberFires.load() > 5, within: .seconds(1), "timer only fired \(numberFires.load()) times")
        sched.cancel()
        XCTAssertNoThrow(try channelHasBeenClosedPromise.futureResult.wait())
    }
}