File: allocator.go

package info (click to toggle)
docker.io 18.09.1%2Bdfsg1-7.1%2Bdeb10u3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 66,144 kB
  • sloc: sh: 9,753; makefile: 827; ansic: 239; python: 162; asm: 10
file content (236 lines) | stat: -rw-r--r-- 5,611 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package allocator

import (
	"context"
	"sync"

	"github.com/docker/docker/pkg/plugingetter"
	"github.com/docker/go-events"
	"github.com/docker/swarmkit/api"
	"github.com/docker/swarmkit/manager/allocator/cnmallocator"
	"github.com/docker/swarmkit/manager/state"
	"github.com/docker/swarmkit/manager/state/store"
)

// Allocator controls how the allocation stage in the manager is handled.
type Allocator struct {
	// The manager store.
	store *store.MemoryStore

	// the ballot used to synchronize across all allocators to ensure
	// all of them have completed their respective allocations so that the
	// task can be moved to ALLOCATED state.
	taskBallot *taskBallot

	// context for the network allocator that will be needed by
	// network allocator.
	netCtx *networkContext

	// stopChan signals to the allocator to stop running.
	stopChan chan struct{}
	// doneChan is closed when the allocator is finished running.
	doneChan chan struct{}

	// pluginGetter provides access to docker's plugin inventory.
	pluginGetter plugingetter.PluginGetter

	// networkConfig stores network related config for the cluster
	networkConfig *cnmallocator.NetworkConfig
}

// taskBallot controls how the voting for task allocation is
// coordinated b/w different allocators. This the only structure that
// will be written by all allocator goroutines concurrently. Hence the
// mutex.
type taskBallot struct {
	sync.Mutex

	// List of registered voters who have to cast their vote to
	// indicate their allocation complete
	voters []string

	// List of votes collected for every task so far from different voters.
	votes map[string][]string
}

// allocActor controls the various phases in the lifecycle of one kind of allocator.
type allocActor struct {
	// Task voter identity of the allocator.
	taskVoter string

	// Action routine which is called for every event that the
	// allocator received.
	action func(context.Context, events.Event)

	// Init routine which is called during the initialization of
	// the allocator.
	init func(ctx context.Context) error
}

// New returns a new instance of Allocator for use during allocation
// stage of the manager.
func New(store *store.MemoryStore, pg plugingetter.PluginGetter, netConfig *cnmallocator.NetworkConfig) (*Allocator, error) {
	a := &Allocator{
		store: store,
		taskBallot: &taskBallot{
			votes: make(map[string][]string),
		},
		stopChan:      make(chan struct{}),
		doneChan:      make(chan struct{}),
		pluginGetter:  pg,
		networkConfig: netConfig,
	}

	return a, nil
}

// Run starts all allocator go-routines and waits for Stop to be called.
func (a *Allocator) Run(ctx context.Context) error {
	// Setup cancel context for all goroutines to use.
	ctx, cancel := context.WithCancel(ctx)
	var (
		wg     sync.WaitGroup
		actors []func() error
	)

	defer func() {
		cancel()
		wg.Wait()
		close(a.doneChan)
	}()

	for _, aa := range []allocActor{
		{
			taskVoter: networkVoter,
			init:      a.doNetworkInit,
			action:    a.doNetworkAlloc,
		},
	} {
		if aa.taskVoter != "" {
			a.registerToVote(aa.taskVoter)
		}

		// Assign a pointer for variable capture
		aaPtr := &aa
		actor := func() error {
			wg.Add(1)
			defer wg.Done()

			// init might return an allocator specific context
			// which is a child of the passed in context to hold
			// allocator specific state
			watch, watchCancel, err := a.init(ctx, aaPtr)
			if err != nil {
				return err
			}

			wg.Add(1)
			go func(watch <-chan events.Event, watchCancel func()) {
				defer func() {
					wg.Done()
					watchCancel()
				}()
				a.run(ctx, *aaPtr, watch)
			}(watch, watchCancel)
			return nil
		}

		actors = append(actors, actor)
	}

	for _, actor := range actors {
		if err := actor(); err != nil {
			return err
		}
	}

	<-a.stopChan
	return nil
}

// Stop stops the allocator
func (a *Allocator) Stop() {
	close(a.stopChan)
	// Wait for all allocator goroutines to truly exit
	<-a.doneChan
}

func (a *Allocator) init(ctx context.Context, aa *allocActor) (<-chan events.Event, func(), error) {
	watch, watchCancel := state.Watch(a.store.WatchQueue(),
		api.EventCreateNetwork{},
		api.EventDeleteNetwork{},
		api.EventCreateService{},
		api.EventUpdateService{},
		api.EventDeleteService{},
		api.EventCreateTask{},
		api.EventUpdateTask{},
		api.EventDeleteTask{},
		api.EventCreateNode{},
		api.EventUpdateNode{},
		api.EventDeleteNode{},
		state.EventCommit{},
	)

	if err := aa.init(ctx); err != nil {
		watchCancel()
		return nil, nil, err
	}

	return watch, watchCancel, nil
}

func (a *Allocator) run(ctx context.Context, aa allocActor, watch <-chan events.Event) {
	for {
		select {
		case ev, ok := <-watch:
			if !ok {
				return
			}

			aa.action(ctx, ev)
		case <-ctx.Done():
			return
		}
	}
}

func (a *Allocator) registerToVote(name string) {
	a.taskBallot.Lock()
	defer a.taskBallot.Unlock()

	a.taskBallot.voters = append(a.taskBallot.voters, name)
}

func (a *Allocator) taskAllocateVote(voter string, id string) bool {
	a.taskBallot.Lock()
	defer a.taskBallot.Unlock()

	// If voter has already voted, return false
	for _, v := range a.taskBallot.votes[id] {
		// check if voter is in x
		if v == voter {
			return false
		}
	}

	a.taskBallot.votes[id] = append(a.taskBallot.votes[id], voter)

	// We haven't gotten enough votes yet
	if len(a.taskBallot.voters) > len(a.taskBallot.votes[id]) {
		return false
	}

nextVoter:
	for _, voter := range a.taskBallot.voters {
		for _, vote := range a.taskBallot.votes[id] {
			if voter == vote {
				continue nextVoter
			}
		}

		// Not every registered voter has registered a vote.
		return false
	}

	return true
}