File: internalListener.go

package info (click to toggle)
incus 6.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 24,392 kB
  • sloc: sh: 16,313; ansic: 3,121; python: 457; makefile: 337; ruby: 51; sql: 50; lisp: 6
file content (119 lines) | stat: -rw-r--r-- 2,802 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package events

import (
	"context"
	"encoding/json"
	"sync"

	"github.com/lxc/incus/v6/internal/server/storage/memorypipe"
	"github.com/lxc/incus/v6/shared/api"
)

// InternalListener represents a internal event listener.
type InternalListener struct {
	handlers       map[string]EventHandler
	listener       *Listener
	server         *Server
	ctx            context.Context
	listenerCtx    context.Context
	listenerCancel context.CancelFunc
	lock           sync.Mutex
	wg             sync.WaitGroup
}

// NewInternalListener returns an InternalListener.
func NewInternalListener(ctx context.Context, server *Server) *InternalListener {
	return &InternalListener{
		ctx:      ctx,
		handlers: map[string]EventHandler{},
		server:   server,
	}
}

// startListener creates a new listener connection and listener. Also, it starts the gorountines
// needed to notify any registered handlers about new events.
func (l *InternalListener) startListener() {
	var err error

	l.listenerCtx, l.listenerCancel = context.WithCancel(l.ctx)
	aEnd, bEnd := memorypipe.NewPipePair(l.listenerCtx)
	listenerConnection := NewSimpleListenerConnection(aEnd)

	l.listener, err = l.server.AddListener("", true, nil, listenerConnection, []string{"lifecycle", "logging", "network-acl"}, []EventSource{EventSourcePull}, nil, nil)
	if err != nil {
		return
	}

	go func(ctx context.Context) {
		l.listener.Wait(ctx)
		l.listener.Close()
		l.listener = nil
	}(l.listenerCtx)

	l.wg.Add(1)
	go func(ctx context.Context, handlers map[string]EventHandler) {
		defer l.wg.Done()
		for {
			select {
			case <-ctx.Done():
				return
			default:
				var event api.Event

				_ = json.NewDecoder(bEnd).Decode(&event)

				for _, handler := range handlers {
					if handler == nil {
						continue
					}

					go handler(event)
				}
			}
		}
	}(l.listenerCtx, l.handlers)
}

// stopListener cancels the context thus stopping the listener.
func (l *InternalListener) stopListener() {
	if l.listenerCancel != nil {
		l.listenerCancel()
		l.wg.Wait()
	}
}

// AddHandler adds a new event handler.
func (l *InternalListener) AddHandler(name string, handler EventHandler) {
	l.lock.Lock()
	defer l.lock.Unlock()

	if handler == nil {
		return
	}

	// Add handler to the list of handlers.
	l.handlers[name] = handler

	if l.listener == nil {
		// Create a listener if necessary. This avoids having a listener around if there are no handlers.
		l.startListener()
	}
}

// RemoveHandler removes the event handler with the given name.
func (l *InternalListener) RemoveHandler(name string) {
	l.lock.Lock()
	defer l.lock.Unlock()

	for handlerName := range l.handlers {
		if handlerName == name {
			delete(l.handlers, name)
			break
		}
	}

	if len(l.handlers) == 0 {
		// Stop listener to avoid unnecessary goroutines.
		l.stopListener()
	}
}