File: multiplexer.go

package info (click to toggle)
golang-github-optiopay-kafka 0.0~git20150921.0.bc8e095-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 400 kB
  • ctags: 461
  • sloc: sh: 45; makefile: 2
file content (122 lines) | stat: -rw-r--r-- 3,041 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package kafka

import (
	"errors"
	"sync"

	"github.com/optiopay/kafka/proto"
)

// ErrMxClosed is returned as a result of closed multiplexer consumption.
var ErrMxClosed = errors.New("closed")

// Mx is multiplexer combining into single stream number of consumers.
//
// It is responsibility of the user of the multiplexer and the consumer
// implementation to handle errors.
// ErrNoData returned by consumer is not passed through by the multiplexer,
// instead consumer that returned ErrNoData is removed from merged set. When
// all consumers are removed (set is empty), Mx is automatically closed and any
// further Consume call will result in ErrMxClosed error.
//
// It is important to remember that because fetch from every consumer is done
// by separate worker, most of the time there is one message consumed by each
// worker that is held in memory while waiting for opportunity to return it
// once Consume on multiplexer is called. Closing multiplexer may result in
// ignoring some of already read, waiting for delivery messages kept internally
// by every worker.
type Mx struct {
	errc chan error
	msgc chan *proto.Message
	stop chan struct{}

	mu      sync.Mutex
	closed  bool
	workers int
}

// Merge is merging consume result of any number of consumers into single stream
// and expose them through returned multiplexer.
func Merge(consumers ...Consumer) *Mx {
	p := &Mx{
		errc:    make(chan error),
		msgc:    make(chan *proto.Message),
		stop:    make(chan struct{}),
		workers: len(consumers),
	}

	for _, consumer := range consumers {
		go func(c Consumer) {
			defer func() {
				p.mu.Lock()
				p.workers -= 1
				if p.workers == 0 && !p.closed {
					close(p.stop)
					p.closed = true
				}
				p.mu.Unlock()
			}()

			for {
				msg, err := c.Consume()
				if err != nil {
					if err == ErrNoData {
						return
					}
					select {
					case p.errc <- err:
					case <-p.stop:
						return
					}
				} else {
					select {
					case p.msgc <- msg:
					case <-p.stop:
						return
					}
				}
			}
		}(consumer)
	}

	return p
}

// Workers return number of active consumer workers that are pushing messages
// to multiplexer conumer queue.
func (p *Mx) Workers() int {
	p.mu.Lock()
	defer p.mu.Unlock()
	return p.workers
}

// Close is closing multiplexer and stopping all underlying workers.
//
// Closing multiplexer will stop all workers as soon as possible, but any
// consume-in-progress action performed by worker has to be finished first. Any
// consumption result received after closing multiplexer is ignored.
//
// Close is returning without waiting for all the workers to finish.
//
// Closing closed multiplexer has no effect.
func (p *Mx) Close() {
	p.mu.Lock()
	defer p.mu.Unlock()

	if !p.closed {
		p.closed = true
		close(p.stop)
	}
}

// Consume returns Consume result from any of the merged consumer.
func (p *Mx) Consume() (*proto.Message, error) {
	select {
	case <-p.stop:
		return nil, ErrMxClosed
	case msg := <-p.msgc:
		return msg, nil
	case err := <-p.errc:
		return nil, err
	}
}