File: BaseStreamSocketChannel.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (256 lines) | stat: -rw-r--r-- 10,471 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket> {
    internal var connectTimeoutScheduled: Optional<Scheduled<Void>>
    private var allowRemoteHalfClosure: Bool = false
    private var inputShutdown: Bool = false
    private var outputShutdown: Bool = false
    private let pendingWrites: PendingStreamWritesManager

    override init(socket: Socket,
                  parent: Channel?,
                  eventLoop: SelectableEventLoop,
                  recvAllocator: RecvByteBufferAllocator) throws {
        self.pendingWrites = PendingStreamWritesManager(iovecs: eventLoop.iovecs, storageRefs: eventLoop.storageRefs)
        self.connectTimeoutScheduled = nil
        try super.init(socket: socket, parent: parent, eventLoop: eventLoop, recvAllocator: recvAllocator)
    }

    deinit {
        // We should never have any pending writes left as otherwise we may leak callbacks
        assert(self.pendingWrites.isEmpty)
    }

    // MARK: BaseSocketChannel's must override API that might be further refined by subclasses
    override func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
        self.eventLoop.assertInEventLoop()

        guard self.isOpen else {
            throw ChannelError.ioOnClosedChannel
        }

        switch option {
        case _ as ChannelOptions.Types.AllowRemoteHalfClosureOption:
            self.allowRemoteHalfClosure = value as! Bool
        case _ as ChannelOptions.Types.WriteSpinOption:
            self.pendingWrites.writeSpinCount = value as! UInt
        case _ as ChannelOptions.Types.WriteBufferWaterMarkOption:
            self.pendingWrites.waterMark = value as! ChannelOptions.Types.WriteBufferWaterMark
        default:
            try super.setOption0(option, value: value)
        }
    }

    override func getOption0<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
        self.eventLoop.assertInEventLoop()

        guard self.isOpen else {
            throw ChannelError.ioOnClosedChannel
        }

        switch option {
        case _ as ChannelOptions.Types.AllowRemoteHalfClosureOption:
            return self.allowRemoteHalfClosure as! Option.Value
        case _ as ChannelOptions.Types.WriteSpinOption:
            return self.pendingWrites.writeSpinCount as! Option.Value
        case _ as ChannelOptions.Types.WriteBufferWaterMarkOption:
            return self.pendingWrites.waterMark as! Option.Value
        default:
            return try super.getOption0(option)
        }
    }

    // Hook for customizable socket shutdown processing for subclasses, e.g. PipeChannel
    func shutdownSocket(mode: CloseMode) throws {
        switch mode {
        case .output:
            try self.socket.shutdown(how: .WR)
            self.outputShutdown = true
        case .input:
            try socket.shutdown(how: .RD)
            self.inputShutdown = true
        case .all:
            break
        }
    }

    // MARK: BaseSocketChannel's must override API that cannot be further refined by subclasses
    // This is `Channel` API so must be thread-safe.
    final override public var isWritable: Bool {
        return self.pendingWrites.isWritable
    }

    final override var isOpen: Bool {
        self.eventLoop.assertInEventLoop()
        assert(super.isOpen == self.pendingWrites.isOpen)
        return super.isOpen
    }

    final override func readFromSocket() throws -> ReadResult {
        self.eventLoop.assertInEventLoop()
        // Just allocate one time for the while read loop. This is fine as ByteBuffer is a struct and uses COW.
        var buffer = self.recvAllocator.buffer(allocator: allocator)
        var result = ReadResult.none
        for i in 1...self.maxMessagesPerRead {
            guard self.isOpen && !self.inputShutdown else {
                throw ChannelError.eof
            }
            // Reset reader and writerIndex and so allow to have the buffer filled again. This is better here than at
            // the end of the loop to not do an allocation when the loop exits.
            buffer.clear()
            switch try buffer.withMutableWritePointer(body: { try self.socket.read(pointer: $0) }) {
            case .processed(let bytesRead):
                if bytesRead > 0 {
                    let mayGrow = recvAllocator.record(actualReadBytes: bytesRead)

                    self.readPending = false

                    assert(self.isActive)
                    self.pipeline.fireChannelRead0(NIOAny(buffer))
                    result = .some

                    if buffer.writableBytes > 0 {
                        // If we did not fill the whole buffer with read(...) we should stop reading and wait until we get notified again.
                        // Otherwise chances are good that the next read(...) call will either read nothing or only a very small amount of data.
                        // Also this will allow us to call fireChannelReadComplete() which may give the user the chance to flush out all pending
                        // writes.
                        return result
                    } else if mayGrow && i < self.maxMessagesPerRead {
                        // if the ByteBuffer may grow on the next allocation due we used all the writable bytes we should allocate a new `ByteBuffer` to allow ramping up how much data
                        // we are able to read on the next read operation.
                        buffer = self.recvAllocator.buffer(allocator: allocator)
                    }
                } else {
                    if self.inputShutdown {
                        // We received a EOF because we called shutdown on the fd by ourself, unregister from the Selector and return
                        self.readPending = false
                        self.unregisterForReadable()
                        return result
                    }
                    // end-of-file
                    throw ChannelError.eof
                }
            case .wouldBlock(let bytesRead):
                assert(bytesRead == 0)
                return result
            }
        }
        return result
    }

    final override func writeToSocket() throws -> OverallWriteResult {
        let result = try self.pendingWrites.triggerAppropriateWriteOperations(scalarBufferWriteOperation: { ptr in
            guard ptr.count > 0 else {
                // No need to call write if the buffer is empty.
                return .processed(0)
            }
            // normal write
            return try self.socket.write(pointer: ptr)
        }, vectorBufferWriteOperation: { ptrs in
            // Gathering write
            try self.socket.writev(iovecs: ptrs)
        }, scalarFileWriteOperation: { descriptor, index, endIndex in
            try self.socket.sendFile(fd: descriptor, offset: index, count: endIndex - index)
        })
        return result
    }

    final override func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
        do {
            switch mode {
            case .output:
                if self.outputShutdown {
                    promise?.fail(ChannelError.outputClosed)
                    return
                }
                try self.shutdownSocket(mode: mode)
                // Fail all pending writes and so ensure all pending promises are notified
                self.pendingWrites.failAll(error: error, close: false)
                self.unregisterForWritable()
                promise?.succeed(())

                self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
            case .input:
                if self.inputShutdown {
                    promise?.fail(ChannelError.inputClosed)
                    return
                }
                switch error {
                case ChannelError.eof:
                    // No need to explicit call socket.shutdown(...) as we received an EOF and the call would only cause
                    // ENOTCON
                    self.inputShutdown = true
                    break
                default:
                    try self.shutdownSocket(mode: mode)
                }
                self.unregisterForReadable()
                promise?.succeed(())

                self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
            case .all:
                if let timeout = self.connectTimeoutScheduled {
                    self.connectTimeoutScheduled = nil
                    timeout.cancel()
                }
                super.close0(error: error, mode: mode, promise: promise)
            }
        } catch let err {
            promise?.fail(err)
        }
    }

    final override func hasFlushedPendingWrites() -> Bool {
        return self.pendingWrites.isFlushPending
    }

    final override func markFlushPoint() {
        // Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
        // are actually written once writable() is called.
        self.pendingWrites.markFlushCheckpoint()
    }

    final override func cancelWritesOnClose(error: Error) {
        self.pendingWrites.failAll(error: error, close: true)
    }

    @discardableResult
    final override func readIfNeeded0() -> Bool {
        if self.inputShutdown {
            return false
        }
        return super.readIfNeeded0()
    }

    final override public func read0() {
        if self.inputShutdown {
            return
        }
        super.read0()
    }

    final override func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
        if self.outputShutdown {
            promise?.fail(ChannelError.outputClosed)
            return
        }

        let data = data.forceAsIOData()

        if !self.pendingWrites.add(data: data, promise: promise) {
            self.pipeline.fireChannelWritabilityChanged0()
        }
    }
}