File: expiring_hash.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (193 lines) | stat: -rw-r--r-- 6,694 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package redistool

import (
	"context"
	"strconv"
	"time"

	"github.com/redis/rueidis"
	otelmetric "go.opentelemetry.io/otel/metric"
)

// KeyToRedisKey is used to convert typed key (key1 or key2) into a string.
// HSET key1 key2 value.
type KeyToRedisKey[K any] func(key K) string

// ExpiringHash represents a two-level hash: key K1 -> hashKey K2 -> value []byte.
// key identifies the hash; hashKey identifies the key in the hash; value is the value for the hashKey.
// It is not safe for concurrent use.
type ExpiringHash[K1 any, K2 any] interface {
	// GetName returns the name of this hash
	GetName() string
	// Set sets the key -> hashKey -> value. The mapping is stored in RAM and in the backing store.
	// Use Refresh to refresh the value in the backing store.
	Set(ctx context.Context, key K1, hashKey K2, value []byte) error
	// SetEX sets the key -> hashKey -> value. The value is stored in the backing store only (unlike Set).
	// Refresh does not refresh this value in the backing store. Use this method to re-set (i.e. refresh) the value
	// in the backing store.
	// Safe for concurrent use.
	SetEX(ctx context.Context, key K1, hashKey K2, value []byte, expiresAt time.Time) error
	Unset(ctx context.Context, key K1, hashKey K2) error
	// Forget only removes the item from the in-memory map.
	Forget(key K1, hashKey K2)
	// Scan iterates key-value pairs for key.
	// Safe for concurrent use.
	Scan(ctx context.Context, key K1, cb ScanCallback) error
	// Len returns number of key-value mappings in the hash identified by key.
	Len(ctx context.Context, key K1) (int64, error)
	// GC returns a function that iterates all relevant stored data and deletes expired entries.
	// The returned function can be called concurrently as it does not interfere with the hash's operation.
	// The function returns number of deleted Redis (hash) keys, including when an error occurred.
	// It only inspects/GCs hashes where it has entries. Other concurrent clients GC same and/or other corresponding hashes.
	// Hashes that don't have a corresponding client (e.g. because it crashed) will expire because of TTL on the hash key.
	GC() func(context.Context) (int /* keysDeleted */, error)
	// GCFor returns a function that iterates the hash for the given keys and deletes expired entries.
	// GCFor is useful when executing GC for specific keys.
	GCFor(keys []K1) func(context.Context) (int /* keysDeleted */, error)
	// Refresh refreshes data in the backing store to prevent it from expiring.
	Refresh(ctx context.Context, nextRefresh time.Time) error
}

type RedisExpiringHash[K1 comparable, K2 comparable] struct {
	name            string
	client          rueidis.Client
	key1ToRedisKey  KeyToRedisKey[K1]
	key2ToRedisKey  KeyToRedisKey[K2]
	ttl             time.Duration
	api             *RedisExpiringHashAPI[K1, K2]
	data            map[K1]map[K2]*ExpiringValue // key -> hash key -> value
	dataEX          map[K1]struct{}
	transactionalGC bool
}

func NewRedisExpiringHash[K1 comparable, K2 comparable](name string, client rueidis.Client, key1ToRedisKey KeyToRedisKey[K1],
	key2ToRedisKey KeyToRedisKey[K2], ttl time.Duration, m otelmetric.Meter, transactionalGC bool) (*RedisExpiringHash[K1, K2], error) {
	api, err := NewRedisExpiringHashAPI[K1, K2](name, client, key1ToRedisKey, key2ToRedisKey, m)
	if err != nil {
		return nil, err
	}

	return &RedisExpiringHash[K1, K2]{
		name:            name,
		client:          client,
		key1ToRedisKey:  key1ToRedisKey,
		key2ToRedisKey:  key2ToRedisKey,
		ttl:             ttl,
		api:             api,
		data:            make(map[K1]map[K2]*ExpiringValue),
		dataEX:          make(map[K1]struct{}),
		transactionalGC: transactionalGC,
	}, nil
}

