File: loop.go

package info (click to toggle)
golang-github-tideland-golib 4.24.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,144 kB
  • sloc: makefile: 4
file content (416 lines) | stat: -rw-r--r-- 9,981 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
// Tideland Go Application Support - Loop
//
// Copyright (C) 2013-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

package loop

//--------------------
// IMPORTS
//--------------------

import (
	"fmt"
	"sync"
	"time"

	"github.com/tideland/golib/errors"
	"github.com/tideland/golib/identifier"
	"github.com/tideland/golib/logger"
)

//--------------------
// API
//--------------------

// Go starts the loop function in the background. The loop can be
// stopped or killed. This leads to a signal out of the channel
// Loop.ShallStop. The loop then has to end working returning
// a possible error. Wait then waits until the loop ended and
// returns the error.
func Go(lf LoopFunc, dps ...interface{}) Loop {
	descr := identifier.SepIdentifier("::", dps...)
	return goLoop(lf, nil, nil, nil, descr)
}

// GoRecoverable starts the loop function in the background. The
// loop can be stopped or killed. This leads to a signal out of the
// channel Loop.ShallStop. The loop then has to end working returning
// a possible error. Wait then waits until the loop ended and returns
// the error.
//
// If the loop panics a Recovering is created and passed with all
// Recoverings before to the RecoverFunc. If it returns nil the
// loop will be started again. Otherwise the loop will be killed
// with that error.
func GoRecoverable(lf LoopFunc, rf RecoverFunc, dps ...interface{}) Loop {
	descr := identifier.SepIdentifier("::", dps...)
	return goLoop(lf, rf, nil, nil, descr)
}

// GoSentinel starts a new sentinel. It can manage loops and other sentinels
// and will stop them in case of errors.
func GoSentinel(dps ...interface{}) Sentinel {
	descr := identifier.SepIdentifier("::", dps...)
	return goSentinel(nil, nil, descr)
}

// GoNotifiedSentinel starts a new sentinel with a notification handler
// function. It can manage loops and other sentinels and restart them in
// case of errors, based on the notification handler function.
func GoNotifiedSentinel(nhf NotificationHandlerFunc, dps ...interface{}) Sentinel {
	descr := identifier.SepIdentifier("::", dps...)
	return goSentinel(nhf, nil, descr)
}

//--------------------
// RECOVERING
//--------------------

// Recovering stores time and reason of one of the recoverings.
type Recovering struct {
	Time   time.Time
	Reason interface{}
}

// Recoverings is a list of recoverings a loop already had.
type Recoverings []*Recovering

// Frequency checks if a given number of restarts happened during
// a given duration.
func (rs Recoverings) Frequency(num int, dur time.Duration) bool {
	if len(rs) >= num {
		first := rs[len(rs)-num].Time
		last := rs[len(rs)-1].Time
		return last.Sub(first) <= dur
	}
	return false
}

// Len returns the length of the recoverings.
func (rs Recoverings) Len() int {
	return len(rs)
}

// Trim returns the last recoverings defined by l. This
// way the recover func can con control the length and take
// care that the list not grows too much.
func (rs Recoverings) Trim(l int) Recoverings {
	if l >= len(rs) {
		return rs
	}
	return rs[len(rs)-l:]
}

// First returns the first recovering.
func (rs Recoverings) First() *Recovering {
	if len(rs) > 0 {
		return rs[0]
	}
	return nil
}

// Last returns the last recovering.
func (rs Recoverings) Last() *Recovering {
	if len(rs) > 0 {
		return rs[len(rs)-1]
	}
	return nil
}

// RecoverFunc decides if a loop shall be started again or
// end with an error. It is also responsible to trim the
// list of revocerings if needed.
type RecoverFunc func(rs Recoverings) (Recoverings, error)

//--------------------
// OBSERVABLE
//--------------------

// Observable is a common base interface for those objects
// that a sentinel can monitor.
type Observable interface {
	fmt.Stringer

	// Stop tells the observable to stop working and waits until it is done.
	Stop() error

	// Kill kills the observable with the passed error.
	Kill(err error)

	// Wait blocks the caller until the observable ended and returns
	// a possible error.
	Wait() error

	// Restart stops the observable and restarts it afterwards.
	Restart() error

	// Error returns information about the current status and error.
	Error() (status int, err error)

	// attachSentinel attaches the observable to a sentinel.
	attachSentinel(s *sentinel)
}

//--------------------
// LOOP
//--------------------

// Status of the loop.
const (
	Running = iota
	Stopping
	Stopped
)

// LoopFunc is managed loop function.
type LoopFunc func(l Loop) error

// Loop manages running loops in the background as goroutines.
type Loop interface {
	Observable

	// ShallStop returns a channel signalling the loop to
	// stop working.
	ShallStop() <-chan struct{}

	// IsStopping returns a channel that can be used to wait until
	// the loop is stopping or to avoid deadlocks when communicating
	// with the loop.
	IsStopping() <-chan struct{}
}

// Loop manages a loop function.
type loop struct {
	mux         sync.Mutex
	descr       string
	status      int
	err         error
	loopF       LoopFunc
	recoverF    RecoverFunc
	recoverings Recoverings
	startedC    chan struct{}
	stopC       chan struct{}
	doneC       chan struct{}
	owner       Observable
	sentinel    *sentinel
}

