File: xsync.go

package info (click to toggle)
golang-github-bradenaw-juniper 0.15.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 872 kB
  • sloc: sh: 27; makefile: 2
file content (329 lines) | stat: -rw-r--r-- 7,850 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
// Package xsync contains extensions to the standard library package sync.
package xsync

import (
	"context"
	"math/rand"
	"sync"
	"time"
)

// ContextCond is equivalent to sync.Cond, except its Wait function accepts a context.Context.
//
// ContextConds should not be copied after first use.
type ContextCond struct {
	m  sync.RWMutex
	ch chan struct{}
	L  sync.Locker
}

// NewContextCond returns a new ContextCond with l as its Locker.
func NewContextCond(l sync.Locker) *ContextCond {
	return &ContextCond{
		L: l,
		// The 1-buffering here does mean that the cond will 'remember' signals in poor fashion,
		// because it means a misused Signal/Wait may still work.
		//
		// However, it's necessary because otherwise there is a race in Wait() - without the
		// buffering here, we could see a Signal() attempt to deliver to the channel after Wait has
		// reached c.L.Unlock() but before reaching the select, which will then lead to a missed
		// wakeup.
		ch: make(chan struct{}, 1),
	}
}

// Broadcast wakes all goroutines blocked in Wait(), if there are any.
//
// It is allowed but not required for the caller to hold c.L during the call.
func (c *ContextCond) Broadcast() {
	c.m.Lock()
	close(c.ch)
	c.ch = make(chan struct{}, 1)
	c.m.Unlock()
}

// Signal wakes one goroutine blocked in Wait(), if there is any. No guarantee is made as to which
// goroutine will wake.
//
// It is allowed but not required for the caller to hold c.L during the call.
func (c *ContextCond) Signal() {
	c.m.RLock()
	select {
	case c.ch <- struct{}{}:
	default:
	}
	c.m.RUnlock()
}

// Wait is equivalent to sync.Cond.Wait, except it accepts a context.Context. If the context expires
// before this goroutine is woken by Broadcast or Signal, it returns ctx.Err() immediately. If an
// error is returned, does not reaquire c.L before returning.
func (c *ContextCond) Wait(ctx context.Context) error {
	c.m.RLock()
	ch := c.ch
	c.m.RUnlock()
	c.L.Unlock()
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-ch:
		c.L.Lock()
	}
	return nil
}

// Group manages a group of goroutines.
type Group struct {
	ctx    context.Context
	cancel context.CancelFunc
	// held in R when spawning to check if ctx is already cancelled and in W when cancelling ctx to
	// make sure we never cause wg to go 0->1 while inside Wait()
	m  sync.RWMutex
	wg sync.WaitGroup
}

// NewGroup returns a Group ready for use. The context passed to any of the f functions will be a
// descendant of ctx.
func NewGroup(ctx context.Context) *Group {
	bgCtx, cancel := context.WithCancel(ctx)
	return &Group{
		ctx:    bgCtx,
		cancel: cancel,
	}
}

// helper even though it's exactly g.Do so that the goroutine stack for a spawned function doesn't
// confusingly show all of them as created by Do.
func (g *Group) spawn(f func()) {
	g.m.RLock()
	if g.ctx.Err() != nil {
		g.m.RUnlock()
		return
	}
	g.wg.Add(1)
	g.m.RUnlock()

	go func() {
		f()
		g.wg.Done()
	}()
}

// Do calls f once from another goroutine.
func (g *Group) Do(f func(ctx context.Context)) {
	g.spawn(func() { f(g.ctx) })
}

// returns a random duration in [d - jitter, d + jitter]
func jitterDuration(d time.Duration, jitter time.Duration) time.Duration {
	return d + time.Duration(float64(jitter)*((rand.Float64()*2)-1))
}

// Periodic spawns a goroutine that calls f once per interval +/- jitter.
func (g *Group) Periodic(
	interval time.Duration,
	jitter time.Duration,
	f func(ctx context.Context),
) {
	g.spawn(func() {
		t := time.NewTimer(jitterDuration(interval, jitter))
		defer t.Stop()
		for {
			if g.ctx.Err() != nil {
				return
			}
			select {
			case <-g.ctx.Done():
				return
			case <-t.C:
			}
			t.Reset(jitterDuration(interval, jitter))
			f(g.ctx)
		}
	})
}

