File: lock_racer_command.go

package info (click to toggle)
etcd 3.5.16-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,868 kB
  • sloc: sh: 3,136; makefile: 477
file content (94 lines) | stat: -rw-r--r-- 2,335 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package runner

import (
	"context"
	"errors"
	"fmt"
	"sync"

	"go.etcd.io/etcd/client/v3/concurrency"

	"github.com/spf13/cobra"
)

// NewLockRacerCommand returns the cobra command for "lock-racer runner".
func NewLockRacerCommand() *cobra.Command {
	cmd := &cobra.Command{
		Use:   "lock-racer [name of lock (defaults to 'racers')]",
		Short: "Performs lock race operation",
		Run:   runRacerFunc,
	}
	cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
	return cmd
}

func runRacerFunc(cmd *cobra.Command, args []string) {
	racers := "racers"
	if len(args) == 1 {
		racers = args[0]
	}

	if len(args) > 1 {
		ExitWithError(ExitBadArgs, errors.New("lock-racer takes at most one argument"))
	}

	rcs := make([]roundClient, totalClientConnections)
	ctx := context.Background()
	// mu ensures validate and release funcs are atomic.
	var mu sync.Mutex
	cnt := 0

	eps := endpointsFromFlag(cmd)

	for i := range rcs {
		var (
			s   *concurrency.Session
			err error
		)

		rcs[i].c = newClient(eps, dialTimeout)

		for {
			s, err = concurrency.NewSession(rcs[i].c)
			if err == nil {
				break
			}
		}
		m := concurrency.NewMutex(s, racers)
		rcs[i].acquire = func() error { return m.Lock(ctx) }
		rcs[i].validate = func() error {
			mu.Lock()
			defer mu.Unlock()
			if cnt++; cnt != 1 {
				return fmt.Errorf("bad lock; count: %d", cnt)
			}
			return nil
		}
		rcs[i].release = func() error {
			mu.Lock()
			defer mu.Unlock()
			if err := m.Unlock(ctx); err != nil {
				return err
			}
			cnt = 0
			return nil
		}
	}
	// each client creates 1 key from NewMutex() and delete it from Unlock()
	// a round involves in 2*len(rcs) requests.
	doRounds(rcs, rounds, 2*len(rcs))
}