File: handler_dispatcher.go

package info (click to toggle)
fever 1.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 920 kB
  • sloc: sh: 41; makefile: 18
file content (170 lines) | stat: -rw-r--r-- 4,983 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package processing

// DCSO FEVER
// Copyright (c) 2017, 2018, DCSO GmbH

import (
	"sync"
	"time"

	"github.com/DCSO/fever/types"
	"github.com/DCSO/fever/util"

	log "github.com/sirupsen/logrus"
)

// HandlerDispatcherPerfStats contains performance stats written to InfluxDB
// for monitoring.
type HandlerDispatcherPerfStats struct {
	DispatchedPerSec uint64 `influx:"dispatch_calls_per_sec"`
}

// HandlerDispatcher is a component to collect and properly apply a set of
// Handlers to a stream of Entry objects. Handlers can register the event types
// they are meant to act on and are called with relevant Entries to perform
// their job.
type HandlerDispatcher struct {
	Lock               sync.Mutex
	DispatchMap        map[string]([]Handler)
	DBHandler          Handler
	PerfStats          HandlerDispatcherPerfStats
	Logger             *log.Entry
	StatsEncoder       *util.PerformanceStatsEncoder
	StopCounterChan    chan bool
	StoppedCounterChan chan bool
}

// DBHandler writes consumed events to a database.
type DBHandler struct {
	OutChan chan types.Entry
}

// GetName just returns the name of the default handler
func (h *DBHandler) GetName() string {
	return "Default handler"
}

// GetEventTypes here is a dummy method -- since this handler is never
// registered we don't need to set this to an actual event type
func (h *DBHandler) GetEventTypes() []string {
	return []string{"not applicable"}
}

// Consume simply emits the consumed entry on the default output channel
func (h *DBHandler) Consume(e *types.Entry) error {
	h.OutChan <- *e
	return nil
}

func (ad *HandlerDispatcher) runCounter() {
	sTime := time.Now()
	for {
		time.Sleep(500 * time.Millisecond)
		select {
		case <-ad.StopCounterChan:
			close(ad.StoppedCounterChan)
			return
		default:
			if ad.StatsEncoder == nil || time.Since(sTime) < ad.StatsEncoder.SubmitPeriod {
				continue
			}
			// Lock the current measurements for submission. Since this is a blocking
			// operation, we don't want this to depend on how long submitter.Submit()
			// takes but keep it independent of that. Hence we take the time to create
			// a local copy of the counter to be able to reset and release the live
			// one as quickly as possible.
			ad.Lock.Lock()
			// Make our own copy of the current counter
			myStats := HandlerDispatcherPerfStats{
				DispatchedPerSec: ad.PerfStats.DispatchedPerSec,
			}
			myStats.DispatchedPerSec /= uint64(ad.StatsEncoder.SubmitPeriod.Seconds())
			// Reset live counter
			ad.PerfStats.DispatchedPerSec = 0
			// Release live counter to not block further events
			ad.Lock.Unlock()

			ad.StatsEncoder.Submit(myStats)
			sTime = time.Now()
		}
	}
}

// MakeHandlerDispatcher returns a new HandlerDispatcher. The channel passed
// as an argument is used as an output channel for the default handler, which
// simply forwards events to a given channel (for example to be written to a
// database)
func MakeHandlerDispatcher(databaseOut chan types.Entry) *HandlerDispatcher {
	ad := &HandlerDispatcher{
		DispatchMap: make(map[string]([]Handler)),
		Logger: log.WithFields(log.Fields{
			"domain": "dispatch",
		}),
	}
	if databaseOut != nil {
		ad.DBHandler = &DBHandler{
			OutChan: databaseOut,
		}
	}
	ad.Logger.WithFields(log.Fields{
		"type": "*",
		"name": "default handler",
	}).Debugf("event handler added")
	return ad
}

// RegisterHandler adds the given Handler to the set of callbacks to be
// called on the relevant Entries received by the dispatcher.
func (ad *HandlerDispatcher) RegisterHandler(agg Handler) {
	eventTypes := agg.GetEventTypes()
	for _, eventType := range eventTypes {
		if _, ok := ad.DispatchMap[eventType]; !ok {
			ad.DispatchMap[eventType] = make([]Handler, 0)
		}
		ad.DispatchMap[eventType] = append(ad.DispatchMap[eventType], agg)
		ad.Logger.WithFields(log.Fields{
			"type": eventType,
			"name": agg.GetName(),
		}).Info("event handler added")
	}
}

// Dispatch applies the set of handlers currently registered in the dispatcher
// to the Entry object passed to it.
func (ad *HandlerDispatcher) Dispatch(e *types.Entry) {
	if _, ok := ad.DispatchMap[e.EventType]; ok {
		for _, agg := range ad.DispatchMap[e.EventType] {
			agg.Consume(e)
		}
	}
	if a, ok := ad.DispatchMap["*"]; ok {
		for _, agg := range a {
			agg.Consume(e)
		}
	}
	if ad.DBHandler != nil {
		ad.DBHandler.Consume(e)
	}
	ad.Lock.Lock()
	ad.PerfStats.DispatchedPerSec++
	ad.Lock.Unlock()
}

// SubmitStats registers a PerformanceStatsEncoder for runtime stats submission.
func (ad *HandlerDispatcher) SubmitStats(sc *util.PerformanceStatsEncoder) {
	ad.StatsEncoder = sc
}

// Run starts the background service for this handler
func (ad *HandlerDispatcher) Run() {
	ad.StopCounterChan = make(chan bool)
	ad.StoppedCounterChan = make(chan bool)
	go ad.runCounter()
}

// Stop causes the handler to cease counting and submitting data
func (ad *HandlerDispatcher) Stop(stopChan chan bool) {
	close(ad.StopCounterChan)
	<-ad.StoppedCounterChan
	close(stopChan)
}