File: value.go

package info (click to toggle)
golang-github-juju-utils 0.0~git20171220.f38c0b0-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,748 kB
  • sloc: makefile: 20
file content (147 lines) | stat: -rw-r--r-- 3,360 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2012, 2013 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.

// Package voyeur implements a concurrency-safe value that can be watched for
// changes.
package voyeur

import (
	"sync"
)

// Value represents a shared value that can be watched for changes. Methods on
// a Value may be called concurrently. The zero Value is
// ok to use, and is equivalent to a NewValue result
// with a nil initial value.
type Value struct {
	val     interface{}
	version int
	mu      sync.RWMutex
	wait    sync.Cond
	closed  bool
}

// NewValue creates a new Value holding the given initial value. If initial is
// nil, any watchers will wait until a value is set.
func NewValue(initial interface{}) *Value {
	v := new(Value)
	v.init()
	if initial != nil {
		v.val = initial
		v.version++
	}
	return v
}

func (v *Value) needsInit() bool {
	return v.wait.L == nil
}

func (v *Value) init() {
	if v.needsInit() {
		v.wait.L = v.mu.RLocker()
	}
}

// Set sets the shared value to val.
func (v *Value) Set(val interface{}) {
	v.mu.Lock()
	v.init()
	v.val = val
	v.version++
	v.mu.Unlock()
	v.wait.Broadcast()
}

// Close closes the Value, unblocking any outstanding watchers.  Close always
// returns nil.
func (v *Value) Close() error {
	v.mu.Lock()
	v.init()
	v.closed = true
	v.mu.Unlock()
	v.wait.Broadcast()
	return nil
}

// Closed reports whether the value has been closed.
func (v *Value) Closed() bool {
	v.mu.RLock()
	defer v.mu.RUnlock()
	return v.closed
}

// Get returns the current value.
func (v *Value) Get() interface{} {
	v.mu.RLock()
	defer v.mu.RUnlock()
	return v.val
}

// Watch returns a Watcher that can be used to watch for changes to the value.
func (v *Value) Watch() *Watcher {
	return &Watcher{value: v}
}

// Watcher represents a single watcher of a shared value.
type Watcher struct {
	value   *Value
	version int
	current interface{}
	closed  bool
}

// Next blocks until there is a new value to be retrieved from the value that is
// being watched. It also unblocks when the value or the Watcher itself is
// closed. Next returns false if the value or the Watcher itself have been
// closed.
func (w *Watcher) Next() bool {
	val := w.value
	val.mu.RLock()
	defer val.mu.RUnlock()
	if val.needsInit() {
		val.mu.RUnlock()
		val.mu.Lock()
		val.init()
		val.mu.Unlock()
		val.mu.RLock()
	}

	// We can go around this loop a maximum of two times,
	// because the only thing that can cause a Wait to
	// return is for the condition to be triggered,
	// which can only happen if the value is set (causing
	// the version to increment) or it is closed
	// causing the closed flag to be set.
	// Both these cases will cause Next to return.
	for {
		if w.version != val.version {
			w.version = val.version
			w.current = val.val
			return true
		}
		if val.closed || w.closed {
			return false
		}

		// Wait releases the lock until triggered and then reacquires the lock,
		// thus avoiding a deadlock.
		val.wait.Wait()
	}
}

// Close closes the Watcher without closing the underlying
// value. It may be called concurrently with Next.
func (w *Watcher) Close() {
	w.value.mu.Lock()
	w.value.init()
	w.closed = true
	w.value.mu.Unlock()
	w.value.wait.Broadcast()
}

// Value returns the last value that was retrieved from the watched Value by
// Next.
func (w *Watcher) Value() interface{} {
	return w.current
}