1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
|
package tq
import (
"net/http"
"regexp"
"strings"
"sync"
"github.com/git-lfs/git-lfs/v3/errors"
"github.com/git-lfs/git-lfs/v3/fs"
"github.com/git-lfs/git-lfs/v3/lfsapi"
"github.com/git-lfs/git-lfs/v3/tr"
"github.com/rubyist/tracerx"
)
// adapterBase implements the common functionality for core adapters which
// process transfers with N workers handling an oid each, and which wait for
// authentication to succeed on one worker before proceeding
type adapterBase struct {
fs *fs.Filesystem
name string
direction Direction
transferImpl transferImplementation
apiClient *lfsapi.Client
remote string
jobChan chan *job
debugging bool
cb ProgressCallback
// WaitGroup to sync the completion of all workers
workerWait sync.WaitGroup
// WaitGroup to sync the completion of all in-flight jobs
jobWait *sync.WaitGroup
// WaitGroup to serialise the first transfer response to perform login if needed
authWait sync.WaitGroup
}
// transferImplementation must be implemented to provide the actual upload/download
// implementation for all core transfer approaches that use adapterBase for
// convenience. This function will be called on multiple goroutines so it
// must be either stateless or thread safe. However it will never be called
// for the same oid in parallel.
// If authOkFunc is not nil, implementations must call it as early as possible
// when authentication succeeded, before the whole file content is transferred
type transferImplementation interface {
// WorkerStarting is called when a worker goroutine starts to process jobs
// Implementations can run some startup logic here & return some context if needed
WorkerStarting(workerNum int) (interface{}, error)
// WorkerEnding is called when a worker goroutine is shutting down
// Implementations can clean up per-worker resources here, context is as returned from WorkerStarting
WorkerEnding(workerNum int, ctx interface{})
// DoTransfer performs a single transfer within a worker. ctx is any context returned from WorkerStarting
DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error
}
const (
enableHrefRewriteKey = "lfs.transfer.enablehrefrewrite"
defaultEnableHrefRewrite = false
)
func newAdapterBase(f *fs.Filesystem, name string, dir Direction, ti transferImplementation) *adapterBase {
return &adapterBase{
fs: f,
name: name,
direction: dir,
transferImpl: ti,
jobWait: new(sync.WaitGroup),
}
}
func (a *adapterBase) Name() string {
return a.name
}
func (a *adapterBase) Direction() Direction {
return a.direction
}
func (a *adapterBase) Begin(cfg AdapterConfig, cb ProgressCallback) error {
a.apiClient = cfg.APIClient()
a.remote = cfg.Remote()
a.cb = cb
a.jobChan = make(chan *job, 100)
a.debugging = a.apiClient.OSEnv().Bool("GIT_TRANSFER_TRACE", false) ||
a.apiClient.OSEnv().Bool("GIT_CURL_VERBOSE", false)
maxConcurrency := cfg.ConcurrentTransfers()
a.Trace("xfer: adapter %q Begin() with %d workers", a.Name(), maxConcurrency)
a.workerWait.Add(maxConcurrency)
a.authWait.Add(1)
for i := 0; i < maxConcurrency; i++ {
ctx, err := a.transferImpl.WorkerStarting(i)
if err != nil {
return err
}
go a.worker(i, ctx)
}
a.Trace("xfer: adapter %q started", a.Name())
return nil
}
type job struct {
T *Transfer
results chan<- TransferResult
wg *sync.WaitGroup
}
func (j *job) Done(err error) {
j.results <- TransferResult{j.T, err}
j.wg.Done()
}
func (a *adapterBase) Add(transfers ...*Transfer) <-chan TransferResult {
results := make(chan TransferResult, len(transfers))
a.jobWait.Add(len(transfers))
go func() {
for _, t := range transfers {
a.jobChan <- &job{t, results, a.jobWait}
}
a.jobWait.Wait()
close(results)
}()
return results
}
func (a *adapterBase) End() {
a.Trace("xfer: adapter %q End()", a.Name())
a.jobWait.Wait()
close(a.jobChan)
// wait for all transfers to complete
a.workerWait.Wait()
a.Trace("xfer: adapter %q stopped", a.Name())
}
func (a *adapterBase) Trace(format string, args ...interface{}) {
if !a.debugging {
return
}
tracerx.Printf(format, args...)
}
// worker function, many of these run per adapter
func (a *adapterBase) worker(workerNum int, ctx interface{}) {
a.Trace("xfer: adapter %q worker %d starting", a.Name(), workerNum)
waitForAuth := workerNum > 0
signalAuthOnResponse := workerNum == 0
// First worker is the only one allowed to start immediately
// The rest wait until successful response from 1st worker to
// make sure only 1 login prompt is presented if necessary
// Deliberately outside jobChan processing so we know worker 0 will process 1st item
if waitForAuth {
a.Trace("xfer: adapter %q worker %d waiting for Auth", a.Name(), workerNum)
a.authWait.Wait()
a.Trace("xfer: adapter %q worker %d auth signal received", a.Name(), workerNum)
}
for job := range a.jobChan {
t := job.T
var authCallback func()
if signalAuthOnResponse {
authCallback = func() {
a.authWait.Done()
signalAuthOnResponse = false
}
}
a.Trace("xfer: adapter %q worker %d processing job for %q", a.Name(), workerNum, t.Oid)
// Actual transfer happens here
var err error
if t.Size < 0 {
err = errors.New(tr.Tr.Get("object %q has invalid size (got: %d)", t.Oid, t.Size))
} else {
err = a.transferImpl.DoTransfer(ctx, t, a.cb, authCallback)
}
// Mark the job as completed, and alter all listeners
job.Done(err)
a.Trace("xfer: adapter %q worker %d finished job for %q", a.Name(), workerNum, t.Oid)
}
// This will only happen if no jobs were submitted; just wake up all workers to finish
if signalAuthOnResponse {
a.authWait.Done()
}
a.Trace("xfer: adapter %q worker %d stopping", a.Name(), workerNum)
a.transferImpl.WorkerEnding(workerNum, ctx)
a.workerWait.Done()
}
var httpRE = regexp.MustCompile(`\Ahttps?://`)
func (a *adapterBase) newHTTPRequest(method string, rel *Action) (*http.Request, error) {
enableRewrite := a.apiClient.GitEnv().Bool(enableHrefRewriteKey, defaultEnableHrefRewrite)
href := rel.Href
if enableRewrite {
href = a.apiClient.Endpoints.NewEndpoint(a.direction.String(), rel.Href).Url
}
if !httpRE.MatchString(href) {
urlfragment := strings.SplitN(href, "?", 2)[0]
return nil, errors.New(tr.Tr.Get("missing protocol: %q", urlfragment))
}
req, err := http.NewRequest(method, href, nil)
if err != nil {
return nil, err
}
for key, value := range rel.Header {
req.Header.Set(key, value)
}
return req, nil
}
func (a *adapterBase) doHTTP(t *Transfer, req *http.Request) (*http.Response, error) {
if t.Authenticated {
return a.apiClient.Do(req)
}
endpoint := endpointURL(req.URL.String(), t.Oid)
return a.apiClient.DoWithAuthNoRetry(a.remote, a.apiClient.Endpoints.AccessFor(endpoint), req)
}
func advanceCallbackProgress(cb ProgressCallback, t *Transfer, numBytes int64) {
if cb != nil {
// Must split into max int sizes since read count is int
const maxInt = int(^uint(0) >> 1)
for read := int64(0); read < numBytes; {
remainder := numBytes - read
if remainder > int64(maxInt) {
read += int64(maxInt)
cb(t.Name, t.Size, read, maxInt)
} else {
read += remainder
cb(t.Name, t.Size, read, int(remainder))
}
}
}
}
func endpointURL(rawurl, oid string) string {
return strings.Split(rawurl, oid)[0]
}
|