File: context.go

package info (click to toggle)
tiup 1.16.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 6,384 kB
  • sloc: sh: 1,988; makefile: 138; sql: 16
file content (196 lines) | stat: -rw-r--r-- 5,564 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ctxt

import (
	"context"
	"runtime"
	"sync"
	"time"

	"github.com/pingcap/tiup/pkg/checkpoint"
	logprinter "github.com/pingcap/tiup/pkg/logger/printer"
	"github.com/pingcap/tiup/pkg/utils/mock"
)

type contextKey string

const (
	ctxKey = contextKey("TASK_CONTEXT")
)

const (
	// CtxBaseTopo is key of store the base topology in context.Context
	CtxBaseTopo = contextKey("BASE_TOPO")
)

type (
	// Executor is the executor interface for TiUP, all tasks will in the end
	// be passed to an executor and then be actually performed.
	Executor interface {
		// Execute run the command, then return its stdout and stderr
		// NOTE: stdin is not supported as it seems we don't need it (for now). If
		// at some point in the future we need to pass stdin to a command, we'll
		// need to refactor this function and its implementations.
		// If the cmd can't quit in timeout, it will return error, the default timeout is 60 seconds.
		Execute(ctx context.Context, cmd string, sudo bool, timeout ...time.Duration) (stdout []byte, stderr []byte, err error)

		// Transfer copies files from or to a target
		Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error
	}

	// ExecutorGetter get the executor by host.
	ExecutorGetter interface {
		Get(host string) (e Executor)
		// GetSSHKeySet gets the SSH private and public key path
		GetSSHKeySet() (privateKeyPath, publicKeyPath string)
	}

	// Context is used to share state while multiple tasks execution.
	// We should use mutex to prevent concurrent R/W for some fields
	// because of the same context can be shared in parallel tasks.
	Context struct {
		mutex sync.RWMutex

		Ev EventBus

		exec struct {
			executors    map[string]Executor
			stdouts      map[string][]byte
			stderrs      map[string][]byte
			checkResults map[string][]any
		}

		// The private/public key is used to access remote server via the user `tidb`
		PrivateKeyPath string
		PublicKeyPath  string

		Concurrency int // max number of parallel tasks running at the same time
	}
)

// New create a context instance.
func New(ctx context.Context, limit int, logger *logprinter.Logger) context.Context {
	concurrency := runtime.NumCPU()
	if limit > 0 {
		concurrency = limit
	}

	return context.WithValue(
		context.WithValue(
			checkpoint.NewContext(ctx),
			logprinter.ContextKeyLogger,
			logger,
		),
		ctxKey,
		&Context{
			mutex: sync.RWMutex{},
			Ev:    NewEventBus(),
			exec: struct {
				executors    map[string]Executor
				stdouts      map[string][]byte
				stderrs      map[string][]byte
				checkResults map[string][]any
			}{
				executors:    make(map[string]Executor),
				stdouts:      make(map[string][]byte),
				stderrs:      make(map[string][]byte),
				checkResults: make(map[string][]any),
			},
			Concurrency: concurrency, // default to CPU count
		},
	)
}

// GetInner return *Context from context.Context's value
func GetInner(ctx context.Context) *Context {
	return ctx.Value(ctxKey).(*Context)
}

// Get implements the operation.ExecutorGetter interface.
func (ctx *Context) Get(host string) (e Executor) {
	ctx.mutex.Lock()
	e, ok := ctx.exec.executors[host]
	ctx.mutex.Unlock()

	if !ok {
		panic("no init executor for " + host)
	}
	return
}

// GetSSHKeySet implements the operation.ExecutorGetter interface.
func (ctx *Context) GetSSHKeySet() (privateKeyPath, publicKeyPath string) {
	return ctx.PrivateKeyPath, ctx.PublicKeyPath
}

// GetExecutor get the executor.
func (ctx *Context) GetExecutor(host string) (e Executor, ok bool) {
	// Mock point for unit test
	if e := mock.On("FakeExecutor"); e != nil {
		return e.(Executor), true
	}

	ctx.mutex.RLock()
	e, ok = ctx.exec.executors[host]
	ctx.mutex.RUnlock()
	return
}

// SetExecutor set the executor.
func (ctx *Context) SetExecutor(host string, e Executor) {
	ctx.mutex.Lock()
	if e != nil {
		ctx.exec.executors[host] = e
	} else {
		delete(ctx.exec.executors, host)
	}
	ctx.mutex.Unlock()
}

// GetOutputs get the outputs of a host (if has any)
func (ctx *Context) GetOutputs(hostID string) ([]byte, []byte, bool) {
	ctx.mutex.RLock()
	stdout, ok1 := ctx.exec.stdouts[hostID]
	stderr, ok2 := ctx.exec.stderrs[hostID]
	ctx.mutex.RUnlock()
	return stdout, stderr, ok1 && ok2
}

// SetOutputs set the outputs of a host
func (ctx *Context) SetOutputs(hostID string, stdout []byte, stderr []byte) {
	ctx.mutex.Lock()
	ctx.exec.stdouts[hostID] = stdout
	ctx.exec.stderrs[hostID] = stderr
	ctx.mutex.Unlock()
}

// GetCheckResults get the the check result of a host (if has any)
func (ctx *Context) GetCheckResults(host string) (results []any, ok bool) {
	ctx.mutex.RLock()
	results, ok = ctx.exec.checkResults[host]
	ctx.mutex.RUnlock()
	return
}

// SetCheckResults append the check result of a host to the list
func (ctx *Context) SetCheckResults(host string, results []any) {
	ctx.mutex.Lock()
	if currResult, ok := ctx.exec.checkResults[host]; ok {
		ctx.exec.checkResults[host] = append(currResult, results...)
	} else {
		ctx.exec.checkResults[host] = results
	}
	ctx.mutex.Unlock()
}