File: cursor.go

package info (click to toggle)
gitlab-shell 14.35.0%2Bds1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 23,652 kB
  • sloc: ruby: 1,129; makefile: 583; sql: 391; sh: 384
file content (101 lines) | stat: -rw-r--r-- 2,032 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package streamcache

import (
	"sync"
)

// cursor is a datatype that combines concurrent updates of an int64 with
// change notifications. The number is only allowed to go up; it is meant
// to represent the read or write offset in a file that is being accessed
// linearly.
type cursor struct {
	pos         int64
	subscribers []*notifier
	m           sync.RWMutex
	doneChan    chan struct{}
}

func newCursor() *cursor { return &cursor{doneChan: make(chan struct{})} }

func (c *cursor) Subscribe() *notifier {
	c.m.Lock()
	defer c.m.Unlock()

	n := newNotifier()
	c.subscribers = append(c.subscribers, n)
	return n
}

func (c *cursor) Unsubscribe(n *notifier) {
	c.m.Lock()
	defer c.m.Unlock()

	for i := range c.subscribers {
		if c.subscribers[i] == n {
			c.subscribers = append(c.subscribers[:i], c.subscribers[i+1:]...)
			break
		}
	}

	if len(c.subscribers) == 0 {
		select {
		case <-c.doneChan:
		default:
			close(c.doneChan)
		}
	}
}

// Done() returns a channel which gets closed when the number of
// subscribers drops to 0. If new subscribers get added after this, the
// channel remains closed.
func (c *cursor) Done() <-chan struct{} { return c.doneChan }

func (c *cursor) IsDone() bool {
	select {
	case <-c.doneChan:
		return true
	default:
		return false
	}
}

// SetPosition sets c.pos to the new value pos, but only if pos>c.pos. In the
// case that c.pos actually grew, all subscribers are notified.
func (c *cursor) SetPosition(pos int64) {
	if pos <= c.Position() {
		return
	}

	c.m.Lock()
	defer c.m.Unlock()

	// Check a second time now we hold the write lock.
	if pos <= c.pos {
		return
	}

	c.pos = pos
	for _, n := range c.subscribers {
		n.Notify()
	}
}

func (c *cursor) Position() int64 {
	c.m.RLock()
	defer c.m.RUnlock()
	return c.pos
}

type notifier struct {
	C chan struct{} // The channel on which notifications are delivered
}

func newNotifier() *notifier { return &notifier{C: make(chan struct{}, 1)} }

func (n *notifier) Notify() {
	select {
	case n.C <- struct{}{}:
	default:
	}
}