1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
package event_tracker //nolint:stylecheck
import (
"fmt"
"sync"
"sync/atomic"
)
const (
ModuleName = "event_tracker"
maxEventSize = 1000
)
type Event struct {
UserID int64
ProjectID int64
}
type Events struct {
mu sync.Mutex
events []Event
accumulatedEvents *atomic.Int64
eventSizeLimit chan struct{}
}
type EventsInterface interface {
EmitEvent(userID, projectID int64)
}
type EventTracker struct {
eventLists map[string]*Events
accumulatedEvents atomic.Int64
eventSizeLimit chan struct{}
}
func NewEventTracker() *EventTracker {
return &EventTracker{
eventLists: map[string]*Events{},
eventSizeLimit: make(chan struct{}),
}
}
type EventTrackerRegisterer interface {
RegisterEvent(name string) EventsInterface
}
func (et *EventTracker) RegisterEvent(name string) EventsInterface {
if _, exists := et.eventLists[name]; exists {
panic(fmt.Errorf("event with name %s already exists", name))
}
e := &Events{
accumulatedEvents: &et.accumulatedEvents,
eventSizeLimit: et.eventSizeLimit,
}
et.eventLists[name] = e
return e
}
func (e *Events) EmitEvent(userID, projectID int64) {
event := Event{
UserID: userID,
ProjectID: projectID,
}
e.mu.Lock()
e.events = append(e.events, event)
e.mu.Unlock()
e.accumulatedEvents.Add(int64(1))
// If the number of accumulated events is greater than the maxEventSize,
// send a signal to the eventSizeLimit channel to flush the events.
if e.accumulatedEvents.Load() > maxEventSize {
select {
case e.eventSizeLimit <- struct{}{}:
default:
// Discard the message if no one is listening
}
}
}
func (e *Events) Subtract(v []Event) {
e.mu.Lock()
e.events = append(e.events[:0], e.events[len(v):]...)
e.mu.Unlock()
e.accumulatedEvents.Add(-int64(len(v)))
}
type EventTrackerCollector interface {
CloneEventData() *EventData
Subtract(*EventData)
AccumulatedEvents() int64
GetEventSizeLimit() <-chan struct{}
}
func (et *EventTracker) CloneEventData() *EventData {
events := make(map[string][]Event)
for k, v := range et.eventLists {
v.mu.Lock()
if len(v.events) == 0 {
v.mu.Unlock()
continue
}
var newEvents []Event
newEvents = append(newEvents, v.events...)
events[k] = newEvents
v.mu.Unlock()
}
return &EventData{
Events: events,
}
}
func (et *EventTracker) Subtract(ed *EventData) {
for k, v := range ed.Events {
e := et.eventLists[k]
e.Subtract(v)
}
}
func (et *EventTracker) AccumulatedEvents() int64 {
return et.accumulatedEvents.Load()
}
func (et *EventTracker) GetEventSizeLimit() <-chan struct{} {
return et.eventSizeLimit
}
// EventData is a struct that contains all the eventLists and used to send them to GitLab
type EventData struct {
Events map[string][]Event
}
func (ed *EventData) IsEmpty() bool {
return len(ed.Events) == 0
}
type EventTrackerInterface interface {
EventTrackerRegisterer
EventTrackerCollector
}
|