File: command_filter_process.go

package info (click to toggle)
git-lfs 3.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,808 kB
  • sloc: sh: 21,256; makefile: 507; ruby: 417
file content (403 lines) | stat: -rw-r--r-- 11,884 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
package commands

import (
	"bytes"
	"fmt"
	"io"
	"os"
	"strings"
	"sync"

	"github.com/git-lfs/git-lfs/v3/errors"
	"github.com/git-lfs/git-lfs/v3/filepathfilter"
	"github.com/git-lfs/git-lfs/v3/git"
	"github.com/git-lfs/git-lfs/v3/lfs"
	"github.com/git-lfs/git-lfs/v3/tq"
	"github.com/git-lfs/git-lfs/v3/tr"
	"github.com/git-lfs/pktline"
	"github.com/spf13/cobra"
)

const (
	// cleanFilterBufferCapacity is the desired capacity of the
	// `*git.PacketWriter`'s internal buffer when the filter protocol
	// dictates the "clean" command. 512 bytes is (in most cases) enough to
	// hold an entire LFS pointer in memory.
	cleanFilterBufferCapacity = 512

	// smudgeFilterBufferCapacity is the desired capacity of the
	// `*git.PacketWriter`'s internal buffer when the filter protocol
	// dictates the "smudge" command.
	smudgeFilterBufferCapacity = pktline.MaxPacketLength
)

// filterSmudgeSkip is a command-line flag owned by the `filter-process` command
// dictating whether or not to skip the smudging process, leaving pointers as-is
// in the working tree.
var filterSmudgeSkip bool

