File: memory_pipe.go

package info (click to toggle)
incus 6.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 24,392 kB
  • sloc: sh: 16,313; ansic: 3,121; python: 457; makefile: 337; ruby: 51; sql: 50; lisp: 6
file content (120 lines) | stat: -rw-r--r-- 3,319 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package memorypipe

import (
	"context"
	"io"
)

const bufferSize = 10

// msg represents an internal structure sent between the pipes.
type msg struct {
	data []byte
	err  error
}

// pipe provides a bidirectional pipe compatible with io.ReadWriteCloser interface.
// Note, however, that it does not behave exactly how one would expect an io.ReadWriteCloser to
// behave. Specifically the Close() function does not close the pipe, but instead delivers an io.EOF
// error to the next reader. After which it can be read again to receive new data. This means the
// pipe can be closed multiple times. Each time it indicates that one particular session has ended.
// The reason for this is to emulate the WebsocketIO's behaviour by allowing a single persistent
// connection to be used for multiple sessions.
type pipe struct {
	ch           chan msg
	ctx          context.Context
	otherEnd     *pipe
	msgRemainder []byte
}

// Read reads from the pipe into p. Returns number of bytes read and any errors.
func (p *pipe) Read(b []byte) (int, error) {
	if p.msgRemainder != nil {
		n := copy(b, p.msgRemainder)
		if len(p.msgRemainder) > n {
			tmpBuf := make([]byte, len(p.msgRemainder)-n)
			copy(tmpBuf, p.msgRemainder[n:])
			p.msgRemainder = tmpBuf
		} else {
			p.msgRemainder = nil
		}

		return n, nil
	}

	select {
	case msg := <-p.ch:
		if msg.err == io.EOF {
			return 0, msg.err
		}

		n := copy(b, msg.data)

		// Store the remainder of the message for next Read.
		if len(msg.data) > n {
			p.msgRemainder = make([]byte, len(msg.data)-n)
			copy(p.msgRemainder, msg.data[n:])
		} else {
			p.msgRemainder = nil
		}

		return n, msg.err
	case <-p.ctx.Done():
		return 0, p.ctx.Err()
	}
}

// Write writes to the pipe from p. Returns number of bytes written and any errors.
func (p *pipe) Write(b []byte) (int, error) {
	msg := msg{
		data: make([]byte, len(b)),
		err:  nil,
	}

	// Create copy of b in case it is modified externally.
	copy(msg.data, b)

	select {
	case p.otherEnd.ch <- msg: // Sent msg to the other side's Read function.
		return len(msg.data), msg.err
	case <-p.ctx.Done():
		return 0, p.ctx.Err()
	}
}

// Close is unusual in that it doesn't actually close the pipe. Instead it sends an io.EOF error
// to the other side's Read function. This is so the other side can detect that a session has ended.
// Each call to Close will indicate to the other side that a session has ended, whilst allowing the
// reuse of a single persistent pipe for multiple sessions.
func (p *pipe) Close() error {
	msg := msg{
		data: nil,
		err:  io.EOF, // Indicates to the other side's Read function that session has ended.
	}

	select {
	case p.otherEnd.ch <- msg: // Sent msg to the other side's Read function.
		return nil
	case <-p.ctx.Done():
		return p.ctx.Err()
	}
}

// NewPipePair returns a pair of io.ReadWriterCloser pipes that are connected together such that
// writes to one will appear as reads on the other and vice versa. Calling Close() on one end will
// indicate to the other end that the session has ended.
func NewPipePair(ctx context.Context) (io.ReadWriteCloser, io.ReadWriteCloser) {
	aEnd := &pipe{
		ch:  make(chan msg, bufferSize),
		ctx: ctx,
	}

	bEnd := &pipe{
		ch:  make(chan msg, bufferSize),
		ctx: ctx,
	}

	aEnd.otherEnd = bEnd
	bEnd.otherEnd = aEnd
	return aEnd, bEnd
}