// goLoop starts a loop in the background.
func goLoop(lf LoopFunc, rf RecoverFunc, o Observable, s *sentinel, d string) *loop {
	l := &loop{
		descr:    d,
		loopF:    lf,
		recoverF: rf,
		startedC: make(chan struct{}),
		stopC:    make(chan struct{}),
		doneC:    make(chan struct{}),
		owner:    o,
		sentinel: s,
	}
	// Check owner, at least we should own ourself.
	if l.owner == nil {
		l.owner = l
	}
	// Start the loop.
	l.logf(false, "loop %q starts", l)
	go l.run()
	<-l.startedC
	return l
}

// String implements the Stringer interface. It returns
// the description of the loop.
func (l *loop) String() string {
	return l.descr
}

// Stop implements the Observable interface.
func (l *loop) Stop() error {
	l.terminate(nil)
	return l.Wait()
}

// Kill implements the Observable interface.
func (l *loop) Kill(err error) {
	l.terminate(err)
}

// Wait implements the Observable interface.
func (l *loop) Wait() error {
	<-l.doneC
	l.mux.Lock()
	defer l.mux.Unlock()
	err := l.err
	return err
}

// Restart implements the Observable interface.
func (l *loop) Restart() error {
	l.mux.Lock()
	defer l.mux.Unlock()
	if l.status != Stopped {
		return errors.New(ErrRestartNonStopped, errorMessages, l)
	}
	l.err = nil
	l.recoverings = nil
	l.status = Running
	l.stopC = make(chan struct{})
	l.doneC = make(chan struct{})
	// Restart the goroutine.
	l.logf(false, "loop %q restarts", l)
	go l.run()
	<-l.startedC
	return nil
}

// Error implements the Observable interface.
func (l *loop) Error() (status int, err error) {
	l.mux.Lock()
	defer l.mux.Unlock()
	status = l.status
	err = l.err
	return
}

// attachSentinel implements the Observable interface.
func (l *loop) attachSentinel(s *sentinel) {
	l.mux.Lock()
	defer l.mux.Unlock()
	if l.sentinel != nil {
		l.sentinel.Forget(l)
	}
	l.sentinel = s
}

// ShallStop implements the Loop interface.
func (l *loop) ShallStop() <-chan struct{} {
	return l.stopC
}

// IsStopping implements the Loop interface.
func (l *loop) IsStopping() <-chan struct{} {
	return l.stopC
}

// run operates the loop as goroutine.
func (l *loop) run() {
	l.status = Running
	// Finalize the loop.
	defer l.finalizeTermination()
	// Create a loop wrapper containing the recovering control.
	loopWrapper := func() {
		defer func() {
			// Check for recovering.
			if reason := recover(); reason != nil {
				l.checkTermination(reason)
			}
		}()
		l.checkTermination(l.loopF(l))
	}
	// Now start running the loop wrappr.
	l.startedC <- struct{}{}
	for l.status == Running {
		loopWrapper()
	}
}

// terminate tells the loop to stop working and stores
// the passed error if none has been stored already.
func (l *loop) terminate(err error) {
	l.mux.Lock()
	defer l.mux.Unlock()
	if l.err == nil {
		l.err = err
	}
	if l.status != Running {
		return
	}
	l.status = Stopping
	select {
	case <-l.stopC:
	default:
		close(l.stopC)
	}
}

// checkTermination checks if an error has been the reason and if
// it possibly can be recovered by a recover function.
func (l *loop) checkTermination(reason interface{}) {
	switch {
	case reason == nil:
		// Regular end.
		l.status = Stopping
	case l.recoverF == nil:
		// Error but no recover function.
		l.status = Stopping
		if l.err != nil {
			break
		}
		if err, ok := reason.(error); ok {
			l.err = err
		} else {
			l.err = errors.New(ErrLoopPanicked, errorMessages, reason)
		}
	default:
		// Try to recover.
		logger.Errorf("loop %q tries to recover", l)
		l.recoverings = append(l.recoverings, &Recovering{time.Now(), reason})
		l.recoverings, l.err = l.recoverF(l.recoverings)
		if l.err != nil {
			l.status = Stopping
		} else {
			l.logf(false, "loop %q recovered", l)
		}
	}
}

// finalizeTermination notifies listeners that the loop stopped
// working and a potential sentinal about its status.
func (l *loop) finalizeTermination() {
	l.status = Stopped
	// Close stopC in case  the termination is due to an
	// error or internal.
	select {
	case <-l.stopC:
	default:
		close(l.stopC)
	}
	// Communicate that it's done.
	select {
	case <-l.doneC:
	default:
		close(l.doneC)
	}
	// If a sentinel monitors us then till him.
	if l.sentinel != nil {
		if l.err != nil {
			// Notify sentinel about error termination.
			l.sentinel.notifyC <- l.owner
		} else {
			// Tell sentinel to remove loop.
			l.sentinel.Forget(l)
		}
	}
	if l.err != nil {
		l.logf(true, "loop %q stopped with error: %v", l, l.err)
	} else {
		l.logf(false, "loop %q stopped", l)
	}
}

// log writes information or error only if the loop has a description.
func (l *loop) logf(isError bool, format string, a ...interface{}) {
	if l.descr == "" {
		return
	}
	if isError {
		logger.Errorf(format, a...)
	} else {
		logger.Infof(format, a...)
	}
}

// EOF