1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
|
package commands
import (
"bytes"
"fmt"
"io"
"os"
"strings"
"sync"
"github.com/git-lfs/git-lfs/v3/errors"
"github.com/git-lfs/git-lfs/v3/filepathfilter"
"github.com/git-lfs/git-lfs/v3/git"
"github.com/git-lfs/git-lfs/v3/lfs"
"github.com/git-lfs/git-lfs/v3/tq"
"github.com/git-lfs/git-lfs/v3/tr"
"github.com/git-lfs/pktline"
"github.com/spf13/cobra"
)
const (
// cleanFilterBufferCapacity is the desired capacity of the
// `*git.PacketWriter`'s internal buffer when the filter protocol
// dictates the "clean" command. 512 bytes is (in most cases) enough to
// hold an entire LFS pointer in memory.
cleanFilterBufferCapacity = 512
// smudgeFilterBufferCapacity is the desired capacity of the
// `*git.PacketWriter`'s internal buffer when the filter protocol
// dictates the "smudge" command.
smudgeFilterBufferCapacity = pktline.MaxPacketLength
)
// filterSmudgeSkip is a command-line flag owned by the `filter-process` command
// dictating whether or not to skip the smudging process, leaving pointers as-is
// in the working tree.
var filterSmudgeSkip bool
func filterCommand(cmd *cobra.Command, args []string) {
requireStdin(tr.Tr.Get("This command should be run by the Git filter process"))
setupRepository()
installHooks(false)
s := git.NewFilterProcessScanner(os.Stdin, os.Stdout)
if err := s.Init(); err != nil {
ExitWithError(err)
}
caps, err := s.NegotiateCapabilities()
if err != nil {
ExitWithError(err)
}
var supportsDelay bool
for _, cap := range caps {
if cap == "capability=delay" {
supportsDelay = true
break
}
}
skip := filterSmudgeSkip || cfg.Os.Bool("GIT_LFS_SKIP_SMUDGE", false)
filter := filepathfilter.New(cfg.FetchIncludePaths(), cfg.FetchExcludePaths(), filepathfilter.GitIgnore)
ptrs := make(map[string]*lfs.Pointer)
var q *tq.TransferQueue
var malformed []string
var malformedOnWindows []string
var closeOnce *sync.Once
var available chan *tq.Transfer
gitfilter := lfs.NewGitFilter(cfg)
for s.Scan() {
var n int64
var err error
var delayed bool
var w *pktline.PktlineWriter
req := s.Request()
switch req.Header["command"] {
case "clean":
s.WriteStatus(statusFromErr(nil))
w = pktline.NewPktlineWriter(os.Stdout, cleanFilterBufferCapacity)
var ptr *lfs.Pointer
ptr, err = clean(gitfilter, w, req.Payload, req.Header["pathname"], -1)
if ptr != nil {
n = ptr.Size
}
case "smudge":
if q == nil && supportsDelay {
closeOnce = new(sync.Once)
available = make(chan *tq.Transfer)
if cfg.AutoDetectRemoteEnabled() {
// update current remote with information gained by treeish
newRemote := git.FirstRemoteForTreeish(req.Header["treeish"])
if newRemote != "" {
cfg.SetRemote(newRemote)
}
}
q = tq.NewTransferQueue(
tq.Download,
getTransferManifestOperationRemote("download", cfg.Remote()),
cfg.Remote(),
tq.RemoteRef(currentRemoteRef()),
tq.WithBatchSize(cfg.TransferBatchSize()),
)
go infiniteTransferBuffer(q, available)
}
w = pktline.NewPktlineWriter(os.Stdout, smudgeFilterBufferCapacity)
if req.Header["can-delay"] == "1" {
var ptr *lfs.Pointer
n, delayed, ptr, err = delayedSmudge(gitfilter, s, w, req.Payload, q, req.Header["pathname"], skip, filter)
if delayed {
ptrs[req.Header["pathname"]] = ptr
}
} else {
s.WriteStatus(statusFromErr(nil))
from, ferr := incomingOrCached(req.Payload, ptrs[req.Header["pathname"]])
if ferr != nil {
break
}
n, err = smudge(gitfilter, w, from, req.Header["pathname"], skip, filter)
if err == nil {
delete(ptrs, req.Header["pathname"])
}
}
case "list_available_blobs":
closeOnce.Do(func() {
// The first time that Git sends us the
// 'list_available_blobs' command, it is given
// that now it waiting until all delayed blobs
// are available within this smudge filter call
//
// This means that, by the time that we're here,
// we have seen all entries in the checkout, and
// should therefore instruct the transfer queue
// to make a batch out of whatever remaining
// items it has, and then close itself.
//
// This function call is wrapped in a
// `sync.(*Once).Do()` call so we only call
// `q.Wait()` once, and is called via a
// goroutine since `q.Wait()` is blocking.
go q.Wait()
})
// The first, and all subsequent calls to
// list_available_blobs, we read items from `tq.Watch()`
// until a read from that channel becomes blocking (in
// other words, we read until there are no more items
// immediately ready to be sent back to Git).
paths := pathnames(readAvailable(available, q.BatchSize()))
if len(paths) == 0 {
// If `len(paths) == 0`, `tq.Watch()` has
// closed, indicating that all items have been
// completely processed, and therefore, sent
// back to Git for checkout.
for path, _ := range ptrs {
// If we sent a path to Git but it
// didn't ask for the smudge contents,
// that path is available and Git should
// accept it later.
paths = append(paths, fmt.Sprintf("pathname=%s", path))
}
// At this point all items have been completely processed,
// so we explicitly close transfer queue. If Git issues
// another `smudge` command the transfer queue will be
// created from scratch. Transfer queue needs to be recreated
// because it has been already partially closed by `q.Wait()`
q = nil
}
err = s.WriteList(paths)
default:
ExitWithError(errors.New(tr.Tr.Get("unknown command %q", req.Header["command"])))
}
if errors.IsNotAPointerError(err) {
malformed = append(malformed, req.Header["pathname"])
err = nil
} else if possiblyMalformedObjectSize(n) {
malformedOnWindows = append(malformedOnWindows, req.Header["pathname"])
}
var status git.FilterProcessStatus
if delayed {
// If delayed, there is no need to call w.Flush() since
// no data was written. Calculate the status from the
// given error using 'delayedStatusFromErr'.
status = delayedStatusFromErr(err)
} else if ferr := w.Flush(); ferr != nil {
// Otherwise, we do need to call w.Flush(), since we
// have to assume that data was written. If the flush
// operation was unsuccessful, calculate the status
// using 'statusFromErr'.
status = statusFromErr(ferr)
} else {
// If the above flush was successful, we calculate the
// status from the above clean, smudge, or
// list_available_blobs command using statusFromErr,
// since we did not delay.
status = statusFromErr(err)
}
s.WriteStatus(status)
}
if len(malformed) > 0 {
fmt.Fprintln(os.Stderr, tr.Tr.GetN(
"Encountered %d file that should have been a pointer, but wasn't:",
"Encountered %d files that should have been pointers, but weren't:",
len(malformed),
len(malformed),
))
for _, m := range malformed {
fmt.Fprintf(os.Stderr, "\t%s\n", m)
}
}
if len(malformedOnWindows) > 0 && cfg.Git.Bool("lfs.largefilewarning", !git.IsGitVersionAtLeast("2.34.0")) {
fmt.Fprintln(os.Stderr, tr.Tr.GetN(
"Encountered %d file that may not have been copied correctly on Windows:",
"Encountered %d files that may not have been copied correctly on Windows:",
len(malformedOnWindows),
len(malformedOnWindows),
))
for _, m := range malformedOnWindows {
fmt.Fprintf(os.Stderr, "\t%s\n", m)
}
fmt.Fprint(os.Stderr, "\n", tr.Tr.Get("See: `git lfs help smudge` for more details."), "\n")
}
if err := s.Err(); err != nil && err != io.EOF {
ExitWithError(err)
}
}
// infiniteTransferBuffer streams the results of q.Watch() into "available" as
// if available had an infinite channel buffer.
func infiniteTransferBuffer(q *tq.TransferQueue, available chan<- *tq.Transfer) {
// Stream results from q.Watch() into chan "available" via an infinite
// buffer.
watch := q.Watch()
// pending is used to keep track of an ordered list of available
// `*tq.Transfer`'s that cannot be written to "available" without
// blocking.
var pending []*tq.Transfer
for {
if len(pending) > 0 {
select {
case t, ok := <-watch:
if !ok {
// If the list of pending elements is
// non-empty, stream them out (even if
// they block), and then close().
for _, t = range pending {
available <- t
}
close(available)
return
}
pending = append(pending, t)
case available <- pending[0]:
// Otherwise, dequeue and shift the first
// element from pending onto available.
pending = pending[1:]
}
} else {
t, ok := <-watch
if !ok {
// If watch is closed, the "tq" is done, and
// there are no items on the buffer. Return
// immediately.
close(available)
return
}
select {
case available <- t:
// Copy an item directly from <-watch onto available<-.
default:
// Otherwise, if that would have blocked, make
// the new read pending.
pending = append(pending, t)
}
}
}
}
// incomingOrCached returns an io.Reader that is either the contents of the
// given io.Reader "r", or the encoded contents of "ptr". It returns an error if
// there was an error reading from "r".
//
// This is done because when a `command=smudge` with `can-delay=0` is issued,
// the entry's contents are not sent, and must be re-encoded from the stored
// pointer corresponding to the request's filepath.
func incomingOrCached(r io.Reader, ptr *lfs.Pointer) (io.Reader, error) {
buf := make([]byte, 1024)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
if ptr == nil {
// If we read no data from the given io.Reader "r" _and_
// there was no data to fall back on, return an empty
// io.Reader yielding no data.
return bytes.NewReader(buf), nil
}
// If we read no data from the given io.Reader "r", _and_ there
// is a pointer that we can fall back on, return an io.Reader
// that yields the encoded version of the given pointer.
return strings.NewReader(ptr.Encoded()), nil
}
if err == io.EOF {
return bytes.NewReader(buf), nil
}
return io.MultiReader(bytes.NewReader(buf), r), err
}
// readAvailable satisfies the accumulation semantics for the
// 'list_available_blobs' command. It accumulates items until:
//
// 1. Reading from the channel of available items blocks, or ...
// 2. There is one item available, or ...
// 3. The 'tq.TransferQueue' is completed.
func readAvailable(ch <-chan *tq.Transfer, cap int) []*tq.Transfer {
ts := make([]*tq.Transfer, 0, cap)
for {
select {
case t, ok := <-ch:
if !ok {
return ts
}
ts = append(ts, t)
default:
if len(ts) > 0 {
return ts
}
t, ok := <-ch
if !ok {
return ts
}
return append(ts, t)
}
}
}
// pathnames formats a list of *tq.Transfers as a valid response to the
// 'list_available_blobs' command.
func pathnames(ts []*tq.Transfer) []string {
pathnames := make([]string, 0, len(ts))
for _, t := range ts {
pathnames = append(pathnames, fmt.Sprintf("pathname=%s", t.Name))
}
return pathnames
}
// statusFromErr returns the status code that should be sent over the filter
// protocol based on a given error, "err".
func statusFromErr(err error) git.FilterProcessStatus {
if err != nil && err != io.EOF {
return git.StatusError
}
return git.StatusSuccess
}
// delayedStatusFromErr returns the status code that should be sent over the
// filter protocol based on a given error, "err" when the blob smudge operation
// was delayed.
func delayedStatusFromErr(err error) git.FilterProcessStatus {
status := statusFromErr(err)
switch status {
case git.StatusSuccess:
return git.StatusDelay
default:
return status
}
}
func init() {
RegisterCommand("filter-process", filterCommand, func(cmd *cobra.Command) {
cmd.Flags().BoolVarP(&filterSmudgeSkip, "skip", "s", false, "")
})
}
|