1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
// Tideland Go Library - Redis Client - Pipeline
//
// Copyright (C) 2009-2017 Frank Mueller / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.
package redis
//--------------------
// IMPORTS
//--------------------
import (
"strings"
"github.com/tideland/golib/errors"
"github.com/tideland/golib/identifier"
"github.com/tideland/golib/monitoring"
)
//--------------------
// CONNECTION
//--------------------
// Pipeline manages a Redis connection executing
// pipelined commands.
type Pipeline struct {
database *Database
resp *resp
counter int
}
// newPipeline creates a new pipeline instance.
func newPipeline(db *Database) (*Pipeline, error) {
ppl := &Pipeline{
database: db,
}
err := ppl.ensureProtocol()
if err != nil {
return nil, err
}
// Perform authentication and database selection.
if err != nil {
return nil, err
}
err = ppl.resp.authenticate()
if err != nil {
ppl.database.pool.kill(ppl.resp)
return nil, err
}
err = ppl.resp.selectDatabase()
if err != nil {
ppl.database.pool.kill(ppl.resp)
return nil, err
}
return ppl, nil
}
// Do executes one Redis command and returns
// the result as result set.
func (ppl *Pipeline) Do(cmd string, args ...interface{}) error {
cmd = strings.ToLower(cmd)
if strings.Contains(cmd, "subscribe") {
return errors.New(ErrUseSubscription, errorMessages)
}
err := ppl.ensureProtocol()
if err != nil {
return err
}
if ppl.database.monitoring {
m := monitoring.BeginMeasuring(identifier.Identifier("redis", "command", cmd))
defer m.EndMeasuring()
}
err = ppl.resp.sendCommand(cmd, args...)
logCommand(cmd, args, err, ppl.database.logging)
if err != nil {
return err
}
ppl.counter++
return err
}
// Collect collects all the result sets of the commands and returns
// the connection back into the pool.
func (ppl *Pipeline) Collect() ([]*ResultSet, error) {
defer func() {
ppl.resp = nil
}()
err := ppl.ensureProtocol()
if err != nil {
return nil, err
}
results := []*ResultSet{}
for i := ppl.counter; i > 0; i-- {
result, err := ppl.resp.receiveResultSet()
if err != nil {
ppl.database.pool.kill(ppl.resp)
return nil, err
}
results = append(results, result)
}
ppl.database.pool.push(ppl.resp)
return results, nil
}
// ensureProtocol retrieves a protocol from the pool if needed.
func (ppl *Pipeline) ensureProtocol() error {
if ppl.resp == nil {
p, err := ppl.database.pool.pull(unforcedPull)
if err != nil {
return err
}
ppl.resp = p
ppl.counter = 0
}
return nil
}
// EOF
|