1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
package swarm
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"gotest.tools/v3/poll"
)
// NoTasksForService verifies that there are no more tasks for the given service
func NoTasksForService(ctx context.Context, client client.ServiceAPIClient, serviceID string) func(log poll.LogT) poll.Result {
return func(log poll.LogT) poll.Result {
tasks, err := client.TaskList(ctx, types.TaskListOptions{
Filters: filters.NewArgs(
filters.Arg("service", serviceID),
),
})
if err == nil {
if len(tasks) == 0 {
return poll.Success()
}
if len(tasks) > 0 {
return poll.Continue("task count for service %s at %d waiting for 0", serviceID, len(tasks))
}
return poll.Continue("waiting for tasks for service %s to be deleted", serviceID)
}
// TODO we should not use an error as indication that the tasks are gone. There may be other reasons for an error to occur.
return poll.Success()
}
}
// NoTasks verifies that all tasks are gone
func NoTasks(ctx context.Context, client client.ServiceAPIClient) func(log poll.LogT) poll.Result {
return func(log poll.LogT) poll.Result {
tasks, err := client.TaskList(ctx, types.TaskListOptions{})
switch {
case err != nil:
return poll.Error(err)
case len(tasks) == 0:
return poll.Success()
default:
return poll.Continue("waiting for all tasks to be removed: task count at %d", len(tasks))
}
}
}
// RunningTasksCount verifies there are `instances` tasks running for `serviceID`
func RunningTasksCount(client client.ServiceAPIClient, serviceID string, instances uint64) func(log poll.LogT) poll.Result {
return func(log poll.LogT) poll.Result {
filter := filters.NewArgs()
filter.Add("service", serviceID)
tasks, err := client.TaskList(context.Background(), types.TaskListOptions{
Filters: filter,
})
var running int
var taskError string
for _, task := range tasks {
switch task.Status.State {
case swarmtypes.TaskStateRunning:
running++
case swarmtypes.TaskStateFailed:
if task.Status.Err != "" {
taskError = task.Status.Err
}
}
}
switch {
case err != nil:
return poll.Error(err)
case running > int(instances):
return poll.Continue("waiting for tasks to terminate")
case running < int(instances) && taskError != "":
return poll.Continue("waiting for tasks to enter run state. task failed with error: %s", taskError)
case running == int(instances):
return poll.Success()
default:
return poll.Continue("running task count at %d waiting for %d (total tasks: %d)", running, instances, len(tasks))
}
}
}
// JobComplete is a poll function for determining that a ReplicatedJob is
// completed additionally, while polling, it verifies that the job never
// exceeds MaxConcurrent running tasks
func JobComplete(client client.CommonAPIClient, service swarmtypes.Service) func(log poll.LogT) poll.Result {
filter := filters.NewArgs()
filter.Add("service", service.ID)
var jobIteration swarmtypes.Version
if service.JobStatus != nil {
jobIteration = service.JobStatus.JobIteration
}
maxRaw := service.Spec.Mode.ReplicatedJob.MaxConcurrent
totalRaw := service.Spec.Mode.ReplicatedJob.TotalCompletions
max := int(*maxRaw)
total := int(*totalRaw)
previousResult := ""
return func(log poll.LogT) poll.Result {
tasks, err := client.TaskList(context.Background(), types.TaskListOptions{
Filters: filter,
})
if err != nil {
poll.Error(err)
}
var running int
var completed int
var runningSlot []int
var runningID []string
for _, task := range tasks {
// make sure the task has the same job iteration
if task.JobIteration == nil || task.JobIteration.Index != jobIteration.Index {
continue
}
switch task.Status.State {
case swarmtypes.TaskStateRunning:
running++
runningSlot = append(runningSlot, task.Slot)
runningID = append(runningID, task.ID)
case swarmtypes.TaskStateComplete:
completed++
}
}
switch {
case running > max:
return poll.Error(fmt.Errorf(
"number of running tasks (%v) exceeds max (%v)", running, max,
))
case (completed + running) > total:
return poll.Error(fmt.Errorf(
"number of tasks exceeds total (%v), %v running and %v completed",
total, running, completed,
))
case completed == total && running == 0:
return poll.Success()
default:
newRes := fmt.Sprintf(
"Completed: %2d Running: %v\n\t%v",
completed, runningSlot, runningID,
)
if newRes == previousResult {
} else {
previousResult = newRes
}
return poll.Continue(
"Job not yet finished, %v completed and %v running out of %v total",
completed, running, total,
)
}
}
}
|