File: conn.go

package info (click to toggle)
golang-github-siddontang-goredis 0.0~git20150324.0.760763f-5
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 108 kB
  • sloc: makefile: 2
file content (187 lines) | stat: -rw-r--r-- 3,671 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package goredis

import (
	"bufio"
	"fmt"
	"io"
	"net"
	"sync/atomic"
	"time"
)

type sizeWriter int64

func (s *sizeWriter) Write(p []byte) (int, error) {
	*s += sizeWriter(len(p))
	return len(p), nil
}

type Conn struct {
	c net.Conn

	respReader *RespReader
	respWriter *RespWriter

	totalReadSize  sizeWriter
	totalWriteSize sizeWriter

	closed int32
}

func Connect(addr string) (*Conn, error) {
	return ConnectWithSize(addr, 1024, 1024)
}

func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) {
	conn, err := net.Dial(getProto(addr), addr)
	if err != nil {
		return nil, err
	}

	return NewConnWithSize(conn, readSize, writeSize)
}

func NewConn(conn net.Conn) (*Conn, error) {
	return NewConnWithSize(conn, 1024, 1024)
}

func NewConnWithSize(conn net.Conn, readSize int, writeSize int) (*Conn, error) {
	c := new(Conn)

	c.c = conn

	br := bufio.NewReaderSize(io.TeeReader(c.c, &c.totalReadSize), readSize)
	bw := bufio.NewWriterSize(io.MultiWriter(c.c, &c.totalWriteSize), writeSize)

	c.respReader = NewRespReader(br)
	c.respWriter = NewRespWriter(bw)

	atomic.StoreInt32(&c.closed, 0)

	return c, nil

}

func (c *Conn) Close() {
	if atomic.LoadInt32(&c.closed) == 1 {
		return
	}

	c.c.Close()

	atomic.StoreInt32(&c.closed, 1)
}

func (c *Conn) isClosed() bool {
	return atomic.LoadInt32(&c.closed) == 1
}

func (c *Conn) GetTotalReadSize() int64 {
	return int64(c.totalReadSize)
}

func (c *Conn) GetTotalWriteSize() int64 {
	return int64(c.totalWriteSize)
}

func (c *Conn) SetReadDeadline(t time.Time) error {
	return c.c.SetReadDeadline(t)
}

func (c *Conn) SetWriteDeadline(t time.Time) error {
	return c.c.SetWriteDeadline(t)
}

func (c *Conn) RemoteAddr() net.Addr {
	return c.c.RemoteAddr()
}

func (c *Conn) LocalAddr() net.Addr {
	return c.c.LocalAddr()
}

// Send RESP command and receive the reply
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
	if err := c.Send(cmd, args...); err != nil {
		return nil, err
	}

	return c.Receive()
}

// Send RESP command
func (c *Conn) Send(cmd string, args ...interface{}) error {
	if err := c.respWriter.WriteCommand(cmd, args...); err != nil {
		c.Close()
		return err
	}

	return nil
}

// Receive RESP reply
func (c *Conn) Receive() (interface{}, error) {
	if reply, err := c.respReader.Parse(); err != nil {
		c.Close()
		return nil, err
	} else {
		if e, ok := reply.(Error); ok {
			return reply, e
		} else {
			return reply, nil
		}
	}
}

// Receive RESP bulk string reply into writer w
func (c *Conn) ReceiveBulkTo(w io.Writer) error {
	err := c.respReader.ParseBulkTo(w)
	if err != nil {
		if _, ok := err.(Error); !ok {
			c.Close()
		}
	}
	return err
}

// Receive RESP command request, must array of bulk stirng
func (c *Conn) ReceiveRequest() ([][]byte, error) {
	return c.respReader.ParseRequest()
}

// Send RESP value, must be string, int64, []byte, error, nil or []interface{}
func (c *Conn) SendValue(v interface{}) error {
	switch v := v.(type) {
	case string:
		return c.respWriter.FlushString(v)
	case int64:
		return c.respWriter.FlushInteger(v)
	case []byte:
		return c.respWriter.FlushBulk(v)
	case []interface{}:
		return c.respWriter.FlushArray(v)
	case error:
		return c.respWriter.FlushError(v)
	case nil:
		return c.respWriter.FlushBulk(nil)
	default:
		return fmt.Errorf("invalid type %T for send RESP value", v)
	}
}

func (c *Client) newConn(addr string, pass string) (*Conn, error) {
	co, err := ConnectWithSize(addr, c.readBufferSize, c.writeBufferSize)
	if err != nil {
		return nil, err
	}

	if len(pass) > 0 {
		_, err = co.Do("AUTH", pass)
		if err != nil {
			co.Close()
			return nil, err
		}
	}

	return co, nil
}