File: rules.go

package info (click to toggle)
golang-github-mesos-mesos-go 0.0.6%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 11,724 kB
  • sloc: makefile: 163
file content (136 lines) | stat: -rw-r--r-- 4,996 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package controller

import (
	"context"
	"fmt"
	"log"
	"time"

	. "github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/eventrules"
	"github.com/mesos/mesos-go/api/v1/lib/extras/store"
	"github.com/mesos/mesos-go/api/v1/lib/scheduler"
	"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
)

// ErrEvent errors are generated by LiftErrors upon receiving an ERROR event from Mesos.
type ErrEvent string

func (e ErrEvent) Error() string {
	return string(e)
}

// LiftErrors extract the error message from a scheduler error event and returns it as an ErrEvent
// so that downstream rules/handlers may continue processing.
func LiftErrors() Rule {
	return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
		if err != nil {
			return chain(ctx, e, err)
		}
		if e.GetType() == scheduler.Event_ERROR {
			// it's recommended that we abort and re-try subscribing; returning an
			// error here will cause the event loop to terminate and the connection
			// will be reset.
			return chain(ctx, e, ErrEvent(e.GetError().GetMessage()))
		}
		return chain(ctx, e, nil)
	}
}

// StateError is returned when the system encounters an unresolvable state transition error and
// should likely exit.
type StateError string

func (err StateError) Error() string { return string(err) }

func TrackSubscription(frameworkIDStore store.Singleton, failoverTimeout time.Duration) Rule {
	return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
		if err != nil {
			return chain(ctx, e, err)
		}
		if e.GetType() == scheduler.Event_SUBSCRIBED {
			var (
				storedFrameworkID, err = frameworkIDStore.Get()
				frameworkID            = e.GetSubscribed().GetFrameworkID().GetValue()
			)
			if err != nil && err != store.ErrNotFound {
				return chain(ctx, e, err)
			}
			// order of `if` statements are important: tread carefully w/ respect to future changes
			if frameworkID == "" {
				// sanity check, should **never** happen
				return chain(ctx, e, StateError("mesos sent an empty frameworkID?!"))
			}
			if storedFrameworkID != "" && storedFrameworkID != frameworkID && failoverTimeout > 0 {
				return chain(ctx, e, StateError(fmt.Sprintf(
					"frameworkID changed unexpectedly; failover exceeded timeout? (%s).", failoverTimeout)))
			}
			if storedFrameworkID != frameworkID {
				frameworkIDStore.Set(frameworkID)
			}
		}
		return chain(ctx, e, nil)
	}
}

// AckStatusUpdates sends an acknowledgement of a task status update back to mesos and drops the event if
// sending the ack fails. If successful, the specified err param (if any) is forwarded. Acknowledgements
// are only attempted for task status updates tagged with a UUID.
func AckStatusUpdates(caller calls.Caller) Rule {
	return AckStatusUpdatesF(func() calls.Caller { return caller })
}

// AckStatusUpdatesF is a functional adapter for AckStatusUpdates, useful for cases where the caller may
// change over time. An error that occurs while ack'ing the status update is returned as a calls.AckError.
func AckStatusUpdatesF(callerLookup func() calls.Caller) Rule {
	return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
		// aggressively attempt to ack updates: even if there's pre-existing error state attempt
		// to acknowledge all status updates.
		origErr := err
		if e.GetType() == scheduler.Event_UPDATE {
			var (
				s    = e.GetUpdate().GetStatus()
				uuid = s.GetUUID()
			)
			// only ACK non-empty UUID's, as per mesos scheduler spec
			if len(uuid) > 0 {
				ack := calls.Acknowledge(
					s.GetAgentID().GetValue(),
					s.TaskID.Value,
					uuid,
				)
				err = calls.CallNoData(ctx, callerLookup(), ack)
				if err != nil {
					// TODO(jdef): not sure how important this is; if there's an error ack'ing
					// because we beacame disconnected, then we'll just reconnect later and
					// Mesos will ask us to ACK anyway -- why pay special attention to these
					// call failures vs others?
					err = &calls.AckError{Ack: ack, Cause: err}
					return ctx, e, Error2(origErr, err) // drop (do not propagate to chain)
				}
			}
		}
		return chain(ctx, e, origErr)
	}
}

// DefaultEventLabel is, by default, logged as the first argument by DefaultEventLogger
const DefaultEventLabel = "event"

// DefaultEventLogger logs the event via the `log` package.
func DefaultEventLogger(eventLabel string) func(*scheduler.Event) {
	if eventLabel == "" {
		return func(e *scheduler.Event) { log.Println(e) }
	}
	return func(e *scheduler.Event) { log.Println(eventLabel, e) }
}

// LogEvents returns a rule that logs scheduler events to the EventLogger
func LogEvents(f func(*scheduler.Event)) Rule {
	if f == nil {
		f = DefaultEventLogger(DefaultEventLabel)
	}
	return Rule(func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
		f(e)
		return chain(ctx, e, err)
	})
}