File: pipeline.go

package info (click to toggle)
golang-github-tideland-golib 4.24.2-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,144 kB
  • sloc: makefile: 4
file content (120 lines) | stat: -rw-r--r-- 2,597 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Tideland Go Library - Redis Client - Pipeline
//
// Copyright (C) 2009-2017 Frank Mueller / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

package redis

//--------------------
// IMPORTS
//--------------------

import (
	"strings"

	"github.com/tideland/golib/errors"
	"github.com/tideland/golib/identifier"
	"github.com/tideland/golib/monitoring"
)

//--------------------
// CONNECTION
//--------------------

// Pipeline manages a Redis connection executing
// pipelined commands.
type Pipeline struct {
	database *Database
	resp     *resp
	counter  int
}

// newPipeline creates a new pipeline instance.
func newPipeline(db *Database) (*Pipeline, error) {
	ppl := &Pipeline{
		database: db,
	}
	err := ppl.ensureProtocol()
	if err != nil {
		return nil, err
	}
	// Perform authentication and database selection.
	if err != nil {
		return nil, err
	}
	err = ppl.resp.authenticate()
	if err != nil {
		ppl.database.pool.kill(ppl.resp)
		return nil, err
	}
	err = ppl.resp.selectDatabase()
	if err != nil {
		ppl.database.pool.kill(ppl.resp)
		return nil, err
	}
	return ppl, nil
}

// Do executes one Redis command and returns
// the result as result set.
func (ppl *Pipeline) Do(cmd string, args ...interface{}) error {
	cmd = strings.ToLower(cmd)
	if strings.Contains(cmd, "subscribe") {
		return errors.New(ErrUseSubscription, errorMessages)
	}
	err := ppl.ensureProtocol()
	if err != nil {
		return err
	}
	if ppl.database.monitoring {
		m := monitoring.BeginMeasuring(identifier.Identifier("redis", "command", cmd))
		defer m.EndMeasuring()
	}
	err = ppl.resp.sendCommand(cmd, args...)
	logCommand(cmd, args, err, ppl.database.logging)
	if err != nil {
		return err
	}
	ppl.counter++
	return err
}

// Collect collects all the result sets of the commands and returns
// the connection back into the pool.
func (ppl *Pipeline) Collect() ([]*ResultSet, error) {
	defer func() {
		ppl.resp = nil
	}()
	err := ppl.ensureProtocol()
	if err != nil {
		return nil, err
	}
	results := []*ResultSet{}
	for i := ppl.counter; i > 0; i-- {
		result, err := ppl.resp.receiveResultSet()
		if err != nil {
			ppl.database.pool.kill(ppl.resp)
			return nil, err
		}
		results = append(results, result)
	}
	ppl.database.pool.push(ppl.resp)
	return results, nil
}

// ensureProtocol retrieves a protocol from the pool if needed.
func (ppl *Pipeline) ensureProtocol() error {
	if ppl.resp == nil {
		p, err := ppl.database.pool.pull(unforcedPull)
		if err != nil {
			return err
		}
		ppl.resp = p
		ppl.counter = 0
	}
	return nil
}

// EOF