File: ready.go

package info (click to toggle)
golang-k8s-apiserver 0.33.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 11,660 kB
  • sloc: sh: 236; makefile: 5
file content (195 lines) | stat: -rw-r--r-- 5,177 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/*
Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cacher

import (
	"context"
	"fmt"
	"sync"
	"time"

	"k8s.io/utils/clock"
)

type status int

const (
	Pending status = iota
	Ready
	Stopped
)

// ready is a three state condition variable that blocks until is Ready if is not Stopped.
// Its initial state is Pending and its state machine diagram is as follow.
//
// Pending <------> Ready -----> Stopped
//
//	|                           ^
//	└---------------------------┘
type ready struct {
	state       status // represent the state of the variable
	lastErr     error
	generation  int           // represent the number of times we have transtioned to ready
	lock        sync.RWMutex  // protect the state and generation variables
	restartLock sync.Mutex    // protect the transition from ready to pending where the channel is recreated
	waitCh      chan struct{} // blocks until is ready or stopped

	clock               clock.Clock
	lastStateChangeTime time.Time
}

func newReady(c clock.Clock) *ready {
	r := &ready{
		waitCh: make(chan struct{}),
		state:  Pending,
		clock:  c,
	}
	r.updateLastStateChangeTimeLocked()

	return r
}

// done close the channel once the state is Ready or Stopped
func (r *ready) done() chan struct{} {
	r.restartLock.Lock()
	defer r.restartLock.Unlock()
	return r.waitCh
}

// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
func (r *ready) wait(ctx context.Context) error {
	_, err := r.waitAndReadGeneration(ctx)
	return err
}

// waitAndReadGenration blocks until it is Ready or Stopped and returns number
// of times we entered ready state if Ready and error otherwise.
func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
	for {
		// r.done() only blocks if state is Pending
		select {
		case <-ctx.Done():
			return 0, ctx.Err()
		case <-r.done():
		}

		r.lock.RLock()
		if r.state == Pending {
			// since we allow to switch between the states Pending and Ready
			// if there is a quick transition from Pending -> Ready -> Pending
			// a process that was waiting can get unblocked and see a Pending
			// state again. If the state is Pending we have to wait again to
			// avoid an inconsistent state on the system, with some processes not
			// waiting despite the state moved back to Pending.
			r.lock.RUnlock()
			continue
		}
		generation, err := r.readGenerationLocked()
		r.lock.RUnlock()
		return generation, err
	}
}

// check returns the time elapsed since the state was last changed and the current value.
func (r *ready) check() (time.Duration, error) {
	_, elapsed, err := r.checkAndReadGeneration()
	return elapsed, err
}

// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value.
func (r *ready) checkAndReadGeneration() (int, time.Duration, error) {
	r.lock.RLock()
	defer r.lock.RUnlock()
	generation, err := r.readGenerationLocked()
	return generation, r.clock.Since(r.lastStateChangeTime), err
}

func (r *ready) readGenerationLocked() (int, error) {
	switch r.state {
	case Pending:
		if r.lastErr == nil {
			return 0, fmt.Errorf("storage is (re)initializing")
		} else {
			return 0, fmt.Errorf("storage is (re)initializing: %w", r.lastErr)
		}
	case Ready:
		return r.generation, nil
	case Stopped:
		return 0, fmt.Errorf("apiserver cacher is stopped")
	default:
		return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
	}
}

func (r *ready) setReady() {
	r.set(true, nil)
}

func (r *ready) setError(err error) {
	r.set(false, err)
}

// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
func (r *ready) set(ok bool, err error) {
	r.lock.Lock()
	defer r.lock.Unlock()
	if r.state == Stopped {
		return
	}
	r.lastErr = err
	if ok && r.state == Pending {
		r.state = Ready
		r.generation++
		r.updateLastStateChangeTimeLocked()
		select {
		case <-r.waitCh:
		default:
			close(r.waitCh)
		}
	} else if !ok && r.state == Ready {
		// creating the waitCh can be racy if
		// something enter the wait() method
		select {
		case <-r.waitCh:
			r.restartLock.Lock()
			r.waitCh = make(chan struct{})
			r.restartLock.Unlock()
		default:
		}
		r.state = Pending
		r.updateLastStateChangeTimeLocked()
	}
}

// stop the condition variable and set it as Stopped. This state is irreversible.
func (r *ready) stop() {
	r.lock.Lock()
	defer r.lock.Unlock()
	if r.state != Stopped {
		r.state = Stopped
		r.updateLastStateChangeTimeLocked()
	}
	select {
	case <-r.waitCh:
	default:
		close(r.waitCh)
	}
}

func (r *ready) updateLastStateChangeTimeLocked() {
	r.lastStateChangeTime = r.clock.Now()
}