1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
package controller
import (
"context"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/events"
)
type (
// Option modifies a Config, returns an Option that acts as an "undo"
Option func(*Config) Option
// Config is an opaque controller configuration. Properties are configured by applying Option funcs.
Config struct {
frameworkIDFunc func() string
handler events.Handler
registrationTokens <-chan struct{}
subscriptionTerminated func(error)
}
)
// WithEventHandler sets the consumer of scheduler events. The controller's internal event processing
// loop is aborted if a Handler returns a non-nil error, after which the controller may attempt
// to re-register (subscribe) with Mesos.
func WithEventHandler(handler events.Handler) Option {
return func(c *Config) Option {
old := c.handler
c.handler = handler
return WithEventHandler(old)
}
}
// WithFrameworkID sets a fetcher for the current Mesos-assigned framework ID. Frameworks are expected to
// track this ID (that comes from Mesos, in a SUBSCRIBED event).
// frameworkIDFunc is optional; nil tells the controller to always register as a new framework
// for each subscription attempt.
func WithFrameworkID(frameworkIDFunc func() string) Option {
return func(c *Config) Option {
old := c.frameworkIDFunc
c.frameworkIDFunc = frameworkIDFunc
return WithFrameworkID(old)
}
}
// WithSubscriptionTerminated sets a handler that is invoked at the end of every subscription cycle; the
// given error may be nil if no error occurred. subscriptionTerminated is optional; if nil then errors are
// swallowed.
func WithSubscriptionTerminated(handler func(error)) Option {
return func(c *Config) Option {
old := c.subscriptionTerminated
c.subscriptionTerminated = handler
return WithSubscriptionTerminated(old)
}
}
// WithRegistrationTokens limits the rate at which a framework (re)registers with Mesos.
// A non-nil chan should yield a struct{} in order to allow the framework registration process to continue.
// When nil, there is no backoff delay between re-subscription attempts.
// A closed chan disables re-registration and terminates the Run control loop.
func WithRegistrationTokens(registrationTokens <-chan struct{}) Option {
return func(c *Config) Option {
old := c.registrationTokens
c.registrationTokens = registrationTokens
return WithRegistrationTokens(old)
}
}
func (c *Config) tryFrameworkID() (result string) {
if c.frameworkIDFunc != nil {
result = c.frameworkIDFunc()
}
return
}
func isDone(ctx context.Context) (result bool) {
select {
case <-ctx.Done():
return true
default:
return false
}
}
// Run executes a control loop that registers a framework with Mesos and processes the scheduler events
// that flow through the subscription. Upon disconnection, if the current configuration reports "not done"
// then the controller will attempt to re-register the framework and continue processing events.
func Run(ctx context.Context, framework *mesos.FrameworkInfo, caller calls.Caller, options ...Option) (lastErr error) {
var config Config
for _, opt := range options {
if opt != nil {
opt(&config)
}
}
if config.handler == nil {
config.handler = DefaultHandler
}
subscribe := calls.Subscribe(framework)
for !isDone(ctx) {
frameworkID := config.tryFrameworkID()
if framework.GetFailoverTimeout() > 0 && frameworkID != "" {
subscribe.With(calls.SubscribeTo(frameworkID))
}
if config.registrationTokens != nil {
select {
case _, ok := <-config.registrationTokens:
if !ok {
// re-registration canceled, exit Run loop
return
}
case <-ctx.Done():
return ctx.Err()
}
}
resp, err := caller.Call(ctx, subscribe)
lastErr = processSubscription(ctx, config, resp, err)
if config.subscriptionTerminated != nil {
config.subscriptionTerminated(lastErr)
}
}
return
}
func processSubscription(ctx context.Context, config Config, resp mesos.Response, err error) error {
if resp != nil {
defer resp.Close()
}
if err == nil {
err = eventLoop(ctx, config, resp)
}
return err
}
// eventLoop returns the framework ID received by mesos (if any); callers should check for a
// framework ID regardless of whether error != nil.
func eventLoop(ctx context.Context, config Config, eventDecoder encoding.Decoder) (err error) {
for err == nil && !isDone(ctx) {
var e scheduler.Event
if err = eventDecoder.Decode(&e); err == nil {
err = config.handler.HandleEvent(ctx, &e)
}
}
return err
}
// DefaultHandler is invoked when no other handlers have been defined for the controller.
// The current implementation does nothing.
// TODO(jdef) a smarter default impl would decline all offers so as to avoid resource hoarding.
const DefaultHandler = events.NoopHandler
|