1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
|
package events
import (
"context"
"encoding/json"
"sync"
"github.com/lxc/incus/v6/internal/server/storage/memorypipe"
"github.com/lxc/incus/v6/shared/api"
)
// InternalListener represents a internal event listener.
type InternalListener struct {
handlers map[string]EventHandler
listener *Listener
server *Server
ctx context.Context
listenerCtx context.Context
listenerCancel context.CancelFunc
lock sync.Mutex
wg sync.WaitGroup
}
// NewInternalListener returns an InternalListener.
func NewInternalListener(ctx context.Context, server *Server) *InternalListener {
return &InternalListener{
ctx: ctx,
handlers: map[string]EventHandler{},
server: server,
}
}
// startListener creates a new listener connection and listener. Also, it starts the gorountines
// needed to notify any registered handlers about new events.
func (l *InternalListener) startListener() {
var err error
l.listenerCtx, l.listenerCancel = context.WithCancel(l.ctx)
aEnd, bEnd := memorypipe.NewPipePair(l.listenerCtx)
listenerConnection := NewSimpleListenerConnection(aEnd)
l.listener, err = l.server.AddListener("", true, nil, listenerConnection, []string{"lifecycle", "logging", "network-acl"}, []EventSource{EventSourcePull}, nil, nil)
if err != nil {
return
}
go func(ctx context.Context) {
l.listener.Wait(ctx)
l.listener.Close()
l.listener = nil
}(l.listenerCtx)
l.wg.Add(1)
go func(ctx context.Context, handlers map[string]EventHandler) {
defer l.wg.Done()
for {
select {
case <-ctx.Done():
return
default:
var event api.Event
_ = json.NewDecoder(bEnd).Decode(&event)
for _, handler := range handlers {
if handler == nil {
continue
}
go handler(event)
}
}
}
}(l.listenerCtx, l.handlers)
}
// stopListener cancels the context thus stopping the listener.
func (l *InternalListener) stopListener() {
if l.listenerCancel != nil {
l.listenerCancel()
l.wg.Wait()
}
}
// AddHandler adds a new event handler.
func (l *InternalListener) AddHandler(name string, handler EventHandler) {
l.lock.Lock()
defer l.lock.Unlock()
if handler == nil {
return
}
// Add handler to the list of handlers.
l.handlers[name] = handler
if l.listener == nil {
// Create a listener if necessary. This avoids having a listener around if there are no handlers.
l.startListener()
}
}
// RemoveHandler removes the event handler with the given name.
func (l *InternalListener) RemoveHandler(name string) {
l.lock.Lock()
defer l.lock.Unlock()
for handlerName := range l.handlers {
if handlerName == name {
delete(l.handlers, name)
break
}
}
if len(l.handlers) == 0 {
// Stop listener to avoid unnecessary goroutines.
l.stopListener()
}
}
|