1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
package events
import (
"context"
"encoding/json"
"fmt"
"slices"
"time"
"github.com/google/uuid"
"github.com/lxc/incus/v6/shared/api"
"github.com/lxc/incus/v6/shared/cancel"
)
// DevIncusServer represents an instance of an devIncus event server.
type DevIncusServer struct {
serverCommon
listeners map[string]*DevIncusListener
}
// NewDevIncusServer returns a new devIncus event server.
func NewDevIncusServer(debug bool, verbose bool) *DevIncusServer {
server := &DevIncusServer{
serverCommon: serverCommon{
debug: debug,
verbose: verbose,
},
listeners: map[string]*DevIncusListener{},
}
return server
}
// AddListener creates and returns a new event listener.
func (s *DevIncusServer) AddListener(instanceID int, connection EventListenerConnection, messageTypes []string) (*DevIncusListener, error) {
listener := &DevIncusListener{
listenerCommon: listenerCommon{
EventListenerConnection: connection,
messageTypes: messageTypes,
done: cancel.New(context.Background()),
id: uuid.New().String(),
},
instanceID: instanceID,
}
s.lock.Lock()
defer s.lock.Unlock()
if s.listeners[listener.id] != nil {
return nil, fmt.Errorf("A listener with ID %q already exists", listener.id)
}
s.listeners[listener.id] = listener
go listener.start()
return listener, nil
}
// Send broadcasts a custom event.
func (s *DevIncusServer) Send(instanceID int, eventType string, eventMessage any) error {
encodedMessage, err := json.Marshal(eventMessage)
if err != nil {
return err
}
event := api.Event{
Type: eventType,
Timestamp: time.Now(),
Metadata: encodedMessage,
}
return s.broadcast(instanceID, event)
}
func (s *DevIncusServer) broadcast(instanceID int, event api.Event) error {
s.lock.Lock()
listeners := s.listeners
for _, listener := range listeners {
if !slices.Contains(listener.messageTypes, event.Type) {
continue
}
if listener.instanceID != instanceID {
continue
}
go func(listener *DevIncusListener, event api.Event) {
// Check that the listener still exists
if listener == nil {
return
}
// Make sure we're not done already
if listener.IsClosed() {
return
}
err := listener.WriteJSON(event)
if err != nil {
// Remove the listener from the list
s.lock.Lock()
delete(s.listeners, listener.id)
s.lock.Unlock()
listener.Close()
}
}(listener, event)
}
s.lock.Unlock()
return nil
}
// DevIncusListener describes a devIncus event listener.
type DevIncusListener struct {
listenerCommon
instanceID int
}
|