1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
package events
import (
"net/http"
"sync"
"github.com/crc-org/crc/v2/pkg/crc/logging"
"github.com/crc-org/crc/v2/pkg/crc/machine"
"github.com/r3labs/sse/v2"
)
type EventServer struct {
sseServer *sse.Server
muStreams sync.RWMutex
streams map[string]EventStream
machine machine.Client
}
func NewEventServer(machine machine.Client) *EventServer {
var sseServer = sse.New()
sseServer.AutoReplay = false
eventServer := &EventServer{
sseServer: sseServer,
machine: machine,
streams: map[string]EventStream{},
}
sseServer.OnSubscribe = func(streamId string, sub *sse.Subscriber) {
logging.Debugf("OnSubscribe on channel: %s", streamId)
stream, ok := eventServer.streams[streamId]
eventServer.muStreams.Lock()
defer eventServer.muStreams.Unlock()
if !ok {
stream = createEventStream(eventServer, streamId)
if stream == nil {
logging.Errorf("Could not create EventStream for %s", streamId)
return
}
eventServer.streams[streamId] = stream
}
stream.AddSubscriber(sub)
}
sseServer.OnUnsubscribe = func(streamId string, sub *sse.Subscriber) {
logging.Debugf("OnUnsubscribe on channel: %s", streamId)
stream, ok := eventServer.streams[streamId]
if !ok {
logging.Debugf("Could not find stream:%s", streamId)
return
}
stream.RemoveSubscriber(sub)
}
sseServer.CreateStream(LOGS)
sseServer.CreateStream(STATUS)
return eventServer
}
func (es *EventServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
es.sseServer.ServeHTTP(w, r)
}
func createEventStream(server *EventServer, streamID string) EventStream {
switch streamID {
case LOGS:
return newLogsStream(server)
case STATUS:
return newStatusStream(server)
}
return nil
}
|