File: batch_obj_iter.go

package info (click to toggle)
git-sizer 1.5.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 616 kB
  • sloc: sh: 100; makefile: 61
file content (156 lines) | stat: -rw-r--r-- 4,104 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package git

import (
	"bufio"
	"context"
	"fmt"
	"io"

	"github.com/github/git-sizer/internal/pipe"
)

type ObjectRecord struct {
	BatchHeader
	Data []byte
}

// BatchObjectIter iterates over objects whose names are fed into its
// stdin. The output is buffered, so it has to be closed before you
// can be sure that you have gotten all of the objects.
type BatchObjectIter struct {
	ctx   context.Context
	p     *pipe.Pipeline
	oidCh chan OID
	objCh chan ObjectRecord
	errCh chan error
}

// NewBatchObjectIter returns a `*BatchObjectIterator` and an
// `io.WriteCloser`. The iterator iterates over objects whose names
// are fed into the `io.WriteCloser`, one per line. The
// `io.WriteCloser` should normally be closed and the iterator's
// output drained before `Close()` is called.
func (repo *Repository) NewBatchObjectIter(ctx context.Context) (*BatchObjectIter, error) {
	iter := BatchObjectIter{
		ctx:   ctx,
		p:     pipe.New(),
		oidCh: make(chan OID),
		objCh: make(chan ObjectRecord),
		errCh: make(chan error),
	}

	iter.p.Add(
		// Read OIDs from `iter.oidCh` and write them to `git
		// cat-file`:
		pipe.Function(
			"request-objects",
			func(ctx context.Context, _ pipe.Env, _ io.Reader, stdout io.Writer) error {
				out := bufio.NewWriter(stdout)

				for {
					select {
					case oid, ok := <-iter.oidCh:
						if !ok {
							return out.Flush()
						}
						if _, err := fmt.Fprintln(out, oid.String()); err != nil {
							return fmt.Errorf("writing to 'git cat-file': %w", err)
						}
					case <-ctx.Done():
						return ctx.Err()
					}
				}
			},
		),

		// Read OIDs from `stdin` and output a header line followed by
		// the contents of the corresponding Git objects:
		pipe.CommandStage(
			"git-cat-file",
			repo.GitCommand("cat-file", "--batch", "--buffer"),
		),

		// Parse the object headers and read the object contents, and
		// shove both into `objCh`:
		pipe.Function(
			"object-reader",
			func(ctx context.Context, _ pipe.Env, stdin io.Reader, _ io.Writer) error {
				defer close(iter.objCh)

				f := bufio.NewReader(stdin)

				for {
					header, err := f.ReadString('\n')
					if err != nil {
						if err == io.EOF {
							return nil
						}
						return fmt.Errorf("reading from 'git cat-file': %w", err)
					}
					batchHeader, err := ParseBatchHeader("", header)
					if err != nil {
						return fmt.Errorf("parsing output of 'git cat-file': %w", err)
					}

					// Read the object contents plus the trailing LF
					// (which is discarded below while creating the
					// `ObjectRecord`):
					data := make([]byte, batchHeader.ObjectSize+1)
					if _, err := io.ReadFull(f, data); err != nil {
						return fmt.Errorf(
							"reading object data from 'git cat-file' for %s '%s': %w",
							batchHeader.ObjectType, batchHeader.OID, err,
						)
					}

					select {
					case iter.objCh <- ObjectRecord{
						BatchHeader: batchHeader,
						Data:        data[:batchHeader.ObjectSize],
					}:
					case <-iter.ctx.Done():
						return iter.ctx.Err()
					}
				}
			},
		),
	)

	if err := iter.p.Start(ctx); err != nil {
		return nil, err
	}

	return &iter, nil
}

// RequestObject requests that the object with the specified `oid` be
// processed. The objects registered via this method can be read using
// `Next()` in the order that they were requested.
func (iter *BatchObjectIter) RequestObject(oid OID) error {
	select {
	case iter.oidCh <- oid:
		return nil
	case <-iter.ctx.Done():
		return iter.ctx.Err()
	}
}

// Close closes the iterator and frees up resources. Close must be
// called exactly once.
func (iter *BatchObjectIter) Close() {
	close(iter.oidCh)
}

// Next either returns the next object (its header and contents), or a
// `false` boolean value if no more objects are left. Objects need to
// be read asynchronously, but the last objects won't necessarily show
// up here until `Close()` has been called.
func (iter *BatchObjectIter) Next() (ObjectRecord, bool, error) {
	obj, ok := <-iter.objCh
	if !ok {
		return ObjectRecord{
			BatchHeader: missingHeader,
		}, false, iter.p.Wait()
	}
	return obj, true, nil
}