File: tiflash.go

package info (click to toggle)
tiup 1.16.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,384 kB
  • sloc: sh: 1,988; makefile: 138; sql: 16
file content (200 lines) | stat: -rw-r--r-- 6,280 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package instance

import (
	"context"
	"fmt"
	"os/exec"
	"path/filepath"
	"strings"

	"github.com/pingcap/errors"
	"github.com/pingcap/tiup/pkg/tidbver"
	"github.com/pingcap/tiup/pkg/utils"
)

// TiFlashRole is the role of TiFlash.
type TiFlashRole string

const (
	// TiFlashRoleNormal is used when TiFlash is not in disaggregated mode.
	TiFlashRoleNormal TiFlashRole = "normal"

	// TiFlashRoleDisaggWrite is used when TiFlash is in disaggregated mode and is the write node.
	TiFlashRoleDisaggWrite TiFlashRole = "write"
	// TiFlashRoleDisaggCompute is used when TiFlash is in disaggregated mode and is the compute node.
	TiFlashRoleDisaggCompute TiFlashRole = "compute"
)

// TiFlashInstance represent a running TiFlash
type TiFlashInstance struct {
	instance
	Role TiFlashRole // Used in wait routine, so it is public

	shOpt           SharedOptions
	tcpPort         int
	servicePort     int
	proxyPort       int
	proxyStatusPort int
	pds             []*PDInstance
	dbs             []*TiDBInstance
	Process
}

// NewTiFlashInstance return a TiFlashInstance
func NewTiFlashInstance(role TiFlashRole, shOpt SharedOptions, binPath, dir, host, configPath string, id int, pds []*PDInstance, dbs []*TiDBInstance, version string) *TiFlashInstance {
	if role != TiFlashRoleNormal && role != TiFlashRoleDisaggWrite && role != TiFlashRoleDisaggCompute {
		panic(fmt.Sprintf("Unknown TiFlash role %s", role))
	}
	if (role == TiFlashRoleDisaggCompute || role == TiFlashRoleDisaggWrite) && shOpt.Mode != "tidb-cse" && shOpt.Mode != "tiflash-disagg" {
		panic(fmt.Sprintf("Unsupported disagg role in mode %s", shOpt.Mode))
	}

	httpPort := 8123
	if !tidbver.TiFlashNotNeedHTTPPortConfig(version) {
		httpPort = utils.MustGetFreePort(host, httpPort, shOpt.PortOffset)
	}
	return &TiFlashInstance{
		shOpt: shOpt,
		instance: instance{
			BinPath:    binPath,
			ID:         id,
			Dir:        dir,
			Host:       host,
			Port:       httpPort,
			StatusPort: utils.MustGetFreePort(host, 8234, shOpt.PortOffset),
			ConfigPath: configPath,
		},
		Role:            role,
		tcpPort:         utils.MustGetFreePort(host, 9100, shOpt.PortOffset), // 9000 for default object store port
		servicePort:     utils.MustGetFreePort(host, 3930, shOpt.PortOffset),
		proxyPort:       utils.MustGetFreePort(host, 20170, shOpt.PortOffset),
		proxyStatusPort: utils.MustGetFreePort(host, 20292, shOpt.PortOffset),
		pds:             pds,
		dbs:             dbs,
	}
}

// Addr return the address of tiflash
func (inst *TiFlashInstance) Addr() string {
	return utils.JoinHostPort(AdvertiseHost(inst.Host), inst.servicePort)
}

// MetricAddr implements Instance interface.
func (inst *TiFlashInstance) MetricAddr() (r MetricAddr) {
	r.Targets = append(r.Targets, utils.JoinHostPort(inst.Host, inst.StatusPort))
	r.Targets = append(r.Targets, utils.JoinHostPort(inst.Host, inst.proxyStatusPort))
	return
}

// Start calls set inst.cmd and Start
func (inst *TiFlashInstance) Start(ctx context.Context) error {
	if !tidbver.TiFlashPlaygroundNewStartMode(inst.Version.String()) {
		return inst.startOld(ctx, inst.Version)
	}

	proxyConfigPath := filepath.Join(inst.Dir, "tiflash_proxy.toml")
	if err := prepareConfig(
		proxyConfigPath,
		"",
		inst.getProxyConfig(),
	); err != nil {
		return err
	}

	configPath := filepath.Join(inst.Dir, "tiflash.toml")
	if err := prepareConfig(
		configPath,
		inst.ConfigPath,
		inst.getConfig(),
	); err != nil {
		return err
	}

	endpoints := pdEndpoints(inst.pds, false)

	args := []string{
		"server",
		fmt.Sprintf("--config-file=%s", configPath),
		"--",
	}
	runtimeConfig := [][]string{
		{"path", filepath.Join(inst.Dir, "data")},
		{"listen_host", inst.Host},
		{"logger.log", inst.LogFile()},
		{"logger.errorlog", filepath.Join(inst.Dir, "tiflash_error.log")},
		{"status.metrics_port", fmt.Sprintf("%d", inst.StatusPort)},
		{"flash.service_addr", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.servicePort)},
		{"raft.pd_addr", strings.Join(endpoints, ",")},
		{"flash.proxy.addr", utils.JoinHostPort(inst.Host, inst.proxyPort)},
		{"flash.proxy.advertise-addr", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.proxyPort)},
		{"flash.proxy.status-addr", utils.JoinHostPort(inst.Host, inst.proxyStatusPort)},
		{"flash.proxy.data-dir", filepath.Join(inst.Dir, "proxy_data")},
		{"flash.proxy.log-file", filepath.Join(inst.Dir, "tiflash_tikv.log")},
	}
	userConfig, err := unmarshalConfig(configPath)
	if err != nil {
		return errors.Trace(err)
	}
	for _, arg := range runtimeConfig {
		// if user has set the config, skip it
		if !isKeyPresentInMap(userConfig, arg[0]) {
			args = append(args, fmt.Sprintf("--%s=%s", arg[0], arg[1]))
		}
	}

	inst.Process = &process{cmd: PrepareCommand(ctx, inst.BinPath, args, nil, inst.Dir)}

	logIfErr(inst.Process.SetOutputFile(inst.LogFile()))
	return inst.Process.Start()
}

func isKeyPresentInMap(m map[string]any, key string) bool {
	keys := strings.Split(key, ".")
	currentMap := m

	for i := range keys {
		if _, ok := currentMap[keys[i]]; !ok {
			return false
		}

		// If the current value is a nested map, update the current map to the nested map
		if innerMap, ok := currentMap[keys[i]].(map[string]any); ok {
			currentMap = innerMap
		}
	}

	return true
}

// Component return the component name.
func (inst *TiFlashInstance) Component() string {
	return "tiflash"
}

// LogFile return the log file name.
func (inst *TiFlashInstance) LogFile() string {
	return filepath.Join(inst.Dir, "tiflash.log")
}

// Cmd returns the internal Cmd instance
func (inst *TiFlashInstance) Cmd() *exec.Cmd {
	return inst.Process.Cmd()
}

// StoreAddr return the store address of TiFlash
func (inst *TiFlashInstance) StoreAddr() string {
	return utils.JoinHostPort(AdvertiseHost(inst.Host), inst.servicePort)
}