1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
package rpc
import (
"context"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
"go.uber.org/zap"
"google.golang.org/protobuf/reflect/protoreflect"
)
const (
headerFieldNumber protoreflect.FieldNumber = 1
objectFieldNumber protoreflect.FieldNumber = 2
trailerFieldNumber protoreflect.FieldNumber = 3
)
var (
respVisitor = grpctool.NewLazyStreamVisitor(&ObjectsToSynchronizeResponse{})
)
type ObjectSource struct {
Name string
Data []byte
}
type ObjectsToSynchronizeData struct {
CommitId string
ProjectId int64
Sources []ObjectSource
}
type ObjectsToSynchronizeCallback func(context.Context, ObjectsToSynchronizeData)
// ObjectsToSynchronizeWatcherInterface abstracts ObjectsToSynchronizeWatcher.
type ObjectsToSynchronizeWatcherInterface interface {
Watch(context.Context, *ObjectsToSynchronizeRequest, ObjectsToSynchronizeCallback)
}
type ObjectsToSynchronizeWatcher struct {
Log *zap.Logger
GitopsClient GitopsClient
PollConfig retry.PollConfigFactory
}
func (o *ObjectsToSynchronizeWatcher) Watch(ctx context.Context, req *ObjectsToSynchronizeRequest, callback ObjectsToSynchronizeCallback) {
sv := respVisitor.Get()
lastProcessedCommitId := req.CommitId
_ = retry.PollWithBackoff(ctx, o.PollConfig(), func(ctx context.Context) (error, retry.AttemptResult) {
ctx, cancel := context.WithCancel(ctx) // nolint:govet
defer cancel() // ensure streaming call is canceled
// Send a new message each time rather than mutate req.
res, err := o.GitopsClient.GetObjectsToSynchronize(ctx, &ObjectsToSynchronizeRequest{
ProjectId: req.ProjectId,
Ref: req.Ref,
CommitId: lastProcessedCommitId,
Paths: req.Paths,
})
if err != nil {
if !grpctool.RequestCanceledOrTimedOut(err) {
o.Log.Error("GetObjectsToSynchronize failed", logz.Error(err))
}
return nil, retry.Backoff
}
v := objectsToSynchronizeVisitor{}
err = sv.Visit(res,
grpctool.WithCallback(headerFieldNumber, v.OnHeader),
grpctool.WithCallback(objectFieldNumber, v.OnObject),
grpctool.WithCallback(trailerFieldNumber, v.OnTrailer),
)
if err != nil {
if !grpctool.RequestCanceledOrTimedOut(err) {
o.Log.Error("GetObjectsToSynchronize.Recv failed", logz.Error(err))
}
return nil, retry.Backoff
}
if !v.nonEmptyStream {
// Server closed the stream without sending us anything.
// It's fine, will just reopen the connection.
return nil, retry.ContinueImmediately
}
callback(ctx, v.objs)
lastProcessedCommitId = v.objs.CommitId
if req.GetRef().GetCommit() != "" {
o.Log.Debug("GetObjectsToSynchronize configuration is for specific commit, no need to update, thus block watcher until cancelled")
<-ctx.Done()
return nil, retry.Done
}
return nil, retry.ContinueImmediately
})
}
type objectsToSynchronizeVisitor struct {
objs ObjectsToSynchronizeData
nonEmptyStream bool
}
func (v *objectsToSynchronizeVisitor) OnHeader(header *ObjectsToSynchronizeResponse_Header) error {
v.objs.CommitId = header.CommitId
v.objs.ProjectId = header.ProjectId
return nil
}
func (v *objectsToSynchronizeVisitor) OnObject(object *ObjectsToSynchronizeResponse_Object) error {
lastIdx := len(v.objs.Sources) - 1
if lastIdx >= 0 && v.objs.Sources[lastIdx].Name == object.Source {
// Same source, append to the actual slice
v.objs.Sources[lastIdx].Data = append(v.objs.Sources[lastIdx].Data, object.Data...)
} else {
// A new source
v.objs.Sources = append(v.objs.Sources, ObjectSource{
Name: object.Source,
Data: object.Data,
})
}
return nil
}
func (v *objectsToSynchronizeVisitor) OnTrailer(trailer *ObjectsToSynchronizeResponse_Trailer) error {
v.nonEmptyStream = true
return nil
}
|