File: accept.go

package info (click to toggle)
vagrant 2.3.7%2Bgit20230731.5fc64cde%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 17,616 kB
  • sloc: ruby: 111,820; sh: 462; makefile: 123; ansic: 34; lisp: 1
file content (291 lines) | stat: -rw-r--r-- 7,851 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package runner

import (
	"context"
	"io"
	"path/filepath"
	"sync"
	"time"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/hashicorp/vagrant-plugin-sdk/terminal"
	"github.com/hashicorp/vagrant/internal/server/proto/vagrant_server"
)

var heartbeatDuration = 5 * time.Second

// Accept will accept and execute a single job. This will block until
// a job is available.
//
// An error is only returned if there was an error internal to the runner.
// Errors during job execution are expected (i.e. a project build is misconfigured)
// and will be reported on the job.
//
// This is safe to be called concurrently which can be used to execute
// multiple jobs in parallel as a runner.
func (r *Runner) Accept(ctx context.Context) error {
	return r.accept(ctx, "")
}

// AcceptExact is the same as Accept except that it accepts only
// a job with exactly the given ID. This is used by Vagrant only in
// local execution mode as an extra security measure to prevent other
// jobs from being assigned to the runner.
func (r *Runner) AcceptExact(ctx context.Context, id string) error {
	return r.accept(ctx, id)
}

