1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
|
package exec
import (
"context"
"fmt"
"time"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/api/equality"
"github.com/moby/swarmkit/v2/log"
"github.com/moby/swarmkit/v2/protobuf/ptypes"
"github.com/pkg/errors"
)
// Controller controls execution of a task.
type Controller interface {
// Update the task definition seen by the controller. Will return
// ErrTaskUpdateFailed if the provided task definition changes fields that
// cannot be changed.
//
// Will be ignored if the task has exited.
Update(ctx context.Context, t *api.Task) error
// Prepare the task for execution. This should ensure that all resources
// are created such that a call to start should execute immediately.
Prepare(ctx context.Context) error
// Start the target and return when it has started successfully.
Start(ctx context.Context) error
// Wait blocks until the target has exited.
Wait(ctx context.Context) error
// Shutdown requests to exit the target gracefully.
Shutdown(ctx context.Context) error
// Terminate the target.
Terminate(ctx context.Context) error
// Remove all resources allocated by the controller.
Remove(ctx context.Context) error
// Close closes any ephemeral resources associated with controller instance.
Close() error
}
// ControllerLogs defines a component that makes logs accessible.
//
// Can usually be accessed on a controller instance via type assertion.
type ControllerLogs interface {
// Logs will write publisher until the context is cancelled or an error
// occurs.
Logs(ctx context.Context, publisher LogPublisher, options api.LogSubscriptionOptions) error
}
// LogPublisher defines the protocol for receiving a log message.
type LogPublisher interface {
Publish(ctx context.Context, message api.LogMessage) error
}
// LogPublisherFunc implements publisher with just a function.
type LogPublisherFunc func(ctx context.Context, message api.LogMessage) error
// Publish calls the wrapped function.
func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage) error {
return fn(ctx, message)
}
// LogPublisherProvider defines the protocol for receiving a log publisher
type LogPublisherProvider interface {
Publisher(ctx context.Context, subscriptionID string) (LogPublisher, func(), error)
}
// ContainerStatuser reports status of a container.
//
// This can be implemented by controllers or error types.
type ContainerStatuser interface {
// ContainerStatus returns the status of the target container, if
// available. When the container is not available, the status will be nil.
ContainerStatus(ctx context.Context) (*api.ContainerStatus, error)
}
// PortStatuser reports status of ports which are allocated by the executor
type PortStatuser interface {
// PortStatus returns the status on a list of PortConfigs
// which are managed at the host level by the controller.
PortStatus(ctx context.Context) (*api.PortStatus, error)
}
// Resolve attempts to get a controller from the executor and reports the
// correct status depending on the tasks current state according to the result.
//
// Unlike Do, if an error is returned, the status should still be reported. The
// error merely reports the failure at getting the controller.
func Resolve(ctx context.Context, task *api.Task, executor Executor) (Controller, *api.TaskStatus, error) {
status := task.Status.Copy()
defer func() {
logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
}()
ctlr, err := executor.Controller(task)
// depending on the tasks state, a failed controller resolution has varying
// impact. The following expresses that impact.
if err != nil {
status.Message = "resolving controller failed"
status.Err = err.Error()
// before the task has been started, we consider it a rejection.
// if task is running, consider the task has failed
// otherwise keep the existing state
if task.Status.State < api.TaskStateStarting {
status.State = api.TaskStateRejected
} else if task.Status.State <= api.TaskStateRunning {
status.State = api.TaskStateFailed
}
} else if task.Status.State < api.TaskStateAccepted {
// we always want to proceed to accepted when we resolve the controller
status.Message = "accepted"
status.State = api.TaskStateAccepted
status.Err = ""
}
return ctlr, status, err
}
// Do progresses the task state using the controller performing a single
// operation on the controller. The return TaskStatus should be marked as the
// new state of the task.
//
// The returned status should be reported and placed back on to task
// before the next call. The operation can be cancelled by creating a
// cancelling context.
//
// Errors from the task controller will reported on the returned status. Any
// errors coming from this function should not be reported as related to the
// individual task.
//
// If ErrTaskNoop is returned, it means a second call to Do will result in no
// change. If ErrTaskDead is returned, calls to Do will no longer result in any
// action.
func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus, error) {
status := task.Status.Copy()
// stay in the current state.
noop := func(errs ...error) (*api.TaskStatus, error) {
return status, ErrTaskNoop
}
retry := func() (*api.TaskStatus, error) {
// while we retry on all errors, this allows us to explicitly declare
// retry cases.
return status, ErrTaskRetry
}
// transition moves the task to the next state.
transition := func(state api.TaskState, msg string) (*api.TaskStatus, error) {
current := status.State
status.State = state
status.Message = msg
status.Err = ""
if current > state {
panic("invalid state transition")
}
return status, nil
}
// containerStatus exitCode keeps track of whether or not we've set it in
// this particular method. Eventually, we assemble this as part of a defer.
var (
containerStatus *api.ContainerStatus
portStatus *api.PortStatus
exitCode int
)
// returned when a fatal execution of the task is fatal. In this case, we
// proceed to a terminal error state and set the appropriate fields.
//
// Common checks for the nature of an error should be included here. If the
// error is determined not to be fatal for the task,
fatal := func(err error) (*api.TaskStatus, error) {
if err == nil {
panic("err must not be nil when fatal")
}
if cs, ok := err.(ContainerStatuser); ok {
var err error
containerStatus, err = cs.ContainerStatus(ctx)
if err != nil && !contextDoneError(err) {
log.G(ctx).WithError(err).Error("error resolving container status on fatal")
}
}
// make sure we've set the *correct* exit code
if ec, ok := err.(ExitCoder); ok {
exitCode = ec.ExitCode()
}
if cause := errors.Cause(err); cause == context.DeadlineExceeded || cause == context.Canceled {
return retry()
}
status.Err = err.Error() // still reported on temporary
if IsTemporary(err) {
return retry()
}
// only at this point do we consider the error fatal to the task.
log.G(ctx).WithError(err).Error("fatal task error")
// NOTE(stevvooe): The following switch dictates the terminal failure
// state based on the state in which the failure was encountered.
switch {
case status.State < api.TaskStateStarting:
status.State = api.TaskStateRejected
case status.State >= api.TaskStateStarting:
status.State = api.TaskStateFailed
}
return status, nil
}
// below, we have several callbacks that are run after the state transition
// is completed.
defer func() {
logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
if !equality.TaskStatusesEqualStable(status, &task.Status) {
status.Timestamp = ptypes.MustTimestampProto(time.Now())
}
}()
// extract the container status from the container, if supported.
defer func() {
// only do this if in an active state
if status.State < api.TaskStateStarting {
return
}
if containerStatus == nil {
// collect this, if we haven't
cctlr, ok := ctlr.(ContainerStatuser)
if !ok {
return
}
var err error
containerStatus, err = cctlr.ContainerStatus(ctx)
if err != nil && !contextDoneError(err) {
log.G(ctx).WithError(err).Error("container status unavailable")
}
// at this point, things have gone fairly wrong. Remain positive
// and let's get something out the door.
if containerStatus == nil {
containerStatus = new(api.ContainerStatus)
containerStatusTask := task.Status.GetContainer()
if containerStatusTask != nil {
*containerStatus = *containerStatusTask // copy it over.
}
}
}
// at this point, we *must* have a containerStatus.
if exitCode != 0 {
containerStatus.ExitCode = int32(exitCode)
}
status.RuntimeStatus = &api.TaskStatus_Container{
Container: containerStatus,
}
if portStatus == nil {
pctlr, ok := ctlr.(PortStatuser)
if !ok {
return
}
var err error
portStatus, err = pctlr.PortStatus(ctx)
if err != nil && !contextDoneError(err) {
log.G(ctx).WithError(err).Error("container port status unavailable")
}
}
status.PortStatus = portStatus
}()
// this branch bounds the largest state achievable in the agent as SHUTDOWN, which
// is exactly the correct behavior for the agent.
if task.DesiredState >= api.TaskStateShutdown {
if status.State >= api.TaskStateCompleted {
return noop()
}
if err := ctlr.Shutdown(ctx); err != nil {
return fatal(err)
}
return transition(api.TaskStateShutdown, "shutdown")
}
if status.State > task.DesiredState {
return noop() // way beyond desired state, pause
}
// the following states may proceed past desired state.
switch status.State {
case api.TaskStatePreparing:
if err := ctlr.Prepare(ctx); err != nil && err != ErrTaskPrepared {
return fatal(err)
}
return transition(api.TaskStateReady, "prepared")
case api.TaskStateStarting:
if err := ctlr.Start(ctx); err != nil && err != ErrTaskStarted {
return fatal(err)
}
return transition(api.TaskStateRunning, "started")
case api.TaskStateRunning:
if err := ctlr.Wait(ctx); err != nil {
return fatal(err)
}
return transition(api.TaskStateCompleted, "finished")
}
// The following represent "pause" states. We can only proceed when the
// desired state is beyond our current state.
if status.State >= task.DesiredState {
return noop()
}
switch status.State {
case api.TaskStateNew, api.TaskStatePending, api.TaskStateAssigned:
return transition(api.TaskStateAccepted, "accepted")
case api.TaskStateAccepted:
return transition(api.TaskStatePreparing, "preparing")
case api.TaskStateReady:
return transition(api.TaskStateStarting, "starting")
default: // terminal states
return noop()
}
}
func logStateChange(ctx context.Context, desired, previous, next api.TaskState) {
if previous != next {
log.G(ctx).WithFields(log.Fields{
"state.transition": fmt.Sprintf("%v->%v", previous, next),
"state.desired": desired,
}).Debug("state changed")
}
}
func contextDoneError(err error) bool {
cause := errors.Cause(err)
return cause == context.Canceled || cause == context.DeadlineExceeded
}
|