1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
package gocql
import (
"context"
"time"
)
type ExecutableQuery interface {
execute(ctx context.Context, conn *Conn) *Iter
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
retryPolicy() RetryPolicy
speculativeExecutionPolicy() SpeculativeExecutionPolicy
GetRoutingKey() ([]byte, error)
Keyspace() string
IsIdempotent() bool
withContext(context.Context) ExecutableQuery
RetryableQuery
}
type queryExecutor struct {
pool *policyConnPool
policy HostSelectionPolicy
}
func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter {
start := time.Now()
iter := qry.execute(ctx, conn)
end := time.Now()
qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
return iter
}
func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp SpeculativeExecutionPolicy, results chan *Iter) *Iter {
ticker := time.NewTicker(sp.Delay())
defer ticker.Stop()
for i := 0; i < sp.Attempts(); i++ {
select {
case <-ticker.C:
go q.run(ctx, qry, results)
case <-ctx.Done():
return &Iter{err: ctx.Err()}
case iter := <-results:
return iter
}
}
return nil
}
func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
// check if the query is not marked as idempotent, if
// it is, we force the policy to NonSpeculative
sp := qry.speculativeExecutionPolicy()
if !qry.IsIdempotent() || sp.Attempts() == 0 {
return q.do(qry.Context(), qry), nil
}
ctx, cancel := context.WithCancel(qry.Context())
defer cancel()
results := make(chan *Iter, 1)
// Launch the main execution
go q.run(ctx, qry, results)
// The speculative executions are launched _in addition_ to the main
// execution, on a timer. So Speculation{2} would make 3 executions running
// in total.
if iter := q.speculate(ctx, qry, sp, results); iter != nil {
return iter, nil
}
select {
case iter := <-results:
return iter, nil
case <-ctx.Done():
return &Iter{err: ctx.Err()}, nil
}
}
func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery) *Iter {
hostIter := q.policy.Pick(qry)
selectedHost := hostIter()
rt := qry.retryPolicy()
var lastErr error
var iter *Iter
for selectedHost != nil {
host := selectedHost.Info()
if host == nil || !host.IsUp() {
selectedHost = hostIter()
continue
}
pool, ok := q.pool.getPool(host)
if !ok {
selectedHost = hostIter()
continue
}
conn := pool.Pick()
if conn == nil {
selectedHost = hostIter()
continue
}
iter = q.attemptQuery(ctx, qry, conn)
iter.host = selectedHost.Info()
// Update host
switch iter.err {
case context.Canceled, context.DeadlineExceeded, ErrNotFound:
// those errors represents logical errors, they should not count
// toward removing a node from the pool
selectedHost.Mark(nil)
return iter
default:
selectedHost.Mark(iter.err)
}
// Exit if the query was successful
// or no retry policy defined or retry attempts were reached
if iter.err == nil || rt == nil || !rt.Attempt(qry) {
return iter
}
lastErr = iter.err
// If query is unsuccessful, check the error with RetryPolicy to retry
switch rt.GetRetryType(iter.err) {
case Retry:
// retry on the same host
continue
case Rethrow, Ignore:
return iter
case RetryNextHost:
// retry on the next host
selectedHost = hostIter()
continue
default:
// Undefined? Return nil and error, this will panic in the requester
return &Iter{err: ErrUnknownRetryType}
}
}
if lastErr != nil {
return &Iter{err: lastErr}
}
return &Iter{err: ErrNoConnections}
}
func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan<- *Iter) {
select {
case results <- q.do(ctx, qry):
case <-ctx.Done():
}
}
|