File: crontab.go

package info (click to toggle)
golang-github-tideland-golib 4.24.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,144 kB
  • sloc: makefile: 4
file content (126 lines) | stat: -rw-r--r-- 2,837 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Tideland Go Library - Time Extensions
//
// Copyright (C) 2009-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

package timex

//--------------------
// IMPORTS
//--------------------

import (
	"time"

	"github.com/tideland/golib/errors"
	"github.com/tideland/golib/logger"
	"github.com/tideland/golib/loop"
)

//--------------------
// CRONTAB
//--------------------

// Job is executed by the crontab.
type Job interface {
	// ShallExecute decides when called if the job
	// shal be executed.
	ShallExecute(t time.Time) bool

	// Execute executes the job. If the method returns
	// false or an error it will be removed.
	Execute() (bool, error)
}

// cronCommand operates on a crontab.
type command struct {
	add bool
	id  string
	job Job
}

// Crontab is one cron server. A system can run multiple in
// parallel.
type Crontab struct {
	frequency   time.Duration
	jobs        map[string]Job
	commandChan chan *command
	loop        loop.Loop
}

// NewCrontab creates a cron server.
func NewCrontab(freq time.Duration) *Crontab {
	c := &Crontab{
		frequency:   freq,
		jobs:        make(map[string]Job),
		commandChan: make(chan *command),
	}
	c.loop = loop.GoRecoverable(c.backendLoop, c.checkRecovering, "crontab", freq.String())
	return c
}

// Stop terminates the cron server.
func (c *Crontab) Stop() error {
	return c.loop.Stop()
}

// Add adds a new job to the server.
func (c *Crontab) Add(id string, job Job) {
	c.commandChan <- &command{true, id, job}
}

// Remove removes a job from the server.
func (c *Crontab) Remove(id string) {
	c.commandChan <- &command{false, id, nil}
}

// backendLoop runs the server backend.
func (c *Crontab) backendLoop(l loop.Loop) error {
	ticker := time.NewTicker(c.frequency)
	for {
		select {
		case <-l.ShallStop():
			return nil
		case cmd := <-c.commandChan:
			if cmd.add {
				c.jobs[cmd.id] = cmd.job
			} else {
				delete(c.jobs, cmd.id)
			}
		case now := <-ticker.C:
			for id, job := range c.jobs {
				c.do(id, job, now)
			}
		}
	}
}

// checkRecovering checks if the backend can be recovered.
func (c *Crontab) checkRecovering(rs loop.Recoverings) (loop.Recoverings, error) {
	if rs.Frequency(12, time.Minute) {
		logger.Errorf("crontab cannot be recovered: %v", rs.Last().Reason)
		return nil, errors.New(ErrCrontabCannotBeRecovered, errorMessages, rs.Last().Reason)
	}
	logger.Warningf("crontab recovered: %v", rs.Last().Reason)
	return rs.Trim(12), nil
}

// do checks and performs a job.
func (c *Crontab) do(id string, job Job, now time.Time) {
	if job.ShallExecute(now) {
		go func() {
			cont, err := job.Execute()
			if err != nil {
				logger.Errorf("job %q removed after error: %v", id, err)
				cont = false
			}
			if !cont {
				c.Remove(id)
			}
		}()
	}
}

// EOF