1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
package server
import (
"context"
"sync/atomic"
"time"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab"
gapi "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/gitlab/api"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/event_tracker"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modserver"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/errz"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)
const (
sentEventsCounterName = "sent_events_counter"
accumulatedEventsCounterName = "accumulated_events_counter"
)
type module struct {
log *zap.Logger
api modserver.API
eventTracker event_tracker.EventTrackerCollector
gitLabClient gitlab.ClientInterface
eventReportingPeriod time.Duration
meter metric.Meter
sentEvents atomic.Int64
}
func newModule(config *modserver.Config, et event_tracker.EventTrackerCollector) (*module, error) {
m := &module{
log: config.Log,
api: config.API,
eventTracker: et,
gitLabClient: config.GitLabClient,
eventReportingPeriod: config.Config.Observability.EventReportingPeriod.AsDuration(),
meter: config.Meter,
}
err := m.constructSentEventsCounter()
if err != nil {
return nil, err
}
err = m.constructAccumulatedEventsCounter()
if err != nil {
return nil, err
}
return m, nil
}
func (m *module) Run(ctx context.Context) error {
if m.eventReportingPeriod == 0 {
return nil
}
ticker := time.NewTicker(m.eventReportingPeriod)
defer ticker.Stop()
done := ctx.Done()
eventSizeLimit := m.eventTracker.GetEventSizeLimit()
for {
select {
case <-done:
ctxExit, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Flush events before exiting
m.sendEvents(ctxExit) //nolint: contextcheck
return nil
case <-ticker.C:
m.sendEvents(ctx)
case <-eventSizeLimit:
m.sendEvents(ctx)
ticker.Reset(m.eventReportingPeriod)
}
}
}
func (m *module) sendEvents(ctx context.Context) {
if err := m.sendEventsInternal(ctx); err != nil {
if !errz.ContextDone(err) {
m.api.HandleProcessingError(ctx, m.log, modshared.NoAgentID, "Failed to send event data", err)
}
}
}
func (m *module) sendEventsInternal(ctx context.Context) error {
ed := m.eventTracker.CloneEventData()
if ed.IsEmpty() {
return nil
}
gapiEd := convertEvents(ed)
err := gapi.SendEvent(ctx, m.gitLabClient, gapiEd)
// Discard events when it fails to send. This is to avoid trying to send them indefinitely
m.eventTracker.Subtract(ed)
if err != nil {
return err
}
for _, v := range ed.Events {
m.sentEvents.Add(int64(len(v)))
}
return nil
}
func (m *module) Name() string {
return event_tracker.ModuleName
}
func convertEvents(ed *event_tracker.EventData) gapi.EventData {
em := tool.TransformMap[string, []event_tracker.Event, []gapi.Event, map[string][]event_tracker.Event, map[string][]gapi.Event](
ed.Events,
func(events []event_tracker.Event) int {
return len(events)
},
func(events []event_tracker.Event, sink []gapi.Event) []gapi.Event {
for _, e := range events {
sink = append(sink, gapi.Event{
UserID: e.UserID,
ProjectID: e.ProjectID,
})
}
return sink
},
func(events []gapi.Event) []gapi.Event {
return events
},
)
return gapi.EventData{
Events: em,
}
}
func (m *module) constructSentEventsCounter() error {
_, err := m.meter.Int64ObservableCounter(
sentEventsCounterName,
metric.WithDescription("The total number of sent events from KAS to GitLab monolith"),
metric.WithInt64Callback(
func(ctx context.Context, observer metric.Int64Observer) error {
observer.Observe(m.sentEvents.Load())
return nil
}),
)
return err
}
func (m *module) constructAccumulatedEventsCounter() error {
_, err := m.meter.Int64ObservableUpDownCounter(
accumulatedEventsCounterName,
metric.WithDescription("The total number of accumulated events in KAS event tracker"),
metric.WithInt64Callback(
func(ctx context.Context, observer metric.Int64Observer) error {
observer.Observe(m.eventTracker.AccumulatedEvents())
return nil
}),
)
return err
}
|