File: basic_upload.go

package info (click to toggle)
git-lfs 3.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,808 kB
  • sloc: sh: 21,256; makefile: 507; ruby: 417
file content (244 lines) | stat: -rw-r--r-- 6,600 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package tq

import (
	"io"
	"net/http"
	"os"
	"path/filepath"
	"strconv"
	"strings"

	"github.com/git-lfs/git-lfs/v3/config"
	"github.com/git-lfs/git-lfs/v3/errors"
	"github.com/git-lfs/git-lfs/v3/lfsapi"
	"github.com/git-lfs/git-lfs/v3/tools"
	"github.com/git-lfs/git-lfs/v3/tr"
)

const (
	BasicAdapterName   = "basic"
	defaultContentType = "application/octet-stream"
)

// Adapter for basic uploads (non resumable)
type basicUploadAdapter struct {
	*adapterBase
}

func (a *basicUploadAdapter) tempDir() string {
	// Dedicated to this adapter rather than shared with basic download.
	d := filepath.Join(os.TempDir(), "git-lfs-basic-temp")
	if err := tools.MkdirAll(d, a.fs); err != nil {
		return os.TempDir()
	}
	return d
}

func (a *basicUploadAdapter) WorkerStarting(workerNum int) (interface{}, error) {
	return nil, nil
}

func (a *basicUploadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
}

func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
	rel, err := t.Rel("upload")
	if err != nil {
		return err
	}
	if rel == nil {
		return errors.Errorf(tr.Tr.Get("No upload action for object: %s", t.Oid))
	}

	req, err := a.newHTTPRequest("PUT", rel)
	if err != nil {
		return err
	}

	if req.Header.Get("Transfer-Encoding") == "chunked" {
		req.TransferEncoding = []string{"chunked"}
	} else {
		req.Header.Set("Content-Length", strconv.FormatInt(t.Size, 10))
	}

	req.ContentLength = t.Size

	f, err := os.OpenFile(t.Path, os.O_RDONLY, 0644)
	if err != nil {
		return errors.Wrap(err, tr.Tr.Get("basic upload"))
	}
	defer f.Close()

	if err := a.setContentTypeFor(req, f); err != nil {
		return err
	}

	// Ensure progress callbacks made while uploading
	// Wrap callback to give name context
	ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
		if cb != nil {
			return cb(t.Name, totalSize, readSoFar, readSinceLast)
		}
		return nil
	}

	cbr := tools.NewFileBodyWithCallback(f, t.Size, ccb)
	var reader lfsapi.ReadSeekCloser = cbr

	// Signal auth was ok on first read; this frees up other workers to start
	if authOkFunc != nil {
		reader = newStartCallbackReader(reader, func() error {
			authOkFunc()
			return nil
		})
	}

	req.Body = reader

	req = a.apiClient.LogRequest(req, "lfs.data.upload")
	res, err := a.makeRequest(t, req)
	if err != nil {
		if errors.IsUnprocessableEntityError(err) {
			// If we got an HTTP 422, we do _not_ want to retry the
			// request later below, because it is likely that the
			// implementing server does not support non-standard
			// Content-Type headers.
			//
			// Instead, return immediately and wait for the
			// *tq.TransferQueue to report an error message.
			return err
		}

		// We're about to return a retriable error, meaning that this
		// transfer will either be retried, or it will fail.
		//
		// Either way, let's decrement the number of bytes that we've
		// read _so far_, so that the next iteration doesn't re-transfer
		// those bytes, according to the progress meter.
		if perr := cbr.ResetProgress(); perr != nil {
			err = errors.Wrap(err, perr.Error())
		}

		if res == nil {
			// We encountered a network or similar error which caused us
			// to not receive a response at all.
			return errors.NewRetriableError(err)
		}

		if res.StatusCode == 429 {
			retLaterErr := errors.NewRetriableLaterError(err, res.Header.Get("Retry-After"))
			if retLaterErr != nil {
				return retLaterErr
			}
		}
		return errors.NewRetriableError(err)
	}

	// A status code of 403 likely means that an authentication token for the
	// upload has expired. This can be safely retried.
	if res.StatusCode == 403 {
		err = errors.New(tr.Tr.Get("Received status %d", res.StatusCode))
		return errors.NewRetriableError(err)
	}

	if res.StatusCode > 299 {
		return errors.Wrapf(nil, tr.Tr.Get("Invalid status for %s %s: %d",
			req.Method,
			strings.SplitN(req.URL.String(), "?", 2)[0],
			res.StatusCode,
		))
	}

	io.Copy(io.Discard, res.Body)
	res.Body.Close()

	return verifyUpload(a.apiClient, a.remote, t)
}

func (a *adapterBase) setContentTypeFor(req *http.Request, r io.ReadSeeker) error {
	uc := config.NewURLConfig(a.apiClient.GitEnv())
	disabled := !uc.Bool("lfs", req.URL.String(), "contenttype", true)
	if len(req.Header.Get("Content-Type")) != 0 {
		return nil
	}

	var contentType string

	if !disabled {
		buffer := make([]byte, 512)
		n, err := r.Read(buffer)
		if err != nil && err != io.EOF {
			return errors.Wrap(err, tr.Tr.Get("content type detection error"))
		}

		contentType = http.DetectContentType(buffer[:n])
		if _, err := r.Seek(0, io.SeekStart); err != nil {
			return errors.Wrap(err, tr.Tr.Get("content type rewind failure"))
		}
	}

	if contentType == "" {
		contentType = defaultContentType
	}

	req.Header.Set("Content-Type", contentType)
	return nil
}

// startCallbackReader is a reader wrapper which calls a function as soon as the
// first Read() call is made. This callback is only made once
type startCallbackReader struct {
	cb     func() error
	cbDone bool
	lfsapi.ReadSeekCloser
}

func (s *startCallbackReader) Read(p []byte) (n int, err error) {
	if !s.cbDone && s.cb != nil {
		if err := s.cb(); err != nil {
			return 0, err
		}
		s.cbDone = true
	}
	return s.ReadSeekCloser.Read(p)
}

func newStartCallbackReader(r lfsapi.ReadSeekCloser, cb func() error) *startCallbackReader {
	return &startCallbackReader{
		ReadSeekCloser: r,
		cb:             cb,
	}
}

func configureBasicUploadAdapter(m *concreteManifest) {
	m.RegisterNewAdapterFunc(BasicAdapterName, Upload, func(name string, dir Direction) Adapter {
		switch dir {
		case Upload:
			bu := &basicUploadAdapter{newAdapterBase(m.fs, name, dir, nil)}
			// self implements impl
			bu.transferImpl = bu
			return bu
		case Download:
			panic(tr.Tr.Get("Should never ask this function to download"))
		}
		return nil
	})
}

func (a *basicUploadAdapter) makeRequest(t *Transfer, req *http.Request) (*http.Response, error) {
	res, err := a.doHTTP(t, req)
	if errors.IsAuthError(err) && len(req.Header.Get("Authorization")) == 0 {
		// Construct a new body with just the raw file and no callbacks. Since
		// all progress tracking happens when the net.http code copies our
		// request body into a new request, we can safely make this request
		// outside of the flow of the transfer adapter, and if it fails, the
		// transfer progress will be rewound at the top level
		f, _ := os.OpenFile(t.Path, os.O_RDONLY, 0644)
		defer f.Close()

		req.Body = tools.NewBodyWithCallback(f, t.Size, nil)
		return a.makeRequest(t, req)
	}

	return res, err
}