File: stream.go

package info (click to toggle)
golang-github-digitalocean-go-libvirt 0.0~git20250317.13bf9b4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,188 kB
  • sloc: yacc: 188; sh: 76; xml: 50; makefile: 3
file content (157 lines) | stat: -rw-r--r-- 4,877 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright 2020 The go-libvirt Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package event

import (
	"context"
)

// emptyEvent is used as a zero-value. Clients will never receive one of these;
// they are only here to satisfy the compiler. See the comments in process() for
// more information.
type emptyEvent struct{}

func (emptyEvent) GetCallbackID() int32 { return 0 }

// Stream is an unbounded buffered event channel. The implementation
// consists of a pair of unbuffered channels and a goroutine to manage them.
// Client behavior will not cause incoming events to block.
type Stream struct {
	// Program specifies the source of the events - libvirt or QEMU.
	Program uint32

	// CallbackID is returned by the event registration call.
	CallbackID int32

	// manage unbounded channel behavior.
	queue   []Event
	qlen    chan (chan int)
	in, out chan Event

	// terminates processing
	shutdown context.CancelFunc
}

// NewStream configures a new Event Stream. Incoming events are appended to a
// queue, which is then relayed to the listening client. Client behavior will
// not cause incoming events to block. It is the responsibility of the caller
// to terminate the Stream via Shutdown() when no longer in use.
func NewStream(program uint32, cbID int32) *Stream {
	s := &Stream{
		Program:    program,
		CallbackID: cbID,
		in:         make(chan Event),
		out:        make(chan Event),
		qlen:       make(chan (chan int)),
	}

	// Start the processing loop, which will return a routine we can use to
	// shut the queue down later.
	s.shutdown = s.start()

	return s
}

// Len will return the current count of events in the internal queue for a
// stream. It does this by sending a message to the stream's process() loop,
// which will then write the current length to the channel contained in that
// message.
func (s *Stream) Len() int {
	// Send a request to the process() loop to get the current length of the
	// queue
	ch := make(chan int)
	s.qlen <- ch
	return <-ch
}

// Recv returns the next available event from the Stream's queue.
func (s *Stream) Recv() chan Event {
	return s.out
}

// Push appends a new event to the queue.
func (s *Stream) Push(e Event) {
	s.in <- e
}

// Shutdown gracefully terminates Stream processing, releasing all internal
// resources. Events which have not yet been received by the client will be
// dropped. Subsequent calls to Shutdown() are idempotent.
func (s *Stream) Shutdown() {
	if s.shutdown != nil {
		s.shutdown()
	}
}

// start starts the event processing loop, which will continue to run until
// terminated by the returned context.CancelFunc.
func (s *Stream) start() context.CancelFunc {
	ctx, cancel := context.WithCancel(context.Background())

	go s.process(ctx)

	return cancel
}

// process manages an Stream's lifecycle until canceled by the provided context.
// Incoming events are appended to a queue which is then relayed to the
// listening client. New events pushed onto the queue will not block if the
// client is not actively polling for them; the stream will buffer them
// internally.
func (s *Stream) process(ctx context.Context) {
	// Close the output channel so that clients know this stream is finished.
	// We don't close s.in to avoid creating a race with the stream's Push()
	// function.
	defer close(s.out)

	// This function is used to retrieve the next event from the queue, to be
	// sent to the client. If there are no more events to send, it returns a nil
	// channel and a zero-value event.
	nextEvent := func() (chan Event, Event) {
		sendCh := chan Event(nil)
		next := Event(emptyEvent{})
		if len(s.queue) > 0 {
			sendCh = s.out
			next = s.queue[0]
		}
		return sendCh, next
	}

	// The select statement in this loop relies on the fact that a send to a nil
	// channel will block forever. If we have no entries in the queue, the
	// sendCh variable will be nil, so the clause that attempts to send an event
	// to the client will never complete. Clients will never receive an
	// emptyEvent.
	for {
		sendCh, nextEvt := nextEvent()

		select {
		// new event received, append to queue
		case e := <-s.in:
			s.queue = append(s.queue, e)

		case lenCh := <-s.qlen:
			lenCh <- len(s.queue)

		// client received an event, pop from queue
		case sendCh <- nextEvt:
			s.queue = s.queue[1:]

		// shutdown requested
		case <-ctx.Done():
			return
		}
	}
}