1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
|
package redistool
import (
"context"
"strconv"
"time"
"github.com/redis/rueidis"
otelmetric "go.opentelemetry.io/otel/metric"
)
// KeyToRedisKey is used to convert typed key (key1 or key2) into a string.
// HSET key1 key2 value.
type KeyToRedisKey[K any] func(key K) string
// ExpiringHash represents a two-level hash: key K1 -> hashKey K2 -> value []byte.
// key identifies the hash; hashKey identifies the key in the hash; value is the value for the hashKey.
// It is not safe for concurrent use.
type ExpiringHash[K1 any, K2 any] interface {
// GetName returns the name of this hash
GetName() string
// Set sets the key -> hashKey -> value. The mapping is stored in RAM and in the backing store.
// Use Refresh to refresh the value in the backing store.
Set(ctx context.Context, key K1, hashKey K2, value []byte) error
// SetEX sets the key -> hashKey -> value. The value is stored in the backing store only (unlike Set).
// Refresh does not refresh this value in the backing store. Use this method to re-set (i.e. refresh) the value
// in the backing store.
// Safe for concurrent use.
SetEX(ctx context.Context, key K1, hashKey K2, value []byte, expiresAt time.Time) error
Unset(ctx context.Context, key K1, hashKey K2) error
// Forget only removes the item from the in-memory map.
Forget(key K1, hashKey K2)
// Scan iterates key-value pairs for key.
// Safe for concurrent use.
Scan(ctx context.Context, key K1, cb ScanCallback) error
// Len returns number of key-value mappings in the hash identified by key.
Len(ctx context.Context, key K1) (int64, error)
// GC returns a function that iterates all relevant stored data and deletes expired entries.
// The returned function can be called concurrently as it does not interfere with the hash's operation.
// The function returns number of deleted Redis (hash) keys, including when an error occurred.
// It only inspects/GCs hashes where it has entries. Other concurrent clients GC same and/or other corresponding hashes.
// Hashes that don't have a corresponding client (e.g. because it crashed) will expire because of TTL on the hash key.
GC() func(context.Context) (int /* keysDeleted */, error)
// GCFor returns a function that iterates the hash for the given keys and deletes expired entries.
// GCFor is useful when executing GC for specific keys.
GCFor(keys []K1) func(context.Context) (int /* keysDeleted */, error)
// Refresh refreshes data in the backing store to prevent it from expiring.
Refresh(ctx context.Context, nextRefresh time.Time) error
}
type RedisExpiringHash[K1 comparable, K2 comparable] struct {
name string
client rueidis.Client
key1ToRedisKey KeyToRedisKey[K1]
key2ToRedisKey KeyToRedisKey[K2]
ttl time.Duration
api *RedisExpiringHashAPI[K1, K2]
data map[K1]map[K2]*ExpiringValue // key -> hash key -> value
dataEX map[K1]struct{}
transactionalGC bool
}
func NewRedisExpiringHash[K1 comparable, K2 comparable](name string, client rueidis.Client, key1ToRedisKey KeyToRedisKey[K1],
key2ToRedisKey KeyToRedisKey[K2], ttl time.Duration, m otelmetric.Meter, transactionalGC bool) (*RedisExpiringHash[K1, K2], error) {
api, err := NewRedisExpiringHashAPI[K1, K2](name, client, key1ToRedisKey, key2ToRedisKey, m)
if err != nil {
return nil, err
}
return &RedisExpiringHash[K1, K2]{
name: name,
client: client,
key1ToRedisKey: key1ToRedisKey,
key2ToRedisKey: key2ToRedisKey,
ttl: ttl,
api: api,
data: make(map[K1]map[K2]*ExpiringValue),
dataEX: make(map[K1]struct{}),
transactionalGC: transactionalGC,
}, nil
}
func (h *RedisExpiringHash[K1, K2]) GetName() string {
return h.name
}
func (h *RedisExpiringHash[K1, K2]) Set(ctx context.Context, key K1, hashKey K2, value []byte) error {
ev := &ExpiringValue{
ExpiresAt: time.Now().Add(h.ttl).Unix(),
Value: value,
}
h.setData(key, hashKey, ev)
b := h.api.SetBuilder()
b.Set(key, h.ttl, BuilderKV[K2]{
HashKey: hashKey,
Value: ev,
})
return b.Do(ctx)
}
func (h *RedisExpiringHash[K1, K2]) SetEX(ctx context.Context, key K1, hashKey K2, value []byte, expiresAt time.Time) error {
h.dataEX[key] = struct{}{}
b := h.api.SetBuilder()
b.Set(key, h.ttl, BuilderKV[K2]{
HashKey: hashKey,
Value: &ExpiringValue{
ExpiresAt: expiresAt.Unix(),
Value: value,
},
})
return b.Do(ctx)
}
func (h *RedisExpiringHash[K1, K2]) Unset(ctx context.Context, key K1, hashKey K2) error {
h.unsetData(key, hashKey)
return h.api.Unset(ctx, key, hashKey)
}
func (h *RedisExpiringHash[K1, K2]) Forget(key K1, hashKey K2) {
h.unsetData(key, hashKey)
}
func (h *RedisExpiringHash[K1, K2]) Len(ctx context.Context, key K1) (size int64, retErr error) {
hlenCmd := h.client.B().Hlen().Key(h.key1ToRedisKey(key)).Build()
return h.client.Do(ctx, hlenCmd).AsInt64()
}
func (h *RedisExpiringHash[K1, K2]) Scan(ctx context.Context, key K1, cb ScanCallback) error {
return h.api.Scan(ctx, key, cb)
}
func (h *RedisExpiringHash[K1, K2]) GC() func(context.Context) (int /* keysDeleted */, error) {
// Copy keys for safe concurrent access.
keys := make([]K1, 0, len(h.data)+len(h.dataEX))
for key := range h.data {
keys = append(keys, key)
}
for key := range h.dataEX {
keys = append(keys, key)
}
clear(h.dataEX)
return h.GCFor(keys)
}
func (h *RedisExpiringHash[K1, K2]) GCFor(keys []K1) func(context.Context) (int /* keysDeleted */, error) {
return h.api.GCFor(keys, h.transactionalGC)
}
func (h *RedisExpiringHash[K1, K2]) Refresh(ctx context.Context, nextRefresh time.Time) error {
expiresAt := time.Now().Add(h.ttl).Unix()
nextRefreshUnix := nextRefresh.Unix()
b := h.api.SetBuilder()
var kvs []BuilderKV[K2]
for key, hashData := range h.data {
kvs = kvs[:0] // reuse backing array, but reset length
for hashKey, value := range hashData {
if value.ExpiresAt > nextRefreshUnix {
// Expires after next refresh. Will be refreshed later, no need to refresh now.
continue
}
value.ExpiresAt = expiresAt
kvs = append(kvs, BuilderKV[K2]{
HashKey: hashKey,
Value: value,
})
}
b.Set(key, h.ttl, kvs...)
}
return b.Do(ctx)
}
func (h *RedisExpiringHash[K1, K2]) setData(key K1, hashKey K2, value *ExpiringValue) {
nm := h.data[key]
if nm == nil {
nm = make(map[K2]*ExpiringValue, 1)
h.data[key] = nm
}
nm[hashKey] = value
}
func (h *RedisExpiringHash[K1, K2]) unsetData(key K1, hashKey K2) {
nm := h.data[key]
delete(nm, hashKey)
if len(nm) == 0 {
delete(h.data, key)
}
}
func PrefixedInt64Key(prefix string, key int64) string {
return prefix + strconv.FormatInt(key, 32)
}
|