1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
package httpsched
import (
"context"
"log"
"net/http"
"net/url"
"time"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/backoff"
mesosclient "github.com/mesos/mesos-go/api/v1/lib/client"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
"github.com/mesos/mesos-go/api/v1/lib/httpcli/apierrors"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
)
var (
errNotHTTPCli = httpcli.ProtocolError("expected an httpcli.Response object, found something else instead")
errBadLocation = httpcli.ProtocolError("failed to build new Mesos service endpoint URL from Location header")
DefaultRedirectSettings = RedirectSettings{
MaxAttempts: 9,
MaxBackoffPeriod: 13 * time.Second,
MinBackoffPeriod: 500 * time.Millisecond,
}
)
type (
RedirectSettings struct {
MaxAttempts int // per httpDo invocation
MaxBackoffPeriod time.Duration // should be more than minBackoffPeriod
MinBackoffPeriod time.Duration // should be less than maxBackoffPeriod
}
client struct {
*httpcli.Client
redirect RedirectSettings
allowReconnect bool // feature flag
}
// Caller is the public interface a framework scheduler's should consume
Caller interface {
calls.Caller
// httpDo is intentionally package-private; clients of this package may extend a Caller
// generated by this package by overriding the Call func but may not customize httpDo.
httpDo(context.Context, encoding.Marshaler, ...httpcli.RequestOpt) (mesos.Response, error)
}
callerInternal interface {
Caller
// WithTemporary configures the Client with the temporary option and returns the results of
// invoking f(). Changes made to the Client by the temporary option are reverted before this
// func returns.
WithTemporary(opt httpcli.Opt, f func() error) error
}
// Option is a functional configuration option type
Option func(*client) Option
callerTemporary struct {
callerInternal // delegate actually does the work
requestOpts []httpcli.RequestOpt // requestOpts are temporary per-request options
opt httpcli.Opt // opt is a temporary client option
}
)
func (ct *callerTemporary) httpDo(ctx context.Context, m encoding.Marshaler, opt ...httpcli.RequestOpt) (resp mesos.Response, err error) {
ct.callerInternal.WithTemporary(ct.opt, func() error {
if len(opt) == 0 {
opt = ct.requestOpts
} else if len(ct.requestOpts) > 0 {
opt = append(opt[:], ct.requestOpts...)
}
resp, err = ct.callerInternal.httpDo(ctx, m, opt...)
return nil
})
return
}
func (ct *callerTemporary) Call(ctx context.Context, call *scheduler.Call) (resp mesos.Response, err error) {
ct.callerInternal.WithTemporary(ct.opt, func() error {
resp, err = ct.callerInternal.Call(ctx, call)
return nil
})
return
}
// MaxRedirects is a functional option that sets the maximum number of per-call HTTP redirects for a scheduler client
func MaxRedirects(mr int) Option {
return func(c *client) Option {
old := c.redirect.MaxAttempts
c.redirect.MaxAttempts = mr
return MaxRedirects(old)
}
}
func AllowReconnection(v bool) Option {
return func(c *client) Option {
old := c.allowReconnect
c.allowReconnect = v
return AllowReconnection(old)
}
}
// NewCaller returns a scheduler API Client in the form of a Caller. Concurrent invocations
// of Call upon the returned caller are safely executed in a serial fashion. It is expected that
// there are no other users of the given Client since its state may be modified by this impl.
func NewCaller(cl *httpcli.Client, opts ...Option) calls.Caller {
result := &client{Client: cl, redirect: DefaultRedirectSettings}
cl.With(result.redirectHandler())
for _, o := range opts {
if o != nil {
o(result)
}
}
return &state{
client: result,
fn: disconnectedFn,
}
}
// httpDo decorates the inherited behavior w/ support for HTTP redirection to follow Mesos leadership changes.
// NOTE: this implementation will change the state of the client upon Mesos leadership changes.
func (cli *client) httpDo(ctx context.Context, m encoding.Marshaler, opt ...httpcli.RequestOpt) (resp mesos.Response, err error) {
var (
done chan struct{} // avoid allocating these chans unless we actually need to redirect
redirectBackoff <-chan struct{}
getBackoff = func() <-chan struct{} {
if redirectBackoff == nil {
done = make(chan struct{})
redirectBackoff = backoff.Notifier(cli.redirect.MinBackoffPeriod, cli.redirect.MaxBackoffPeriod, done)
}
return redirectBackoff
}
)
defer func() {
if done != nil {
close(done)
}
}()
opt = append(opt, httpcli.Context(ctx))
for attempt := 0; ; attempt++ {
resp, err = cli.Client.Do(m, opt...)
redirectErr, ok := err.(*mesosRedirectionError)
if !ok {
return resp, err
}
if attempt < cli.redirect.MaxAttempts {
if debug {
log.Println("redirecting to " + redirectErr.newURL)
}
cli.With(httpcli.Endpoint(redirectErr.newURL))
select {
case <-getBackoff():
case <-ctx.Done():
return nil, ctx.Err()
}
continue
}
return
}
}
// Call implements Client
func (cli *client) Call(ctx context.Context, call *scheduler.Call) (mesos.Response, error) {
return cli.httpDo(ctx, call)
}
type mesosRedirectionError struct{ newURL string }
func (mre *mesosRedirectionError) Error() string {
return "mesos server sent redirect to: " + mre.newURL
}
// redirectHandler returns a config options that decorates the default response handling routine;
// it transforms normal Mesos redirect "errors" into mesosRedirectionErrors by parsing the Location
// header and computing the address of the next endpoint that should be used to replay the failed
// HTTP request.
func (cli *client) redirectHandler() httpcli.Opt {
return httpcli.HandleResponse(func(hres *http.Response, rc mesosclient.ResponseClass, err error) (mesos.Response, error) {
resp, err := cli.HandleResponse(hres, rc, err) // default response handler
if err == nil || !apierrors.CodeNotLeader.Matches(err) {
return resp, err
}
// TODO(jdef) for now, we're tightly coupled to the httpcli package's Response type
res, ok := resp.(*httpcli.Response)
if !ok {
if resp != nil {
resp.Close()
}
return nil, errNotHTTPCli
}
if debug {
log.Println("master changed?")
}
location, ok := buildNewEndpoint(res.Header.Get("Location"), cli.Endpoint())
if !ok {
return nil, errBadLocation
}
res.Close()
return nil, &mesosRedirectionError{location}
})
}
func buildNewEndpoint(location, currentEndpoint string) (string, bool) {
// TODO(jdef) refactor this
// mesos v0.29 will actually send back fully-formed URLs in the Location header
if location == "" {
return "", false
}
// current format appears to be //x.y.z.w:port
hostport, parseErr := url.Parse(location)
if parseErr != nil || hostport.Host == "" {
return "", false
}
current, parseErr := url.Parse(currentEndpoint)
if parseErr != nil {
return "", false
}
current.Host = hostport.Host
return current.String(), true
}
|