1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
package memguard
import (
"container/list"
"io"
"os"
"sync"
"github.com/awnumar/memguard/core"
)
var (
// StreamChunkSize is the maximum amount of data that is locked into memory at a time.
// If you get error allocating memory, increase your system's mlock limits.
// Use 'ulimit -l' to see mlock limit on unix systems.
StreamChunkSize = c
c = os.Getpagesize() * 4
)
type queue struct {
*list.List
}
// add data to back of queue
func (q *queue) join(e *Enclave) {
q.PushBack(e)
}
// add data to front of queue
func (q *queue) push(e *Enclave) {
q.PushFront(e)
}
// pop data off front of queue
// returns nil if queue is empty
func (q *queue) pop() *Enclave {
e := q.Front() // get element at front of queue
if e == nil {
return nil // no data
}
q.Remove(e) // success => remove value
return e.Value.(*Enclave) // unwrap and return (potential panic)
}
/*
Stream is an in-memory encrypted container implementing the reader and writer interfaces.
It is most useful when you need to store lots of data in memory and are able to work on it in chunks.
*/
type Stream struct {
sync.Mutex
*queue
}
// NewStream initialises a new empty Stream object.
func NewStream() *Stream {
return &Stream{queue: &queue{List: list.New()}}
}
/*
Write encrypts and writes some given data to a Stream object.
The data is broken down into chunks and added to the stream in order. The last thing to be written to the stream is the last thing that will be read back.
*/
func (s *Stream) Write(data []byte) (int, error) {
s.Lock()
defer s.Unlock()
for i := 0; i < len(data); i += c {
if i+c > len(data) {
s.join(NewEnclave(data[len(data)-(len(data)%c):]))
} else {
s.join(NewEnclave(data[i : i+c]))
}
}
return len(data), nil
}
/*
Read decrypts and places some data from a Stream object into a provided buffer.
If there is no data, the call will return an io.EOF error. If the caller provides a buffer
that is too small to hold the next chunk of data, the remaining bytes are re-encrypted and
added to the front of the queue to be returned in the next call.
To be performant, have
*/
func (s *Stream) Read(buf []byte) (int, error) {
s.Lock()
defer s.Unlock()
// Grab the next chunk of data from the stream.
b, err := s.next()
if err != nil {
return 0, err
}
defer b.Destroy()
// Copy the contents into the given buffer.
core.Copy(buf, b.Bytes())
// Check if there is data left over.
if len(buf) < b.Size() {
// Re-encrypt it and push onto the front of the list.
c := NewBuffer(b.Size() - len(buf))
c.Copy(b.Bytes()[len(buf):])
s.push(c.Seal())
return len(buf), nil
}
// Not enough data or perfect amount of data.
// Either way we copied the entire buffer.
return b.Size(), nil
}
// Size returns the number of bytes of data currently stored within a Stream object.
func (s *Stream) Size() int {
s.Lock()
defer s.Unlock()
var n int
for e := s.Front(); e != nil; e = e.Next() {
n += e.Value.(*Enclave).Size()
}
return n
}
// Next grabs the next chunk of data from the Stream and returns it decrypted inside a LockedBuffer. Any error from the stream is forwarded.
func (s *Stream) Next() (*LockedBuffer, error) {
s.Lock()
defer s.Unlock()
return s.next()
}
// does not acquire mutex lock
func (s *Stream) next() (*LockedBuffer, error) {
// Pop data from the front of the list.
e := s.pop()
if e == nil {
return newNullBuffer(), io.EOF
}
// Decrypt the data into a guarded allocation.
b, err := e.Open()
if err != nil {
return newNullBuffer(), err
}
return b, nil
}
// Flush reads all of the data from a Stream and returns it inside a LockedBuffer. If an error is encountered before all the data could be read, it is returned along with any data read up until that point.
func (s *Stream) Flush() (*LockedBuffer, error) {
return NewBufferFromEntireReader(s)
}
|