1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
|
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package poll
import (
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
)
const (
// spliceNonblock makes calls to splice(2) non-blocking.
spliceNonblock = 0x2
// maxSpliceSize is the maximum amount of data Splice asks
// the kernel to move in a single call to splice(2).
maxSpliceSize = 4 << 20
)
// Splice transfers at most remain bytes of data from src to dst, using the
// splice system call to minimize copies of data from and to userspace.
//
// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
// src and dst must both be stream-oriented sockets.
//
// If err != nil, sc is the system call which caused the error.
func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
p, sc, err := getPipe()
if err != nil {
return 0, false, sc, err
}
defer putPipe(p)
var inPipe, n int
for err == nil && remain > 0 {
max := maxSpliceSize
if int64(max) > remain {
max = int(remain)
}
inPipe, err = spliceDrain(p.wfd, src, max)
// The operation is considered handled if splice returns no
// error, or an error other than EINVAL. An EINVAL means the
// kernel does not support splice for the socket type of src.
// The failed syscall does not consume any data so it is safe
// to fall back to a generic copy.
//
// spliceDrain should never return EAGAIN, so if err != nil,
// Splice cannot continue.
//
// If inPipe == 0 && err == nil, src is at EOF, and the
// transfer is complete.
handled = handled || (err != syscall.EINVAL)
if err != nil || inPipe == 0 {
break
}
p.data += inPipe
n, err = splicePump(dst, p.rfd, inPipe)
if n > 0 {
written += int64(n)
remain -= int64(n)
p.data -= n
}
}
if err != nil {
return written, handled, "splice", err
}
return written, true, "", nil
}
// spliceDrain moves data from a socket to a pipe.
//
// Invariant: when entering spliceDrain, the pipe is empty. It is either in its
// initial state, or splicePump has emptied it previously.
//
// Given this, spliceDrain can reasonably assume that the pipe is ready for
// writing, so if splice returns EAGAIN, it must be because the socket is not
// ready for reading.
//
// If spliceDrain returns (0, nil), src is at EOF.
func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
if err := sock.readLock(); err != nil {
return 0, err
}
defer sock.readUnlock()
if err := sock.pd.prepareRead(sock.isFile); err != nil {
return 0, err
}
for {
n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
if err == syscall.EINTR {
continue
}
if err != syscall.EAGAIN {
return n, err
}
if err := sock.pd.waitRead(sock.isFile); err != nil {
return n, err
}
}
}
// splicePump moves all the buffered data from a pipe to a socket.
//
// Invariant: when entering splicePump, there are exactly inPipe
// bytes of data in the pipe, from a previous call to spliceDrain.
//
// By analogy to the condition from spliceDrain, splicePump
// only needs to poll the socket for readiness, if splice returns
// EAGAIN.
//
// If splicePump cannot move all the data in a single call to
// splice(2), it loops over the buffered data until it has written
// all of it to the socket. This behavior is similar to the Write
// step of an io.Copy in userspace.
func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
if err := sock.writeLock(); err != nil {
return 0, err
}
defer sock.writeUnlock()
if err := sock.pd.prepareWrite(sock.isFile); err != nil {
return 0, err
}
written := 0
for inPipe > 0 {
n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
// Here, the condition n == 0 && err == nil should never be
// observed, since Splice controls the write side of the pipe.
if n > 0 {
inPipe -= n
written += n
continue
}
if err != syscall.EAGAIN {
return written, err
}
if err := sock.pd.waitWrite(sock.isFile); err != nil {
return written, err
}
}
return written, nil
}
// splice wraps the splice system call. Since the current implementation
// only uses splice on sockets and pipes, the offset arguments are unused.
// splice returns int instead of int64, because callers never ask it to
// move more data in a single call than can fit in an int32.
func splice(out int, in int, max int, flags int) (int, error) {
n, err := syscall.Splice(in, nil, out, nil, max, flags)
return int(n), err
}
type splicePipeFields struct {
rfd int
wfd int
data int
}
type splicePipe struct {
splicePipeFields
// We want to use a finalizer, so ensure that the size is
// large enough to not use the tiny allocator.
_ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte
}
// splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers.
// The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up
// a finalizer for each pipe to close its file descriptors before the actual GC.
var splicePipePool = sync.Pool{New: newPoolPipe}
func newPoolPipe() any {
// Discard the error which occurred during the creation of pipe buffer,
// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
p := newPipe()
if p == nil {
return nil
}
runtime.SetFinalizer(p, destroyPipe)
return p
}
// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache.
//
// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error
// and system call name splice in a string as the indication.
func getPipe() (*splicePipe, string, error) {
v := splicePipePool.Get()
if v == nil {
return nil, "splice", syscall.EINVAL
}
return v.(*splicePipe), "", nil
}
func putPipe(p *splicePipe) {
// If there is still data left in the pipe,
// then close and discard it instead of putting it back into the pool.
if p.data != 0 {
runtime.SetFinalizer(p, nil)
destroyPipe(p)
return
}
splicePipePool.Put(p)
}
var disableSplice unsafe.Pointer
// newPipe sets up a pipe for a splice operation.
func newPipe() (sp *splicePipe) {
p := (*bool)(atomic.LoadPointer(&disableSplice))
if p != nil && *p {
return nil
}
var fds [2]int
// pipe2 was added in 2.6.27 and our minimum requirement is 2.6.23, so it
// might not be implemented. Falling back to pipe is possible, but prior to
// 2.6.29 splice returns -EAGAIN instead of 0 when the connection is
// closed.
const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
if err := syscall.Pipe2(fds[:], flags); err != nil {
return nil
}
sp = &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}}
if p == nil {
p = new(bool)
defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
// F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug.
if syscall.F_GETPIPE_SZ == 0 {
*p = true
destroyPipe(sp)
return nil
}
if _, err := fcntl(fds[0], syscall.F_GETPIPE_SZ, 0); err != nil {
*p = true
destroyPipe(sp)
return nil
}
}
return
}
// destroyPipe destroys a pipe.
func destroyPipe(p *splicePipe) {
CloseFunc(p.rfd)
CloseFunc(p.wfd)
}
|