1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package array
import (
"errors"
"fmt"
"io"
"sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/json"
)
type Option func(config)
type config interface{}
// WithChunk sets the chunk size for reading in json records. The default is to
// read in one row per record batch as a single object. If chunk size is set to
// a negative value, then the entire file is read as a single record batch.
// Otherwise a record batch is read in with chunk size rows per record batch until
// it reaches EOF.
func WithChunk(n int) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *JSONReader:
cfg.chunk = n
default:
panic(fmt.Errorf("arrow/json): unknown config type %T", cfg))
}
}
}
// WithAllocator specifies the allocator to use for creating the record batches,
// if it is not called, then memory.DefaultAllocator will be used.
func WithAllocator(mem memory.Allocator) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *JSONReader:
cfg.mem = mem
default:
panic(fmt.Errorf("arrow/json): unknown config type %T", cfg))
}
}
}
// JSONReader is a json reader that meets the RecordReader interface definition.
//
// To read in an array of objects as a record, you can use RecordFromJSON
// which is equivalent to reading the json as a struct array whose fields are
// the columns of the record. This primarily exists to fit the RecordReader
// interface as a matching reader for the csv reader.
type JSONReader struct {
r *json.Decoder
schema *arrow.Schema
bldr *RecordBuilder
refs int64
cur arrow.Record
err error
chunk int
done bool
mem memory.Allocator
next func() bool
}
// NewJSONReader returns a json RecordReader which expects to find one json object
// per row of dataset. Using WithChunk can control how many rows are processed
// per record, which is how many objects become a single record from the file.
//
// If it is desired to write out an array of rows, then simply use RecordToStructArray
// and json.Marshal the struct array for the same effect.
func NewJSONReader(r io.Reader, schema *arrow.Schema, opts ...Option) *JSONReader {
rr := &JSONReader{
r: json.NewDecoder(r),
schema: schema,
refs: 1,
chunk: 1,
}
for _, o := range opts {
o(rr)
}
if rr.mem == nil {
rr.mem = memory.DefaultAllocator
}
rr.bldr = NewRecordBuilder(rr.mem, schema)
switch {
case rr.chunk < 0:
rr.next = rr.nextall
case rr.chunk > 1:
rr.next = rr.nextn
default:
rr.next = rr.next1
}
return rr
}
// Err returns the last encountered error
func (r *JSONReader) Err() error { return r.err }
func (r *JSONReader) Schema() *arrow.Schema { return r.schema }
// Record returns the last read in record. The returned record is only valid
// until the next call to Next unless Retain is called on the record itself.
func (r *JSONReader) Record() arrow.Record { return r.cur }
func (r *JSONReader) Retain() {
atomic.AddInt64(&r.refs, 1)
}
func (r *JSONReader) Release() {
debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
if atomic.AddInt64(&r.refs, -1) == 0 {
if r.cur != nil {
r.cur.Release()
r.bldr.Release()
r.r = nil
}
}
}
// Next returns true if it read in a record, which will be available via Record
// and false if there is either an error or the end of the reader.
func (r *JSONReader) Next() bool {
if r.cur != nil {
r.cur.Release()
r.cur = nil
}
if r.err != nil || r.done {
return false
}
return r.next()
}
func (r *JSONReader) readNext() bool {
r.err = r.r.Decode(r.bldr)
if r.err != nil {
r.done = true
if errors.Is(r.err, io.EOF) {
r.err = nil
}
return false
}
return true
}
func (r *JSONReader) nextall() bool {
for r.readNext() {
}
r.cur = r.bldr.NewRecord()
return r.cur.NumRows() > 0
}
func (r *JSONReader) next1() bool {
if !r.readNext() {
return false
}
r.cur = r.bldr.NewRecord()
return true
}
func (r *JSONReader) nextn() bool {
var n = 0
for i := 0; i < r.chunk && !r.done; i, n = i+1, n+1 {
if !r.readNext() {
break
}
}
if n > 0 {
r.cur = r.bldr.NewRecord()
}
return n > 0
}
var (
_ RecordReader = (*JSONReader)(nil)
)
|