File: api.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (140 lines) | stat: -rw-r--r-- 2,932 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package event_tracker //nolint:stylecheck

import (
	"fmt"
	"sync"
	"sync/atomic"
)

const (
	ModuleName   = "event_tracker"
	maxEventSize = 1000
)

type Event struct {
	UserID    int64
	ProjectID int64
}

type Events struct {
	mu                sync.Mutex
	events            []Event
	accumulatedEvents *atomic.Int64
	eventSizeLimit    chan struct{}
}

type EventsInterface interface {
	EmitEvent(userID, projectID int64)
}

type EventTracker struct {
	eventLists        map[string]*Events
	accumulatedEvents atomic.Int64
	eventSizeLimit    chan struct{}
}

func NewEventTracker() *EventTracker {
	return &EventTracker{
		eventLists:     map[string]*Events{},
		eventSizeLimit: make(chan struct{}),
	}
}

type EventTrackerRegisterer interface {
	RegisterEvent(name string) EventsInterface
}

func (et *EventTracker) RegisterEvent(name string) EventsInterface {
	if _, exists := et.eventLists[name]; exists {
		panic(fmt.Errorf("event with name %s already exists", name))
	}
	e := &Events{
		accumulatedEvents: &et.accumulatedEvents,
		eventSizeLimit:    et.eventSizeLimit,
	}
	et.eventLists[name] = e
	return e
}

func (e *Events) EmitEvent(userID, projectID int64) {
	event := Event{
		UserID:    userID,
		ProjectID: projectID,
	}

	e.mu.Lock()
	e.events = append(e.events, event)
	e.mu.Unlock()
	e.accumulatedEvents.Add(int64(1))

	// If the number of accumulated events is greater than the maxEventSize,
	// send a signal to the eventSizeLimit channel to flush the events.
	if e.accumulatedEvents.Load() > maxEventSize {
		select {
		case e.eventSizeLimit <- struct{}{}:
		default:
			// Discard the message if no one is listening
		}
	}
}

func (e *Events) Subtract(v []Event) {
	e.mu.Lock()
	e.events = append(e.events[:0], e.events[len(v):]...)
	e.mu.Unlock()
	e.accumulatedEvents.Add(-int64(len(v)))
}

type EventTrackerCollector interface {
	CloneEventData() *EventData
	Subtract(*EventData)
	AccumulatedEvents() int64
	GetEventSizeLimit() <-chan struct{}
}

func (et *EventTracker) CloneEventData() *EventData {
	events := make(map[string][]Event)
	for k, v := range et.eventLists {
		v.mu.Lock()
		if len(v.events) == 0 {
			v.mu.Unlock()
			continue
		}
		var newEvents []Event
		newEvents = append(newEvents, v.events...)
		events[k] = newEvents
		v.mu.Unlock()
	}
	return &EventData{
		Events: events,
	}
}

func (et *EventTracker) Subtract(ed *EventData) {
	for k, v := range ed.Events {
		e := et.eventLists[k]
		e.Subtract(v)
	}
}

func (et *EventTracker) AccumulatedEvents() int64 {
	return et.accumulatedEvents.Load()
}

func (et *EventTracker) GetEventSizeLimit() <-chan struct{} {
	return et.eventSizeLimit
}

// EventData is a struct that contains all the eventLists and used to send them to GitLab
type EventData struct {
	Events map[string][]Event
}

func (ed *EventData) IsEmpty() bool {
	return len(ed.Events) == 0
}

type EventTrackerInterface interface {
	EventTrackerRegisterer
	EventTrackerCollector
}