1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
package files
import (
"context"
"fmt"
"io"
"log"
"math"
"os"
"sync"
"github.com/Azure/go-autorest/autorest"
)
// PutFile is a helper method which takes a file, and automatically chunks it up, rather than having to do this yourself
func (client Client) PutFile(ctx context.Context, accountName, shareName, path, fileName string, file *os.File, parallelism int) error {
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("Error loading file info: %s", err)
}
fileSize := fileInfo.Size()
chunkSize := 4 * 1024 * 1024 // 4MB
if chunkSize > int(fileSize) {
chunkSize = int(fileSize)
}
chunks := int(math.Ceil(float64(fileSize) / float64(chunkSize*1.0)))
workerCount := parallelism
if workerCount > chunks {
workerCount = chunks
}
var waitGroup sync.WaitGroup
waitGroup.Add(workerCount)
jobs := make(chan int, workerCount)
errors := make(chan error, chunkSize)
for i := 0; i < workerCount; i++ {
go func() {
for i := range jobs {
log.Printf("[DEBUG] Chunk %d of %d", i+1, chunks)
uci := uploadChunkInput{
thisChunk: i,
chunkSize: chunkSize,
fileSize: fileSize,
}
_, err := client.uploadChunk(ctx, accountName, shareName, path, fileName, uci, file)
if err != nil {
errors <- err
}
}
waitGroup.Done()
}()
}
for i := 0; i < chunks; i++ {
jobs <- i
}
close(jobs)
waitGroup.Wait()
// TODO: we should switch to hashicorp/multi-error here
if len(errors) > 0 {
return fmt.Errorf("Error uploading file: %s", <-errors)
}
return nil
}
type uploadChunkInput struct {
thisChunk int
chunkSize int
fileSize int64
}
func (client Client) uploadChunk(ctx context.Context, accountName, shareName, path, fileName string, input uploadChunkInput, file *os.File) (result autorest.Response, err error) {
startBytes := int64(input.chunkSize * input.thisChunk)
endBytes := startBytes + int64(input.chunkSize)
// the last size may exceed the size of the file
remaining := input.fileSize - startBytes
if int64(input.chunkSize) > remaining {
endBytes = startBytes + remaining
}
bytesToRead := int(endBytes) - int(startBytes)
bytes := make([]byte, bytesToRead)
_, err = file.ReadAt(bytes, startBytes)
if err != nil {
if err != io.EOF {
return result, fmt.Errorf("Error reading bytes: %s", err)
}
}
putBytesInput := PutByteRangeInput{
StartBytes: startBytes,
EndBytes: endBytes,
Content: bytes,
}
result, err = client.PutByteRange(ctx, accountName, shareName, path, fileName, putBytesInput)
if err != nil {
return result, fmt.Errorf("Error putting bytes: %s", err)
}
return
}
|