1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
|
// package multibuf implements buffer optimized for streaming large chunks of data,
// multiple reads and optional partial buffering to disk.
package multibuf
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
)
// MultiReader provides Read, Close, Seek and Size methods. In addition to that it supports WriterTo interface
// to provide efficient writing schemes, as functions like io.Copy use WriterTo when it's available.
type MultiReader interface {
io.Reader
io.Seeker
io.Closer
io.WriterTo
// Size calculates and returns the total size of the reader and not the length remaining.
Size() (int64, error)
}
// WriterOnce implements write once, read many times writer. Create a WriterOnce and write to it, once Reader() function has been
// called, the internal data is transferred to MultiReader and this instance of WriterOnce should be no longer used.
type WriterOnce interface {
// Write implements io.Writer
Write(p []byte) (int, error)
// Reader transfers all data written to this writer to MultiReader. If there was no data written it retuns an error
Reader() (MultiReader, error)
// WriterOnce owns the data before Reader has been called, so Close will close all the underlying files if Reader has not been called.
Close() error
}
// MaxBytes, ignored if set to value >=, if request exceeds the specified limit, the reader will return error,
// by default buffer is not limited, negative values mean no limit
func MaxBytes(m int64) optionSetter {
return func(o *options) error {
o.maxBytes = m
return nil
}
}
// MemBytes specifies the largest buffer to hold in RAM before writing to disk, default is 1MB
func MemBytes(m int64) optionSetter {
return func(o *options) error {
if m < 0 {
return fmt.Errorf("MemBytes should be >= 0")
}
o.memBytes = m
return nil
}
}
// NewWriterOnce returns io.ReadWrite compatible object that can limit the size of the buffer and persist large buffers to disk.
// WriterOnce implements write once, read many times writer. Create a WriterOnce and write to it, once Reader() function has been
// called, the internal data is transferred to MultiReader and this instance of WriterOnce should be no longer used.
// By default NewWriterOnce returns unbound buffer that will allow to write up to 1MB in RAM and will start buffering to disk
// It supports multiple functional optional arguments:
//
// // Buffer up to 1MB in RAM and limit max buffer size to 20MB
// multibuf.NewWriterOnce(r, multibuf.MemBytes(1024 * 1024), multibuf.MaxBytes(1024 * 1024 * 20))
//
//
func NewWriterOnce(setters ...optionSetter) (WriterOnce, error) {
o := options{
memBytes: DefaultMemBytes,
maxBytes: DefaultMaxBytes,
}
if o.memBytes == 0 {
o.memBytes = DefaultMemBytes
}
for _, s := range setters {
if err := s(&o); err != nil {
return nil, err
}
}
return &writerOnce{o: o}, nil
}
// New returns MultiReader that can limit the size of the buffer and persist large buffers to disk.
// By default New returns unbound buffer that will read up to 1MB in RAM and will start buffering to disk
// It supports multiple functional optional arguments:
//
// // Buffer up to 1MB in RAM and limit max buffer size to 20MB
// multibuf.New(r, multibuf.MemBytes(1024 * 1024), multibuf.MaxBytes(1024 * 1024 * 20))
//
//
func New(input io.Reader, setters ...optionSetter) (MultiReader, error) {
o := options{
memBytes: DefaultMemBytes,
maxBytes: DefaultMaxBytes,
}
for _, s := range setters {
if err := s(&o); err != nil {
return nil, err
}
}
if o.memBytes == 0 {
o.memBytes = DefaultMemBytes
}
if o.maxBytes > 0 && o.maxBytes < o.memBytes {
o.memBytes = o.maxBytes
}
memReader := &io.LimitedReader{
R: input, // Read from this reader
N: o.memBytes, // Maximum amount of data to read
}
readers := make([]io.ReadSeeker, 0, 2)
buffer, err := ioutil.ReadAll(memReader)
if err != nil {
return nil, err
}
readers = append(readers, bytes.NewReader(buffer))
var file *os.File
// This means that we have exceeded all the memory capacity and we will start buffering the body to disk.
totalBytes := int64(len(buffer))
if memReader.N <= 0 {
file, err = ioutil.TempFile("", tempFilePrefix)
if err != nil {
return nil, err
}
os.Remove(file.Name())
readSrc := input
if o.maxBytes > 0 {
readSrc = &maxReader{R: input, Max: o.maxBytes - o.memBytes}
}
writtenBytes, err := io.Copy(file, readSrc)
if err != nil {
return nil, err
}
totalBytes += writtenBytes
file.Seek(0, 0)
readers = append(readers, file)
}
var cleanupFn cleanupFunc
if file != nil {
cleanupFn = func() error {
file.Close()
return nil
}
}
return newBuf(totalBytes, cleanupFn, readers...), nil
}
// MaxSizeReachedError is returned when the maximum allowed buffer size is reached when reading
type MaxSizeReachedError struct {
MaxSize int64
}
func (e *MaxSizeReachedError) Error() string {
return fmt.Sprintf("Maximum size %d was reached", e)
}
const (
DefaultMemBytes = 1048576
DefaultMaxBytes = -1
// Equivalent of bytes.MinRead used in ioutil.ReadAll
DefaultBufferBytes = 512
)
// Constraints:
// - Implements io.Reader
// - Implements Seek(0, 0)
// - Designed for Write once, Read many times.
type multiReaderSeek struct {
length int64
readers []io.ReadSeeker
mr io.Reader
cleanup cleanupFunc
}
type cleanupFunc func() error
func newBuf(length int64, cleanup cleanupFunc, readers ...io.ReadSeeker) *multiReaderSeek {
converted := make([]io.Reader, len(readers))
for i, r := range readers {
// This conversion is safe as ReadSeeker includes Reader
converted[i] = r.(io.Reader)
}
return &multiReaderSeek{
length: length,
readers: readers,
mr: io.MultiReader(converted...),
cleanup: cleanup,
}
}
func (mr *multiReaderSeek) Close() (err error) {
if mr.cleanup != nil {
return mr.cleanup()
}
return nil
}
func (mr *multiReaderSeek) WriteTo(w io.Writer) (int64, error) {
b := make([]byte, DefaultBufferBytes)
var total int64
for {
n, err := mr.mr.Read(b)
// Recommended way is to always handle non 0 reads despite the errors
if n > 0 {
nw, errw := w.Write(b[:n])
total += int64(nw)
// Write must return a non-nil error if it returns nw < n
if nw != n || errw != nil {
return total, errw
}
}
if err != nil {
if err == io.EOF {
return total, nil
}
return total, err
}
}
}
func (mr *multiReaderSeek) Read(p []byte) (n int, err error) {
return mr.mr.Read(p)
}
func (mr *multiReaderSeek) Size() (int64, error) {
return mr.length, nil
}
func (mr *multiReaderSeek) Seek(offset int64, whence int) (int64, error) {
// TODO: implement other whence
// TODO: implement real offsets
if whence != 0 {
return 0, fmt.Errorf("multiReaderSeek: unsupported whence")
}
if offset != 0 {
return 0, fmt.Errorf("multiReaderSeek: unsupported offset")
}
for _, seeker := range mr.readers {
seeker.Seek(0, 0)
}
ior := make([]io.Reader, len(mr.readers))
for i, arg := range mr.readers {
ior[i] = arg.(io.Reader)
}
mr.mr = io.MultiReader(ior...)
return 0, nil
}
type options struct {
// MemBufferBytes sets up the size of the memory buffer for this request.
// If the data size exceeds the limit, the remaining request part will be saved on the file system.
memBytes int64
maxBytes int64
}
type optionSetter func(o *options) error
// MaxReader does not allow to read more than Max bytes and returns error if this limit has been exceeded.
type maxReader struct {
R io.Reader // underlying reader
N int64 // bytes read
Max int64 // max bytes to read
}
func (r *maxReader) Read(p []byte) (int, error) {
readBytes, err := r.R.Read(p)
if err != nil && err != io.EOF {
return readBytes, err
}
r.N += int64(readBytes)
if r.N > r.Max {
return readBytes, &MaxSizeReachedError{MaxSize: r.Max}
}
return readBytes, err
}
const (
writerInit = iota
writerMem
writerFile
writerCalledRead
writerErr
)
type writerOnce struct {
o options
err error
state int
mem *bytes.Buffer
file *os.File
total int64
cleanupFn cleanupFunc
}
// how many bytes we can still write to memory
func (w *writerOnce) writeToMem(p []byte) int {
left := w.o.memBytes - w.total
if left <= 0 {
return 0
}
bufLen := len(p)
if int64(bufLen) < left {
return bufLen
}
return int(left)
}
func (w *writerOnce) Write(p []byte) (int, error) {
out, err := w.write(p)
return out, err
}
func (w *writerOnce) Close() error {
if w.file != nil {
return w.file.Close()
}
return nil
}
func (w *writerOnce) write(p []byte) (int, error) {
if w.o.maxBytes > 0 && int64(len(p))+w.total > w.o.maxBytes {
return 0, fmt.Errorf("total size of %d exceeded allowed %d", int64(len(p))+w.total, w.o.maxBytes)
}
switch w.state {
case writerCalledRead:
return 0, fmt.Errorf("can not write after reader has been called")
case writerInit:
w.mem = &bytes.Buffer{}
w.state = writerMem
fallthrough
case writerMem:
writeToMem := w.writeToMem(p)
if writeToMem > 0 {
wrote, err := w.mem.Write(p[:writeToMem])
w.total += int64(wrote)
if err != nil {
return wrote, err
}
}
left := len(p) - writeToMem
if left <= 0 {
return len(p), nil
}
// we can't write to memory any more, switch to file
if err := w.initFile(); err != nil {
return int(writeToMem), err
}
w.state = writerFile
wrote, err := w.file.Write(p[writeToMem:])
w.total += int64(wrote)
return len(p), err
case writerFile:
wrote, err := w.file.Write(p)
w.total += int64(wrote)
return wrote, err
}
return 0, fmt.Errorf("unsupported state: %d", w.state)
}
func (w *writerOnce) initFile() error {
file, err := ioutil.TempFile("", tempFilePrefix)
if err != nil {
return err
}
w.file = file
w.cleanupFn = func() error {
file.Close()
os.Remove(file.Name())
return nil
}
return nil
}
func (w *writerOnce) Reader() (MultiReader, error) {
switch w.state {
case writerInit:
return nil, fmt.Errorf("no data ready")
case writerCalledRead:
return nil, fmt.Errorf("reader has been called")
case writerMem:
w.state = writerCalledRead
return newBuf(w.total, nil, bytes.NewReader(w.mem.Bytes())), nil
case writerFile:
_, err := w.file.Seek(0, 0)
if err != nil {
return nil, err
}
// we are not responsible for file and buffer any more
w.state = writerCalledRead
br, fr := bytes.NewReader(w.mem.Bytes()), w.file
w.file = nil
w.mem = nil
return newBuf(w.total, w.cleanupFn, br, fr), nil
}
return nil, fmt.Errorf("unsupported state: %d\n", w.state)
}
const tempFilePrefix = "temp-multibuf-"
|