1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
package controller
import (
"context"
"fmt"
"log"
"time"
. "github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/eventrules"
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
)
// ErrEvent errors are generated by LiftErrors upon receiving an ERROR event from Mesos.
type ErrEvent string
func (e ErrEvent) Error() string {
return string(e)
}
// LiftErrors extract the error message from a scheduler error event and returns it as an ErrEvent
// so that downstream rules/handlers may continue processing.
func LiftErrors() Rule {
return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
if err != nil {
return chain(ctx, e, err)
}
if e.GetType() == scheduler.Event_ERROR {
// it's recommended that we abort and re-try subscribing; returning an
// error here will cause the event loop to terminate and the connection
// will be reset.
return chain(ctx, e, ErrEvent(e.GetError().GetMessage()))
}
return chain(ctx, e, nil)
}
}
// StateError is returned when the system encounters an unresolvable state transition error and
// should likely exit.
type StateError string
func (err StateError) Error() string { return string(err) }
func TrackSubscription(frameworkIDStore store.Singleton, failoverTimeout time.Duration) Rule {
return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
if err != nil {
return chain(ctx, e, err)
}
if e.GetType() == scheduler.Event_SUBSCRIBED {
var (
storedFrameworkID, err = frameworkIDStore.Get()
frameworkID = e.GetSubscribed().GetFrameworkID().GetValue()
)
if err != nil && err != store.ErrNotFound {
return chain(ctx, e, err)
}
// order of `if` statements are important: tread carefully w/ respect to future changes
if frameworkID == "" {
// sanity check, should **never** happen
return chain(ctx, e, StateError("mesos sent an empty frameworkID?!"))
}
if storedFrameworkID != "" && storedFrameworkID != frameworkID && failoverTimeout > 0 {
return chain(ctx, e, StateError(fmt.Sprintf(
"frameworkID changed unexpectedly; failover exceeded timeout? (%s).", failoverTimeout)))
}
if storedFrameworkID != frameworkID {
frameworkIDStore.Set(frameworkID)
}
}
return chain(ctx, e, nil)
}
}
// AckStatusUpdates sends an acknowledgement of a task status update back to mesos and drops the event if
// sending the ack fails. If successful, the specified err param (if any) is forwarded. Acknowledgements
// are only attempted for task status updates tagged with a UUID.
func AckStatusUpdates(caller calls.Caller) Rule {
return AckStatusUpdatesF(func() calls.Caller { return caller })
}
// AckStatusUpdatesF is a functional adapter for AckStatusUpdates, useful for cases where the caller may
// change over time. An error that occurs while ack'ing the status update is returned as a calls.AckError.
func AckStatusUpdatesF(callerLookup func() calls.Caller) Rule {
return func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
// aggressively attempt to ack updates: even if there's pre-existing error state attempt
// to acknowledge all status updates.
origErr := err
if e.GetType() == scheduler.Event_UPDATE {
var (
s = e.GetUpdate().GetStatus()
uuid = s.GetUUID()
)
// only ACK non-empty UUID's, as per mesos scheduler spec
if len(uuid) > 0 {
ack := calls.Acknowledge(
s.GetAgentID().GetValue(),
s.TaskID.Value,
uuid,
)
err = calls.CallNoData(ctx, callerLookup(), ack)
if err != nil {
// TODO(jdef): not sure how important this is; if there's an error ack'ing
// because we beacame disconnected, then we'll just reconnect later and
// Mesos will ask us to ACK anyway -- why pay special attention to these
// call failures vs others?
err = &calls.AckError{Ack: ack, Cause: err}
return ctx, e, Error2(origErr, err) // drop (do not propagate to chain)
}
}
}
return chain(ctx, e, origErr)
}
}
// DefaultEventLabel is, by default, logged as the first argument by DefaultEventLogger
const DefaultEventLabel = "event"
// DefaultEventLogger logs the event via the `log` package.
func DefaultEventLogger(eventLabel string) func(*scheduler.Event) {
if eventLabel == "" {
return func(e *scheduler.Event) { log.Println(e) }
}
return func(e *scheduler.Event) { log.Println(eventLabel, e) }
}
// LogEvents returns a rule that logs scheduler events to the EventLogger
func LogEvents(f func(*scheduler.Event)) Rule {
if f == nil {
f = DefaultEventLogger(DefaultEventLabel)
}
return Rule(func(ctx context.Context, e *scheduler.Event, err error, chain Chain) (context.Context, *scheduler.Event, error) {
f(e)
return chain(ctx, e, err)
})
}
|