File: plugin.go

package info (click to toggle)
docker.io 26.1.5%2Bdfsg1-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 68,576 kB
  • sloc: sh: 5,748; makefile: 912; ansic: 664; asm: 228; python: 162
file content (343 lines) | stat: -rw-r--r-- 10,536 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package csi

import (
	"context"
	"errors"
	"fmt"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/container-storage-interface/spec/lib/go/csi"
	"github.com/moby/swarmkit/v2/api"
	"github.com/moby/swarmkit/v2/internal/csi/capability"
	"github.com/moby/swarmkit/v2/log"
	mobyplugin "github.com/moby/swarmkit/v2/node/plugin"
)

// Plugin is the interface for a CSI controller plugin.
//
// In this package, the word "plugin" is unfortunately overused. This
// particular "Plugin" is the interface used by volume Manager to interact with
// CSI controller plugins. It should not be confused with the "plugin" returned
// from the plugingetter interface, which is the interface that gives us the
// information we need to create this Plugin.
type Plugin interface {
	CreateVolume(context.Context, *api.Volume) (*api.VolumeInfo, error)
	DeleteVolume(context.Context, *api.Volume) error
	PublishVolume(context.Context, *api.Volume, string) (map[string]string, error)
	UnpublishVolume(context.Context, *api.Volume, string) error
	AddNode(swarmID, csiID string)
	RemoveNode(swarmID string)
}

// plugin represents an individual CSI controller plugin
type plugin struct {
	// name is the name of the plugin, which is also the name used as the
	// Driver.Name field
	name string

	// socket is the unix socket to connect to this plugin at.
	socket string

	// provider is the SecretProvider, which allows retrieving secrets for CSI
	// calls.
	provider SecretProvider

	// cc is the grpc client connection
	// TODO(dperny): the client is never closed. it may be closed when it goes
	// out of scope, but this should be verified.
	cc *grpc.ClientConn
	// idClient is the identity service client
	idClient csi.IdentityClient
	// controllerClient is the controller service client
	controllerClient csi.ControllerClient

	// controller indicates that the plugin has controller capabilities.
	controller bool

	// publisher indicates that the controller plugin has
	// PUBLISH_UNPUBLISH_VOLUME capability.
	publisher bool

	// swarmToCSI maps a swarm node ID to the corresponding CSI node ID
	swarmToCSI map[string]string

	// csiToSwarm maps a CSI node ID back to the swarm node ID.
	csiToSwarm map[string]string
}

// NewPlugin creates a new Plugin object.
//
// NewPlugin takes both the CompatPlugin and the PluginAddr. These should be
// the same object. By taking both parts here, we can push off the work of
// assuring that the given plugin implements the PluginAddr interface without
// having to typecast in this constructor.
func NewPlugin(p mobyplugin.AddrPlugin, provider SecretProvider) Plugin {
	return &plugin{
		name: p.Name(),
		// TODO(dperny): verify that we do not need to include the Network()
		// portion of the Addr.
		socket:     fmt.Sprintf("%s://%s", p.Addr().Network(), p.Addr().String()),
		provider:   provider,
		swarmToCSI: map[string]string{},
		csiToSwarm: map[string]string{},
	}
}

// connect is a private method that initializes a gRPC ClientConn and creates
// the IdentityClient and ControllerClient.
func (p *plugin) connect(ctx context.Context) error {
	cc, err := grpc.DialContext(ctx, p.socket, grpc.WithInsecure())
	if err != nil {
		return err
	}

	p.cc = cc

	// first, probe the plugin, to ensure that it exists and is ready to go
	idc := csi.NewIdentityClient(cc)
	p.idClient = idc

	// controllerClient may not do anything if the plugin does not support
	// the controller service, but it should not be an error to create it now
	// anyway
	p.controllerClient = csi.NewControllerClient(cc)

	return p.init(ctx)
}

