File: channels.go

package info (click to toggle)
golang-gopkg-eapache-channels.v1 1.1.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 160 kB
  • sloc: makefile: 2
file content (277 lines) | stat: -rw-r--r-- 10,022 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
/*
Package channels provides a collection of helper functions, interfaces and implementations for
working with and extending the capabilities of golang's existing channels. The main interface of
interest is Channel, though sub-interfaces are also provided for cases where the full Channel interface
cannot be met (for example, InChannel for write-only channels).

For integration with native typed golang channels, functions Wrap and Unwrap are provided which do the
appropriate type conversions. The NativeChannel, NativeInChannel and NativeOutChannel type definitions
are also provided for use with native channels which already carry values of type interface{}.

The heart of the package consists of several distinct implementations of the Channel interface, including
channels backed by special buffers (resizable, infinite, ring buffers, etc) and other useful types. A
"black hole" channel for discarding unwanted values (similar in purpose to ioutil.Discard or /dev/null)
rounds out the set.

Helper functions for operating on Channels include Pipe and Tee (which behave much like their Unix
namesakes), as well as Multiplex and Distribute. "Weak" versions of these functions also exist, which
do not close their output channel(s) on completion.

Due to limitations of Go's type system, importing this library directly is often not practical for
production code. It serves equally well, however, as a reference guide and template for implementing
many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit
in the resulting code.

Warning: several types in this package provide so-called "infinite" buffers. Be *very* careful using
these, as no buffer is truly infinite - if such a buffer grows too large your program will run out of
memory and crash. Caveat emptor.
*/
package channels

import "reflect"

// BufferCap represents the capacity of the buffer backing a channel. Valid values consist of all
// positive integers, as well as the special values below.
type BufferCap int

const (
	// None is the capacity for channels that have no buffer at all.
	None BufferCap = 0
	// Infinity is the capacity for channels with no limit on their buffer size.
	Infinity BufferCap = -1
)

// Buffer is an interface for any channel that provides access to query the state of its buffer.
// Even unbuffered channels can implement this interface by simply returning 0 from Len() and None from Cap().
type Buffer interface {
	Len() int       // The number of elements currently buffered.
	Cap() BufferCap // The maximum number of elements that can be buffered.
}

// SimpleInChannel is an interface representing a writeable channel that does not necessarily
// implement the Buffer interface.
type SimpleInChannel interface {
	In() chan<- interface{} // The writeable end of the channel.
	Close()                 // Closes the channel. It is an error to write to In() after calling Close().
}

// InChannel is an interface representing a writeable channel with a buffer.
type InChannel interface {
	SimpleInChannel
	Buffer
}

// SimpleOutChannel is an interface representing a readable channel that does not necessarily
// implement the Buffer interface.
type SimpleOutChannel interface {
	Out() <-chan interface{} // The readable end of the channel.
}

// OutChannel is an interface representing a readable channel implementing the Buffer interface.
type OutChannel interface {
	SimpleOutChannel
	Buffer
}

// SimpleChannel is an interface representing a channel that is both readable and writeable,
// but does not necessarily implement the Buffer interface.
type SimpleChannel interface {
	SimpleInChannel
	SimpleOutChannel
}

// Channel is an interface representing a channel that is readable, writeable and implements
// the Buffer interface
type Channel interface {
	SimpleChannel
	Buffer
}

func pipe(input SimpleOutChannel, output SimpleInChannel, closeWhenDone bool) {
	for elem := range input.Out() {
		output.In() <- elem
	}
	if closeWhenDone {
		output.Close()
	}
}

func multiplex(output SimpleInChannel, inputs []SimpleOutChannel, closeWhenDone bool) {
	inputCount := len(inputs)
	cases := make([]reflect.SelectCase, inputCount)
	for i := range cases {
		cases[i].Dir = reflect.SelectRecv
		cases[i].Chan = reflect.ValueOf(inputs[i].Out())
	}
	for inputCount > 0 {
		chosen, recv, recvOK := reflect.Select(cases)
		if recvOK {
			output.In() <- recv.Interface()
		} else {
			cases[chosen].Chan = reflect.ValueOf(nil)
			inputCount--
		}
	}
	if closeWhenDone {
		output.Close()
	}
}

func tee(input SimpleOutChannel, outputs []SimpleInChannel, closeWhenDone bool) {
	cases := make([]reflect.SelectCase, len(outputs))
	for i := range cases {
		cases[i].Dir = reflect.SelectSend
	}
	for elem := range input.Out() {
		for i := range cases {
			cases[i].Chan = reflect.ValueOf(outputs[i].In())
			cases[i].Send = reflect.ValueOf(elem)
		}
		for _ = range cases {
			chosen, _, _ := reflect.Select(cases)
			cases[chosen].Chan = reflect.ValueOf(nil)
		}
	}
	if closeWhenDone {
		for i := range outputs {
			outputs[i].Close()
		}
	}
}

