1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
package redistool
import (
"context"
"errors"
"github.com/redis/rueidis"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
)
var (
attemptsExceeded = errors.New("failed to execute Redis transaction too many times")
)
// Optimistic locking pattern.
// See https://redis.io/docs/interact/transactions/
// See https://github.com/redis/rueidis#cas-pattern
// Returns attemptsExceeded if maxAttempts attempts ware made but all failed.
func transaction(ctx context.Context, maxAttempts int, c rueidis.DedicatedClient,
conflict otelmetric.Int64Counter, attributes attribute.Set,
cb func(context.Context) ([]rueidis.Completed, error), keys ...string) (retErr error) {
execCalled := false
defer func() {
if execCalled {
return
}
// x. UNWATCH if there was an error or nothing to delete.
err := c.Do(ctx, c.B().Unwatch().Build()).Error()
if retErr == nil {
retErr = err
}
}()
for i := 0; i < maxAttempts; i++ {
// 1. WATCH
execCalled = false // Enable deferred cleanup (for retries)
err := c.Do(ctx, c.B().Watch().Key(keys...).Build()).Error()
if err != nil {
return err
}
// 2. READ
cmds, err := cb(ctx)
if err != nil {
return err
}
if len(cmds) == 0 {
return nil
}
// 3. Mutation via MULTI+EXEC
multiExec := make([]rueidis.Completed, 0, len(cmds)+2)
multiExec = append(multiExec, c.B().Multi().Build())
multiExec = append(multiExec, cmds...)
multiExec = append(multiExec, c.B().Exec().Build())
resp := c.DoMulti(ctx, multiExec...)
execCalled = true // Disable deferred UNWATCH as Redis UNWATCHes all keys on EXEC.
errs := MultiErrors(resp[:len(resp)-1]) // all but the last one, which is EXEC
if len(errs) > 0 { // Something is wrong with commands or I/O, abort
return errors.Join(errs...)
}
// EXEC error
execErr := resp[len(resp)-1].Error()
switch execErr { //nolint: errorlint
case nil: // Success!
return nil
case rueidis.Nil: // EXEC detected a conflict, retry.
conflict.Add(context.Background(), 1, otelmetric.WithAttributeSet(attributes)) //nolint: contextcheck
default: // EXEC failed in a bad way, abort
return execErr
}
}
return attemptsExceeded
}
|