File: deallocator.go

package info (click to toggle)
docker.io 20.10.5%2Bdfsg1-1%2Bdeb11u2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, bullseye-backports
  • size: 60,044 kB
  • sloc: sh: 5,527; makefile: 616; ansic: 179; python: 162; asm: 7
file content (291 lines) | stat: -rw-r--r-- 9,886 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package deallocator

import (
	"context"

	"github.com/docker/go-events"
	"github.com/docker/swarmkit/api"
	"github.com/docker/swarmkit/log"
	"github.com/docker/swarmkit/manager/state/store"
)

// Deallocator waits for services to fully shutdown (ie no containers left)
// and then proceeds to deallocate service-level resources (e.g. networks),
// and finally services themselves
// in particular, the Deallocator should be the only place where services, or
// service-level resources, are ever deleted!
//
// It’s worth noting that this new component’s role is quite different from
// the task reaper’s: tasks are purely internal to Swarmkit, and their status
// is entirely managed by the system itself. In contrast, the deallocator is
// responsible for safely deleting entities that are directly controlled by the
// user.
//
// NOTE: since networks are the only service-level resources as of now,
// it has been deemed over-engineered to have a generic way to
// handle other types of service-level resources; if we ever start
// having more of those and thus want to reconsider this choice, it
// might be worth having a look at this archived branch, that does
// implement a way of separating the code for the deallocator itself
// from each resource-speficic way of handling it
// https://github.com/docker/swarmkit/compare/a84c01f49091167dd086c26b45dc18b38d52e4d9...wk8:wk8/generic_deallocator#diff-75f4f75eee6a6a7a7268c672203ea0ac
type Deallocator struct {
	store *store.MemoryStore

	// for services that are shutting down, we keep track of how many
	// tasks still exist for them
	services map[string]*serviceWithTaskCounts

	// mainly used for tests, so that we can peek
	// into the DB state in between events
	// the bool notifies whether any DB update was actually performed
	eventChan chan bool

	stopChan chan struct{}
	doneChan chan struct{}
}

// used in our internal state's `services` right above
type serviceWithTaskCounts struct {
	service   *api.Service
	taskCount int
}

// New creates a new deallocator
func New(store *store.MemoryStore) *Deallocator {
	return &Deallocator{
		store:    store,
		services: make(map[string]*serviceWithTaskCounts),

		stopChan: make(chan struct{}),
		doneChan: make(chan struct{}),
	}
}

