File: codec.go

package info (click to toggle)
golang-github-hashicorp-net-rpc-msgpackrpc 0.0~git20151116.0.a14192a-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid, stretch
  • size: 72 kB
  • ctags: 24
  • sloc: makefile: 2
file content (122 lines) | stat: -rw-r--r-- 2,884 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package msgpackrpc

import (
	"bufio"
	"io"
	"net/rpc"
	"sync"

	"github.com/hashicorp/go-msgpack/codec"
)

var (
	// msgpackHandle is shared handle for decoding
	msgpackHandle = &codec.MsgpackHandle{}
)

// MsgpackCodec implements the rpc.ClientCodec and rpc.ServerCodec
// using the msgpack encoding
type MsgpackCodec struct {
	closed    bool
	conn      io.ReadWriteCloser
	bufR      *bufio.Reader
	bufW      *bufio.Writer
	enc       *codec.Encoder
	dec       *codec.Decoder
	writeLock sync.Mutex
}

// NewCodec returns a MsgpackCodec that can be used as either a Client or Server
// rpc Codec using a default handle. It also provides controls for enabling and
// disabling buffering for both reads and writes.
func NewCodec(bufReads, bufWrites bool, conn io.ReadWriteCloser) *MsgpackCodec {
	return NewCodecFromHandle(bufReads, bufWrites, conn, msgpackHandle)
}

// NewCodecFromHandle returns a MsgpackCodec that can be used as either a Client
// or Server rpc Codec using the passed handle. It also provides controls for
// enabling and disabling buffering for both reads and writes.
func NewCodecFromHandle(bufReads, bufWrites bool, conn io.ReadWriteCloser,
	h *codec.MsgpackHandle) *MsgpackCodec {
	cc := &MsgpackCodec{
		conn: conn,
	}
	if bufReads {
		cc.bufR = bufio.NewReader(conn)
		cc.dec = codec.NewDecoder(cc.bufR, h)
	} else {
		cc.dec = codec.NewDecoder(cc.conn, h)
	}
	if bufWrites {
		cc.bufW = bufio.NewWriter(conn)
		cc.enc = codec.NewEncoder(cc.bufW, h)
	} else {
		cc.enc = codec.NewEncoder(cc.conn, h)
	}
	return cc
}

func (cc *MsgpackCodec) ReadRequestHeader(r *rpc.Request) error {
	return cc.read(r)
}

func (cc *MsgpackCodec) ReadRequestBody(out interface{}) error {
	return cc.read(out)
}

func (cc *MsgpackCodec) WriteResponse(r *rpc.Response, body interface{}) error {
	cc.writeLock.Lock()
	defer cc.writeLock.Unlock()
	return cc.write(r, body)
}

func (cc *MsgpackCodec) ReadResponseHeader(r *rpc.Response) error {
	return cc.read(r)
}

func (cc *MsgpackCodec) ReadResponseBody(out interface{}) error {
	return cc.read(out)
}

func (cc *MsgpackCodec) WriteRequest(r *rpc.Request, body interface{}) error {
	cc.writeLock.Lock()
	defer cc.writeLock.Unlock()
	return cc.write(r, body)
}

func (cc *MsgpackCodec) Close() error {
	if cc.closed {
		return nil
	}
	cc.closed = true
	return cc.conn.Close()
}

func (cc *MsgpackCodec) write(obj1, obj2 interface{}) (err error) {
	if cc.closed {
		return io.EOF
	}
	if err = cc.enc.Encode(obj1); err != nil {
		return
	}
	if err = cc.enc.Encode(obj2); err != nil {
		return
	}
	if cc.bufW != nil {
		return cc.bufW.Flush()
	}
	return
}

func (cc *MsgpackCodec) read(obj interface{}) (err error) {
	if cc.closed {
		return io.EOF
	}

	// If nil is passed in, we should still attempt to read content to nowhere.
	if obj == nil {
		var obj2 interface{}
		return cc.dec.Decode(&obj2)
	}
	return cc.dec.Decode(obj)
}