File: watcher.go

package info (click to toggle)
crowdsec 1.4.6-10.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 18,500 kB
  • sloc: sh: 2,870; makefile: 386; python: 74
file content (163 lines) | stat: -rw-r--r-- 4,694 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package csplugin

import (
	"sync"
	"time"

	"github.com/crowdsecurity/crowdsec/pkg/models"
	log "github.com/sirupsen/logrus"
	"gopkg.in/tomb.v2"
)

/*
 PluginWatcher is here to allow grouping and threshold features for notification plugins :
 by frequency : it will signal the plugin to deliver notifications at this frequency (watchPluginTicker)
 by threshold : it will signal the plugin to deliver notifications when the number of alerts for this plugin reaches this threshold (watchPluginAlertCounts)
*/

// TODO: When we start using go 1.18, consider moving this struct in some utils pkg. Make the implementation more generic using generics :)
type alertCounterByPluginName struct {
	sync.Mutex
	data map[string]int
}

func newAlertCounterByPluginName() alertCounterByPluginName {
	return alertCounterByPluginName{
		data: make(map[string]int),
	}
}

func (acp *alertCounterByPluginName) Init() {
	acp.data = make(map[string]int)
}

func (acp *alertCounterByPluginName) Get(key string) (int, bool) {
	acp.Lock()
	val, ok := acp.data[key]
	acp.Unlock()
	return val, ok
}

func (acp *alertCounterByPluginName) Set(key string, val int) {
	acp.Lock()
	acp.data[key] = val
	acp.Unlock()
}

type PluginWatcher struct {
	PluginConfigByName     map[string]PluginConfig
	AlertCountByPluginName alertCounterByPluginName
	PluginEvents           chan string
	Inserts                chan string
	tomb                   *tomb.Tomb
}

var DefaultEmptyTicker = time.Second * 1

func (pw *PluginWatcher) Init(configs map[string]PluginConfig, alertsByPluginName map[string][]*models.Alert) {
	pw.PluginConfigByName = configs
	pw.PluginEvents = make(chan string)
	pw.AlertCountByPluginName = newAlertCounterByPluginName()
	pw.Inserts = make(chan string)
	for name := range alertsByPluginName {
		pw.AlertCountByPluginName.Set(name, 0)
	}
}

func (pw *PluginWatcher) Start(tomb *tomb.Tomb) {
	pw.tomb = tomb
	for name := range pw.PluginConfigByName {
		pname := name
		pw.tomb.Go(func() error {
			pw.watchPluginTicker(pname)
			return nil
		})
	}

	pw.tomb.Go(func() error {
		pw.watchPluginAlertCounts()
		return nil
	})
}

func (pw *PluginWatcher) watchPluginTicker(pluginName string) {
	var watchTime time.Duration
	var watchCount int = -1
	// Threshold can be set : by time, by count, or both
	// if only time is set, honor it
	// if only count is set, put timer to 1 second and just check size
	// if both are set, set timer to 1 second, but check size && time
	interval := pw.PluginConfigByName[pluginName].GroupWait
	threshold := pw.PluginConfigByName[pluginName].GroupThreshold

	//only size is set
	if threshold > 0 && interval == 0 {
		watchCount = threshold
		watchTime = DefaultEmptyTicker
	} else if interval != 0 && threshold == 0 {
		//only time is set
		watchTime = interval
	} else if interval != 0 && threshold != 0 {
		//both are set
		watchTime = DefaultEmptyTicker
		watchCount = threshold
	} else {
		//none are set, we sent every event we receive
		watchTime = DefaultEmptyTicker
		watchCount = 1
	}

	ticker := time.NewTicker(watchTime)
	var lastSend time.Time = time.Now()
	for {
		select {
		case <-ticker.C:
			send := false
			//if count threshold was set, honor no matter what
			if pc, _ := pw.AlertCountByPluginName.Get(pluginName); watchCount > 0 && pc >= watchCount {
				log.Tracef("[%s] %d alerts received, sending\n", pluginName, pc)
				send = true
				pw.AlertCountByPluginName.Set(pluginName, 0)
			}
			//if time threshold only was set
			if watchTime > 0 && watchTime == interval {
				log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
				send = true
			}

			//if we hit timer because it was set low to honor count, check if we should trigger
			if watchTime == DefaultEmptyTicker && watchTime != interval && interval != 0 {
				if lastSend.Add(interval).Before(time.Now()) {
					log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
					send = true
					lastSend = time.Now()
				}
			}
			if send {
				log.Tracef("sending alerts to %s", pluginName)
				pw.PluginEvents <- pluginName
			}
		case <-pw.tomb.Dying():
			ticker.Stop()
			// emptying
			// no lock here because we have the broker still listening even in dying state before killing us
			pw.PluginEvents <- pluginName
			return
		}
	}
}

func (pw *PluginWatcher) watchPluginAlertCounts() {
	for {
		select {
		case pluginName := <-pw.Inserts:
			//we only "count" pending alerts, and watchPluginTicker is actually going to send it
			if _, ok := pw.PluginConfigByName[pluginName]; ok {
				curr, _ := pw.AlertCountByPluginName.Get(pluginName)
				pw.AlertCountByPluginName.Set(pluginName, curr+1)
			}
		case <-pw.tomb.Dying():
			return
		}
	}
}