// Run starts the deallocator, which then starts cleaning up services
// and their resources when relevant (ie when no tasks still exist
// for a given service)
// This is a blocking function
func (deallocator *Deallocator) Run(ctx context.Context) error {
	var (
		allServices []*api.Service
		allNetworks []*api.Network
	)

	eventsChan, _, err := store.ViewAndWatch(deallocator.store,
		func(readTx store.ReadTx) (err error) {
			// look for services that are marked for deletion
			// there's no index on the `PendingDelete` field in the store,
			// so we just iterate over all of them and filter manually
			// this is okay since we only do this at leadership change
			allServices, err = store.FindServices(readTx, store.All)

			if err != nil {
				log.G(ctx).WithError(err).Error("failed to list services in deallocator init")
				return err
			}

			// now we also need to look at all existing service-level networks
			// that may be marked for deletion
			if allNetworks, err = store.FindNetworks(readTx, store.All); err != nil {
				log.G(ctx).WithError(err).Error("failed to list networks in deallocator init")
				return err
			}

			return
		},
		api.EventDeleteTask{},
		api.EventUpdateService{},
		api.EventUpdateNetwork{})

	if err != nil {
		// if we have an error here, we can't proceed any further
		log.G(ctx).WithError(err).Error("failed to initialize the deallocator")
		return err
	}

	defer func() {
		// eventsChanCancel()
		close(deallocator.doneChan)
	}()

	anyUpdated := false
	// now let's populate our internal taskCounts
	for _, service := range allServices {
		if updated, _ := deallocator.processService(ctx, service); updated {
			anyUpdated = true
		}
	}

	// and deallocate networks that may be marked for deletion and aren't used any more
	for _, network := range allNetworks {
		if updated, _ := deallocator.processNetwork(ctx, nil, network, nil); updated {
			anyUpdated = true
		}
	}

	// now we just need to wait for events
	deallocator.notifyEventChan(anyUpdated)
	for {
		select {
		case event := <-eventsChan:
			if updated, err := deallocator.processNewEvent(ctx, event); err == nil {
				deallocator.notifyEventChan(updated)
			} else {
				log.G(ctx).WithError(err).Errorf("error processing deallocator event %#v", event)
			}
		case <-deallocator.stopChan:
			return nil
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

// Stop stops the deallocator's routine
// FIXME (jrouge): see the comment on TaskReaper.Stop() and see when to properly stop this
// plus unit test on this!
func (deallocator *Deallocator) Stop() {
	close(deallocator.stopChan)
	<-deallocator.doneChan
}

// always a bno-op, except when running tests tests
// see the comment about `Deallocator`s' `eventChan` field
func (deallocator *Deallocator) notifyEventChan(updated bool) {
	if deallocator.eventChan != nil {
		deallocator.eventChan <- updated
	}
}

// if a service is marked for deletion, this checks whether it's ready to be
// deleted yet, and does it if relevant
func (deallocator *Deallocator) processService(ctx context.Context, service *api.Service) (bool, error) {
	if !service.PendingDelete {
		return false, nil
	}

	var (
		tasks []*api.Task
		err   error
	)

	deallocator.store.View(func(tx store.ReadTx) {
		tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID))
	})

	if err != nil {
		log.G(ctx).WithError(err).Errorf("failed to retrieve the list of tasks for service %v", service.ID)
		// if in doubt, let's proceed to clean up the service anyway
		// better to clean up resources that shouldn't be cleaned up yet
		// than ending up with a service and some resources lost in limbo forever
		return true, deallocator.deallocateService(ctx, service)
	} else if len(tasks) == 0 {
		// no tasks remaining for this service, we can clean it up
		return true, deallocator.deallocateService(ctx, service)
	}
	deallocator.services[service.ID] = &serviceWithTaskCounts{service: service, taskCount: len(tasks)}
	return false, nil
}

func (deallocator *Deallocator) deallocateService(ctx context.Context, service *api.Service) (err error) {
	err = deallocator.store.Update(func(tx store.Tx) error {
		// first, let's delete the service
		var ignoreServiceID *string
		if err := store.DeleteService(tx, service.ID); err != nil {
			// all errors are just for logging here, we do a best effort at cleaning up everything we can
			log.G(ctx).WithError(err).Errorf("failed to delete service record ID %v", service.ID)
			ignoreServiceID = &service.ID
		}

		// then all of its networks, provided no other service uses them
		spec := service.Spec
		// see https://github.com/docker/swarmkit/blob/e2aafdd3453d2ab103dd97364f79ea6b857f9446/api/specs.proto#L80-L84
		// we really should have a helper function on services to do this...
		networkConfigs := spec.Task.Networks
		if len(networkConfigs) == 0 {
			networkConfigs = spec.Networks
		}
		for _, networkConfig := range networkConfigs {
			if network := store.GetNetwork(tx, networkConfig.Target); network != nil {
				deallocator.processNetwork(ctx, tx, network, ignoreServiceID)
			}
		}

		return nil
	})

	if err != nil {
		log.G(ctx).WithError(err).Errorf("DB error when deallocating service %v", service.ID)
	}
	return
}

// proceeds to deallocate a network if it's pending deletion and there no
// longer are any services using it
// actually deletes the network if it's marked for deletion and no services are
// using it any more (or the only one using it has ID `ignoreServiceID`, if not
// nil - this comes in handy when there's been an error deleting a service)
// This function can be called either when deallocating a whole service, or
// because there was an `EventUpdateNetwork` event - in the former case, the
// transaction will be that of the service deallocation, in the latter it will be nil
func (deallocator *Deallocator) processNetwork(ctx context.Context, tx store.Tx, network *api.Network, ignoreServiceID *string) (updated bool, err error) {
	if !network.PendingDelete {
		return
	}

	updateFunc := func(t store.Tx) error {
		services, err := store.FindServices(t, store.ByReferencedNetworkID(network.ID))

		if err != nil {
			log.G(ctx).WithError(err).Errorf("could not fetch services using network ID %v", network.ID)
			return err
		}

		noMoreServices := len(services) == 0 ||
			len(services) == 1 && ignoreServiceID != nil && services[0].ID == *ignoreServiceID

		if noMoreServices {
			return store.DeleteNetwork(t, network.ID)
		}
		return nil
	}

	if tx == nil {
		err = deallocator.store.Update(updateFunc)
	} else {
		err = updateFunc(tx)
	}

	if err != nil {
		log.G(ctx).WithError(err).Errorf("DB error when deallocating network ID %v", network.ID)
	}
	return
}

// Processes new events, and dispatches to the right method depending on what
// type of event it is.
// The boolean part of the return tuple indicates whether anything was actually
// removed from the store
func (deallocator *Deallocator) processNewEvent(ctx context.Context, event events.Event) (bool, error) {
	switch typedEvent := event.(type) {
	case api.EventDeleteTask:
		serviceID := typedEvent.Task.ServiceID

		if serviceWithCount, present := deallocator.services[serviceID]; present {
			if serviceWithCount.taskCount <= 1 {
				delete(deallocator.services, serviceID)
				return deallocator.processService(ctx, serviceWithCount.service)
			}
			serviceWithCount.taskCount--
		}

		return false, nil
	case api.EventUpdateService:
		return deallocator.processService(ctx, typedEvent.Service)
	case api.EventUpdateNetwork:
		return deallocator.processNetwork(ctx, nil, typedEvent.Network, nil)
	default:
		return false, nil
	}
}