// Trigger spawns a goroutine which calls f whenever the returned function is called. If f is
// already running when triggered, f will run again immediately when it finishes.
func (g *Group) Trigger(f func(ctx context.Context)) func() {
	c := make(chan struct{}, 1)

	g.spawn(func() {
		for {
			if g.ctx.Err() != nil {
				return
			}
			select {
			case <-g.ctx.Done():
				return
			case <-c:
			}
			f(g.ctx)
		}
	})

	return func() {
		select {
		case c <- struct{}{}:
		default:
		}
	}
}

// PeriodicOrTrigger spawns a goroutine which calls f whenever the returned function is called.  If
// f is already running when triggered, f will run again immediately when it finishes. Also calls f
// when it has been interval+/-jitter since the last trigger.
func (g *Group) PeriodicOrTrigger(
	interval time.Duration,
	jitter time.Duration,
	f func(ctx context.Context),
) func() {
	c := make(chan struct{}, 1)
	g.spawn(func() {
		t := time.NewTimer(jitterDuration(interval, jitter))
		defer t.Stop()
		for {
			if g.ctx.Err() != nil {
				return
			}
			select {
			case <-g.ctx.Done():
				return
			case <-t.C:
				t.Reset(jitterDuration(interval, jitter))
			case <-c:
				if !t.Stop() {
					<-t.C
				}
				t.Reset(jitterDuration(interval, jitter))
			}
			f(g.ctx)
		}
	})

	return func() {
		select {
		case c <- struct{}{}:
		default:
		}
	}
}

// Stop cancels the context passed to spawned goroutines. After the group is stopped, no more
// goroutines will be spawned.
func (g *Group) Stop() {
	g.m.Lock()
	g.cancel()
	g.m.Unlock()
}

// StopAndWait cancels the context passed to any of the spawned goroutines and waits for all spawned
// goroutines to exit. After the group is stopped, no more goroutines will be spawned.
func (g *Group) StopAndWait() {
	g.Stop()
	g.wg.Wait()
}

// Map is a typesafe wrapper over sync.Map.
type Map[K comparable, V any] struct {
	m sync.Map
}

func (m *Map[K, V]) Delete(key K) {
	m.m.Delete(key)
}
func (m *Map[K, V]) Load(key K) (value V, ok bool) {
	value_, ok := m.m.Load(key)
	if !ok {
		var zero V
		return zero, false
	}
	return value_.(V), ok
}
func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
	value_, ok := m.m.LoadAndDelete(key)
	if !ok {
		var zero V
		return zero, false
	}
	return value_.(V), ok
}
func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
	actual_, loaded := m.m.LoadOrStore(key, value)
	return actual_.(V), loaded
}
func (m *Map[K, V]) Range(f func(key K, value V) bool) {
	m.m.Range(func(key, value interface{}) bool {
		return f(key.(K), value.(V))
	})
}
func (m *Map[K, V]) Store(key K, value V) {
	m.m.Store(key, value)
}

// Pool is a typesafe wrapper over sync.Pool.
type Pool[T any] struct {
	p sync.Pool
}

func NewPool[T any](new_ func() T) Pool[T] {
	return Pool[T]{
		p: sync.Pool{
			New: func() interface{} {
				return new_()
			},
		},
	}
}

func (p *Pool[T]) Get() T {
	return p.p.Get().(T)
}

func (p *Pool[T]) Put(x T) {
	p.p.Put(x)
}

// Future can be filled with a value exactly once. Many goroutines can concurrently wait for it to
// be filled. After filling, Wait() immediately returns the value it was filled with.
//
// Futures must be created by NewFuture and should not be copied after first use.
type Future[T any] struct {
	c chan struct{}
	x T
}

// NewFuture returns a ready-to-use Future.
func NewFuture[T any]() *Future[T] {
	return &Future[T]{
		c: make(chan struct{}),
	}
}

// Fill fills f with value x. All active calls to Wait return x, and all future calls to Wait return
// x immediately.
//
// Panics if f has already been filled.
func (f *Future[T]) Fill(x T) {
	f.x = x
	close(f.c)
}

// Wait waits for f to be filled with a value and returns it. Returns immediately if f is already
// filled.
func (f *Future[T]) Wait() T {
	<-f.c
	return f.x
}

// Wait waits for f to be filled with a value and returns it, or returns ctx.Err() if ctx expires
// before this happens. Returns immediately if f is already filled.
func (f *Future[T]) WaitContext(ctx context.Context) (T, error) {
	select {
	case <-ctx.Done():
		var zero T
		return zero, ctx.Err()
	case <-f.c:
	}
	return f.x, nil
}