1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
package csplugin
import (
"sync"
"time"
"github.com/crowdsecurity/crowdsec/pkg/models"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)
/*
PluginWatcher is here to allow grouping and threshold features for notification plugins :
by frequency : it will signal the plugin to deliver notifications at this frequency (watchPluginTicker)
by threshold : it will signal the plugin to deliver notifications when the number of alerts for this plugin reaches this threshold (watchPluginAlertCounts)
*/
// TODO: When we start using go 1.18, consider moving this struct in some utils pkg. Make the implementation more generic using generics :)
type alertCounterByPluginName struct {
sync.Mutex
data map[string]int
}
func newAlertCounterByPluginName() alertCounterByPluginName {
return alertCounterByPluginName{
data: make(map[string]int),
}
}
func (acp *alertCounterByPluginName) Init() {
acp.data = make(map[string]int)
}
func (acp *alertCounterByPluginName) Get(key string) (int, bool) {
acp.Lock()
val, ok := acp.data[key]
acp.Unlock()
return val, ok
}
func (acp *alertCounterByPluginName) Set(key string, val int) {
acp.Lock()
acp.data[key] = val
acp.Unlock()
}
type PluginWatcher struct {
PluginConfigByName map[string]PluginConfig
AlertCountByPluginName alertCounterByPluginName
PluginEvents chan string
Inserts chan string
tomb *tomb.Tomb
}
var DefaultEmptyTicker = time.Second * 1
func (pw *PluginWatcher) Init(configs map[string]PluginConfig, alertsByPluginName map[string][]*models.Alert) {
pw.PluginConfigByName = configs
pw.PluginEvents = make(chan string)
pw.AlertCountByPluginName = newAlertCounterByPluginName()
pw.Inserts = make(chan string)
for name := range alertsByPluginName {
pw.AlertCountByPluginName.Set(name, 0)
}
}
func (pw *PluginWatcher) Start(tomb *tomb.Tomb) {
pw.tomb = tomb
for name := range pw.PluginConfigByName {
pname := name
pw.tomb.Go(func() error {
pw.watchPluginTicker(pname)
return nil
})
}
pw.tomb.Go(func() error {
pw.watchPluginAlertCounts()
return nil
})
}
func (pw *PluginWatcher) watchPluginTicker(pluginName string) {
var watchTime time.Duration
var watchCount int = -1
// Threshold can be set : by time, by count, or both
// if only time is set, honor it
// if only count is set, put timer to 1 second and just check size
// if both are set, set timer to 1 second, but check size && time
interval := pw.PluginConfigByName[pluginName].GroupWait
threshold := pw.PluginConfigByName[pluginName].GroupThreshold
//only size is set
if threshold > 0 && interval == 0 {
watchCount = threshold
watchTime = DefaultEmptyTicker
} else if interval != 0 && threshold == 0 {
//only time is set
watchTime = interval
} else if interval != 0 && threshold != 0 {
//both are set
watchTime = DefaultEmptyTicker
watchCount = threshold
} else {
//none are set, we sent every event we receive
watchTime = DefaultEmptyTicker
watchCount = 1
}
ticker := time.NewTicker(watchTime)
var lastSend time.Time = time.Now()
for {
select {
case <-ticker.C:
send := false
//if count threshold was set, honor no matter what
if pc, _ := pw.AlertCountByPluginName.Get(pluginName); watchCount > 0 && pc >= watchCount {
log.Tracef("[%s] %d alerts received, sending\n", pluginName, pc)
send = true
pw.AlertCountByPluginName.Set(pluginName, 0)
}
//if time threshold only was set
if watchTime > 0 && watchTime == interval {
log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
send = true
}
//if we hit timer because it was set low to honor count, check if we should trigger
if watchTime == DefaultEmptyTicker && watchTime != interval && interval != 0 {
if lastSend.Add(interval).Before(time.Now()) {
log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
send = true
lastSend = time.Now()
}
}
if send {
log.Tracef("sending alerts to %s", pluginName)
pw.PluginEvents <- pluginName
}
case <-pw.tomb.Dying():
ticker.Stop()
// emptying
// no lock here because we have the broker still listening even in dying state before killing us
pw.PluginEvents <- pluginName
return
}
}
}
func (pw *PluginWatcher) watchPluginAlertCounts() {
for {
select {
case pluginName := <-pw.Inserts:
//we only "count" pending alerts, and watchPluginTicker is actually going to send it
if _, ok := pw.PluginConfigByName[pluginName]; ok {
curr, _ := pw.AlertCountByPluginName.Get(pluginName)
pw.AlertCountByPluginName.Set(pluginName, curr+1)
}
case <-pw.tomb.Dying():
return
}
}
}
|