File: send.go

package info (click to toggle)
gitlab-shell 14.35.0%2Bds1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 23,652 kB
  • sloc: ruby: 1,129; makefile: 583; sql: 391; sh: 384
file content (158 lines) | stat: -rw-r--r-- 4,476 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package lines

import (
	"bufio"
	"bytes"
	"fmt"
	"io"
	"regexp"
)

// SenderOpts contains fields that Send() uses to determine what is considered
// a line, and how to handle pagination. That is, how many lines to skip, before
// a line gets fed into the Sender.
type SenderOpts struct {
	// Delimiter is the separator used to split the sender's output into
	// lines. Defaults to an empty byte (0).
	Delimiter byte
	// Limit is the upper limit of how many lines will be sent. The zero
	// value will cause no lines to be sent.
	Limit int
	// IsPageToken allows control over which results are sent as part of the
	// response. When IsPageToken evaluates to true for the first time,
	// results will start to be sent as part of the response. This function
	// will	be called with an empty slice previous to sending the first line
	// in order to allow sending everything right from the beginning.
	IsPageToken func([]byte) bool
	// When PageTokenError is true than Sender will return an error when provided
	// PageToken is not found.
	PageTokenError bool
	// Filter limits sent results to those that pass the filter. The zero
	// value (nil) disables filtering.
	Filter *regexp.Regexp
}

// ItemsPerMessage establishes the threshold to flush the buffer when using the
// `Send` function. It's a variable instead of a constant to make it possible to
// override in tests.
var ItemsPerMessage = 20

// Sender handles a buffer of lines from a Git command
type Sender func([][]byte) error

type writer struct {
	sender  Sender
	lines   [][]byte
	options SenderOpts
}

// CopyAndAppend adds a newly allocated copy of `e` to the `s` slice. Useful to
// avoid io buffer shennanigans
func CopyAndAppend(s [][]byte, e []byte) [][]byte {
	line := make([]byte, len(e))
	copy(line, e)
	return append(s, line)
}

// flush calls the `sender` handler function with the accumulated lines and
// clears the lines buffer.
func (w *writer) flush() error {
	if len(w.lines) == 0 { // No message to send, just return
		return nil
	}

	if err := w.sender(w.lines); err != nil {
		return err
	}

	// Reset the message
	w.lines = nil

	return nil
}

// addLine adds a new line to the writer buffer, and flushes if the maximum
// size has been achieved
func (w *writer) addLine(p []byte) error {
	w.lines = CopyAndAppend(w.lines, p)

	if len(w.lines) >= ItemsPerMessage {
		return w.flush()
	}

	return nil
}

// consume reads from an `io.Reader` and writes each line to the buffer. It
// flushes after being done reading.
func (w *writer) consume(r io.Reader) error {
	buf := bufio.NewReader(r)

	// As `IsPageToken` will instruct us to send the _next_ line only, we
	// need to call it before the first iteration to allow for the case
	// where we want to send right from the beginning.
	pastPageToken := w.options.IsPageToken([]byte{})
	for i := 0; i < w.options.Limit; {
		var line []byte

		for {
			// delim can be multiple bytes, so we read till the end byte of it ...
			chunk, err := buf.ReadBytes(w.delimiter())
			if err != nil && err != io.EOF {
				return err
			}

			line = append(line, chunk...)
			// ... then we check if the last bytes of line are the same as delim
			if bytes.HasSuffix(line, []byte{w.delimiter()}) {
				break
			}

			if err == io.EOF {
				i = w.options.Limit // Implicit exit clause for the loop
				break
			}
		}

		line = bytes.TrimSuffix(line, []byte{w.delimiter()})
		if len(line) == 0 {
			break
		}

		// If a page token is given, we need to skip all lines until we've found it.
		// All remaining lines will then be sent until we reach the pagination limit.
		if !pastPageToken {
			pastPageToken = w.options.IsPageToken(line)
			continue
		}

		if w.filter() != nil && !w.filter().Match(line) {
			continue
		}
		i++ // Only increment the counter if the result wasn't skipped

		if err := w.addLine(line); err != nil {
			return err
		}
	}

	if !pastPageToken && w.options.PageTokenError {
		return fmt.Errorf("could not find page token")
	}

	return w.flush()
}

func (w *writer) delimiter() byte        { return w.options.Delimiter }
func (w *writer) filter() *regexp.Regexp { return w.options.Filter }

// Send reads output from `r`, splits it at `opts.Delimiter`, then handles the
// buffered lines using `sender`.
func Send(r io.Reader, sender Sender, opts SenderOpts) error {
	if opts.IsPageToken == nil {
		opts.IsPageToken = func(_ []byte) bool { return true }
	}

	writer := &writer{sender: sender, options: opts}
	return writer.consume(r)
}