1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
|
package tq
import (
"fmt"
"os"
"sort"
"sync"
"time"
"github.com/git-lfs/git-lfs/v3/errors"
"github.com/git-lfs/git-lfs/v3/git"
"github.com/git-lfs/git-lfs/v3/lfshttp"
"github.com/git-lfs/git-lfs/v3/tools"
"github.com/git-lfs/git-lfs/v3/tr"
"github.com/rubyist/tracerx"
)
const (
defaultBatchSize = 100
baseRetryDelayMs = 250
)
type retryCounter struct {
MaxRetries int
MaxRetryDelay int
// cmu guards count
cmu sync.Mutex
// count maps OIDs to number of retry attempts
count map[string]int
}
// newRetryCounter instantiates a new *retryCounter.
func newRetryCounter() *retryCounter {
return &retryCounter{
MaxRetries: defaultMaxRetries,
MaxRetryDelay: defaultMaxRetryDelay,
count: make(map[string]int),
}
}
// Increment increments the number of retries for a given OID and returns the
// new value. It is safe to call across multiple goroutines.
func (r *retryCounter) Increment(oid string) int {
r.cmu.Lock()
defer r.cmu.Unlock()
r.count[oid]++
return r.count[oid]
}
// CountFor returns the current number of retries for a given OID. It is safe to
// call across multiple goroutines.
func (r *retryCounter) CountFor(oid string) int {
r.cmu.Lock()
defer r.cmu.Unlock()
return r.count[oid]
}
// CanRetry returns the current number of retries, and whether or not it exceeds
// the maximum number of retries (see: retryCounter.MaxRetries).
func (r *retryCounter) CanRetry(oid string) (int, bool) {
count := r.CountFor(oid)
return count, count < r.MaxRetries
}
// ReadyTime returns the time from now when the current retry can occur or the
// zero time if the retry can occur immediately.
func (r *retryCounter) ReadyTime(oid string) time.Time {
count := r.CountFor(oid)
if count < 1 {
return time.Time{}
}
maxDelayMs := 1000 * uint64(r.MaxRetryDelay)
delay := uint64(baseRetryDelayMs) * (1 << uint(count-1))
if delay == 0 || delay > maxDelayMs {
delay = maxDelayMs
}
return time.Now().Add(time.Duration(delay) * time.Millisecond)
}
// batch implements the sort.Interface interface and enables sorting on a slice
// of `*Transfer`s by object size.
//
// This interface is implemented here so that the largest objects can be
// processed first. Since adding a new batch is unable to occur until the
// current batch has finished processing, this enables us to reduce the risk of
// a single worker getting tied up on a large item at the end of a batch while
// all other workers are sitting idle.
type batch []*objectTuple
// Concat concatenates two batches together, returning a single, clamped batch as
// "left", and the remainder of elements as "right". If the union of the
// receiver and "other" has cardinality less than "size", "right" will be
// returned as nil. Any object tuple that is not currently able to be retried
// (ie Retry-After response), will also go into the right batch. Also, when object(s)
// are returned that are rate-limited, return the minimum duration required to wait until
// a object is ready.
func (b batch) Concat(other batch, size int) (left, right batch, minWait time.Duration) {
u := batch(append(b, other...))
for _, ot := range u {
if time.Now().After(ot.ReadyTime) {
// The current time is past the time the object should
// be available.
left = append(left, ot)
} else {
// The time hasn't passed for the object.
right = append(right, ot)
wait := time.Until(ot.ReadyTime)
if minWait == 0 {
minWait = wait
} else if wait < minWait {
minWait = wait
}
}
}
if len(left) <= size {
// If the size of left fits the given size limit, return with no adjustments.
return left, right, minWait
}
// If left is too large, trip left up to size and append the rest to right.
right = append(right, left[size:]...)
left = left[:size]
return left, right, minWait
}
func (b batch) ToTransfers() []*Transfer {
transfers := make([]*Transfer, 0, len(b))
for _, t := range b {
transfers = append(transfers, &Transfer{Oid: t.Oid, Size: t.Size, Missing: t.Missing})
}
return transfers
}
func (b batch) Len() int { return len(b) }
func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
type abortableWaitGroup struct {
wq sync.WaitGroup
counter int
mu sync.Mutex
abort bool
}
func newAbortableWaitGroup() *abortableWaitGroup {
return &abortableWaitGroup{}
}
func (q *abortableWaitGroup) Add(delta int) {
q.mu.Lock()
defer q.mu.Unlock()
if !q.abort {
q.counter += delta
q.wq.Add(delta)
}
}
func (q *abortableWaitGroup) Done() {
q.mu.Lock()
defer q.mu.Unlock()
if !q.abort {
q.counter -= 1
q.wq.Done()
}
}
func (q *abortableWaitGroup) Abort() {
q.mu.Lock()
defer q.mu.Unlock()
q.abort = true
q.wq.Add(-q.counter)
}
func (q *abortableWaitGroup) Wait() {
q.wq.Wait()
}
// TransferQueue organises the wider process of uploading and downloading,
// including calling the API, passing the actual transfer request to transfer
// adapters, and dealing with progress, errors and retries.
type TransferQueue struct {
direction Direction
client *tqClient
remote string
ref *git.Ref
adapter Adapter
adapterInProgress bool
adapterInitMutex sync.Mutex
dryRun bool
cb tools.CopyCallback
meter *Meter
errors []error
transfers map[string]*objects
batchSize int
bufferDepth int
incoming chan *objectTuple // Channel for processing incoming items
errorc chan error // Channel for processing errors
watchers []chan *Transfer
trMutex *sync.Mutex
collectorWait sync.WaitGroup
errorwait sync.WaitGroup
// wait is used to keep track of pending transfers. It is incremented
// once per unique OID on Add(), and is decremented when that transfer
// is marked as completed or failed, but not retried.
wait *abortableWaitGroup
manifest Manifest
rc *retryCounter
// unsupportedContentType indicates whether the transfer queue ever saw
// an HTTP 422 response indicating that their upload destination does
// not support Content-Type detection.
unsupportedContentType bool
}
// objects holds a set of objects.
type objects struct {
completed bool
objects []*objectTuple
}
// All returns all *objectTuple's contained in the *objects set.
func (s *objects) All() []*objectTuple {
return s.objects
}
// Append returns a new *objects with the given *objectTuple(s) appended to the
// end of the known objects.
func (s *objects) Append(os ...*objectTuple) *objects {
return &objects{
completed: s.completed,
objects: append(s.objects, os...),
}
}
// First returns the first *objectTuple in the chain of objects.
func (s *objects) First() *objectTuple {
if len(s.objects) == 0 {
return nil
}
return s.objects[0]
}
type objectTuple struct {
Name, Path, Oid string
Size int64
Missing bool
ReadyTime time.Time
}
func (o *objectTuple) ToTransfer() *Transfer {
return &Transfer{
Name: o.Name,
Path: o.Path,
Oid: o.Oid,
Size: o.Size,
Missing: o.Missing,
}
}
type Option func(*TransferQueue)
func DryRun(dryRun bool) Option {
return func(tq *TransferQueue) {
tq.dryRun = dryRun
}
}
func WithProgress(m *Meter) Option {
return func(tq *TransferQueue) {
tq.meter = m
}
}
func RemoteRef(ref *git.Ref) Option {
return func(tq *TransferQueue) {
tq.ref = ref
}
}
func WithProgressCallback(cb tools.CopyCallback) Option {
return func(tq *TransferQueue) {
tq.cb = cb
}
}
func WithBatchSize(size int) Option {
return func(tq *TransferQueue) { tq.batchSize = size }
}
func WithBufferDepth(depth int) Option {
return func(tq *TransferQueue) { tq.bufferDepth = depth }
}
// NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func NewTransferQueue(dir Direction, manifest Manifest, remote string, options ...Option) *TransferQueue {
q := &TransferQueue{
direction: dir,
remote: remote,
errorc: make(chan error),
transfers: make(map[string]*objects),
trMutex: &sync.Mutex{},
manifest: manifest,
rc: newRetryCounter(),
wait: newAbortableWaitGroup(),
}
for _, opt := range options {
opt(q)
}
if q.batchSize <= 0 {
q.batchSize = defaultBatchSize
}
if q.bufferDepth <= 0 {
q.bufferDepth = q.batchSize
}
if q.meter != nil {
q.meter.Direction = q.direction
}
q.incoming = make(chan *objectTuple, q.bufferDepth)
q.collectorWait.Add(1)
q.errorwait.Add(1)
q.run()
return q
}
// Ensure we have a concrete manifest and that certain delayed variables are set
// properly.
func (q *TransferQueue) Upgrade() {
if q.client == nil {
manifest := q.manifest.Upgrade()
q.client = &tqClient{Client: manifest.APIClient()}
q.rc.MaxRetries = manifest.maxRetries
q.rc.MaxRetryDelay = manifest.maxRetryDelay
q.client.SetMaxRetries(manifest.maxRetries)
}
}
// Add adds a *Transfer to the transfer queue. It only increments the amount
// of waiting the TransferQueue has to do if the *Transfer "t" is new.
//
// If another transfer(s) with the same OID has been added to the *TransferQueue
// already, the given transfer will not be enqueued, but will be sent to any
// channel created by Watch() once the oldest transfer has completed.
//
// Only one file will be transferred to/from the Path element of the first
// transfer.
func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, err error) {
q.Upgrade()
if err != nil {
q.errorc <- err
return
}
t := &objectTuple{
Name: name,
Path: path,
Oid: oid,
Size: size,
Missing: missing,
}
if objs := q.remember(t); len(objs.objects) > 1 {
if objs.completed {
// If there is already a completed transfer chain for
// this OID, then this object is already "done", and can
// be sent through as completed to the watchers.
for _, w := range q.watchers {
w <- t.ToTransfer()
}
}
// If the chain is not done, there is no reason to enqueue this
// transfer into 'q.incoming'.
tracerx.Printf("already transferring %q, skipping duplicate", t.Oid)
return
}
q.incoming <- t
}
// remember remembers the *Transfer "t" if the *TransferQueue doesn't already
// know about a Transfer with the same OID.
//
// It returns if the value is new or not.
func (q *TransferQueue) remember(t *objectTuple) objects {
q.Upgrade()
q.trMutex.Lock()
defer q.trMutex.Unlock()
if _, ok := q.transfers[t.Oid]; !ok {
q.wait.Add(1)
q.transfers[t.Oid] = &objects{
objects: []*objectTuple{t},
}
return *q.transfers[t.Oid]
}
q.transfers[t.Oid] = q.transfers[t.Oid].Append(t)
return *q.transfers[t.Oid]
}
// collectBatches collects batches in a loop, prioritizing failed items from the
// previous before adding new items. The process works as follows:
//
// 1. Create a new batch, of size `q.batchSize`, and containing no items
// 2. While the batch contains less items than `q.batchSize` AND the channel
// is open, read one item from the `q.incoming` channel.
// a. If the read was a channel close, go to step 4.
// b. If the read was a transferable item, go to step 3.
// 3. Append the item to the batch.
// 4. Sort the batch by descending object size, make a batch API call, send
// the items to the `*adapterBase`.
// 5. In a separate goroutine, process the worker results, incrementing and
// appending retries if possible. On the main goroutine, accept new items
// into "pending".
// 6. Concat() the "next" and "pending" batches such that no more items than
// the maximum allowed per batch are in next, and the rest are in pending.
// 7. If the `q.incoming` channel is open, go to step 2.
// 8. If the next batch is empty AND the `q.incoming` channel is closed,
// terminate immediately.
//
// collectBatches runs in its own goroutine.
func (q *TransferQueue) collectBatches() {
defer q.collectorWait.Done()
var closing bool
next := q.makeBatch()
pending := q.makeBatch()
for {
for !closing && (len(next) < q.batchSize) {
t, ok := <-q.incoming
if !ok {
closing = true
break
}
next = append(next, t)
}
// Before enqueuing the next batch, sort by descending object
// size.
sort.Sort(sort.Reverse(next))
done := make(chan struct{})
var retries batch
var err error
go func() {
defer close(done)
if len(next) == 0 {
return
}
retries, err = q.enqueueAndCollectRetriesFor(next)
if err != nil {
q.errorc <- err
}
}()
var collected batch
collected, closing = q.collectPendingUntil(done)
// If we've encountered a serious error here, abort immediately;
// don't process further batches. Abort the wait queue so that
// we don't deadlock waiting for objects to complete when they
// never will.
if err != nil && !errors.IsRetriableError(err) {
q.wait.Abort()
break
}
// Ensure the next batch is filled with, in order:
//
// - retries from the previous batch,
// - new additions that were enqueued behind retries, &
// - items collected while the batch was processing.
var minWaitTime time.Duration
next, pending, minWaitTime = retries.Concat(append(pending, collected...), q.batchSize)
if len(next) == 0 && len(pending) != 0 {
// There are some pending that could not be queued.
// Wait the requested time before resuming loop.
time.Sleep(minWaitTime)
} else if len(next) == 0 && len(pending) == 0 && closing {
// There are no items remaining, it is safe to break
break
}
}
}
// collectPendingUntil collects items from q.incoming into a "pending" batch
// until the given "done" channel is written to, or is closed.
//
// A "pending" batch is returned, along with whether or not "q.incoming" is
// closed.
func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) {
q.Upgrade()
for {
select {
case t, ok := <-q.incoming:
if !ok {
closing = true
<-done
return
}
pending = append(pending, t)
case <-done:
return
}
}
}
// enqueueAndCollectRetriesFor makes a Batch API call and returns a "next" batch
// containing all of the objects that failed from the previous batch and had
// retries available to them.
//
// If an error was encountered while making the API request, _all_ of the items
// from the previous batch (that have retries available to them) will be
// returned immediately, along with the error that was encountered.
//
// enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
// processed.
func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
q.Upgrade()
next := q.makeBatch()
tracerx.Printf("tq: sending batch of size %d", len(batch))
enqueueRetry := func(t *objectTuple, err error, readyTime *time.Time) {
count := q.rc.Increment(t.Oid)
if readyTime == nil {
t.ReadyTime = q.rc.ReadyTime(t.Oid)
} else {
t.ReadyTime = *readyTime
}
delay := time.Until(t.ReadyTime).Seconds()
var errMsg string
if err != nil {
errMsg = fmt.Sprintf(": %s", err)
}
tracerx.Printf("tq: enqueue retry #%d after %.2fs for %q (size: %d)%s", count, delay, t.Oid, t.Size, errMsg)
next = append(next, t)
}
q.meter.Pause()
var bRes *BatchResponse
manifest := q.manifest.Upgrade()
if manifest.standaloneTransferAgent != "" {
// Trust the external transfer agent can do everything by itself.
objects := make([]*Transfer, 0, len(batch))
for _, t := range batch {
objects = append(objects, &Transfer{Oid: t.Oid, Size: t.Size, Path: t.Path, Missing: t.Missing})
}
bRes = &BatchResponse{
Objects: objects,
TransferAdapterName: manifest.standaloneTransferAgent,
}
} else {
// Query the Git LFS server for what transfer method to use and
// details such as URLs, authentication, etc.
var err error
bRes, err = Batch(q.manifest, q.direction, q.remote, q.ref, batch.ToTransfers())
if err != nil {
var hasNonRetriableObjects = false
// If there was an error making the batch API call, mark all of
// the objects for retry if possible. If any should not be retried,
// they will be marked as failed.
for _, t := range batch {
if q.canRetryObject(t.Oid, err) {
enqueueRetry(t, err, nil)
} else if readyTime, canRetry := q.canRetryObjectLater(t.Oid, err); canRetry {
enqueueRetry(t, err, &readyTime)
} else {
hasNonRetriableObjects = true
q.wait.Done()
}
}
// Only return error and mark operation as failure if at least one object
// was not enqueued for retrial at a later point.
// Make sure to return an error which causes all other objects to be retried.
if hasNonRetriableObjects {
return next, errors.NewRetriableError(err)
} else {
return next, nil
}
}
}
if len(bRes.Objects) == 0 {
return next, nil
}
// We check first that all of the objects we want to upload are present,
// and abort if any are missing. We'll never have any objects marked as
// missing except possibly on upload, so just skip iterating over the
// objects in that case.
if q.direction == Upload {
for _, o := range bRes.Objects {
// If the server already has the object, the list of
// actions will be empty. It's fine if the file is
// missing in that case, since we don't need to upload
// it.
if o.Missing && len(o.Actions) != 0 {
return nil, errors.New(tr.Tr.Get("Unable to find source for object %v (try running `git lfs fetch --all`)", o.Oid))
}
}
}
q.useAdapter(bRes.TransferAdapterName)
q.meter.Start()
toTransfer := make([]*Transfer, 0, len(bRes.Objects))
for _, o := range bRes.Objects {
if o.Error != nil {
q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
q.Skip(o.Size)
q.wait.Done()
continue
}
q.trMutex.Lock()
objects, ok := q.transfers[o.Oid]
q.trMutex.Unlock()
if !ok {
// If we couldn't find any associated
// Transfer object, then we give up on the
// transfer by telling the progress meter to
// skip the number of bytes in "o".
q.errorc <- errors.Errorf(tr.Tr.Get("[%v] The server returned an unknown OID.", o.Oid))
q.Skip(o.Size)
q.wait.Done()
} else {
// Pick t[0], since it will cover all transfers with the
// same OID.
tr := newTransfer(o, objects.First().Name, objects.First().Path)
if a, err := tr.Rel(q.direction.String()); err != nil {
if q.canRetryObject(tr.Oid, err) {
enqueueRetry(objects.First(), err, nil)
} else {
q.errorc <- errors.Errorf("[%v] %v", tr.Name, err)
q.Skip(o.Size)
q.wait.Done()
}
} else if a == nil && manifest.standaloneTransferAgent == "" {
q.Skip(o.Size)
q.wait.Done()
} else {
q.meter.StartTransfer(objects.First().Name)
toTransfer = append(toTransfer, tr)
}
}
}
retries := q.addToAdapter(bRes.endpoint, toTransfer)
for t := range retries {
enqueueRetry(t, nil, nil)
}
return next, nil
}
// makeBatch returns a new, empty batch, with a capacity equal to the maximum
// batch size designated by the `*TransferQueue`.
func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
// addToAdapter adds the given "pending" transfers to the transfer adapters and
// returns a channel of Transfers that are to be retried in the next batch.
// After all of the items in the batch have been processed, the channel is
// closed.
//
// addToAdapter returns immediately, and does not block.
func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-chan *objectTuple {
q.Upgrade()
retries := make(chan *objectTuple, len(pending))
if err := q.ensureAdapterBegun(e); err != nil {
close(retries)
q.errorc <- err
for _, t := range pending {
q.Skip(t.Size)
q.wait.Done()
}
return retries
}
present, missingResults := q.partitionTransfers(pending)
go func() {
defer close(retries)
var results <-chan TransferResult
if q.dryRun {
results = q.makeDryRunResults(present)
} else {
results = q.adapter.Add(present...)
}
for _, res := range missingResults {
q.handleTransferResult(res, retries)
}
for res := range results {
q.handleTransferResult(res, retries)
}
}()
return retries
}
func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) {
q.Upgrade()
if q.direction != Upload {
return transfers, nil
}
present = make([]*Transfer, 0, len(transfers))
results = make([]TransferResult, 0, len(transfers))
for _, t := range transfers {
var err error
if t.Size < 0 {
err = errors.Errorf(tr.Tr.Get("object %q has invalid size (got: %d)", t.Oid, t.Size))
} else {
fd, serr := os.Stat(t.Path)
if serr != nil {
if os.IsNotExist(serr) {
err = newObjectMissingError(t.Name, t.Oid)
} else {
err = serr
}
} else if t.Size != fd.Size() {
err = newCorruptObjectError(t.Name, t.Oid)
}
}
if err != nil {
results = append(results, TransferResult{
Transfer: t,
Error: err,
})
} else {
present = append(present, t)
}
}
return
}
// makeDryRunResults returns a channel populated immediately with "successful"
// results for all of the given transfers in "ts".
func (q *TransferQueue) makeDryRunResults(ts []*Transfer) <-chan TransferResult {
results := make(chan TransferResult, len(ts))
for _, t := range ts {
results <- TransferResult{t, nil}
}
close(results)
return results
}
// handleTransferResult observes the transfer result, sending it on the retries
// channel if it was able to be retried.
func (q *TransferQueue) handleTransferResult(
res TransferResult, retries chan<- *objectTuple,
) {
oid := res.Transfer.Oid
if res.Error != nil {
// If there was an error encountered when processing the
// transfer (res.Transfer), handle the error as is appropriate:
if readyTime, canRetry := q.canRetryObjectLater(oid, res.Error); canRetry {
// If the object can't be retried now, but can be
// after a certain period of time, send it to
// the retry channel with a time when it's ready.
tracerx.Printf("tq: retrying object %s after %s seconds.", oid, time.Until(readyTime).Seconds())
q.trMutex.Lock()
objects, ok := q.transfers[oid]
q.trMutex.Unlock()
if ok {
t := objects.First()
t.ReadyTime = readyTime
retries <- t
} else {
q.errorc <- res.Error
}
} else if q.canRetryObject(oid, res.Error) {
// If the object can be retried, send it on the retries
// channel, where it will be read at the call-site and
// its retry count will be incremented.
tracerx.Printf("tq: retrying object %s: %s", oid, res.Error)
q.trMutex.Lock()
objects, ok := q.transfers[oid]
q.trMutex.Unlock()
if ok {
retries <- objects.First()
} else {
q.errorc <- res.Error
}
} else {
// If the error wasn't retriable, OR the object has
// exceeded its retry budget, it will be NOT be sent to
// the retry channel, and the error will be reported
// immediately (unless the error is in response to a
// HTTP 422).
if errors.IsUnprocessableEntityError(res.Error) {
q.unsupportedContentType = true
} else {
q.errorc <- res.Error
}
q.wait.Done()
}
} else {
q.trMutex.Lock()
objects := q.transfers[oid]
objects.completed = true
// Otherwise, if the transfer was successful, notify all of the
// watchers, and mark it as finished.
for _, c := range q.watchers {
// Send one update for each transfer with the
// same OID.
for _, t := range objects.All() {
c <- &Transfer{
Name: t.Name,
Path: t.Path,
Oid: t.Oid,
Size: t.Size,
}
}
}
q.trMutex.Unlock()
q.meter.FinishTransfer(res.Transfer.Name)
q.wait.Done()
}
}
func (q *TransferQueue) useAdapter(name string) {
q.adapterInitMutex.Lock()
defer q.adapterInitMutex.Unlock()
if q.adapter != nil {
if q.adapter.Name() == name {
// re-use, this is the normal path
return
}
// If the adapter we're using isn't the same as the one we've been
// told to use now, must wait for the current one to finish then switch
// This will probably never happen but is just in case server starts
// changing adapter support in between batches
q.finishAdapter()
}
q.adapter = q.manifest.NewAdapterOrDefault(name, q.direction)
}
func (q *TransferQueue) finishAdapter() {
if q.adapterInProgress {
q.adapter.End()
q.adapterInProgress = false
q.adapter = nil
}
}
// BatchSize returns the batch size of the receiving *TransferQueue, or, the
// number of transfers to accept before beginning work on them.
func (q *TransferQueue) BatchSize() int {
return q.batchSize
}
func (q *TransferQueue) Skip(size int64) {
q.meter.Skip(size)
}
func (q *TransferQueue) ensureAdapterBegun(e lfshttp.Endpoint) error {
q.Upgrade()
q.adapterInitMutex.Lock()
defer q.adapterInitMutex.Unlock()
if q.adapterInProgress {
return nil
}
// Progress callback - receives byte updates
cb := func(name string, total, read int64, current int) error {
q.meter.TransferBytes(q.direction.String(), name, read, total, current)
if q.cb != nil {
// NOTE: this is the mechanism by which the logpath
// specified by GIT_LFS_PROGRESS is written to.
//
// See: lfs.downloadFile() for more.
q.cb(total, read, current)
}
return nil
}
tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
err := q.adapter.Begin(q.toAdapterCfg(e), cb)
if err != nil {
return err
}
q.adapterInProgress = true
return nil
}
func (q *TransferQueue) toAdapterCfg(e lfshttp.Endpoint) AdapterConfig {
apiClient := q.manifest.APIClient()
concurrency := q.manifest.ConcurrentTransfers()
return &adapterConfig{
concurrentTransfers: concurrency,
apiClient: apiClient,
remote: q.remote,
}
}
// Wait waits for the queue to finish processing all transfers. Once Wait is
// called, Add will no longer add transfers to the queue. Any failed
// transfers will be automatically retried once.
func (q *TransferQueue) Wait() {
close(q.incoming)
q.wait.Wait()
q.collectorWait.Wait()
q.finishAdapter()
close(q.errorc)
for _, watcher := range q.watchers {
close(watcher)
}
q.meter.Flush()
q.errorwait.Wait()
if q.manifest.Upgraded() {
manifest := q.manifest.Upgrade()
if manifest.sshTransfer != nil {
manifest.sshTransfer.Shutdown()
manifest.sshTransfer = nil
}
}
if q.unsupportedContentType {
fmt.Fprintln(os.Stderr, tr.Tr.Get(`info: Uploading failed due to unsupported Content-Type header(s).
info: Consider disabling Content-Type detection with:
info:
info: $ git config lfs.contenttype false`))
}
}
// Watch returns a channel where the queue will write the value of each transfer
// as it completes. If multiple transfers exist with the same OID, they will all
// be recorded here, even though only one actual transfer took place. The
// channel will be closed when the queue finishes processing.
func (q *TransferQueue) Watch() chan *Transfer {
c := make(chan *Transfer, q.batchSize)
q.watchers = append(q.watchers, c)
return c
}
// This goroutine collects errors returned from transfers
func (q *TransferQueue) errorCollector() {
for err := range q.errorc {
q.errors = append(q.errors, err)
}
q.errorwait.Done()
}
// run begins the transfer queue. It transfers files sequentially or
// concurrently depending on the Config.ConcurrentTransfers() value.
func (q *TransferQueue) run() {
tracerx.Printf("tq: running as batched queue, batch size of %d", q.batchSize)
go q.errorCollector()
go q.collectBatches()
}
// canRetry returns whether or not the given error "err" is retriable.
func (q *TransferQueue) canRetry(err error) bool {
return errors.IsRetriableError(err)
}
// canRetryLater returns the number of seconds until an error can be retried and if the error
// is a delayed-retriable error.
func (q *TransferQueue) canRetryLater(err error) (time.Time, bool) {
return errors.IsRetriableLaterError(err)
}
// canRetryObject returns whether the given error is retriable for the object
// given by "oid". If the an OID has met its retry limit, then it will not be
// able to be retried again. If so, canRetryObject returns whether or not that
// given error "err" is retriable.
func (q *TransferQueue) canRetryObject(oid string, err error) bool {
if count, ok := q.rc.CanRetry(oid); !ok {
tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
return false
}
return q.canRetry(err)
}
func (q *TransferQueue) canRetryObjectLater(oid string, err error) (time.Time, bool) {
if count, ok := q.rc.CanRetry(oid); !ok {
tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
return time.Time{}, false
}
return q.canRetryLater(err)
}
// Errors returns any errors encountered during transfer.
func (q *TransferQueue) Errors() []error {
return q.errors
}
|