func filterCommand(cmd *cobra.Command, args []string) {
	requireStdin(tr.Tr.Get("This command should be run by the Git filter process"))
	setupRepository()
	installHooks(false)

	s := git.NewFilterProcessScanner(os.Stdin, os.Stdout)

	if err := s.Init(); err != nil {
		ExitWithError(err)
	}

	caps, err := s.NegotiateCapabilities()
	if err != nil {
		ExitWithError(err)
	}

	var supportsDelay bool
	for _, cap := range caps {
		if cap == "capability=delay" {
			supportsDelay = true
			break
		}
	}

	skip := filterSmudgeSkip || cfg.Os.Bool("GIT_LFS_SKIP_SMUDGE", false)
	filter := filepathfilter.New(cfg.FetchIncludePaths(), cfg.FetchExcludePaths(), filepathfilter.GitIgnore)

	ptrs := make(map[string]*lfs.Pointer)

	var q *tq.TransferQueue
	var malformed []string
	var malformedOnWindows []string
	var closeOnce *sync.Once
	var available chan *tq.Transfer
	gitfilter := lfs.NewGitFilter(cfg)
	for s.Scan() {
		var n int64
		var err error
		var delayed bool
		var w *pktline.PktlineWriter

		req := s.Request()

		switch req.Header["command"] {
		case "clean":
			s.WriteStatus(statusFromErr(nil))
			w = pktline.NewPktlineWriter(os.Stdout, cleanFilterBufferCapacity)

			var ptr *lfs.Pointer
			ptr, err = clean(gitfilter, w, req.Payload, req.Header["pathname"], -1)

			if ptr != nil {
				n = ptr.Size
			}
		case "smudge":
			if q == nil && supportsDelay {
				closeOnce = new(sync.Once)
				available = make(chan *tq.Transfer)

				if cfg.AutoDetectRemoteEnabled() {
					// update current remote with information gained by treeish
					newRemote := git.FirstRemoteForTreeish(req.Header["treeish"])
					if newRemote != "" {
						cfg.SetRemote(newRemote)
					}
				}

				q = tq.NewTransferQueue(
					tq.Download,
					getTransferManifestOperationRemote("download", cfg.Remote()),
					cfg.Remote(),
					tq.RemoteRef(currentRemoteRef()),
					tq.WithBatchSize(cfg.TransferBatchSize()),
				)
				go infiniteTransferBuffer(q, available)
			}

			w = pktline.NewPktlineWriter(os.Stdout, smudgeFilterBufferCapacity)
			if req.Header["can-delay"] == "1" {
				var ptr *lfs.Pointer

				n, delayed, ptr, err = delayedSmudge(gitfilter, s, w, req.Payload, q, req.Header["pathname"], skip, filter)

				if delayed {
					ptrs[req.Header["pathname"]] = ptr
				}
			} else {
				s.WriteStatus(statusFromErr(nil))
				from, ferr := incomingOrCached(req.Payload, ptrs[req.Header["pathname"]])
				if ferr != nil {
					break
				}

				n, err = smudge(gitfilter, w, from, req.Header["pathname"], skip, filter)
				if err == nil {
					delete(ptrs, req.Header["pathname"])
				}
			}
		case "list_available_blobs":
			closeOnce.Do(func() {
				// The first time that Git sends us the
				// 'list_available_blobs' command, it is given
				// that now it waiting until all delayed blobs
				// are available within this smudge filter call
				//
				// This means that, by the time that we're here,
				// we have seen all entries in the checkout, and
				// should therefore instruct the transfer queue
				// to make a batch out of whatever remaining
				// items it has, and then close itself.
				//
				// This function call is wrapped in a
				// `sync.(*Once).Do()` call so we only call
				// `q.Wait()` once, and is called via a
				// goroutine since `q.Wait()` is blocking.
				go q.Wait()
			})

			// The first, and all subsequent calls to
			// list_available_blobs, we read items from `tq.Watch()`
			// until a read from that channel becomes blocking (in
			// other words, we read until there are no more items
			// immediately ready to be sent back to Git).
			paths := pathnames(readAvailable(available, q.BatchSize()))
			if len(paths) == 0 {
				// If `len(paths) == 0`, `tq.Watch()` has
				// closed, indicating that all items have been
				// completely processed, and therefore, sent
				// back to Git for checkout.
				for path, _ := range ptrs {
					// If we sent a path to Git but it
					// didn't ask for the smudge contents,
					// that path is available and Git should
					// accept it later.
					paths = append(paths, fmt.Sprintf("pathname=%s", path))
				}
				// At this point all items have been completely processed,
				// so we explicitly close transfer queue. If Git issues
				// another `smudge` command the transfer queue will be
				// created from scratch. Transfer queue needs to be recreated
				// because it has been already partially closed by `q.Wait()`
				q = nil
			}
			err = s.WriteList(paths)
		default:
			ExitWithError(errors.New(tr.Tr.Get("unknown command %q", req.Header["command"])))
		}

		if errors.IsNotAPointerError(err) {
			malformed = append(malformed, req.Header["pathname"])
			err = nil
		} else if possiblyMalformedObjectSize(n) {
			malformedOnWindows = append(malformedOnWindows, req.Header["pathname"])
		}

		var status git.FilterProcessStatus
		if delayed {
			// If delayed, there is no need to call w.Flush() since
			// no data was written. Calculate the status from the
			// given error using 'delayedStatusFromErr'.
			status = delayedStatusFromErr(err)
		} else if ferr := w.Flush(); ferr != nil {
			// Otherwise, we do need to call w.Flush(), since we
			// have to assume that data was written. If the flush
			// operation was unsuccessful, calculate the status
			// using 'statusFromErr'.
			status = statusFromErr(ferr)
		} else {
			// If the above flush was successful, we calculate the
			// status from the above clean, smudge, or
			// list_available_blobs command using statusFromErr,
			// since we did not delay.
			status = statusFromErr(err)
		}

		s.WriteStatus(status)
	}

	if len(malformed) > 0 {
		fmt.Fprintln(os.Stderr, tr.Tr.GetN(
			"Encountered %d file that should have been a pointer, but wasn't:",
			"Encountered %d files that should have been pointers, but weren't:",
			len(malformed),
			len(malformed),
		))
		for _, m := range malformed {
			fmt.Fprintf(os.Stderr, "\t%s\n", m)
		}
	}

	if len(malformedOnWindows) > 0 && cfg.Git.Bool("lfs.largefilewarning", !git.IsGitVersionAtLeast("2.34.0")) {
		fmt.Fprintln(os.Stderr, tr.Tr.GetN(
			"Encountered %d file that may not have been copied correctly on Windows:",
			"Encountered %d files that may not have been copied correctly on Windows:",
			len(malformedOnWindows),
			len(malformedOnWindows),
		))

		for _, m := range malformedOnWindows {
			fmt.Fprintf(os.Stderr, "\t%s\n", m)
		}

		fmt.Fprint(os.Stderr, "\n", tr.Tr.Get("See: `git lfs help smudge` for more details."), "\n")
	}

	if err := s.Err(); err != nil && err != io.EOF {
		ExitWithError(err)
	}
}

