1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
|
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
import (
"runtime/internal/atomic"
"unsafe"
)
// This is based on the former libgo/runtime/netpoll_select.c implementation
// except that it uses poll instead of select and is written in Go.
// It's also based on Solaris implementation for the arming mechanisms
//go:noescape
//extern poll
func libc_poll(pfds *pollfd, npfds uintptr, timeout uintptr) int32
// pollfd represents the poll structure for AIX operating system.
type pollfd struct {
fd int32
events int16
revents int16
}
const _POLLIN = 0x0001
const _POLLOUT = 0x0002
const _POLLHUP = 0x2000
const _POLLERR = 0x4000
var (
pfds []pollfd
pds []*pollDesc
mtxpoll mutex
mtxset mutex
rdwake int32
wrwake int32
pendingUpdates int32
netpollWakeSig uint32 // used to avoid duplicate calls of netpollBreak
)
func netpollinit() {
// Create the pipe we use to wakeup poll.
r, w, errno := nonblockingPipe()
if errno != 0 {
throw("netpollinit: failed to create pipe")
}
rdwake = r
wrwake = w
// Pre-allocate array of pollfd structures for poll.
pfds = make([]pollfd, 1, 128)
// Poll the read side of the pipe.
pfds[0].fd = rdwake
pfds[0].events = _POLLIN
pds = make([]*pollDesc, 1, 128)
pds[0] = nil
}
func netpollIsPollDescriptor(fd uintptr) bool {
return fd == uintptr(rdwake) || fd == uintptr(wrwake)
}
// netpollwakeup writes on wrwake to wakeup poll before any changes.
func netpollwakeup() {
if pendingUpdates == 0 {
pendingUpdates = 1
b := [1]byte{0}
write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
}
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
lock(&mtxpoll)
netpollwakeup()
lock(&mtxset)
unlock(&mtxpoll)
pd.user = uint32(len(pfds))
pfds = append(pfds, pollfd{fd: int32(fd)})
pds = append(pds, pd)
unlock(&mtxset)
return 0
}
func netpollclose(fd uintptr) int32 {
lock(&mtxpoll)
netpollwakeup()
lock(&mtxset)
unlock(&mtxpoll)
for i := 0; i < len(pfds); i++ {
if pfds[i].fd == int32(fd) {
pfds[i] = pfds[len(pfds)-1]
pfds = pfds[:len(pfds)-1]
pds[i] = pds[len(pds)-1]
pds[i].user = uint32(i)
pds = pds[:len(pds)-1]
break
}
}
unlock(&mtxset)
return 0
}
func netpollarm(pd *pollDesc, mode int) {
lock(&mtxpoll)
netpollwakeup()
lock(&mtxset)
unlock(&mtxpoll)
switch mode {
case 'r':
pfds[pd.user].events |= _POLLIN
case 'w':
pfds[pd.user].events |= _POLLOUT
}
unlock(&mtxset)
}
// netpollBreak interrupts a poll.
func netpollBreak() {
if atomic.Cas(&netpollWakeSig, 0, 1) {
b := [1]byte{0}
write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
}
}
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
//go:nowritebarrierrec
func netpoll(delay int64) gList {
var timeout uintptr
if delay < 0 {
timeout = ^uintptr(0)
} else if delay == 0 {
// TODO: call poll with timeout == 0
return gList{}
} else if delay < 1e6 {
timeout = 1
} else if delay < 1e15 {
timeout = uintptr(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
timeout = 1e9
}
retry:
lock(&mtxpoll)
lock(&mtxset)
pendingUpdates = 0
unlock(&mtxpoll)
n := libc_poll(&pfds[0], uintptr(len(pfds)), timeout)
if n < 0 {
e := errno()
if e != _EINTR {
println("errno=", e, " len(pfds)=", len(pfds))
throw("poll failed")
}
unlock(&mtxset)
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if timeout > 0 {
return gList{}
}
goto retry
}
// Check if some descriptors need to be changed
if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
if delay != 0 {
// A netpollwakeup could be picked up by a
// non-blocking poll. Only clear the wakeup
// if blocking.
var b [1]byte
for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
}
atomic.Store(&netpollWakeSig, 0)
}
// Still look at the other fds even if the mode may have
// changed, as netpollBreak might have been called.
n--
}
var toRun gList
for i := 1; i < len(pfds) && n > 0; i++ {
pfd := &pfds[i]
var mode int32
if pfd.revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
mode += 'r'
pfd.events &= ^_POLLIN
}
if pfd.revents&(_POLLOUT|_POLLHUP|_POLLERR) != 0 {
mode += 'w'
pfd.events &= ^_POLLOUT
}
if mode != 0 {
pds[i].setEventErr(pfd.revents == _POLLERR)
netpollready(&toRun, pds[i], mode)
n--
}
}
unlock(&mtxset)
return toRun
}
|