func (h *RedisExpiringHash[K1, K2]) GetName() string {
	return h.name
}

func (h *RedisExpiringHash[K1, K2]) Set(ctx context.Context, key K1, hashKey K2, value []byte) error {
	ev := &ExpiringValue{
		ExpiresAt: time.Now().Add(h.ttl).Unix(),
		Value:     value,
	}
	h.setData(key, hashKey, ev)

	b := h.api.SetBuilder()
	b.Set(key, h.ttl, BuilderKV[K2]{
		HashKey: hashKey,
		Value:   ev,
	})
	return b.Do(ctx)
}

func (h *RedisExpiringHash[K1, K2]) SetEX(ctx context.Context, key K1, hashKey K2, value []byte, expiresAt time.Time) error {
	h.dataEX[key] = struct{}{}

	b := h.api.SetBuilder()
	b.Set(key, h.ttl, BuilderKV[K2]{
		HashKey: hashKey,
		Value: &ExpiringValue{
			ExpiresAt: expiresAt.Unix(),
			Value:     value,
		},
	})
	return b.Do(ctx)
}

func (h *RedisExpiringHash[K1, K2]) Unset(ctx context.Context, key K1, hashKey K2) error {
	h.unsetData(key, hashKey)
	return h.api.Unset(ctx, key, hashKey)
}

func (h *RedisExpiringHash[K1, K2]) Forget(key K1, hashKey K2) {
	h.unsetData(key, hashKey)
}

func (h *RedisExpiringHash[K1, K2]) Len(ctx context.Context, key K1) (size int64, retErr error) {
	hlenCmd := h.client.B().Hlen().Key(h.key1ToRedisKey(key)).Build()
	return h.client.Do(ctx, hlenCmd).AsInt64()
}

func (h *RedisExpiringHash[K1, K2]) Scan(ctx context.Context, key K1, cb ScanCallback) error {
	return h.api.Scan(ctx, key, cb)
}

func (h *RedisExpiringHash[K1, K2]) GC() func(context.Context) (int /* keysDeleted */, error) {
	// Copy keys for safe concurrent access.
	keys := make([]K1, 0, len(h.data)+len(h.dataEX))
	for key := range h.data {
		keys = append(keys, key)
	}
	for key := range h.dataEX {
		keys = append(keys, key)
	}
	clear(h.dataEX)
	return h.GCFor(keys)
}

func (h *RedisExpiringHash[K1, K2]) GCFor(keys []K1) func(context.Context) (int /* keysDeleted */, error) {
	return h.api.GCFor(keys, h.transactionalGC)
}

func (h *RedisExpiringHash[K1, K2]) Refresh(ctx context.Context, nextRefresh time.Time) error {
	expiresAt := time.Now().Add(h.ttl).Unix()
	nextRefreshUnix := nextRefresh.Unix()
	b := h.api.SetBuilder()
	var kvs []BuilderKV[K2]
	for key, hashData := range h.data {
		kvs = kvs[:0] // reuse backing array, but reset length
		for hashKey, value := range hashData {
			if value.ExpiresAt > nextRefreshUnix {
				// Expires after next refresh. Will be refreshed later, no need to refresh now.
				continue
			}
			value.ExpiresAt = expiresAt
			kvs = append(kvs, BuilderKV[K2]{
				HashKey: hashKey,
				Value:   value,
			})
		}
		b.Set(key, h.ttl, kvs...)
	}
	return b.Do(ctx)
}

func (h *RedisExpiringHash[K1, K2]) setData(key K1, hashKey K2, value *ExpiringValue) {
	nm := h.data[key]
	if nm == nil {
		nm = make(map[K2]*ExpiringValue, 1)
		h.data[key] = nm
	}
	nm[hashKey] = value
}

func (h *RedisExpiringHash[K1, K2]) unsetData(key K1, hashKey K2) {
	nm := h.data[key]
	delete(nm, hashKey)
	if len(nm) == 0 {
		delete(h.data, key)
	}
}

func PrefixedInt64Key(prefix string, key int64) string {
	return prefix + strconv.FormatInt(key, 32)
}