1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
|
// This file contains the interface for file-format-specific record-readers, as
// well as a collection of utility functions.
package input
import (
"bufio"
"container/list"
"io"
"strings"
"github.com/johnkerl/miller/v6/pkg/lib"
)
type ILineReader interface {
// Read returns the string without the final newline (or whatever terminator).
// The error condition io.EOF as non-error "error" case.
// EOF is always returned with empty line: the code here is structured so that
// we do not return a non-empty line along with an EOF indicator.
Read() (string, error)
}
type DefaultLineReader struct {
underlying *bufio.Reader
eof bool
}
// SingleIRSLineReader handles reading lines with a single-character terminator.
type SingleIRSLineReader struct {
underlying *bufio.Reader
end_irs byte
eof bool
}
// MultiIRSLineReader handles reading lines which may be delimited by multi-line separators, e.g.
// "\xe2\x90\x9e" for USV.
type MultiIRSLineReader struct {
underlying *bufio.Reader
irs string
irs_len int
end_irs byte
eof bool
}
func NewLineReader(handle io.Reader, irs string) ILineReader {
underlying := bufio.NewReader(handle)
irs_len := len(irs)
// Not worth complicating the API by adding an error return.
// Empty IRS is checked elsewhere.
if irs_len < 1 {
panic("Empty IRS")
} else if irs == "\n" || irs == "\r\n" {
return &DefaultLineReader{
underlying: underlying,
}
} else if irs_len == 1 {
return &SingleIRSLineReader{
underlying: underlying,
end_irs: irs[0],
}
} else {
return &MultiIRSLineReader{
underlying: underlying,
irs: irs,
irs_len: irs_len,
end_irs: irs[irs_len-1],
}
}
}
func (r *DefaultLineReader) Read() (string, error) {
if r.eof {
return "", io.EOF
}
line, err := r.underlying.ReadString('\n')
// If we have EOF and a non-empty line, defer the EOF return to the next Read call.
if len(line) > 0 && lib.IsEOF(err) {
r.eof = true
err = nil
}
n := len(line)
if strings.HasSuffix(line, "\r\n") {
line = line[:n-2]
} else if strings.HasSuffix(line, "\n") {
line = line[:n-1]
}
return line, err
}
func (r *SingleIRSLineReader) Read() (string, error) {
if r.eof {
return "", io.EOF
}
line, err := r.underlying.ReadString(r.end_irs)
// If we have EOF and a non-empty line, defer the EOF return to the next Read call.
if len(line) > 0 && lib.IsEOF(err) {
r.eof = true
err = nil
}
n := len(line)
if n > 0 && line[n-1] == r.end_irs {
line = line[:n-1]
}
return line, err
}
func (r *MultiIRSLineReader) Read() (string, error) {
// bufio.Reader.ReadString supports only a single-character terminator. So we read lines ending
// in the final character, until we get a line that ends in the entire sequence or EOF.
//
// Note that bufio.Scanner has a very nice bufio.Scanner.Split method which can be overridden to
// support custom line-ending logic. Sadly, though, bufio.Scanner _only_ supports a fixed
// maximum line length, and misbehaves badly when presented with longer lines. So we cannot use
// bufio.Scanner. See also https://github.com/johnkerl/miller/issues/1501.
if r.eof {
return "", io.EOF
}
line := ""
for {
piece, err := r.underlying.ReadString(r.end_irs)
// If we have EOF and a non-empty line, defer the EOF return to the next Read call.
if len(piece) > 0 && lib.IsEOF(err) {
r.eof = true
err = nil
}
if err != nil {
return line, err // includes io.EOF as a non-error "error" case
}
if strings.HasSuffix(piece, r.irs) {
piece = piece[:len(piece)-r.irs_len]
line += piece
break
}
if r.eof {
line += piece
break
}
}
return line, nil
}
// channelizedLineReader puts the line reading/splitting into its own goroutine in order to pipeline
// the I/O with regard to further processing. Used by record-readers for multiple file formats.
//
// Lines are written to the channel with their trailing newline (or whatever
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
func channelizedLineReader(
lineReader ILineReader,
linesChannel chan<- *list.List,
downstreamDoneChannel <-chan bool, // for mlr head
recordsPerBatch int64,
) {
i := int64(0)
done := false
lines := list.New()
for {
line, err := lineReader.Read()
if err != nil {
if lib.IsEOF(err) {
done = true
break
} else {
break
}
}
i++
lines.PushBack(line)
// See if downstream processors will be ignoring further data (e.g. mlr
// head). If so, stop reading. This makes 'mlr head hugefile' exit
// quickly, as it should.
if i%recordsPerBatch == 0 {
select {
case <-downstreamDoneChannel:
done = true
break
default:
break
}
if done {
break
}
linesChannel <- lines
lines = list.New()
}
if done {
break
}
}
linesChannel <- lines
close(linesChannel) // end-of-stream marker
}
|