func distribute(input SimpleOutChannel, outputs []SimpleInChannel, closeWhenDone bool) {
	cases := make([]reflect.SelectCase, len(outputs))
	for i := range cases {
		cases[i].Dir = reflect.SelectSend
		cases[i].Chan = reflect.ValueOf(outputs[i].In())
	}
	for elem := range input.Out() {
		for i := range cases {
			cases[i].Send = reflect.ValueOf(elem)
		}
		reflect.Select(cases)
	}
	if closeWhenDone {
		for i := range outputs {
			outputs[i].Close()
		}
	}
}

// Pipe connects the input channel to the output channel so that
// they behave as if a single channel.
func Pipe(input SimpleOutChannel, output SimpleInChannel) {
	go pipe(input, output, true)
}

// Multiplex takes an arbitrary number of input channels and multiplexes their output into a single output
// channel. When all input channels have been closed, the output channel is closed. Multiplex with a single
// input channel is equivalent to Pipe (though slightly less efficient).
func Multiplex(output SimpleInChannel, inputs ...SimpleOutChannel) {
	if len(inputs) == 0 {
		panic("channels: Multiplex requires at least one input")
	}
	go multiplex(output, inputs, true)
}

// Tee (like its Unix namesake) takes a single input channel and an arbitrary number of output channels
// and duplicates each input into every output. When the input channel is closed, all outputs channels are closed.
// Tee with a single output channel is equivalent to Pipe (though slightly less efficient).
func Tee(input SimpleOutChannel, outputs ...SimpleInChannel) {
	if len(outputs) == 0 {
		panic("channels: Tee requires at least one output")
	}
	go tee(input, outputs, true)
}

// Distribute takes a single input channel and an arbitrary number of output channels and duplicates each input
// into *one* available output. If multiple outputs are waiting for a value, one is chosen at random. When the
// input channel is closed, all outputs channels are closed. Distribute with a single output channel is
// equivalent to Pipe (though slightly less efficient).
func Distribute(input SimpleOutChannel, outputs ...SimpleInChannel) {
	if len(outputs) == 0 {
		panic("channels: Distribute requires at least one output")
	}
	go distribute(input, outputs, true)
}

// WeakPipe behaves like Pipe (connecting the two channels) except that it does not close
// the output channel when the input channel is closed.
func WeakPipe(input SimpleOutChannel, output SimpleInChannel) {
	go pipe(input, output, false)
}

// WeakMultiplex behaves like Multiplex (multiplexing multiple inputs into a single output) except that it does not close
// the output channel when the input channels are closed.
func WeakMultiplex(output SimpleInChannel, inputs ...SimpleOutChannel) {
	if len(inputs) == 0 {
		panic("channels: WeakMultiplex requires at least one input")
	}
	go multiplex(output, inputs, false)
}

// WeakTee behaves like Tee (duplicating a single input into multiple outputs) except that it does not close
// the output channels when the input channel is closed.
func WeakTee(input SimpleOutChannel, outputs ...SimpleInChannel) {
	if len(outputs) == 0 {
		panic("channels: WeakTee requires at least one output")
	}
	go tee(input, outputs, false)
}

// WeakDistribute behaves like Distribute (distributing a single input amongst multiple outputs) except that
// it does not close the output channels when the input channel is closed.
func WeakDistribute(input SimpleOutChannel, outputs ...SimpleInChannel) {
	if len(outputs) == 0 {
		panic("channels: WeakDistribute requires at least one output")
	}
	go distribute(input, outputs, false)
}

// Wrap takes any readable channel type (chan or <-chan but not chan<-) and
// exposes it as a SimpleOutChannel for easy integration with existing channel sources.
// It panics if the input is not a readable channel.
func Wrap(ch interface{}) SimpleOutChannel {
	t := reflect.TypeOf(ch)
	if t.Kind() != reflect.Chan || t.ChanDir()&reflect.RecvDir == 0 {
		panic("channels: input to Wrap must be readable channel")
	}
	realChan := make(chan interface{})

	go func() {
		v := reflect.ValueOf(ch)
		for {
			x, ok := v.Recv()
			if !ok {
				close(realChan)
				return
			}
			realChan <- x.Interface()
		}
	}()

	return NativeOutChannel(realChan)
}

// Unwrap takes a SimpleOutChannel and uses reflection to pipe it to a typed native channel for
// easy integration with existing channel sources. Output can be any writable channel type (chan or chan<-).
// It panics if the output is not a writable channel, or if a value is received that cannot be sent on the
// output channel.
func Unwrap(input SimpleOutChannel, output interface{}) {
	t := reflect.TypeOf(output)
	if t.Kind() != reflect.Chan || t.ChanDir()&reflect.SendDir == 0 {
		panic("channels: input to Unwrap must be readable channel")
	}

	go func() {
		v := reflect.ValueOf(output)
		for {
			x, ok := <-input.Out()
			if !ok {
				v.Close()
				return
			}
			v.Send(reflect.ValueOf(x))
		}
	}()
}