1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
package ctxio
import (
"bufio"
"context"
"net"
"time"
)
// Conn wraps net.Conn with context aware functionality.
type Conn struct {
conn net.Conn
reader *bufio.Reader
}
// NewConn creates a new context aware Conn.
func NewConn(c net.Conn) *Conn {
return &Conn{
conn: c,
reader: bufio.NewReader(c),
}
}
type ioret struct {
n int
err error
}
type rret struct {
val []byte
err error
}
// aLongTimeAgo is a time in the past that indicates a connection should
// immediately time out.
var aLongTimeAgo = time.Unix(1, 0)
// Close releases the Conns resources.
func (c *Conn) Close() error {
return c.conn.Close()
}
// Write writes to the underlying connection.
// It is not safe for concurrent use with itself.
func (c *Conn) Write(ctx context.Context, buf []byte) (int, error) {
// Enable immediate connection cancelation via context by using the context's
// deadline and also setting a deadline in the past if/when the context is
// canceled. This pattern courtesy of @acln from #networking on Gophers Slack.
dl, _ := ctx.Deadline()
if err := c.conn.SetWriteDeadline(dl); err != nil {
return 0, err
}
ch := make(chan ioret, 1)
go func() {
n, err := c.conn.Write(buf)
ch <- ioret{n, err}
}()
select {
case <-ctx.Done():
// Set deadline to unblock pending Write.
if err := c.conn.SetWriteDeadline(aLongTimeAgo); err != nil {
return 0, err
}
// Wait for goroutine to exit, throwing away the error.
<-ch
// Reset deadline again.
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
return 0, err
}
return 0, ctx.Err()
case ret := <-ch:
return ret.n, ret.err
}
}
// Read reads from the underlying connection.
// It is not safe for concurrent use with itself or ReadBytes.
func (c *Conn) Read(ctx context.Context, buf []byte) (int, error) {
// Enable immediate connection cancelation via context by using the context's
// deadline and also setting a deadline in the past if/when the context is
// canceled. This pattern courtesy of @acln from #networking on Gophers Slack.
dl, _ := ctx.Deadline()
if err := c.conn.SetReadDeadline(dl); err != nil {
return 0, err
}
ch := make(chan ioret, 1)
go func() {
n, err := c.conn.Read(buf)
ch <- ioret{n, err}
}()
select {
case <-ctx.Done():
// Set deadline to unblock pending Read.
if err := c.conn.SetReadDeadline(aLongTimeAgo); err != nil {
return 0, err
}
// Wait for goroutine to exit, throwing away the error.
<-ch
// Reset deadline again.
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return 0, err
}
return 0, ctx.Err()
case ret := <-ch:
return ret.n, ret.err
}
}
// ReadBytes reads from the connection until the bytes are found.
// It is not safe for concurrent use with itself or Read.
func (c *Conn) ReadBytes(ctx context.Context, delim byte) ([]byte, error) {
// Enable immediate connection cancelation via context by using the context's
// deadline and also setting a deadline in the past if/when the context is
// canceled. This pattern courtesy of @acln from #networking on Gophers Slack.
dl, _ := ctx.Deadline()
if err := c.conn.SetReadDeadline(dl); err != nil {
return nil, err
}
ch := make(chan rret, 1)
go func() {
out, err := c.reader.ReadBytes(delim)
ch <- rret{out, err}
}()
select {
case <-ctx.Done():
// Set deadline to unblock pending Write.
if err := c.conn.SetReadDeadline(aLongTimeAgo); err != nil {
return nil, err
}
// Wait for goroutine to exit, throwing away the error.
<-ch
// Reset deadline again.
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return nil, err
}
return nil, ctx.Err()
case ret := <-ch:
return ret.val, ret.err
}
}
|