1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
|
package input
import (
"container/list"
"fmt"
"io"
"strings"
"encoding/json"
"github.com/johnkerl/miller/v6/pkg/cli"
"github.com/johnkerl/miller/v6/pkg/lib"
"github.com/johnkerl/miller/v6/pkg/mlrval"
"github.com/johnkerl/miller/v6/pkg/types"
)
type RecordReaderJSON struct {
readerOptions *cli.TReaderOptions
recordsPerBatch int64 // distinct from readerOptions.RecordsPerBatch for join/repl
// XXX 1513
sawBrackets bool
}
func NewRecordReaderJSON(
readerOptions *cli.TReaderOptions,
recordsPerBatch int64,
) (*RecordReaderJSON, error) {
return &RecordReaderJSON{
readerOptions: readerOptions,
recordsPerBatch: recordsPerBatch,
}, nil
}
func (reader *RecordReaderJSON) Read(
filenames []string,
context types.Context,
readerChannel chan<- *list.List, // list of *types.RecordAndContext
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
if filenames != nil { // nil for mlr -n
if len(filenames) == 0 { // read from stdin
handle, err := lib.OpenStdin(
reader.readerOptions.Prepipe,
reader.readerOptions.PrepipeIsRaw,
reader.readerOptions.FileInputEncoding,
)
if err != nil {
errorChannel <- err
} else {
reader.processHandle(handle, "(stdin)", &context, readerChannel, errorChannel, downstreamDoneChannel)
}
} else {
for _, filename := range filenames {
handle, err := lib.OpenFileForRead(
filename,
reader.readerOptions.Prepipe,
reader.readerOptions.PrepipeIsRaw,
reader.readerOptions.FileInputEncoding,
)
if err != nil {
errorChannel <- err
} else {
reader.processHandle(handle, filename, &context, readerChannel, errorChannel, downstreamDoneChannel)
handle.Close()
}
}
}
}
context.JSONHadBrackets = reader.sawBrackets
readerChannel <- types.NewEndOfStreamMarkerList(&context)
}
func (reader *RecordReaderJSON) processHandle(
handle io.Reader,
filename string,
context *types.Context,
readerChannel chan<- *list.List, // list of *types.RecordAndContext
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
context.UpdateForStartOfFile(filename)
// TODO: comment
recordsPerBatch := reader.recordsPerBatch
if reader.readerOptions.CommentHandling != cli.CommentsAreData {
handle = NewJSONCommentEnabledReader(handle, reader.readerOptions, readerChannel)
}
decoder := json.NewDecoder(handle)
recordsAndContexts := list.New()
eof := false
i := int64(0)
for {
// See if downstream processors will be ignoring further data (e.g. mlr
// head). If so, stop reading. This makes 'mlr head hugefile' exit
// quickly, as it should. Do this channel-check every so often to avoid
// scheduler overhead.
i++
if i%recordsPerBatch == 0 {
select {
case <-downstreamDoneChannel:
eof = true
break
default:
break
}
if eof {
break
}
}
mlrval, eof, err := mlrval.MlrvalDecodeFromJSON(decoder)
if eof {
break
}
if err != nil {
errorChannel <- err
return
}
// Find out what we got.
// * Map is an input record: deliver it.
// * Array is OK if it's array of input record: deliver them.
// * Non-collection types are valid but unmillerable JSON.
if mlrval.IsMap() {
// TODO: make a helper method
record := mlrval.GetMap()
if record == nil {
errorChannel <- fmt.Errorf("internal coding error detected in JSON record-reader")
return
}
context.UpdateForInputRecord()
recordsAndContexts.PushBack(types.NewRecordAndContext(record, context))
if int64(recordsAndContexts.Len()) >= recordsPerBatch {
readerChannel <- recordsAndContexts
recordsAndContexts = list.New()
}
} else if mlrval.IsArray() {
reader.sawBrackets = true
records := mlrval.GetArray()
if records == nil {
errorChannel <- fmt.Errorf("internal coding error detected in JSON record-reader")
return
}
for _, mlrval := range records {
if !mlrval.IsMap() {
// TODO: more context
errorChannel <- fmt.Errorf(
"valid but unmillerable JSON. Expected map (JSON object); got %s",
mlrval.GetTypeName(),
)
return
}
record := mlrval.GetMap()
if record == nil {
errorChannel <- fmt.Errorf("internal coding error detected in JSON record-reader")
return
}
context.UpdateForInputRecord()
recordsAndContexts.PushBack(types.NewRecordAndContext(record, context))
if int64(recordsAndContexts.Len()) >= recordsPerBatch {
readerChannel <- recordsAndContexts
recordsAndContexts = list.New()
}
}
} else {
errorChannel <- fmt.Errorf(
"valid but unmillerable JSON. Expected map (JSON object); got %s",
mlrval.GetTypeName(),
)
return
}
}
if recordsAndContexts.Len() > 0 {
readerChannel <- recordsAndContexts
}
}
// ================================================================
// JSON comment-stripping
//
// Miller lets users (on an opt-in basis) have comments in their data files,
// for all formats including JSON. Comments are only honored at start of line.
// Users can have them be printed to stdout straight away, or simply discarded.
//
// For most file formats Miller is doing line-based I/O and can deal with
// comment lines easily and simply. But for JSON, the Go library needs an
// io.Reader object which we implement here.
//
// This could be done by peeking into the return value from the underlying
// io.Reader, detecting comment-line starts and line-endings within the byte
// array that io.Reader deals with. That's an appealing plan of action, but it
// gets messy if the comment-string is multi-character since then a comment
// string could be split between successive calls to Read() on the underlying
// handle.
//
// Instead we use a line-oriented scanner to do line-splitting for us.
// JSONCommentEnabledReader implements io.Reader to strip comment lines
// off of CSV data.
type JSONCommentEnabledReader struct {
lineReader ILineReader
readerOptions *cli.TReaderOptions
context *types.Context // Needed for channelized stdout-printing logic
readerChannel chan<- *list.List // list of *types.RecordAndContext
// In case a line was ingested which was longer than the read-buffer passed
// to us, in which case we need to split up that line and return it over
// the course of two or more calls.
lineBytes []byte
}
func NewJSONCommentEnabledReader(
underlying io.Reader,
readerOptions *cli.TReaderOptions,
readerChannel chan<- *list.List, // list of *types.RecordAndContext
) *JSONCommentEnabledReader {
return &JSONCommentEnabledReader{
lineReader: NewLineReader(underlying, "\n"),
readerOptions: readerOptions,
context: types.NewNilContext(),
readerChannel: readerChannel,
lineBytes: nil,
}
}
func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) {
if bsr.lineBytes != nil {
return bsr.populateFromLine(p), nil
}
done := false
// Loop until we can get a non-comment line to pass on, or end of file.
for !done {
// EOF
line, err := bsr.lineReader.Read()
if err != nil {
return 0, err
}
// Non-comment line
if !strings.HasPrefix(line, bsr.readerOptions.CommentString) {
bsr.lineBytes = []byte(line)
return bsr.populateFromLine(p), nil
}
// Comment line
if bsr.readerOptions.CommentHandling == cli.PassComments {
// Insert the string into the record-output stream, so that goroutine can
// print it, resulting in deterministic output-ordering.
ell := list.New()
ell.PushBack(types.NewOutputString(line+"\n", bsr.context))
bsr.readerChannel <- ell
}
if done {
break
}
}
return 0, nil
}
// populateFromLine is a helper for Read. It takes a full line from the
// bufio.Scanner, and writes as much as it can to the caller's p-buffer. If
// the entirety is written, the line is marked as done so a subsequent call to
// Read will retrieve the next line from the input file. Otherwise, as much as
// possible is transferred, and the rest is marked for transfer on a subsequent
// call.
func (bsr *JSONCommentEnabledReader) populateFromLine(p []byte) int {
numBytesWritten := 0
if len(bsr.lineBytes) < len(p) {
copy(p, bsr.lineBytes)
numBytesWritten = len(bsr.lineBytes)
bsr.lineBytes = nil
} else {
for i := 0; i < len(p); i++ {
p[i] = bsr.lineBytes[i]
}
numBytesWritten = len(p)
bsr.lineBytes = bsr.lineBytes[len(p):]
}
return numBytesWritten
}
|