// init checks uses the identity service to check the properties of the plugin,
// most importantly, its capabilities.
func (p *plugin) init(ctx context.Context) error {
	probe, err := p.idClient.Probe(ctx, &csi.ProbeRequest{})
	if err != nil {
		return err
	}

	if probe.Ready != nil && !probe.Ready.Value {
		return errors.New("plugin not ready")
	}

	resp, err := p.idClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{})
	if err != nil {
		return err
	}

	if resp == nil {
		return nil
	}

	for _, c := range resp.Capabilities {
		if sc := c.GetService(); sc != nil {
			switch sc.Type {
			case csi.PluginCapability_Service_CONTROLLER_SERVICE:
				p.controller = true
			}
		}
	}

	if p.controller {
		cCapResp, err := p.controllerClient.ControllerGetCapabilities(
			ctx, &csi.ControllerGetCapabilitiesRequest{},
		)
		if err != nil {
			return err
		}

		for _, c := range cCapResp.Capabilities {
			rpc := c.GetRpc()
			if rpc.Type == csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME {
				p.publisher = true
			}
		}
	}

	return nil
}

// CreateVolume wraps and abstracts the CSI CreateVolume logic and returns
// the volume info, or an error.
func (p *plugin) CreateVolume(ctx context.Context, v *api.Volume) (*api.VolumeInfo, error) {
	c, err := p.Client(ctx)
	if err != nil {
		return nil, err
	}

	if !p.controller {
		// TODO(dperny): come up with a scheme to handle headless plugins
		// TODO(dperny): handle plugins without create volume capabilities
		return &api.VolumeInfo{VolumeID: v.Spec.Annotations.Name}, nil
	}

	createVolumeRequest := p.makeCreateVolume(v)
	resp, err := c.CreateVolume(ctx, createVolumeRequest)
	if err != nil {
		return nil, err
	}

	return makeVolumeInfo(resp.Volume), nil
}

func (p *plugin) DeleteVolume(ctx context.Context, v *api.Volume) error {
	if v.VolumeInfo == nil {
		return errors.New("VolumeInfo must not be nil")
	}
	// we won't use a fancy createDeleteVolumeRequest method because the
	// request is simple enough to not bother with it
	secrets := p.makeSecrets(v)
	req := &csi.DeleteVolumeRequest{
		VolumeId: v.VolumeInfo.VolumeID,
		Secrets:  secrets,
	}
	c, err := p.Client(ctx)
	if err != nil {
		return err
	}
	// response from RPC intentionally left blank
	_, err = c.DeleteVolume(ctx, req)
	return err
}

// PublishVolume calls ControllerPublishVolume to publish the given Volume to
// the Node with the given swarmkit ID. It returns a map, which is the
// PublishContext for this Volume on this Node.
func (p *plugin) PublishVolume(ctx context.Context, v *api.Volume, nodeID string) (map[string]string, error) {
	if !p.publisher {
		return nil, nil
	}
	csiNodeID := p.swarmToCSI[nodeID]
	if csiNodeID == "" {
		log.L.Errorf("CSI node ID not found for given Swarm node ID. Plugin: %s , Swarm node ID: %s", p.name, nodeID)
		return nil, status.Error(codes.FailedPrecondition, "CSI node ID not found for given Swarm node ID")
	}

	req := p.makeControllerPublishVolumeRequest(v, nodeID)
	c, err := p.Client(ctx)
	if err != nil {
		return nil, err
	}
	resp, err := c.ControllerPublishVolume(ctx, req)

	if err != nil {
		return nil, err
	}
	return resp.PublishContext, nil
}

// UnpublishVolume calls ControllerUnpublishVolume to unpublish the given
// Volume from the Node with the given swarmkit ID. It returns an error if the
// unpublish does not succeed
func (p *plugin) UnpublishVolume(ctx context.Context, v *api.Volume, nodeID string) error {
	if !p.publisher {
		return nil
	}

	req := p.makeControllerUnpublishVolumeRequest(v, nodeID)
	c, err := p.Client(ctx)
	if err != nil {
		return err
	}

	// response of the RPC intentionally left blank
	_, err = c.ControllerUnpublishVolume(ctx, req)
	return err
}

