1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
|
package tq
import (
"fmt"
"hash"
"io"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"github.com/git-lfs/git-lfs/v3/errors"
"github.com/git-lfs/git-lfs/v3/tools"
"github.com/git-lfs/git-lfs/v3/tr"
"github.com/rubyist/tracerx"
)
// Adapter for basic HTTP downloads, includes resuming via HTTP Range
type basicDownloadAdapter struct {
*adapterBase
}
func (a *basicDownloadAdapter) tempDir() string {
// Shared with the SSH adapter.
d := filepath.Join(a.fs.LFSStorageDir, "incomplete")
if err := tools.MkdirAll(d, a.fs); err != nil {
return os.TempDir()
}
return d
}
func (a *basicDownloadAdapter) WorkerStarting(workerNum int) (interface{}, error) {
return nil, nil
}
func (a *basicDownloadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
}
func (a *basicDownloadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
// Reserve a temporary filename. We need to make sure nobody operates on the file simultaneously with us.
f, err := tools.TempFile(a.tempDir(), t.Oid, a.fs)
if err != nil {
return err
}
tmpName := f.Name()
defer func() {
// Fail-safe: Most implementation of os.File.Close() does nil check
if f != nil {
f.Close()
}
// This will delete temp file if:
// - we failed to fully download file and move it to final location including the case when final location already
// exists because other parallel git-lfs processes downloaded file
// - we also failed to move it to a partially-downloaded location
os.Remove(tmpName)
}()
// Close file because we will attempt to move partially-downloaded one on top of it
if err := f.Close(); err != nil {
return err
}
// Attempt to resume download. No error checking here. If we fail, we'll simply download from the start
tools.RobustRename(a.downloadFilename(t), f.Name())
// Open temp file. It is either empty or partially downloaded
f, err = os.OpenFile(f.Name(), os.O_RDWR, 0644)
if err != nil {
return err
}
// Read any existing data into hash
hash := tools.NewLfsContentHash()
fromByte, err := io.Copy(hash, f)
if err != nil {
return err
}
// Ensure that partial file seems valid
if fromByte > 0 {
if fromByte < t.Size-1 {
tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Oid, fromByte)
} else {
// Somehow we have more data than expected. Let's retry from the beginning.
if _, err := f.Seek(0, io.SeekStart); err != nil {
return err
}
if err := f.Truncate(0); err != nil {
return err
}
fromByte = 0
hash = nil
}
}
err = a.download(t, cb, authOkFunc, f, fromByte, hash)
if err != nil {
f.Close()
// Rename file so next download can resume from where we stopped.
// No error checking here, if rename fails then file will be deleted and there just will be no download resuming
tools.RobustRename(f.Name(), a.downloadFilename(t))
}
return err
}
// Returns path where partially downloaded file should be stored for download resuming
func (a *basicDownloadAdapter) downloadFilename(t *Transfer) string {
return filepath.Join(a.tempDir(), t.Oid+".part")
}
// download starts or resumes and download. dlFile is expected to be an existing file open in RW mode
func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOkFunc func(), dlFile *os.File, fromByte int64, hash hash.Hash) error {
rel, err := t.Rel("download")
if err != nil {
return err
}
if rel == nil {
return errors.Errorf(tr.Tr.Get("Object %s not found on the server.", t.Oid))
}
req, err := a.newHTTPRequest("GET", rel)
if err != nil {
return err
}
if fromByte > 0 {
// We could just use a start byte, but since we know the length be specific
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Size-1))
}
req = a.apiClient.LogRequest(req, "lfs.data.download")
res, err := a.makeRequest(t, req)
if err != nil {
if res == nil {
// We encountered a network or similar error which caused us
// to not receive a response at all.
return errors.NewRetriableError(err)
}
// Special-case status code 416 () - fall back
if fromByte > 0 && dlFile != nil && res.StatusCode == 416 {
tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Oid, fromByte)
if _, err := dlFile.Seek(0, io.SeekStart); err != nil {
return err
}
if err := dlFile.Truncate(0); err != nil {
return err
}
return a.download(t, cb, authOkFunc, dlFile, 0, nil)
}
// Special-cae status code 429 - retry after certain time
if res.StatusCode == 429 {
retLaterErr := errors.NewRetriableLaterError(err, res.Header.Get("Retry-After"))
if retLaterErr != nil {
return retLaterErr
}
}
return errors.NewRetriableError(err)
}
defer res.Body.Close()
// Range request must return 206 & content range to confirm
if fromByte > 0 {
rangeRequestOk := false
var failReason string
// check 206 and Content-Range, fall back if either not as expected
if res.StatusCode == 206 {
// Probably a successful range request, check Content-Range
if rangeHdr := res.Header.Get("Content-Range"); rangeHdr != "" {
regex := regexp.MustCompile(`bytes (\d+)\-.*`)
match := regex.FindStringSubmatch(rangeHdr)
if match != nil && len(match) > 1 {
contentStart, _ := strconv.ParseInt(match[1], 10, 64)
if contentStart == fromByte {
rangeRequestOk = true
} else {
failReason = fmt.Sprintf("Content-Range start byte incorrect: %s expected %d", match[1], fromByte)
}
} else {
failReason = fmt.Sprintf("badly formatted Content-Range header: %q", rangeHdr)
}
} else {
failReason = "missing Content-Range header in response"
}
} else {
failReason = fmt.Sprintf("expected status code 206, received %d", res.StatusCode)
}
if rangeRequestOk {
tracerx.Printf("xfer: server accepted resume download request: %q from byte %d", t.Oid, fromByte)
advanceCallbackProgress(cb, t, fromByte)
} else {
// Abort resume, perform regular download
tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Oid, fromByte, failReason)
if _, err := dlFile.Seek(0, io.SeekStart); err != nil {
return err
}
if err := dlFile.Truncate(0); err != nil {
return err
}
fromByte = 0
hash = nil
if res.StatusCode == 200 {
// If status code was 200 then server just ignored Range header and
// sent everything. Don't re-request, use this one from byte 0
} else {
// re-request needed
return a.download(t, cb, authOkFunc, dlFile, fromByte, hash)
}
}
}
// Signal auth OK on success response, before starting download to free up
// other workers immediately
if authOkFunc != nil {
authOkFunc()
}
var hasher *tools.HashingReader
httpReader := tools.NewRetriableReader(res.Body)
if fromByte > 0 && hash != nil {
// pre-load hashing reader with previous content
hasher = tools.NewHashingReaderPreloadHash(httpReader, hash)
} else {
hasher = tools.NewHashingReader(httpReader)
}
dlfilename := dlFile.Name()
// Wrap callback to give name context
ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
if cb != nil {
return cb(t.Name, totalSize, readSoFar+fromByte, readSinceLast)
}
return nil
}
written, err := tools.CopyWithCallback(dlFile, hasher, res.ContentLength, ccb)
if err != nil {
return errors.Wrapf(err, tr.Tr.Get("cannot write data to temporary file %q", dlfilename))
}
if actual := hasher.Hash(); actual != t.Oid {
return errors.New(tr.Tr.Get("expected OID %s, got %s after %d bytes written", t.Oid, actual, written))
}
if err := dlFile.Close(); err != nil {
return errors.New(tr.Tr.Get("can't close temporary file %q: %v", dlfilename, err))
}
err = tools.RenameFileCopyPermissions(dlfilename, t.Path)
if _, err2 := os.Stat(t.Path); err2 == nil {
// Target file already exists, possibly was downloaded by other git-lfs process
return nil
}
return err
}
func configureBasicDownloadAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc(BasicAdapterName, Download, func(name string, dir Direction) Adapter {
switch dir {
case Download:
bd := &basicDownloadAdapter{newAdapterBase(m.fs, name, dir, nil)}
// self implements impl
bd.transferImpl = bd
return bd
case Upload:
panic(tr.Tr.Get("Should never ask this function to upload"))
}
return nil
})
}
func (a *basicDownloadAdapter) makeRequest(t *Transfer, req *http.Request) (*http.Response, error) {
res, err := a.doHTTP(t, req)
if errors.IsAuthError(err) && len(req.Header.Get("Authorization")) == 0 {
return a.makeRequest(t, req)
}
return res, err
}
|