File: clusterlock.go

package info (click to toggle)
mirrorbits 0.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 984 kB
  • sloc: sh: 675; makefile: 93
file content (96 lines) | stat: -rw-r--r-- 2,233 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// Copyright (c) 2014-2019 Ludovic Fauvet
// Licensed under the MIT license

package network

import (
	"errors"
	"os"
	"time"

	"github.com/etix/mirrorbits/database"
	"github.com/gomodule/redigo/redis"
)

const (
	lockTTL     = 10 // in seconds
	lockRefresh = 5  // in seconds
)

// ClusterLock holds the internal structure of a ClusterLock
type ClusterLock struct {
	redis      *database.Redis
	key        string
	identifier string
	done       chan struct{}
}

// NewClusterLock returns a new instance of a ClusterLock.
// A ClucterLock is used to maitain a lock on a mirror that is being
// scanned. The lock is renewed every lockRefresh seconds and is
// automatically released by the redis database every lockTTL seconds
// allowing the lock to be released even if the application is killed.
func NewClusterLock(redis *database.Redis, key, identifier string) *ClusterLock {
	return &ClusterLock{
		redis:      redis,
		key:        key,
		identifier: identifier,
	}
}

// Get tries to obtain an exclusive lock, cluster wide, for the given mirror
func (n *ClusterLock) Get() (<-chan struct{}, error) {
	if n.done != nil {
		return nil, errors.New("lock already in use")
	}

	conn := n.redis.Get()
	defer conn.Close()

	if conn.Err() != nil {
		return nil, conn.Err()
	}

	_, err := redis.String(conn.Do("SET", n.key, 1, "NX", "EX", lockTTL))
	if err == redis.ErrNil {
		return nil, nil
	} else if err != nil {
		return nil, err
	}

	n.done = make(chan struct{})

	// Maintain the lock active until release
	go func() {
		conn := n.redis.Get()
		defer conn.Close()

		for {
			select {
			case <-n.done:
				n.done = nil
				conn.Do("DEL", n.key)
				return
			case <-time.After(lockRefresh * time.Second):
				result, err := redis.Int(conn.Do("EXPIRE", n.key, lockTTL))
				if err != nil {
					log.Errorf("Renewing lock for %s failed: %s", n.identifier, err)
					return
				} else if result == 0 {
					log.Errorf("Renewing lock for %s failed: lock disappeared", n.identifier)
					return
				}
				if os.Getenv("DEBUG") != "" {
					log.Debugf("[%s] Lock renewed", n.identifier)
				}
			}
		}
	}()

	return n.done, nil
}

// Release releases the exclusive lock on the mirror
func (n *ClusterLock) Release() {
	close(n.done)
}