File: manager.go

package info (click to toggle)
gitlab-ci-multi-runner 14.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 31,248 kB
  • sloc: sh: 1,694; makefile: 384; asm: 79; ruby: 68
file content (252 lines) | stat: -rw-r--r-- 6,723 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package volumes

import (
	"context"
	"errors"
	"fmt"

	"github.com/docker/docker/api/types/volume"

	"gitlab.com/gitlab-org/gitlab-runner/executors/docker/internal/labels"
	"gitlab.com/gitlab-org/gitlab-runner/executors/docker/internal/volumes/parser"
	"gitlab.com/gitlab-org/gitlab-runner/executors/docker/internal/volumes/permission"
	"gitlab.com/gitlab-org/gitlab-runner/helpers/docker"
)

var ErrCacheVolumesDisabled = errors.New("cache volumes feature disabled")

type Manager interface {
	Create(ctx context.Context, volume string) error
	CreateTemporary(ctx context.Context, destination string) error
	RemoveTemporary(ctx context.Context) error
	Binds() []string
}

type ManagerConfig struct {
	CacheDir         string
	BasePath         string
	UniqueName       string
	TemporaryName    string
	DisableCache     bool
	PermissionSetter permission.Setter
}

type manager struct {
	config           ManagerConfig
	logger           debugLogger
	parser           parser.Parser
	client           docker.Client
	permissionSetter permission.Setter
	labeler          labels.Labeler

	volumeBindings   []string
	temporaryVolumes []string
	managedVolumes   pathList
}

func NewManager(
	logger debugLogger,
	volumeParser parser.Parser,
	c docker.Client,
	config ManagerConfig,
	labeler labels.Labeler,
) Manager {
	return &manager{
		config:           config,
		logger:           logger,
		parser:           volumeParser,
		client:           c,
		volumeBindings:   make([]string, 0),
		managedVolumes:   pathList{},
		permissionSetter: config.PermissionSetter,
		labeler:          labeler,
	}
}

// Create will create a new Docker volume bind for the specified volume. The
// volume can either be a host volume `/src:/dst`, meaning it will mount
// something from the host to the container or `/dst` which will create a Docker
// volume and mount it to the specified path.
func (m *manager) Create(ctx context.Context, volume string) error {
	if len(volume) < 1 {
		return nil
	}

	parsedVolume, err := m.parser.ParseVolume(volume)
	if err != nil {
		return fmt.Errorf("parse volume: %w", err)
	}

	switch parsedVolume.Len() {
	case 2:
		err = m.addHostVolume(parsedVolume)
		if err != nil {
			err = fmt.Errorf("adding host volume: %w", err)
		}
	case 1:
		err = m.addCacheVolume(ctx, parsedVolume)
		if err != nil {
			err = fmt.Errorf("adding cache volume: %w", err)
		}
	default:
		err = fmt.Errorf("unsupported volume definition %s", volume)
	}

	return err
}

func (m *manager) addHostVolume(volume *parser.Volume) error {
	var err error

	volume.Destination, err = m.absolutePath(volume.Destination)
	if err != nil {
		return fmt.Errorf("defining absolute path: %w", err)
	}

	err = m.managedVolumes.Add(volume.Destination)
	if err != nil {
		return fmt.Errorf("updating managed volume list: %w", err)
	}

	m.appendVolumeBind(volume)

	return nil
}

func (m *manager) absolutePath(dir string) (string, error) {
	if m.parser.Path().IsRoot(dir) {
		return "", errDirectoryIsRootPath
	}

	if m.parser.Path().IsAbs(dir) {
		return dir, nil
	}

	return m.parser.Path().Join(m.config.BasePath, dir), nil
}

func (m *manager) appendVolumeBind(volume *parser.Volume) {
	m.logger.Debugln(fmt.Sprintf("Using host-based %q for %q...", volume.Source, volume.Destination))

	m.volumeBindings = append(m.volumeBindings, volume.Definition())
}

func (m *manager) addCacheVolume(ctx context.Context, volume *parser.Volume) error {
	// disable cache for automatic container cache,
	// but leave it for host volumes (they are shared on purpose)
	if m.config.DisableCache {
		m.logger.Debugln("Cache containers feature is disabled")

		return ErrCacheVolumesDisabled
	}

	if m.config.CacheDir != "" {
		return m.createHostBasedCacheVolume(volume.Destination)
	}

	_, err := m.createCacheVolume(ctx, volume.Destination, true)

	return err
}

func (m *manager) createHostBasedCacheVolume(destination string) error {
	destination, err := m.absolutePath(destination)
	if err != nil {
		return fmt.Errorf("defining absolute path: %w", err)
	}

	err = m.managedVolumes.Add(destination)
	if err != nil {
		return fmt.Errorf("updating managed volumes list: %w", err)
	}

	hostPath := m.parser.Path().Join(m.config.CacheDir, m.config.UniqueName, hashPath(destination))

	m.appendVolumeBind(&parser.Volume{
		Source:      hostPath,
		Destination: destination,
	})

	return nil
}

func (m *manager) createCacheVolume(ctx context.Context, destination string, reusable bool) (string, error) {
	destination, err := m.absolutePath(destination)
	if err != nil {
		return "", fmt.Errorf("defining absolute path: %w", err)
	}

	err = m.managedVolumes.Add(destination)
	if err != nil {
		return "", fmt.Errorf("updating managed volumes list: %w", err)
	}

	name := m.config.TemporaryName
	if reusable {
		name = m.config.UniqueName
	}

	volumeName := fmt.Sprintf("%s-cache-%s", name, hashPath(destination))
	vBody := volume.VolumeCreateBody{
		Name:   volumeName,
		Labels: m.labeler.Labels(map[string]string{"type": "cache"}),
	}

	v, err := m.client.VolumeCreate(ctx, vBody)
	if err != nil {
		return "", fmt.Errorf("creating docker volume: %w", err)
	}

	if m.permissionSetter != nil {
		err = m.permissionSetter.Set(ctx, v.Name, m.labeler.Labels(map[string]string{"type": "cache-init"}))
		if err != nil {
			return "", fmt.Errorf("set volume permissions: %w", err)
		}
	}

	m.appendVolumeBind(&parser.Volume{
		Source:      v.Name,
		Destination: destination,
	})
	m.logger.Debugln(fmt.Sprintf("Using volume %q as cache %q...", v.Name, destination))

	return volumeName, nil
}

// CreateTemporary will create a volume, and mark it as temporary. When a volume
// is marked as temporary it means that it should be cleaned up at some point.
// It's up to the caller to clean up the temporary volumes by calling
// `RemoveTemporary`.
func (m *manager) CreateTemporary(ctx context.Context, destination string) error {
	volumeName, err := m.createCacheVolume(ctx, destination, false)
	if err != nil {
		return fmt.Errorf("creating cache volume: %w", err)
	}

	m.temporaryVolumes = append(m.temporaryVolumes, volumeName)

	return nil
}

// RemoveTemporary will remove all the volumes that are marked as temporary. If
// the volume is not found the error is ignored, any other error is returned to
// the caller.
func (m *manager) RemoveTemporary(ctx context.Context) error {
	for _, v := range m.temporaryVolumes {
		err := m.client.VolumeRemove(ctx, v, true)
		if docker.IsErrNotFound(err) {
			m.logger.Debugln(fmt.Sprintf("volume not found: %q", v))
			continue
		}
		if err != nil {
			return err
		}
	}

	return nil
}

// Binds returns all the bindings that the volume manager is aware of.
func (m *manager) Binds() []string {
	return m.volumeBindings
}