1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import XCTest
@testable import NIO
import NIOConcurrencyHelpers
final class SALChannelTest: XCTestCase, SALTest {
var group: MultiThreadedEventLoopGroup!
var kernelToUserBox: LockedBox<KernelToUser>!
var userToKernelBox: LockedBox<UserToKernel>!
var wakeups: LockedBox<()>!
override func setUp() {
self.setUpSAL()
}
override func tearDown() {
self.tearDownSAL()
}
func testBasicConnectedChannel() throws {
let localAddress = try! SocketAddress(ipAddress: "0.1.2.3", port: 4)
let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5)
let buffer = ByteBuffer(string: "xxx")
let channel = try self.makeConnectedSocketChannel(localAddress: localAddress,
remoteAddress: serverAddress)
try channel.eventLoop.runSAL(syscallAssertions: {
try self.assertWrite(expectedFD: .max, expectedBytes: buffer, return: .processed(buffer.readableBytes))
try self.assertWritev(expectedFD: .max, expectedBytes: [buffer, buffer], return: .processed(2 * buffer.readableBytes))
try self.assertDeregister { selectable in
try selectable.withUnsafeHandle {
XCTAssertEqual(.max, $0)
}
return true
}
try self.assertClose(expectedFD: .max)
}) {
channel.writeAndFlush(buffer).flatMap {
channel.write(buffer, promise: nil)
return channel.writeAndFlush(buffer)
}.flatMap {
channel.close()
}
}.salWait()
}
func testWritesFromWritabilityNotificationsDoNotGetLostIfWePreviouslyWroteEverything() {
// This is a unit test, doing what
// testWriteAndFlushFromReentrantFlushNowTriggeredOutOfWritabilityWhereOuterSaysAllWrittenAndInnerDoesNot
// does but in a deterministic way, without having to send actual bytes.
let localAddress = try! SocketAddress(ipAddress: "0.1.2.3", port: 4)
let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5)
var buffer = ByteBuffer(string: "12")
let writableNotificationStepExpectation = NIOAtomic<Int>.makeAtomic(value: 0)
final class DoWriteFromWritabilityChangedNotification: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer
var numberOfCalls = 0
var writableNotificationStepExpectation: NIOAtomic<Int>
init(writableNotificationStepExpectation: NIOAtomic<Int>) {
self.writableNotificationStepExpectation = writableNotificationStepExpectation
}
func channelWritabilityChanged(context: ChannelHandlerContext) {
self.numberOfCalls += 1
XCTAssertEqual(self.writableNotificationStepExpectation.load(),
numberOfCalls)
switch self.numberOfCalls {
case 1:
// First, we should see a `false` here because 2 bytes is above the high watermark.
XCTAssertFalse(context.channel.isWritable)
case 2:
// Then, we should go back to `true` from a `writable` notification. Now, let's write 3 bytes which
// will exhaust the high watermark. We'll also set up (further down) that we only partially write
// those 3 bytes.
XCTAssertTrue(context.channel.isWritable)
var buffer = context.channel.allocator.buffer(capacity: 3)
buffer.writeString("ABC")
// We expect another channelWritabilityChanged notification
XCTAssertTrue(self.writableNotificationStepExpectation.compareAndExchange(expected: 2, desired: 3))
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
case 3:
// Next, we should go to false because we never send all the bytes.
XCTAssertFalse(context.channel.isWritable)
case 4:
// And finally, back to `true` because eventually, we'll write enough.
XCTAssertTrue(context.channel.isWritable)
default:
XCTFail("call \(self.numberOfCalls) unexpected (\(context.channel.isWritable))")
}
}
}
var maybeChannel: SocketChannel? = nil
XCTAssertNoThrow(maybeChannel = try self.makeConnectedSocketChannel(localAddress: localAddress,
remoteAddress: serverAddress))
guard let channel = maybeChannel else {
XCTFail("couldn't construct channel")
return
}
XCTAssertNoThrow(try channel.eventLoop.runSAL(syscallAssertions: {
// We get in a write of 2 bytes, and we claim we wrote 1 bytes of that.
try self.assertWrite(expectedFD: .max, expectedBytes: buffer, return: .processed(1))
// Next, we expect a reregistration which adds the `.write` notification
try self.assertReregister { selectable, eventSet in
XCTAssert(selectable as? Socket === channel.socket)
XCTAssertEqual([.read, .reset, .readEOF, .write], eventSet)
return true
}
// Before sending back the writable notification, we know that that'll trigger a Channel writability change
XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 1, desired: 2))
let writableEvent = SelectorEvent(io: [.write],
registration: NIORegistration(channel: .socketChannel(channel),
interested: [.write],
registrationID: .initialRegistrationID))
try self.assertWaitingForNotification(result: writableEvent)
try self.assertWrite(expectedFD: .max,
expectedBytes: buffer.getSlice(at: 1, length: 1)!,
return: .processed(1))
buffer.clear()
buffer.writeString("ABC") // expected
// This time, the write again, just writes one byte, so we should remain registered for writable.
try self.assertWrite(expectedFD: .max,
expectedBytes: buffer,
return: .processed(1))
buffer.moveReaderIndex(forwardBy: 1)
// Let's send them another 'writable' notification:
try self.assertWaitingForNotification(result: writableEvent)
// This time, we'll make the write write everything which should also lead to a final channelWritability
// change.
XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 3, desired: 4))
try self.assertWrite(expectedFD: .max,
expectedBytes: buffer,
return: .processed(2))
// And lastly, after having written everything, we'd expect to unregister for write
try self.assertReregister { selectable, eventSet in
XCTAssert(selectable as? Socket === channel.socket)
XCTAssertEqual([.read, .reset, .readEOF], eventSet)
return true
}
try self.assertParkedRightNow()
}) { () -> EventLoopFuture<Void> in
channel.setOption(ChannelOptions.writeSpin, value: 0).flatMap {
channel.setOption(ChannelOptions.writeBufferWaterMark, value: .init(low: 1, high: 1))
}.flatMap {
channel.pipeline.addHandler(DoWriteFromWritabilityChangedNotification(writableNotificationStepExpectation: writableNotificationStepExpectation))
}.flatMap {
// This write should cause a Channel writability change.
XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 0, desired: 1))
return channel.writeAndFlush(buffer)
}
}.salWait())
}
func testWeSurviveIfIgnoringSIGPIPEFails() {
// We know this sometimes happens on Darwin, so let's test it.
let expectedError = IOError(errnoCode: EINVAL, reason: "bad")
XCTAssertThrowsError(try self.makeSocketChannelInjectingFailures(disableSIGPIPEFailure: expectedError)) { error in
XCTAssertEqual(expectedError.errnoCode, (error as? IOError)?.errnoCode)
}
}
func testBasicRead() {
let localAddress = try! SocketAddress(ipAddress: "0.1.2.3", port: 4)
let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5)
final class SignalGroupOnRead: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
private let group: DispatchGroup
private var numberOfCalls = 0
init(group: DispatchGroup) {
self.group = group
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
self.numberOfCalls += 1
XCTAssertEqual("hello",
String(decoding: self.unwrapInboundIn(data).readableBytesView, as: Unicode.UTF8.self))
if self.numberOfCalls == 1 {
self.group.leave()
}
}
}
var maybeChannel: SocketChannel? = nil
XCTAssertNoThrow(maybeChannel = try self.makeConnectedSocketChannel(localAddress: localAddress,
remoteAddress: serverAddress))
guard let channel = maybeChannel else {
XCTFail("couldn't construct channel")
return
}
let buffer = ByteBuffer(string: "hello")
let g = DispatchGroup()
g.enter()
XCTAssertNoThrow(try channel.eventLoop.runSAL(syscallAssertions: {
let readEvent = SelectorEvent(io: [.read],
registration: NIORegistration(channel: .socketChannel(channel),
interested: [.read],
registrationID: .initialRegistrationID))
try self.assertWaitingForNotification(result: readEvent)
try self.assertRead(expectedFD: .max, expectedBufferSpace: 2048, return: buffer)
}) {
channel.pipeline.addHandler(SignalGroupOnRead(group: g))
})
g.wait()
}
func testBasicConnectWithClientBootstrap() {
guard let channel = try? self.makeSocketChannel() else {
XCTFail("couldn't make a channel")
return
}
let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5)
let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5)
XCTAssertNoThrow(try channel.eventLoop.runSAL(syscallAssertions: {
try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in
return (value as? SocketOptionValue) == 1
}
try self.assertConnect(expectedAddress: serverAddress, result: true)
try self.assertLocalAddress(address: localAddress)
try self.assertRemoteAddress(address: localAddress)
try self.assertRegister { selectable, event, Registration in
XCTAssertEqual([.reset], event)
return true
}
try self.assertReregister { selectable, event in
XCTAssertEqual([.reset, .readEOF], event)
return true
}
try self.assertDeregister { selectable in
return true
}
try self.assertClose(expectedFD: .max)
}) {
ClientBootstrap(group: channel.eventLoop)
.channelOption(ChannelOptions.autoRead, value: false)
.testOnly_connect(injectedChannel: channel, to: serverAddress)
.flatMap { channel in
channel.close()
}
}.salWait())
}
func testClientBootstrapBindIsDoneAfterSocketOptions() {
guard let channel = try? self.makeSocketChannel() else {
XCTFail("couldn't make a channel")
return
}
let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5)
let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5)
XCTAssertNoThrow(try channel.eventLoop.runSAL(syscallAssertions: {
try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in
return (value as? SocketOptionValue) == 1
}
// This is the important bit: We need to apply the socket options _before_ ...
try self.assertSetOption(expectedLevel: .socket, expectedOption: .so_reuseaddr) { value in
return (value as? SocketOptionValue) == 1
}
// ... we call bind.
try self.assertBind(expectedAddress: localAddress)
try self.assertLocalAddress(address: nil) // this is an inefficiency in `bind0`.
try self.assertConnect(expectedAddress: serverAddress, result: true)
try self.assertLocalAddress(address: localAddress)
try self.assertRemoteAddress(address: localAddress)
try self.assertRegister { selectable, event, Registration in
XCTAssertEqual([.reset], event)
return true
}
try self.assertReregister { selectable, event in
XCTAssertEqual([.reset, .readEOF], event)
return true
}
try self.assertDeregister { selectable in
return true
}
try self.assertClose(expectedFD: .max)
}) {
ClientBootstrap(group: channel.eventLoop)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelOption(ChannelOptions.autoRead, value: false)
.bind(to: localAddress)
.testOnly_connect(injectedChannel: channel, to: serverAddress)
.flatMap { channel in
channel.close()
}
}.salWait())
}
}
|