1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
package scheduler
import (
"sort"
"github.com/moby/swarmkit/v2/api"
)
var (
defaultFilters = []Filter{
// Always check for readiness first.
&ReadyFilter{},
&ResourceFilter{},
&PluginFilter{},
&ConstraintFilter{},
&PlatformFilter{},
&HostPortFilter{},
&MaxReplicasFilter{},
}
)
type checklistEntry struct {
f Filter
enabled bool
// failureCount counts the number of nodes that this filter failed
// against.
failureCount int
}
type checklistByFailures []checklistEntry
func (c checklistByFailures) Len() int { return len(c) }
func (c checklistByFailures) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c checklistByFailures) Less(i, j int) bool { return c[i].failureCount < c[j].failureCount }
// Pipeline runs a set of filters against nodes.
type Pipeline struct {
// checklist is a slice of filters to run
checklist []checklistEntry
}
// NewPipeline returns a pipeline with the default set of filters.
func NewPipeline() *Pipeline {
p := &Pipeline{}
for _, f := range defaultFilters {
p.checklist = append(p.checklist, checklistEntry{f: f})
}
return p
}
// Process a node through the filter pipeline.
// Returns true if all filters pass, false otherwise.
func (p *Pipeline) Process(n *NodeInfo) bool {
for i, entry := range p.checklist {
if entry.enabled && !entry.f.Check(n) {
// Immediately stop on first failure.
p.checklist[i].failureCount++
return false
}
}
for i := range p.checklist {
p.checklist[i].failureCount = 0
}
return true
}
func (p *Pipeline) AddFilter(f Filter) {
p.checklist = append(p.checklist, checklistEntry{f: f})
}
// SetTask sets up the filters to process a new task. Once this is called,
// Process can be called repeatedly to try to assign the task various nodes.
func (p *Pipeline) SetTask(t *api.Task) {
for i := range p.checklist {
p.checklist[i].enabled = p.checklist[i].f.SetTask(t)
p.checklist[i].failureCount = 0
}
}
// Explain returns a string explaining why a task could not be scheduled.
func (p *Pipeline) Explain() string {
var explanation string
// Sort from most failures to least
sortedByFailures := make([]checklistEntry, len(p.checklist))
copy(sortedByFailures, p.checklist)
sort.Sort(sort.Reverse(checklistByFailures(sortedByFailures)))
for _, entry := range sortedByFailures {
if entry.failureCount > 0 {
if len(explanation) > 0 {
explanation += "; "
}
explanation += entry.f.Explain(entry.failureCount)
}
}
return explanation
}
|