File: codec.go

package info (click to toggle)
golang-github-influxdata-yarpc 0.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 276 kB
  • sloc: makefile: 2
file content (118 lines) | stat: -rw-r--r-- 2,235 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package yarpc

import (
	"encoding/binary"
	"io"
	"sync"

	"github.com/gogo/protobuf/codec"
	"github.com/influxdata/yamux"
	"github.com/influxdata/yarpc/codes"
	"github.com/influxdata/yarpc/status"
)

var (
	codecPool = &sync.Pool{
		New: func() interface{} {
			return codec.New(1024)
		},
	}
)

type pooledCodec struct{}

var (
	cd = &pooledCodec{}
)

func NewCodec() Codec {
	return cd
}

func (*pooledCodec) Marshal(v interface{}) ([]byte, error) {
	ci := codecPool.Get()
	c := ci.(codec.Codec)
	data, err := c.Marshal(v)
	// To avoid a data race, create a copy of data before we return the codec to the pool.
	dataCopy := append([]byte(nil), data...)
	codecPool.Put(ci)
	return dataCopy, err
}

func (*pooledCodec) Unmarshal(data []byte, v interface{}) error {
	ci := codecPool.Get()
	c := ci.(codec.Codec)
	err := c.Unmarshal(data, v)
	codecPool.Put(ci)
	return err
}

type Codec interface {
	Marshal(v interface{}) ([]byte, error)
	Unmarshal(data []byte, v interface{}) error
}

type parser struct {
	r      io.Reader
	header [4]byte
}

func (p *parser) recvMsg() (msg []byte, err error) {
	if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
		return nil, err
	}

	length := binary.BigEndian.Uint32(p.header[:])
	if length == 0 {
		return nil, nil
	}

	msg = make([]byte, int(length))
	if _, err := io.ReadFull(p.r, msg); err != nil {
		if err == io.EOF {
			err = io.ErrUnexpectedEOF
		}
		return nil, err
	}
	return msg, nil
}

func encode(c Codec, msg interface{}) ([]byte, error) {
	var (
		b      []byte
		length uint
	)

	if msg != nil {
		var err error
		b, err = c.Marshal(msg)
		if err != nil {
			// TODO(sgc): should return error with status code "internal"
			return nil, status.Errorf(codes.Internal, "rpc: error while marshaling %v", err)
		}
		length = uint(len(b))
	}

	const (
		sizeLen = 4
	)

	var buf = make([]byte, sizeLen+length)
	binary.BigEndian.PutUint32(buf, uint32(length))
	copy(buf[4:], b)

	return buf, nil
}

func decode(p *parser, c Codec, s *yamux.Stream, m interface{}) error {
	d, err := p.recvMsg()
	if err != nil {
		return err
	}

	if err := c.Unmarshal(d, m); err != nil {
		return status.Errorf(codes.Internal, "rpc: failed to unmarshal received message %v", err)
	}

	return nil
}