File: chunker.go

package info (click to toggle)
gitlab-shell 14.35.0%2Bds1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 23,652 kB
  • sloc: ruby: 1,129; makefile: 583; sql: 391; sh: 384
file content (66 lines) | stat: -rw-r--r-- 1,426 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package chunk

import (
	"google.golang.org/protobuf/proto"
)

// Sender encapsulates a gRPC response stream and the current chunk
// that's being built.
//
// Reset, Append, [Append...], Send, Reset, Append, [Append...], Send, ...
type Sender interface {
	// Reset should create a fresh response message.
	Reset()
	// Append should append the given item to the slice in the current response message
	Append(proto.Message)
	// Send should send the current response message
	Send() error
}

// New returns a new Chunker.
func New(s Sender) *Chunker { return &Chunker{s: s} }

// Chunker lets you spread items you want to send over multiple chunks.
// This type is not thread-safe.
type Chunker struct {
	s    Sender
	size int
}

// maxMessageSize is maximum size per protobuf message
const maxMessageSize = 1 * 1024 * 1024

// Send will append an item to the current chunk and send the chunk if it is full.
func (c *Chunker) Send(it proto.Message) error {
	if c.size == 0 {
		c.s.Reset()
	}

	itSize := proto.Size(it)

	if itSize+c.size >= maxMessageSize {
		if err := c.sendResponseMsg(); err != nil {
			return err
		}
		c.s.Reset()
	}

	c.s.Append(it)
	c.size += itSize

	return nil
}

func (c *Chunker) sendResponseMsg() error {
	c.size = 0
	return c.s.Send()
}

// Flush sends remaining items in the current chunk, if any.
func (c *Chunker) Flush() error {
	if c.size == 0 {
		return nil
	}

	return c.sendResponseMsg()
}