1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
package agent
import (
"context"
"reflect"
"sync"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/log"
)
// StatusReporter receives updates to task status. Method may be called
// concurrently, so implementations should be goroutine-safe.
type StatusReporter interface {
UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error
}
// Reporter receives update to both task and volume status.
type Reporter interface {
StatusReporter
ReportVolumeUnpublished(ctx context.Context, volumeID string) error
}
type statusReporterFunc func(ctx context.Context, taskID string, status *api.TaskStatus) error
func (fn statusReporterFunc) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
return fn(ctx, taskID, status)
}
//nolint:unused // currently only used in tests.
type volumeReporterFunc func(ctx context.Context, volumeID string) error
//nolint:unused // currently only used in tests.
func (fn volumeReporterFunc) ReportVolumeUnpublished(ctx context.Context, volumeID string) error {
return fn(ctx, volumeID)
}
//nolint:unused // currently only used in tests.
type statusReporterCombined struct {
statusReporterFunc
volumeReporterFunc
}
// statusReporter creates a reliable StatusReporter that will always succeed.
// It handles several tasks at once, ensuring all statuses are reported.
//
// The reporter will continue reporting the current status until it succeeds.
type statusReporter struct {
reporter Reporter
statuses map[string]*api.TaskStatus
// volumes is a set of volumes which are to be reported unpublished.
volumes map[string]struct{}
mu sync.Mutex
cond sync.Cond
closed bool
}
func newStatusReporter(ctx context.Context, upstream Reporter) *statusReporter {
r := &statusReporter{
reporter: upstream,
statuses: make(map[string]*api.TaskStatus),
volumes: make(map[string]struct{}),
}
r.cond.L = &r.mu
go r.run(ctx)
return r
}
func (sr *statusReporter) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
sr.mu.Lock()
defer sr.mu.Unlock()
current, ok := sr.statuses[taskID]
if ok {
if reflect.DeepEqual(current, status) {
return nil
}
if current.State > status.State {
return nil // ignore old updates
}
}
sr.statuses[taskID] = status
sr.cond.Signal()
return nil
}
func (sr *statusReporter) ReportVolumeUnpublished(ctx context.Context, volumeID string) error {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.volumes[volumeID] = struct{}{}
sr.cond.Signal()
return nil
}
func (sr *statusReporter) Close() error {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.closed = true
sr.cond.Signal()
return nil
}
func (sr *statusReporter) run(ctx context.Context) {
done := make(chan struct{})
defer close(done)
sr.mu.Lock() // released during wait, below.
defer sr.mu.Unlock()
go func() {
select {
case <-ctx.Done():
sr.Close()
case <-done:
return
}
}()
for {
if len(sr.statuses) == 0 && len(sr.volumes) == 0 {
sr.cond.Wait()
}
if sr.closed {
// TODO(stevvooe): Add support here for waiting until all
// statuses are flushed before shutting down.
return
}
for taskID, status := range sr.statuses {
delete(sr.statuses, taskID) // delete the entry, while trying to send.
sr.mu.Unlock()
err := sr.reporter.UpdateTaskStatus(ctx, taskID, status)
sr.mu.Lock()
// reporter might be closed during UpdateTaskStatus call
if sr.closed {
return
}
if err != nil {
log.G(ctx).WithError(err).Error("status reporter failed to report status to agent")
// place it back in the map, if not there, allowing us to pick
// the value if a new one came in when we were sending the last
// update.
if _, ok := sr.statuses[taskID]; !ok {
sr.statuses[taskID] = status
}
}
}
for volumeID := range sr.volumes {
delete(sr.volumes, volumeID)
sr.mu.Unlock()
err := sr.reporter.ReportVolumeUnpublished(ctx, volumeID)
sr.mu.Lock()
// reporter might be closed during ReportVolumeUnpublished call
if sr.closed {
return
}
if err != nil {
log.G(ctx).WithError(err).Error("status reporter failed to report volume status to agent")
sr.volumes[volumeID] = struct{}{}
}
}
}
}
|