1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
// Package liner handles reading lines from clients that may or may not require continuation.
package liner
import (
"bufio"
"io"
"regexp"
"strconv"
"sync"
"github.com/google/uuid"
)
// rxLiteral matches a line that ends in a literal length indicator.
var rxLiteral = regexp.MustCompile(`\{(\d+)\}\r\n$`)
type Liner struct {
br *bufio.Reader
mu sync.RWMutex
}
type Line struct {
Line []byte
Literals map[string][]byte
}
func New(r io.Reader) *Liner {
return &Liner{br: bufio.NewReader(r)}
}
// Reset resets the liner to read from a new reader.
func (l *Liner) Reset(r io.Reader) {
l.mu.Lock()
defer l.mu.Unlock()
l.br.Reset(r)
}
// Lines returns a channel that will receive lines as they are read.
func (l *Liner) Lines(doContinuation func() error) <-chan Line {
ch := make(chan Line)
go func() {
defer close(ch)
for {
line, lits, err := l.Read(doContinuation)
if err != nil {
return
}
ch <- Line{line, lits}
}
}()
return ch
}
// Read reads a full line, automatically reading again if the line was not complete.
// Each time an additional read is performed, doContinuation is called.
// If the callback returns an error, the operation is aborted.
func (l *Liner) Read(doContinuation func() error) ([]byte, map[string][]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()
line, err := l.br.ReadBytes('\n')
if err != nil {
return nil, nil, err
}
lits := make(map[string][]byte)
for {
length := shouldReadLiteral(line)
if length == 0 {
break
}
if err := doContinuation(); err != nil {
return nil, nil, err
}
uuid := uuid.New().String()
lits[uuid] = make([]byte, length)
if _, err := io.ReadFull(l.br, lits[uuid]); err != nil {
return nil, nil, err
}
line = append(line, uuid...)
rest, err := l.br.ReadBytes('\n')
if err != nil {
return nil, nil, err
}
line = append(line, rest...)
}
return line, lits, nil
}
func shouldReadLiteral(line []byte) int {
match := rxLiteral.FindSubmatch(line)
if match != nil {
length, err := strconv.Atoi(string(match[1]))
if err != nil {
panic("bad line")
}
return length
}
return 0
}
|