File: pubsub.go

package info (click to toggle)
golang-github-anacrolix-missinggo 2.1.0-7
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 872 kB
  • sloc: makefile: 4
file content (117 lines) | stat: -rw-r--r-- 1,850 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package pubsub

import (
	"sync"
)

type PubSub struct {
	mu     sync.Mutex
	next   chan item
	closed bool
}

type item struct {
	value interface{}
	next  chan item
}

type Subscription struct {
	next   chan item
	Values chan interface{}
	mu     sync.Mutex
	closed chan struct{}
}

func NewPubSub() (ret *PubSub) {
	return new(PubSub)
}

func (me *PubSub) init() {
	me.next = make(chan item, 1)
}

func (me *PubSub) lazyInit() {
	me.mu.Lock()
	defer me.mu.Unlock()
	if me.closed {
		return
	}
	if me.next == nil {
		me.init()
	}
}

func (me *PubSub) Publish(v interface{}) {
	me.lazyInit()
	next := make(chan item, 1)
	i := item{v, next}
	me.mu.Lock()
	if !me.closed {
		me.next <- i
		me.next = next
	}
	me.mu.Unlock()
}

func (me *Subscription) Close() {
	me.mu.Lock()
	defer me.mu.Unlock()
	select {
	case <-me.closed:
	default:
		close(me.closed)
	}
}

func (me *Subscription) runner() {
	defer close(me.Values)
	for {
		select {
		case i, ok := <-me.next:
			if !ok {
				me.Close()
				return
			}
			// Send the value back into the channel for someone else. This
			// won't block because the channel has a capacity of 1, and this
			// is currently the only copy of this value being sent to this
			// channel.
			me.next <- i
			// The next value comes from the channel given to us by the value
			// we just got.
			me.next = i.next
			select {
			case me.Values <- i.value:
			case <-me.closed:
				return
			}
		case <-me.closed:
			return
		}
	}
}

func (me *PubSub) Subscribe() (ret *Subscription) {
	me.lazyInit()
	ret = &Subscription{
		closed: make(chan struct{}),
		Values: make(chan interface{}),
	}
	me.mu.Lock()
	ret.next = me.next
	me.mu.Unlock()
	go ret.runner()
	return
}

func (me *PubSub) Close() {
	me.mu.Lock()
	defer me.mu.Unlock()
	if me.closed {
		return
	}
	if me.next != nil {
		close(me.next)
	}
	me.closed = true
}