1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
package filesync
import (
"bufio"
"context"
io "io"
"os"
"time"
"github.com/moby/buildkit/util/bklog"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"google.golang.org/grpc"
)
type Stream interface {
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}
func newStreamWriter(stream grpc.ClientStream) io.WriteCloser {
wc := &streamWriterCloser{ClientStream: stream}
return &bufferedWriteCloser{Writer: bufio.NewWriter(wc), Closer: wc}
}
type bufferedWriteCloser struct {
*bufio.Writer
io.Closer
}
func (bwc *bufferedWriteCloser) Close() error {
if err := bwc.Writer.Flush(); err != nil {
return errors.WithStack(err)
}
return bwc.Closer.Close()
}
type streamWriterCloser struct {
grpc.ClientStream
}
func (wc *streamWriterCloser) Write(dt []byte) (int, error) {
// grpc-go has a 4MB limit on messages by default. Split large messages
// so we don't get close to that limit.
const maxChunkSize = 3 * 1024 * 1024
if len(dt) > maxChunkSize {
n1, err := wc.Write(dt[:maxChunkSize])
if err != nil {
return n1, err
}
dt = dt[maxChunkSize:]
var n2 int
if n2, err = wc.Write(dt); err != nil {
return n1 + n2, err
}
return n1 + n2, nil
}
if err := wc.ClientStream.SendMsg(&BytesMessage{Data: dt}); err != nil {
// SendMsg return EOF on remote errors
if errors.Is(err, io.EOF) {
if err := errors.WithStack(wc.ClientStream.RecvMsg(struct{}{})); err != nil {
return 0, err
}
}
return 0, errors.WithStack(err)
}
return len(dt), nil
}
func (wc *streamWriterCloser) Close() error {
if err := wc.ClientStream.CloseSend(); err != nil {
return errors.WithStack(err)
}
// block until receiver is done
var bm BytesMessage
if err := wc.ClientStream.RecvMsg(&bm); err != io.EOF {
return errors.WithStack(err)
}
return nil
}
func recvDiffCopy(ds grpc.ClientStream, dest string, cu CacheUpdater, progress progressCb, differ fsutil.DiffType, filter func(string, *fstypes.Stat) bool) (err error) {
st := time.Now()
defer func() {
bklog.G(ds.Context()).Debugf("diffcopy took: %v", time.Since(st))
}()
var cf fsutil.ChangeFunc
var ch fsutil.ContentHasher
if cu != nil {
cu.MarkSupported(true)
cf = cu.HandleChange
ch = cu.ContentHasher()
}
defer func() {
// tracing wrapper requires close trigger even on clean eof
if err == nil {
ds.CloseSend()
}
}()
return errors.WithStack(fsutil.Receive(ds.Context(), ds, dest, fsutil.ReceiveOpt{
NotifyHashed: cf,
ContentHasher: ch,
ProgressCb: progress,
Filter: fsutil.FilterFunc(filter),
Differ: differ,
}))
}
func syncTargetDiffCopy(ds grpc.ServerStream, dest string) error {
if err := os.MkdirAll(dest, 0700); err != nil {
return errors.Wrapf(err, "failed to create synctarget dest dir %s", dest)
}
return errors.WithStack(fsutil.Receive(ds.Context(), ds, dest, fsutil.ReceiveOpt{
Merge: true,
Filter: func() func(string, *fstypes.Stat) bool {
uid := os.Getuid()
gid := os.Getgid()
return func(p string, st *fstypes.Stat) bool {
st.Uid = uint32(uid)
st.Gid = uint32(gid)
return true
}
}(),
}))
}
func writeTargetFile(ds grpc.ServerStream, wc io.WriteCloser) error {
for {
bm := BytesMessage{}
if err := ds.RecvMsg(&bm); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return errors.WithStack(err)
}
if _, err := wc.Write(bm.Data); err != nil {
return errors.WithStack(err)
}
}
}
|