func (r *Runner) accept(ctx context.Context, id string) error {
	if r.closed() {
		return ErrClosed
	}

	log := r.logger

	// Open a new job stream. NOTE: we purposely do NOT use ctx above
	// since if the context is cancelled we want to continue reporting
	// errors.
	log.Debug("opening job stream")
	client, err := r.client.RunnerJobStream(context.Background())
	if err != nil {
		return err
	}
	defer client.CloseSend()

	// Send our request
	log.Trace("sending job request")
	if err := client.Send(&vagrant_server.RunnerJobStreamRequest{
		Event: &vagrant_server.RunnerJobStreamRequest_Request_{
			Request: &vagrant_server.RunnerJobStreamRequest_Request{
				RunnerId: r.id,
			},
		},
	}); err != nil {
		return err
	}

	// Wait for an assignment
	log.Info("waiting for job assignment")
	resp, err := client.Recv()
	if err != nil {
		return err
	}

	// We received an assignment!
	assignment, ok := resp.Event.(*vagrant_server.RunnerJobStreamResponse_Assignment)
	if !ok {
		return status.Errorf(codes.Aborted,
			"expected job assignment, server sent %T",
			resp.Event)
	}
	log = log.With("job_id", assignment.Assignment.Job.Id)
	log.Info("job assignment received")

	// We increment the waitgroup at this point since prior to this if we're
	// forcefully quit, we shouldn't have acked. This is somewhat brittle so
	// a todo here is to build a better notification mechanism that we've quit
	// and exit here.
	r.acceptWg.Add(1)
	defer r.acceptWg.Done()

	// If this isn't the job we expected then we nack and error.
	if id != "" {
		if assignment.Assignment.Job.Id != id {
			log.Warn("unexpected job id for exact match, nacking")
			if err := client.Send(&vagrant_server.RunnerJobStreamRequest{
				Event: &vagrant_server.RunnerJobStreamRequest_Error_{
					Error: &vagrant_server.RunnerJobStreamRequest_Error{},
				},
			}); err != nil {
				return err
			}

			return status.Errorf(codes.Aborted, "server sent us an invalid job")
		}

		log.Trace("assigned job matches expected ID for local mode")
	}

	// Ack the assignment
	log.Trace("acking job assignment")
	if err := client.Send(&vagrant_server.RunnerJobStreamRequest{
		Event: &vagrant_server.RunnerJobStreamRequest_Ack_{
			Ack: &vagrant_server.RunnerJobStreamRequest_Ack{},
		},
	}); err != nil {
		return err
	}

	// Create a cancelable context so we can stop if job is canceled
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	// We need a mutex to protect against simultaneous sends to the client.
	var sendMutex sync.Mutex

	// For our UI, we always send output to the server. If we have a local UI
	// set, we mirror to that as well.
	var ui terminal.UI = &runnerUI{
		ctx:    ctx,
		cancel: cancel,
		evc:    client,
		mu:     &sendMutex,
	}
	if r.ui != nil {
		ui = &multiUI{
			UIs: []terminal.UI{r.ui, ui},
		}
	}

	// Start up a goroutine to listen for any other events
	errCh := make(chan error, 1)
	go func() {
		for {
			// Wait for the connection to close. We do this because this ensures
			// that the server received our completion and updated the database.
			resp, err = client.Recv()
			if err != nil {
				errCh <- err
				return
			}

			// Determine the event
			switch resp.Event.(type) {
			case *vagrant_server.RunnerJobStreamResponse_Cancel:
				log.Info("job cancellation request received, canceling")
				cancel()

			default:
				log.Info("unknown job event", "event", resp.Event)
			}
		}
	}()

	// Heartbeat
	go func() {
		tick := time.NewTicker(heartbeatDuration)
		defer tick.Stop()

		for {
			select {
			case <-ctx.Done():
				return

			case <-tick.C:
			}

			sendMutex.Lock()
			err := client.Send(&vagrant_server.RunnerJobStreamRequest{
				Event: &vagrant_server.RunnerJobStreamRequest_Heartbeat_{
					Heartbeat: &vagrant_server.RunnerJobStreamRequest_Heartbeat{},
				},
			})
			sendMutex.Unlock()
			if err != nil && err != io.EOF {
				log.Warn("error during heartbeat", "err", err)
			}
		}
	}()

	// We need to get our data source next prior to executing.
	var result *vagrant_server.Job_Result
	wd, closer, err := r.downloadJobData(
		ctx,
		log,
		ui,
		assignment.Assignment.Job.DataSource,
		assignment.Assignment.Job.DataSourceOverrides,
	)
	if err == nil {
		log.Debug("job data downloaded (or local)", "pwd", wd)

		if closer != nil {
			defer func() {
				log.Debug("cleaning up downloaded data")
				if err := closer(); err != nil {
					log.Warn("error cleaning up data", "err", err)
				}
			}()
		}

		// We want the working directory to always be absolute.
		if !filepath.IsAbs(wd) {
			err = status.Errorf(codes.Internal,
				"data working directory should be absolute. This is a bug, please report it.")
		}

		if err == nil {
			// Execute the job. We have to close the UI right afterwards to
			// ensure that no more output is writting to the client.
			log.Info("starting job execution")
			result, err = r.executeJob(ctx, log, ui, assignment.Assignment.Job, wd)
			if ui, ok := ui.(io.Closer); ok {
				ui.Close()
			}
			log.Debug("job finished", "error", err)
		}
	}

	// Check if we were force canceled. If so, then just exit now. Realistically
	// we could also be force cancelled at any point below but this is the
	// most likely spot to catch it and the error scenario below is not bad.
	if ctx.Err() != nil {
		select {
		case err := <-errCh:
			// If we got an EOF then we were force cancelled.
			if err == io.EOF {
				log.Info("job force canceled")
				return nil
			}
		default:
		}
	}

	// For the remainder of the job, we're going to hold the mutex. We are
	// just sending quick status updates so this should not block anything
	// for very long.
	sendMutex.Lock()
	defer sendMutex.Unlock()

	// Handle job execution errors
	if err != nil {
		st, _ := status.FromError(err)

		log.Warn("error during job execution", "err", err)
		if rpcerr := client.Send(&vagrant_server.RunnerJobStreamRequest{
			Event: &vagrant_server.RunnerJobStreamRequest_Error_{
				Error: &vagrant_server.RunnerJobStreamRequest_Error{
					Error: st.Proto(),
				},
			},
		}); rpcerr != nil {
			log.Warn("error sending error event, job may be dangling", "err", rpcerr)
		}

		return nil
	}

	// Complete the job
	log.Debug("sending job completion")
	if err := client.Send(&vagrant_server.RunnerJobStreamRequest{
		Event: &vagrant_server.RunnerJobStreamRequest_Complete_{
			Complete: &vagrant_server.RunnerJobStreamRequest_Complete{
				Result: result,
			},
		},
	}); err != nil {
		log.Error("error sending job complete message", "error", err)
		return err
	}

	// Wait for the connection to close. We do this because this ensures
	// that the server received our completion and updated the database.
	err = <-errCh
	if err == io.EOF {
		return nil
	}

	return err
}