// AddNode adds a mapping for a node's swarm ID to the ID provided by this CSI
// plugin. This allows future calls to the plugin to be done entirely in terms
// of the swarm node ID.
//
// The CSI node ID is provided by the node as part of the NodeDescription.
func (p *plugin) AddNode(swarmID, csiID string) {
	p.swarmToCSI[swarmID] = csiID
	p.csiToSwarm[csiID] = swarmID
}

// RemoveNode removes a node from this plugin's node mappings.
func (p *plugin) RemoveNode(swarmID string) {
	csiID := p.swarmToCSI[swarmID]
	delete(p.swarmToCSI, swarmID)
	delete(p.csiToSwarm, csiID)
}

// Client retrieves a csi.ControllerClient for this plugin
//
// If this is the first time client has been called and no client yet exists,
// it will initialize the gRPC connection to the remote plugin and create a new
// ControllerClient.
func (p *plugin) Client(ctx context.Context) (csi.ControllerClient, error) {
	if p.controllerClient == nil {
		if err := p.connect(ctx); err != nil {
			return nil, err
		}
	}
	return p.controllerClient, nil
}

// makeCreateVolume makes a csi.CreateVolumeRequest from the volume object and
// spec. it uses the Plugin's SecretProvider to retrieve relevant secrets.
func (p *plugin) makeCreateVolume(v *api.Volume) *csi.CreateVolumeRequest {
	secrets := p.makeSecrets(v)
	return &csi.CreateVolumeRequest{
		Name:       v.Spec.Annotations.Name,
		Parameters: v.Spec.Driver.Options,
		VolumeCapabilities: []*csi.VolumeCapability{
			capability.MakeCapability(v.Spec.AccessMode),
		},
		Secrets:                   secrets,
		AccessibilityRequirements: makeTopologyRequirement(v.Spec.AccessibilityRequirements),
		CapacityRange:             makeCapacityRange(v.Spec.CapacityRange),
	}
}

// makeSecrets uses the plugin's SecretProvider to make the secrets map to pass
// to CSI RPCs.
func (p *plugin) makeSecrets(v *api.Volume) map[string]string {
	secrets := map[string]string{}
	for _, vs := range v.Spec.Secrets {
		// a secret should never be nil, but check just to be sure
		if vs != nil {
			secret := p.provider.GetSecret(vs.Secret)
			if secret != nil {
				// TODO(dperny): return an error, but this should never happen,
				// as secrets should be validated at volume creation time
				secrets[vs.Key] = string(secret.Spec.Data)
			}
		}
	}
	return secrets
}

func (p *plugin) makeControllerPublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerPublishVolumeRequest {
	if v.VolumeInfo == nil {
		return nil
	}

	secrets := p.makeSecrets(v)
	capability := capability.MakeCapability(v.Spec.AccessMode)
	capability.AccessType = &csi.VolumeCapability_Mount{
		Mount: &csi.VolumeCapability_MountVolume{},
	}
	return &csi.ControllerPublishVolumeRequest{
		VolumeId:         v.VolumeInfo.VolumeID,
		NodeId:           p.swarmToCSI[nodeID],
		Secrets:          secrets,
		VolumeCapability: capability,
		VolumeContext:    v.VolumeInfo.VolumeContext,
	}
}

func (p *plugin) makeControllerUnpublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerUnpublishVolumeRequest {
	if v.VolumeInfo == nil {
		return nil
	}

	secrets := p.makeSecrets(v)
	return &csi.ControllerUnpublishVolumeRequest{
		VolumeId: v.VolumeInfo.VolumeID,
		NodeId:   p.swarmToCSI[nodeID],
		Secrets:  secrets,
	}
}