File: checkpoint.md

package info (click to toggle)
tiup 1.16.4-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,032 kB
  • sloc: sh: 1,988; makefile: 232; sql: 16
file content (229 lines) | stat: -rw-r--r-- 9,727 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# The checkpoint implementation for tiup-cluster and tiup-dm

When there is an occasional error on `tiup cluster` or `tiup dm` command, some users may want to retry previews action from the fail point instead of from scratch.

For example, the following tasks:

```
1. download package
          |
2. scp package to remote
          |
3. unzip remote package     <- failed here
          |
4. start service
```

If something wrong with the third task, retry from the first task is OK because TiUP provides a guarantee that all commands are idempotent. However, for some large clusters, it may waste a lot of time on successful tasks (task 1 and stpe 2) and the user may want to restart the process from task 3.

## The audit log

tiup-cluster and tiup-dm will generate an audit log file in `${TIUP_HOME}/storage/{cluster,dm}/audit/`, you can view the audit list with the command `tiup cluster audit` or `tiup dm audit`. The list looks like this:

```
ID           Time                       Command
--           ----                       -------
fxgcScKJ2Kd  2021-01-21T18:36:10+08:00  /home/tidb/.tiup/components/cluster/v1.3.1/tiup-cluster display test
fxgcRrMnBz8  2021-01-21T18:35:56+08:00  /home/tidb/.tiup/components/cluster/v1.3.1/tiup-cluster start test
```

The first column is the id of the audit, to view a specified audit log, use the command `tiup cluster audit <id>` or `tiup dm audit <id>`, the content of the audit log is something like this:

```
/home/tidb/.tiup/components/cluster/v1.3.1/tiup-cluster display test
2021-01-21T18:36:08.380+0800    INFO    Execute command {"command": "tiup cluster display test"}
2021-01-21T18:36:09.805+0800    INFO    SSHCommand      {"host": "172.16.5.140", "port": "22", "cmd": "xxx command", "stdout": "xxxx", "stderr": ""}
```

The first line of the file is the command the user executed, the following lines are structure logs.

## The checkpoint

In the implementation of the checkpoint, we mix checkpoint in the audit log, like this:

```
/home/tidb/.tiup/components/cluster/v1.3.1/tiup-cluster display test
2021-01-21T18:36:08.380+0800    INFO    Execute command {"command": "tiup cluster display test"}
2021-01-21T18:36:09.805+0800    INFO    SSHCommand      {"host": "172.16.5.140", "port": "22", "cmd": "echo task 1", "stdout": "task 1", "stderr": ""}
2021-01-21T18:36:09.806+0800    INFO    CheckPoint      {"host": "172.16.5.140", "n": 1, "result": true}
2021-01-21T18:36:09.805+0800    INFO    SSHCommand      {"host": "172.16.5.140", "port": "22", "cmd": "echo task 2", "stdout": "task 2", "stderr": ""}
2021-01-21T18:36:09.806+0800    INFO    CheckPoint      {"host": "172.16.5.140", "n": 2, "result": true}
2021-01-21T18:36:09.805+0800    INFO    SSHCommand      {"host": "172.16.5.140", "port": "22", "cmd": "echo task 3", "stdout": "task 2", "stderr": ""}
2021-01-21T18:36:09.806+0800    INFO    CheckPoint      {"host": "172.16.5.140", "n": 3, "result": true}
```

If the user runs tiup-cluster or tiup-dm in replay mode by giving an audit id, we will parse that audit log file and pick up all `CheckPoint` by order into a queue, then in corresponding functions, we check if the checkpoint is in the queue, if hit, we dequeue the checkpoint and return the result directly instead of doing the real work. Example:

```golang
func init() {
    // Register checkpoint fields so that we know how to compare checkpoints
    countHost := checkpoint.Register(
        checkpoint.Field("host", reflect.DeepEqual),
        checkpoint.Field("n", func(a, b interface{}) bool {
            // the n is a int, however, it will be float after it write to json because json only has float number.
            // so we just compare the string format.
            return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b)
        }),
    )
}

func processCommand() {
    // we will explain the context in the next section
    ctx := checkpoint.NewContext(context.Background())
    r, err := task(ctx, host1, 1)
    handleResult(r, err)
    r, err = task(ctx, host2, 2)
    handleResult(r, err)
    r, err = task(ctx, host3, 3)
    handleResult(r, err)
}

func task(ctx context.Context, host string, n int) (result bool, err error) {
    // first, we get the checkpoint from audit log
    point := checkpoint.Acquire(ctx, countHost, map[string]interface{}{
        "host": host,
        "n":  n,
    })
    defer func() {
        // we must call point.Release, otherwise there will be a resource leak.
        // the release function will write the checkpoint into current audit log (not the one user specified)
        // for latter replay.
        point.Release(err,
            zap.String("host", host),
            zap.Int("n", n),
            zap.Bool("result", result),
        )
    }()
    // Then, if the checkpoint exists in the specified audit file, point.Hit() will return map[string]interface{}
    if point.Hit() != nil {
        return point.Hit()["result"].(bool), nil
    }

    // Last, if the checkpoint does not exist in the specified audit file, we should do real work and return the result
    return do_real_work(host, n)
}
```

