File: resizable_channel.go

package info (click to toggle)
golang-gopkg-eapache-channels.v1 1.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-backports, experimental, forky, sid, trixie
  • size: 164 kB
  • sloc: makefile: 2
file content (109 lines) | stat: -rw-r--r-- 2,483 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package channels

import "github.com/eapache/queue"

// ResizableChannel implements the Channel interface with a resizable buffer between the input and the output.
// The channel initially has a buffer size of 1, but can be resized by calling Resize().
//
// Resizing to a buffer capacity of None is, unfortunately, not supported and will panic
// (see https://github.com/eapache/channels/issues/1).
// Resizing back and forth between a finite and infinite buffer is fully supported.
type ResizableChannel struct {
	input, output    chan interface{}
	length           chan int
	capacity, resize chan BufferCap
	size             BufferCap
	buffer           *queue.Queue
}

func NewResizableChannel() *ResizableChannel {
	ch := &ResizableChannel{
		input:    make(chan interface{}),
		output:   make(chan interface{}),
		length:   make(chan int),
		capacity: make(chan BufferCap),
		resize:   make(chan BufferCap),
		size:     1,
		buffer:   queue.New(),
	}
	go ch.magicBuffer()
	return ch
}

func (ch *ResizableChannel) In() chan<- interface{} {
	return ch.input
}

func (ch *ResizableChannel) Out() <-chan interface{} {
	return ch.output
}

func (ch *ResizableChannel) Len() int {
	return <-ch.length
}

func (ch *ResizableChannel) Cap() BufferCap {
	val, open := <-ch.capacity
	if open {
		return val
	} else {
		return ch.size
	}
}

func (ch *ResizableChannel) Close() {
	close(ch.input)
}

func (ch *ResizableChannel) Resize(newSize BufferCap) {
	if newSize == None {
		panic("channels: ResizableChannel does not support unbuffered behaviour")
	}
	if newSize < 0 && newSize != Infinity {
		panic("channels: invalid negative size trying to resize channel")
	}
	ch.resize <- newSize
}

func (ch *ResizableChannel) magicBuffer() {
	var input, output, nextInput chan interface{}
	var next interface{}
	nextInput = ch.input
	input = nextInput

	for input != nil || output != nil {
		select {
		case elem, open := <-input:
			if open {
				ch.buffer.Add(elem)
			} else {
				input = nil
				nextInput = nil
			}
		case output <- next:
			ch.buffer.Remove()
		case ch.size = <-ch.resize:
		case ch.length <- ch.buffer.Length():
		case ch.capacity <- ch.size:
		}

		if ch.buffer.Length() == 0 {
			output = nil
			next = nil
		} else {
			output = ch.output
			next = ch.buffer.Peek()
		}

		if ch.size != Infinity && ch.buffer.Length() >= int(ch.size) {
			input = nil
		} else {
			input = nextInput
		}
	}

	close(ch.output)
	close(ch.resize)
	close(ch.length)
	close(ch.capacity)
}