File: priority_queue.go

package info (click to toggle)
golang-github-pion-interceptor 0.1.42-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,052 kB
  • sloc: makefile: 8
file content (203 lines) | stat: -rw-r--r-- 4,070 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
	"errors"

	"github.com/pion/rtp"
)

// PriorityQueue provides a linked list sorting of RTP packets by SequenceNumber.
type PriorityQueue struct {
	next   *node
	length uint16
}

type node struct {
	val      *rtp.Packet
	next     *node
	prev     *node
	priority uint16
}

var (
	// ErrInvalidOperation may be returned if a Pop or Find operation is performed on an empty queue.
	ErrInvalidOperation = errors.New("attempt to find or pop on an empty list")
	// ErrNotFound will be returned if the packet cannot be found in the queue.
	ErrNotFound = errors.New("priority not found")
)

// NewQueue will create a new PriorityQueue whose order relies on monotonically
// increasing Sequence Number, wrapping at MaxUint16, so
// a packet with sequence number MaxUint16 - 1 will be after 0.
func NewQueue() *PriorityQueue {
	return &PriorityQueue{
		next:   nil,
		length: 0,
	}
}

func newNode(val *rtp.Packet, priority uint16) *node {
	return &node{
		val:      val,
		prev:     nil,
		next:     nil,
		priority: priority,
	}
}

// Find a packet in the queue with the provided sequence number,
// regardless of position (the packet is retained in the queue).
func (q *PriorityQueue) Find(sqNum uint16) (*rtp.Packet, error) {
	next := q.next
	for next != nil {
		if next.priority == sqNum {
			return next.val, nil
		}
		next = next.next
	}

	return nil, ErrNotFound
}

// Push will insert a packet in to the queue in order of sequence number.
func (q *PriorityQueue) Push(val *rtp.Packet, priority uint16) {
	newPq := newNode(val, priority)
	if q.next == nil {
		q.next = newPq
		q.length++

		return
	}
	if priority < q.next.priority {
		newPq.next = q.next
		q.next.prev = newPq
		q.next = newPq
		q.length++

		return
	}
	head := q.next
	prev := q.next
	for head != nil {
		if priority <= head.priority {
			break
		}
		prev = head
		head = head.next
	}
	if head == nil {
		if prev != nil {
			prev.next = newPq
		}
		newPq.prev = prev
	} else {
		newPq.next = head
		newPq.prev = prev
		if prev != nil {
			prev.next = newPq
		}
		head.prev = newPq
	}
	q.length++
}

// Length will get the total length of the queue.
func (q *PriorityQueue) Length() uint16 {
	return q.length
}

// Pop removes the first element from the queue, regardless
// sequence number.
func (q *PriorityQueue) Pop() (*rtp.Packet, error) {
	if q.next == nil {
		return nil, ErrInvalidOperation
	}
	val := q.next.val
	q.next.val = nil
	q.length--
	q.next = q.next.next

	return val, nil
}

// PopAt removes an element at the specified sequence number (priority).
func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) {
	if q.next == nil {
		return nil, ErrInvalidOperation
	}
	if q.next.priority == sqNum {
		val := q.next.val
		q.next.val = nil
		q.next = q.next.next
		q.length--

		return val, nil
	}
	pos := q.next
	prev := q.next.prev
	for pos != nil {
		if pos.priority == sqNum {
			val := pos.val
			pos.val = nil
			prev.next = pos.next
			if prev.next != nil {
				prev.next.prev = prev
			}
			q.length--

			return val, nil
		}
		prev = pos
		pos = pos.next
	}

	return nil, ErrNotFound
}

// PopAtTimestamp removes and returns a packet at the given RTP Timestamp, regardless
// sequence number order.
func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
	if q.next == nil {
		return nil, ErrInvalidOperation
	}
	if q.next.val.Timestamp == timestamp {
		val := q.next.val
		q.next.val = nil
		q.next = q.next.next
		q.length--

		return val, nil
	}
	pos := q.next
	prev := q.next.prev
	for pos != nil {
		if pos.val.Timestamp == timestamp {
			val := pos.val
			pos.val = nil
			prev.next = pos.next
			if prev.next != nil {
				prev.next.prev = prev
			}
			q.length--

			return val, nil
		}
		prev = pos
		pos = pos.next
	}

	return nil, ErrNotFound
}

// Clear will empty a PriorityQueue.
func (q *PriorityQueue) Clear() {
	next := q.next
	q.length = 0
	for next != nil {
		next.prev = nil
		next = next.next
	}
}