// infiniteTransferBuffer streams the results of q.Watch() into "available" as
// if available had an infinite channel buffer.
func infiniteTransferBuffer(q *tq.TransferQueue, available chan<- *tq.Transfer) {
	// Stream results from q.Watch() into chan "available" via an infinite
	// buffer.

	watch := q.Watch()

	// pending is used to keep track of an ordered list of available
	// `*tq.Transfer`'s that cannot be written to "available" without
	// blocking.
	var pending []*tq.Transfer

	for {
		if len(pending) > 0 {
			select {
			case t, ok := <-watch:
				if !ok {
					// If the list of pending elements is
					// non-empty, stream them out (even if
					// they block), and then close().
					for _, t = range pending {
						available <- t
					}
					close(available)
					return
				}
				pending = append(pending, t)
			case available <- pending[0]:
				// Otherwise, dequeue and shift the first
				// element from pending onto available.
				pending = pending[1:]
			}
		} else {
			t, ok := <-watch
			if !ok {
				// If watch is closed, the "tq" is done, and
				// there are no items on the buffer.  Return
				// immediately.
				close(available)
				return
			}

			select {
			case available <- t:
			// Copy an item directly from <-watch onto available<-.
			default:
				// Otherwise, if that would have blocked, make
				// the new read pending.
				pending = append(pending, t)
			}
		}
	}
}

// incomingOrCached returns an io.Reader that is either the contents of the
// given io.Reader "r", or the encoded contents of "ptr". It returns an error if
// there was an error reading from "r".
//
// This is done because when a `command=smudge` with `can-delay=0` is issued,
// the entry's contents are not sent, and must be re-encoded from the stored
// pointer corresponding to the request's filepath.
func incomingOrCached(r io.Reader, ptr *lfs.Pointer) (io.Reader, error) {
	buf := make([]byte, 1024)
	n, err := r.Read(buf)
	buf = buf[:n]

	if n == 0 {
		if ptr == nil {
			// If we read no data from the given io.Reader "r" _and_
			// there was no data to fall back on, return an empty
			// io.Reader yielding no data.
			return bytes.NewReader(buf), nil
		}
		// If we read no data from the given io.Reader "r", _and_ there
		// is a pointer that we can fall back on, return an io.Reader
		// that yields the encoded version of the given pointer.
		return strings.NewReader(ptr.Encoded()), nil
	}

	if err == io.EOF {
		return bytes.NewReader(buf), nil
	}
	return io.MultiReader(bytes.NewReader(buf), r), err
}

// readAvailable satisfies the accumulation semantics for the
// 'list_available_blobs' command. It accumulates items until:
//
// 1. Reading from the channel of available items blocks, or ...
// 2. There is one item available, or ...
// 3. The 'tq.TransferQueue' is completed.
func readAvailable(ch <-chan *tq.Transfer, cap int) []*tq.Transfer {
	ts := make([]*tq.Transfer, 0, cap)

	for {
		select {
		case t, ok := <-ch:
			if !ok {
				return ts
			}
			ts = append(ts, t)
		default:
			if len(ts) > 0 {
				return ts
			}

			t, ok := <-ch
			if !ok {
				return ts
			}
			return append(ts, t)
		}
	}
}

// pathnames formats a list of *tq.Transfers as a valid response to the
// 'list_available_blobs' command.
func pathnames(ts []*tq.Transfer) []string {
	pathnames := make([]string, 0, len(ts))
	for _, t := range ts {
		pathnames = append(pathnames, fmt.Sprintf("pathname=%s", t.Name))
	}

	return pathnames
}

// statusFromErr returns the status code that should be sent over the filter
// protocol based on a given error, "err".
func statusFromErr(err error) git.FilterProcessStatus {
	if err != nil && err != io.EOF {
		return git.StatusError
	}
	return git.StatusSuccess
}

// delayedStatusFromErr returns the status code that should be sent over the
// filter protocol based on a given error, "err" when the blob smudge operation
// was delayed.
func delayedStatusFromErr(err error) git.FilterProcessStatus {
	status := statusFromErr(err)

	switch status {
	case git.StatusSuccess:
		return git.StatusDelay
	default:
		return status
	}
}

func init() {
	RegisterCommand("filter-process", filterCommand, func(cmd *cobra.Command) {
		cmd.Flags().BoolVarP(&filterSmudgeSkip, "skip", "s", false, "")
	})
}