File: codec.go

package info (click to toggle)
golang-github-cenk-rpc2 0.0~git20160427.0.7ab76d2e88c7-1
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 104 kB
  • ctags: 84
  • sloc: makefile: 2
file content (124 lines) | stat: -rw-r--r-- 2,967 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package rpc2

import (
	"bufio"
	"encoding/gob"
	"io"
	"sync"
)

// A Codec implements reading and writing of RPC requests and responses.
// The client calls ReadHeader to read a message header.
// The implementation must populate either Request or Response argument.
// Depending on which argument is populated, ReadRequestBody or
// ReadResponseBody is called right after ReadHeader.
// ReadRequestBody and ReadResponseBody may be called with a nil
// argument to force the body to be read and then discarded.
type Codec interface {
	// ReadHeader must read a message and populate either the request
	// or the response by inspecting the incoming message.
	ReadHeader(*Request, *Response) error

	// ReadRequestBody into args argument of handler function.
	ReadRequestBody(interface{}) error

	// ReadResponseBody into reply argument of handler function.
	ReadResponseBody(interface{}) error

	// WriteRequest must be safe for concurrent use by multiple goroutines.
	WriteRequest(*Request, interface{}) error

	// WriteResponse must be safe for concurrent use by multiple goroutines.
	WriteResponse(*Response, interface{}) error

	// Close is called when client/server finished with the connection.
	Close() error
}

// Request is a header written before every RPC call.
type Request struct {
	Seq    uint64 // sequence number chosen by client
	Method string
}

// Response is a header written before every RPC return.
type Response struct {
	Seq   uint64 // echoes that of the request
	Error string // error, if any.
}

type gobCodec struct {
	rwc    io.ReadWriteCloser
	dec    *gob.Decoder
	enc    *gob.Encoder
	encBuf *bufio.Writer
	mutex  sync.Mutex
}

type message struct {
	Seq    uint64
	Method string
	Error  string
}

func newGobCodec(conn io.ReadWriteCloser) *gobCodec {
	buf := bufio.NewWriter(conn)
	return &gobCodec{
		rwc:    conn,
		dec:    gob.NewDecoder(conn),
		enc:    gob.NewEncoder(buf),
		encBuf: buf,
	}
}

func (c *gobCodec) ReadHeader(req *Request, resp *Response) error {
	var msg message
	if err := c.dec.Decode(&msg); err != nil {
		return err
	}

	if msg.Method != "" {
		req.Seq = msg.Seq
		req.Method = msg.Method
	} else {
		resp.Seq = msg.Seq
		resp.Error = msg.Error
	}
	return nil
}

func (c *gobCodec) ReadRequestBody(body interface{}) error {
	return c.dec.Decode(body)
}

func (c *gobCodec) ReadResponseBody(body interface{}) error {
	return c.dec.Decode(body)
}

func (c *gobCodec) WriteRequest(r *Request, body interface{}) (err error) {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	if err = c.enc.Encode(r); err != nil {
		return
	}
	if err = c.enc.Encode(body); err != nil {
		return
	}
	return c.encBuf.Flush()
}

func (c *gobCodec) WriteResponse(r *Response, body interface{}) (err error) {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	if err = c.enc.Encode(r); err != nil {
		return
	}
	if err = c.enc.Encode(body); err != nil {
		return
	}
	return c.encBuf.Flush()
}

func (c *gobCodec) Close() error {
	return c.rwc.Close()
}