## The context

If the function with checkpoint calls another function with checkpoint, we will get into trouble:

```golang
func processCommand() {
    ctx := checkpoint.NewContext(context.Background())
    r, err := task(ctx, host1, 1)
    handleResult(r, err)
    r, err := task(ctx, host1, 0)
    handleResult(r, err)
}

// we use a flag mock that the task only return true once
var flag = true
func task(ctx context.Context, host string, n int) (result bool, err error) {
    point := checkpoint.Acquire(ctx, countHost, map[string]interface{}{ ... }
    defer func() { point.Release( ... ) }
    if point.Hit() != nil { ... }

    if n > 1 {
        return task(ctx, host, n - 1)
    }

    defer func() { flag = false }()
    return flag, nil
}
```

The execution flow and return value will be:

```
task(1)[called by processCommand]:  return true
|
task(0)[called by task(1)]:         return true
|
task(0)[called by processCommand]:  return false
```

the checkpoint in audit log will be:

```
... {"host": "...", "n": 1, "result": true}
... {"host": "...", "n": 0, "result": true}
... {"host": "...", "n": 0, "result": false}
```

There are three checkpoints, but when we try to replay the process, the `task(0)[called by task(1)]` will not be called at all since `task(1)` will return early with the cached result, so the execution flow will be:

```
task(1)[called by processCommand]:  return true (cached by {"host": "...", "n": 1, "result": true})
|
task(0)[called by processCommand]:  return true (cached by {"host": "...", "n": 0, "result": true})
```

The trouble is coming: in the real case the `task(0)[called by processCommand]` returns false but in replay case it return true because it takes the result of `task(0)[called by task(1)]` by mistake. The problem is that the `CheckPoint` of `task(0)[called by task(1)]` should not be record because it's parent, `task(0)[called by processCommand]`, has record a `CheckPoint`.

So we implement a semaphore and insert it into the context passed to `checkpoint.Acquire`, the context or it's ancestor must be generated by `checkpoint.NewContext` where the semaphore is generated. When `checkpoint.Acquire` called, it will try to acquire the semaphore and record if it success in the returned value, when we call `Release` on the returned value, it will check if previews semaphore acquire success, if not, the `Release` will not writing checkpoint.

## Parallel task

Because we use a semaphore in the context to trace if it's the first stack layer that wants to write checkpoint, the context can't be shared between goroutines:

```golang
func processCommand() {
    ctx := checkpoint.NewContext(context.Background())

    for _, n := range []int{1, 2, 3} {
        go func() {
            r, err := task(ctx, host, n)
            handleResult(r, err)
        }()
    }
}
```

There are three tasks, `task(1)`, `task(2)` and `task(3)`, they run parallelly. What if the `task(0)` run first but return last?

```
task(1): start -------------------------------------------> return
task(2):         start ------------------------> return
task(3):         start ------------------------> return
```

The checkpoint of `task(2)` and `task(3)` will not be recorded because they think they are called by `task(1)`. The solution is to add a semaphore for every goroutine:

```golang
func processCommand() {
    ctx := checkpoint.NewContext(context.Background())

    for _, n := range []int{1, 2, 3} {
        go func(ctx context.Context) {
            r, err := task(ctx, host, n)
            handleResult(r, err)
        }(checkpoint.NewContext(ctx))
    }
}
```

What if the `processCommand` or its' ancestor has its' own checkpoint?

```golang
func processTask(ctx context.Context) {
    p := checkpoint.Acquire(...)
    defer func() { p.Release(...) }()
    if p.Hit() != nil { ... }

    return processCommand(ctx)
}

func processCommand(ctx context.Context) {
    for _, n := range []int{1, 2, 3} {
        go func(ctx context.Context) {
            r, err := task(ctx, host, n)
            handleResult(r, err)
        }(checkpoint.NewContext(ctx))
    }
}
```

If `checkpoint.NewContext` just append a unacquired semaphore, the checkpoint of `processTask` and it's children(`task(1..3)`) will be all recorded, that's not correct (we have talked this before).

So the `checkpoint.NewContext` should check if there is already a semaphore in current context, if there is, just copy it's value. By this way, if `processTask` has acquired the semaphore, the `task(1..3)` will get their own acquired semaphore, otherwise, they will get their own unacquired semaphore.