1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
|
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package jsonrpc2
import (
"context"
"io"
"runtime"
"strings"
"sync"
"syscall"
"time"
errors "golang.org/x/xerrors"
)
// Listener is implemented by protocols to accept new inbound connections.
type Listener interface {
// Accept an inbound connection to a server.
// It must block until an inbound connection is made, or the listener is
// shut down.
Accept(context.Context) (io.ReadWriteCloser, error)
// Close is used to ask a listener to stop accepting new connections.
Close() error
// Dialer returns a dialer that can be used to connect to this listener
// locally.
// If a listener does not implement this it will return a nil.
Dialer() Dialer
}
// Dialer is used by clients to dial a server.
type Dialer interface {
// Dial returns a new communication byte stream to a listening server.
Dial(ctx context.Context) (io.ReadWriteCloser, error)
}
// Server is a running server that is accepting incoming connections.
type Server struct {
listener Listener
binder Binder
async async
}
// Dial uses the dialer to make a new connection, wraps the returned
// reader and writer using the framer to make a stream, and then builds
// a connection on top of that stream using the binder.
func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) {
// dial a server
rwc, err := dialer.Dial(ctx)
if err != nil {
return nil, err
}
return newConnection(ctx, rwc, binder)
}
// Serve starts a new server listening for incoming connections and returns
// it.
// This returns a fully running and connected server, it does not block on
// the listener.
// You can call Wait to block on the server, or Shutdown to get the sever to
// terminate gracefully.
// To notice incoming connections, use an intercepting Binder.
func Serve(ctx context.Context, listener Listener, binder Binder) (*Server, error) {
server := &Server{
listener: listener,
binder: binder,
}
server.async.init()
go server.run(ctx)
return server, nil
}
// Wait returns only when the server has shut down.
func (s *Server) Wait() error {
return s.async.wait()
}
// run accepts incoming connections from the listener,
// If IdleTimeout is non-zero, run exits after there are no clients for this
// duration, otherwise it exits only on error.
func (s *Server) run(ctx context.Context) {
defer s.async.done()
var activeConns []*Connection
for {
// we never close the accepted connection, we rely on the other end
// closing or the socket closing itself naturally
rwc, err := s.listener.Accept(ctx)
if err != nil {
if !isClosingError(err) {
s.async.setError(err)
}
// we are done generating new connections for good
break
}
// see if any connections were closed while we were waiting
activeConns = onlyActive(activeConns)
// a new inbound connection,
conn, err := newConnection(ctx, rwc, s.binder)
if err != nil {
if !isClosingError(err) {
s.async.setError(err)
}
continue
}
activeConns = append(activeConns, conn)
}
// wait for all active conns to finish
for _, c := range activeConns {
c.Wait()
}
}
func onlyActive(conns []*Connection) []*Connection {
i := 0
for _, c := range conns {
if !c.async.isDone() {
conns[i] = c
i++
}
}
// trim the slice down
return conns[:i]
}
// isClosingError reports if the error occurs normally during the process of
// closing a network connection. It uses imperfect heuristics that err on the
// side of false negatives, and should not be used for anything critical.
func isClosingError(err error) bool {
if err == nil {
return false
}
// Fully unwrap the error, so the following tests work.
for wrapped := err; wrapped != nil; wrapped = errors.Unwrap(err) {
err = wrapped
}
// Was it based on an EOF error?
if err == io.EOF {
return true
}
// Was it based on a closed pipe?
if err == io.ErrClosedPipe {
return true
}
// Per https://github.com/golang/go/issues/4373, this error string should not
// change. This is not ideal, but since the worst that could happen here is
// some superfluous logging, it is acceptable.
if err.Error() == "use of closed network connection" {
return true
}
if runtime.GOOS == "plan9" {
// Error reading from a closed connection.
if err == syscall.EINVAL {
return true
}
// Error trying to accept a new connection from a closed listener.
if strings.HasSuffix(err.Error(), " listen hungup") {
return true
}
}
return false
}
// NewIdleListener wraps a listener with an idle timeout.
// When there are no active connections for at least the timeout duration a
// call to accept will fail with ErrIdleTimeout.
func NewIdleListener(timeout time.Duration, wrap Listener) Listener {
l := &idleListener{
timeout: timeout,
wrapped: wrap,
newConns: make(chan *idleCloser),
closed: make(chan struct{}),
wasTimeout: make(chan struct{}),
}
go l.run()
return l
}
type idleListener struct {
wrapped Listener
timeout time.Duration
newConns chan *idleCloser
closed chan struct{}
wasTimeout chan struct{}
closeOnce sync.Once
}
type idleCloser struct {
wrapped io.ReadWriteCloser
closed chan struct{}
closeOnce sync.Once
}
func (c *idleCloser) Read(p []byte) (int, error) {
n, err := c.wrapped.Read(p)
if err != nil && isClosingError(err) {
c.closeOnce.Do(func() { close(c.closed) })
}
return n, err
}
func (c *idleCloser) Write(p []byte) (int, error) {
// we do not close on write failure, we rely on the wrapped writer to do that
// if it is appropriate, which we will detect in the next read.
return c.wrapped.Write(p)
}
func (c *idleCloser) Close() error {
// we rely on closing the wrapped stream to signal to the next read that we
// are closed, rather than triggering the closed signal directly
return c.wrapped.Close()
}
func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
rwc, err := l.wrapped.Accept(ctx)
if err != nil {
if isClosingError(err) {
// underlying listener was closed
l.closeOnce.Do(func() { close(l.closed) })
// was it closed because of the idle timeout?
select {
case <-l.wasTimeout:
err = ErrIdleTimeout
default:
}
}
return nil, err
}
conn := &idleCloser{
wrapped: rwc,
closed: make(chan struct{}),
}
l.newConns <- conn
return conn, err
}
func (l *idleListener) Close() error {
defer l.closeOnce.Do(func() { close(l.closed) })
return l.wrapped.Close()
}
func (l *idleListener) Dialer() Dialer {
return l.wrapped.Dialer()
}
func (l *idleListener) run() {
var conns []*idleCloser
for {
var firstClosed chan struct{} // left at nil if there are no active conns
var timeout <-chan time.Time // left at nil if there are active conns
if len(conns) > 0 {
firstClosed = conns[0].closed
} else {
timeout = time.After(l.timeout)
}
select {
case <-l.closed:
// the main listener closed, no need to keep going
return
case conn := <-l.newConns:
// a new conn arrived, add it to the list
conns = append(conns, conn)
case <-timeout:
// we timed out, only happens when there are no active conns
// close the underlying listener, and allow the normal closing process to happen
close(l.wasTimeout)
l.wrapped.Close()
case <-firstClosed:
// a conn closed, remove it from the active list
conns = conns[:copy(conns, conns[1:])]
}
}
}
|