1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
package recordio
import (
"bufio"
"bytes"
"io"
logger "github.com/mesos/mesos-go/api/v1/lib/debug"
"github.com/mesos/mesos-go/api/v1/lib/encoding/framing"
)
const debug = logger.Logger(false)
type (
Opt func(*reader)
reader struct {
*bufio.Scanner
pend int
splitf func(data []byte, atEOF bool) (int, []byte, error)
maxf int // max frame size
}
)
// NewReader returns a reader that parses frames from a recordio stream.
func NewReader(read io.Reader, opt ...Opt) framing.Reader {
debug.Log("new frame reader")
r := &reader{Scanner: bufio.NewScanner(read)}
r.Split(func(data []byte, atEOF bool) (int, []byte, error) {
// Scanner panics if we invoke Split after scanning has started,
// use this proxy func as a work-around.
return r.splitf(data, atEOF)
})
buf := make([]byte, 16*1024)
r.Buffer(buf, 1<<22) // 1<<22 == max protobuf size
r.splitf = r.splitSize
// apply options
for _, f := range opt {
if f != nil {
f(r)
}
}
return r
}
// MaxMessageSize returns a functional option that configures the internal Scanner's buffer and max token (message)
// length, in bytes.
func MaxMessageSize(max int) Opt {
return func(r *reader) {
buf := make([]byte, max>>1)
r.Buffer(buf, max)
r.maxf = max
}
}
func (r *reader) splitSize(data []byte, atEOF bool) (int, []byte, error) {
const maxTokenLength = 20 // textual length of largest uint64 number
if atEOF {
x := len(data)
switch {
case x == 0:
debug.Log("EOF and empty frame, returning io.EOF")
return 0, nil, io.EOF
case x < 2: // min frame size
debug.Log("remaining data less than min total frame length")
return 0, nil, framing.ErrorUnderrun
}
// otherwise, we may have a valid frame...
}
debug.Log("len(data)=", len(data))
adv := 0
for {
i := 0
for ; i < maxTokenLength && i < len(data) && data[i] != '\n'; i++ {
}
debug.Log("i=", i)
if i == len(data) {
debug.Log("need more input")
return 0, nil, nil // need more input
}
if i == maxTokenLength && data[i] != '\n' {
debug.Log("frame size: max token length exceeded")
return 0, nil, framing.ErrorBadSize
}
n, err := ParseUintBytes(bytes.TrimSpace(data[:i]), 10, 64)
if err != nil {
debug.Log("failed to parse frame size field:", err)
return 0, nil, framing.ErrorBadSize
}
if r.maxf != 0 && int(n) > r.maxf {
debug.Log("frame size max length exceeded:", n)
return 0, nil, framing.ErrorOversizedFrame
}
if n == 0 {
// special case... don't invoke splitData, just parse the next size header
adv += i + 1
data = data[i+1:]
continue
}
r.pend = int(n)
r.splitf = r.splitFrame
debug.Logf("split next frame: %d, %d", n, adv+i+1)
return adv + i + 1, data[:0], nil // returning a nil token screws up the Scanner, so return empty
}
}
func (r *reader) splitFrame(data []byte, atEOF bool) (advance int, token []byte, err error) {
x := len(data)
debug.Log("splitFrame:x=", x, ",eof=", atEOF)
if atEOF {
if x < r.pend {
return 0, nil, framing.ErrorUnderrun
}
}
if r.pend == 0 {
panic("asked to read frame data, but no data left in frame")
}
if x < int(r.pend) {
// need more data
return 0, nil, nil
}
r.splitf = r.splitSize
adv := int(r.pend)
r.pend = 0
return adv, data[:adv], nil
}
// ReadFrame implements framing.Reader
func (r *reader) ReadFrame() (tok []byte, err error) {
for r.Scan() {
b := r.Bytes()
if len(b) == 0 {
continue
}
tok = b
debug.Log("len(tok)", len(tok))
break
}
// either scan failed, or it succeeded and we have a token...
err = r.Err()
if err == nil && len(tok) == 0 {
err = io.EOF
}
return
}
|