1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import XCTest
@testable import NIO
import NIOConcurrencyHelpers
public final class AcceptBackoffHandlerTest: XCTestCase {
private let acceptHandlerName = "AcceptBackoffHandler"
public func testECONNABORTED() throws {
try assertBackoffRead(error: ECONNABORTED)
}
public func testEMFILE() throws {
try assertBackoffRead(error: EMFILE)
}
public func testENFILE() throws {
try assertBackoffRead(error: ENFILE)
}
public func testENOBUFS() throws {
try assertBackoffRead(error: ENOBUFS)
}
public func testENOMEM() throws {
try assertBackoffRead(error: ENOMEM)
}
private func assertBackoffRead(error: Int32) throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let readCountHandler = ReadCountHandler()
let serverChannel = try setupChannel(group: group,
readCountHandler: readCountHandler,
backoffProvider: { _ in return .milliseconds(100) },
errors: [error])
XCTAssertEqual(0, try serverChannel.eventLoop.submit {
serverChannel.readable()
serverChannel.read()
return readCountHandler.readCount
}.wait())
// Inspect the read count after our scheduled backoff elapsed.
XCTAssertEqual(1, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(100)) {
return readCountHandler.readCount
}.futureResult.wait())
// The read should go through as the scheduled read happened
XCTAssertEqual(2, try serverChannel.eventLoop.submit {
serverChannel.read()
return readCountHandler.readCount
}.wait())
XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())
}
public func testRemovalTriggerReadWhenPreviousReadScheduled() throws {
try assertRemoval(read: true)
}
public func testRemovalTriggerNoReadWhenPreviousNoReadScheduled() throws {
try assertRemoval(read: false)
}
private func assertRemoval(read: Bool) throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let readCountHandler = ReadCountHandler()
let serverChannel = try setupChannel(group: group, readCountHandler: readCountHandler, backoffProvider: { err in
return .hours(1)
}, errors: [ENFILE])
XCTAssertEqual(0, try serverChannel.eventLoop.submit {
serverChannel.readable()
if read {
serverChannel.read()
}
return readCountHandler.readCount
}.wait())
XCTAssertNoThrow(try serverChannel.pipeline.removeHandler(name: acceptHandlerName).wait())
if read {
// Removal should have triggered a read.
XCTAssertEqual(1, try serverChannel.eventLoop.submit {
return readCountHandler.readCount
}.wait())
} else {
// Removal should have triggered no read.
XCTAssertEqual(0, try serverChannel.eventLoop.submit {
return readCountHandler.readCount
}.wait())
}
XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())
}
public func testNotScheduleReadIfAlreadyScheduled() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let readCountHandler = ReadCountHandler()
let serverChannel = try setupChannel(group: group, readCountHandler: readCountHandler, backoffProvider: { err in
return .milliseconds(10)
}, errors: [ENFILE])
XCTAssertEqual(0, try serverChannel.eventLoop.submit {
serverChannel.readable()
serverChannel.read()
serverChannel.read()
return readCountHandler.readCount
}.wait())
// Inspect the read count after our scheduled backoff elapsed multiple times. This should still only have triggered one read as we should only ever
// schedule one read.
XCTAssertEqual(1, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(500)) {
return readCountHandler.readCount
}.futureResult.wait())
// The read should go through as the scheduled read happened
XCTAssertEqual(2, try serverChannel.eventLoop.submit {
serverChannel.read()
return readCountHandler.readCount
}.wait())
XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())
}
public func testChannelInactiveCancelScheduled() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
class InactiveVerificationHandler: ChannelInboundHandler {
typealias InboundIn = Any
private let promise: EventLoopPromise<Void>
init(promise: EventLoopPromise<Void>) {
self.promise = promise
}
public func channelInactive(context: ChannelHandlerContext) {
promise.succeed(())
}
func waitForInactive() throws {
try promise.futureResult.wait()
}
}
let readCountHandler = ReadCountHandler()
let serverChannel = try setupChannel(group: group, readCountHandler: readCountHandler, backoffProvider: { err in
return .milliseconds(10)
}, errors: [ENFILE])
let inactiveVerificationHandler = InactiveVerificationHandler(promise: serverChannel.eventLoop.makePromise())
XCTAssertNoThrow(try serverChannel.pipeline.addHandler(inactiveVerificationHandler).wait())
XCTAssertEqual(0, try serverChannel.eventLoop.submit {
serverChannel.readable()
serverChannel.read()
// Close the channel, this should also take care of cancel the scheduled read.
serverChannel.close(promise: nil)
return readCountHandler.readCount
}.wait())
// Inspect the read count after our scheduled backoff elapsed multiple times. This should have triggered no read as the channel was closed.
XCTAssertEqual(0, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(500)) {
return readCountHandler.readCount
}.futureResult.wait())
XCTAssertNoThrow(try inactiveVerificationHandler.waitForInactive())
}
public func testSecondErrorUpdateScheduledRead() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let readCountHandler = ReadCountHandler()
let backoffProviderCalled = NIOAtomic<Int>.makeAtomic(value: 0)
let serverChannel = try setupChannel(group: group, readCountHandler: readCountHandler, backoffProvider: { err in
if backoffProviderCalled.add(1) == 0 {
return .seconds(1)
}
return .seconds(2)
}, errors: [ENFILE, EMFILE])
XCTAssertEqual(0, try serverChannel.eventLoop.submit {
serverChannel.readable()
serverChannel.read()
let readCount = readCountHandler.readCount
// Directly trigger a read again without going through the pipeline. This will allow us to use serverChannel.readable()
serverChannel._channelCore.read0()
serverChannel.readable()
return readCount
}.wait())
// This should have not fired a read yet as we updated the scheduled read because we received two errors.
XCTAssertEqual(0, try serverChannel.eventLoop.scheduleTask(in: .seconds(1)) {
return readCountHandler.readCount
}.futureResult.wait())
// This should have fired now as the updated scheduled read task should have been complete by now
XCTAssertEqual(1, try serverChannel.eventLoop.scheduleTask(in: .seconds(1)) {
return readCountHandler.readCount
}.futureResult.wait())
XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())
XCTAssertEqual(2, backoffProviderCalled.load())
}
private final class ReadCountHandler: ChannelOutboundHandler {
typealias OutboundIn = NIOAny
typealias OutboundOut = NIOAny
var readCount = 0
func read(context: ChannelHandlerContext) {
readCount += 1
context.read()
}
}
private func setupChannel(group: EventLoopGroup,
readCountHandler: ReadCountHandler,
backoffProvider: @escaping (IOError) -> TimeAmount? = AcceptBackoffHandler.defaultBackoffProvider,
errors: [Int32]) throws -> ServerSocketChannel {
let eventLoop = group.next() as! SelectableEventLoop
let socket = try NonAcceptingServerSocket(errors: errors)
let serverChannel = try assertNoThrowWithValue(ServerSocketChannel(serverSocket: socket,
eventLoop: eventLoop,
group: group))
XCTAssertNoThrow(try serverChannel.setOption(ChannelOptions.autoRead, value: false).wait())
XCTAssertNoThrow(try serverChannel.pipeline.addHandler(readCountHandler).flatMap { _ in
serverChannel.pipeline.addHandler(AcceptBackoffHandler(backoffProvider: backoffProvider),
name: self.acceptHandlerName)
}.wait())
XCTAssertNoThrow(try eventLoop.flatSubmit {
// this is pretty delicate at the moment:
// `bind` must be _synchronously_ follow `register`, otherwise in our current implementation, `epoll` will
// send us `EPOLLHUP`. To have it run synchronously, we need to invoke the `flatMap` on the eventloop that the
// `register` will succeed.
serverChannel.register().flatMap { () -> EventLoopFuture<()> in
return serverChannel.bind(to: try! SocketAddress(ipAddress: "127.0.0.1", port: 0))
}
}.wait() as Void)
return serverChannel
}
}
|