1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
|
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build cgo
// +build cgo
package cdata
import (
"context"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/arrio"
"github.com/apache/arrow-go/v18/arrow/memory"
"golang.org/x/xerrors"
)
// SchemaFromPtr is a simple helper function to cast a uintptr to a *CArrowSchema
func SchemaFromPtr(ptr uintptr) *CArrowSchema { return (*CArrowSchema)(unsafe.Pointer(ptr)) }
// ArrayFromPtr is a simple helper function to cast a uintptr to a *CArrowArray
func ArrayFromPtr(ptr uintptr) *CArrowArray { return (*CArrowArray)(unsafe.Pointer(ptr)) }
// ImportCArrowField takes in an ArrowSchema from the C Data interface, it
// will copy the metadata and type definitions rather than keep direct references
// to them. It is safe to call C.ArrowSchemaRelease after receiving the field
// from this function.
func ImportCArrowField(out *CArrowSchema) (arrow.Field, error) {
return importSchema(out)
}
// ImportCArrowSchema takes in the ArrowSchema from the C Data Interface, it
// will copy the metadata and schema definitions over from the C object rather
// than keep direct references to them. This function will call ArrowSchemaRelease
// on the passed in schema regardless of whether or not there is an error returned.
//
// This version is intended to take in a schema for a record batch, which means
// that the top level of the schema should be a struct of the schema fields. If
// importing a single array's schema, then use ImportCArrowField instead.
func ImportCArrowSchema(out *CArrowSchema) (*arrow.Schema, error) {
ret, err := importSchema(out)
if err != nil {
return nil, err
}
return arrow.NewSchema(ret.Type.(*arrow.StructType).Fields(), &ret.Metadata), nil
}
// ImportCArrayWithType takes a pointer to a C Data ArrowArray and interprets the values
// as an array with the given datatype. If err is not nil, then ArrowArrayRelease must still
// be called on arr to release the memory.
//
// The underlying buffers will not be copied, but will instead be referenced directly
// by the resulting array interface object. The passed in ArrowArray will have it's ownership
// transferred to the resulting arrow.Array via ArrowArrayMove. The underlying array.Data
// object that is owned by the Array will now be the owner of the memory pointer and
// will call ArrowArrayRelease when it is released and garbage collected via runtime.SetFinalizer.
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCArrayWithType(arr *CArrowArray, dt arrow.DataType) (arrow.Array, error) {
imp, err := importCArrayAsType(arr, dt)
if err != nil {
return nil, err
}
defer imp.data.Release()
return array.MakeFromData(imp.data), nil
}
// ImportCArray takes a pointer to both a C Data ArrowArray and C Data ArrowSchema in order
// to import them into usable Go Objects. If err is not nil, then ArrowArrayRelease must still
// be called on arr to release the memory. The ArrowSchemaRelease will be called on the passed in
// schema regardless of whether there is an error or not.
//
// The Schema will be copied with the information used to populate the returned Field, complete
// with metadata. The array will reference the same memory that is referred to by the ArrowArray
// object and take ownership of it as per ImportCArrayWithType. The returned arrow.Array will
// own the C memory and call ArrowArrayRelease when the array.Data object is cleaned up.
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCArray(arr *CArrowArray, schema *CArrowSchema) (arrow.Field, arrow.Array, error) {
field, err := importSchema(schema)
if err != nil {
return field, nil, err
}
ret, err := ImportCArrayWithType(arr, field.Type)
return field, ret, err
}
// ImportCRecordBatchWithSchema is used for importing a Record Batch array when the schema
// is already known such as when receiving record batches through a stream.
//
// All of the semantics regarding memory ownership are the same as when calling
// ImportCRecordBatch directly with a schema.
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCRecordBatchWithSchema(arr *CArrowArray, sc *arrow.Schema) (arrow.Record, error) {
imp, err := importCArrayAsType(arr, arrow.StructOf(sc.Fields()...))
if err != nil {
return nil, err
}
defer imp.data.Release()
st := array.NewStructData(imp.data)
defer st.Release()
// now that we have our fields, we can split them out into the slice of arrays
// and construct a record batch from them to return.
cols := make([]arrow.Array, st.NumField())
for i := 0; i < st.NumField(); i++ {
cols[i] = st.Field(i)
}
return array.NewRecord(sc, cols, int64(st.Len())), nil
}
// ImportCRecordBatch imports an ArrowArray from C as a record batch. If err is not nil,
// then ArrowArrayRelease must still be called to release the memory.
//
// A record batch is represented in the C Data Interface as a Struct Array whose fields
// are the columns of the record batch. Thus after importing the schema passed in here,
// if it is not a Struct type, this will return an error. As with ImportCArray, the
// columns in the record batch will take ownership of the CArrowArray memory if successful.
// Since ArrowArrayMove is used, it's still safe to call ArrowArrayRelease on the source
// regardless. But if there is an error, it *MUST* be called to ensure there is no memory leak.
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCRecordBatch(arr *CArrowArray, sc *CArrowSchema) (arrow.Record, error) {
field, err := importSchema(sc)
if err != nil {
return nil, err
}
if field.Type.ID() != arrow.STRUCT {
return nil, xerrors.New("recordbatch array import must be of struct type")
}
return ImportCRecordBatchWithSchema(arr, arrow.NewSchema(field.Type.(*arrow.StructType).Fields(), &field.Metadata))
}
// ImportCArrayStream creates an arrio.Reader from an ArrowArrayStream taking ownership
// of the underlying stream object via ArrowArrayStreamMove.
//
// The records returned by this reader must be released manually after they are returned.
// The reader itself will release the stream via SetFinalizer when it is garbage collected.
// It will return (nil, io.EOF) from the Read function when there are no more records to return.
//
// NOTE: The reader takes ownership of the underlying memory buffers via ArrowArrayStreamMove,
// it does not take ownership of the actual stream object itself.
//
// Deprecated: This will panic if importing the schema fails (which is possible).
// Prefer ImportCRecordReader instead.
func ImportCArrayStream(stream *CArrowArrayStream, schema *arrow.Schema) arrio.Reader {
reader, err := ImportCRecordReader(stream, schema)
if err != nil {
panic(err)
}
return reader
}
// ImportCStreamReader creates an arrio.Reader from an ArrowArrayStream taking ownership
// of the underlying stream object via ArrowArrayStreamMove.
//
// The records returned by this reader must be released manually after they are returned.
// The reader itself will release the stream via SetFinalizer when it is garbage collected.
// It will return (nil, io.EOF) from the Read function when there are no more records to return.
//
// NOTE: The reader takes ownership of the underlying memory buffers via ArrowArrayStreamMove,
// it does not take ownership of the actual stream object itself.
func ImportCRecordReader(stream *CArrowArrayStream, schema *arrow.Schema) (arrio.Reader, error) {
out := &nativeCRecordBatchReader{schema: schema}
if err := initReader(out, stream); err != nil {
return nil, err
}
return out, nil
}
// ExportArrowSchema populates the passed in CArrowSchema with the schema passed in so
// that it can be passed to some consumer of the C Data Interface. The `release` function
// is tied to a callback in order to properly release any memory that was allocated during
// the populating of the struct. Any memory allocated will be allocated using malloc
// which means that it is invisible to the Go Garbage Collector and must be freed manually
// using the callback on the CArrowSchema object.
//
// WARNING: the output ArrowSchema MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
dummy := arrow.Field{Type: arrow.StructOf(schema.Fields()...), Metadata: schema.Metadata()}
exportField(dummy, out)
}
// ExportArrowRecordBatch populates the passed in CArrowArray (and optionally the schema too)
// by sharing the memory used for the buffers of each column's arrays. It does not
// copy the data, and will internally increment the reference counters so that releasing
// the record will not free the memory prematurely.
//
// When using CGO, memory passed to C is pinned so that the Go garbage collector won't
// move where it is allocated out from under the C pointer locations, ensuring the C pointers
// stay valid. This is only true until the CGO call returns, at which point the garbage collector
// is free to move things around again. As a result, if the function you're calling is going to
// hold onto the pointers or otherwise continue to reference the memory *after* the call returns,
// you should use the CgoArrowAllocator rather than the GoAllocator (or DefaultAllocator) so that
// the memory which is allocated for the record batch in the first place is allocated in C,
// not by the Go runtime and is therefore not subject to the Garbage collection.
//
// The release function on the populated CArrowArray will properly decrease the reference counts,
// and release the memory if the record has already been released. But since this must be explicitly
// done, make sure it is released so that you do not create a memory leak.
//
// WARNING: the output ArrowArray MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowRecordBatch(rb arrow.Record, out *CArrowArray, outSchema *CArrowSchema) {
children := make([]arrow.ArrayData, rb.NumCols())
for i := range rb.Columns() {
children[i] = rb.Column(i).Data()
}
data := array.NewData(arrow.StructOf(rb.Schema().Fields()...), int(rb.NumRows()), []*memory.Buffer{nil},
children, 0, 0)
defer data.Release()
arr := array.NewStructData(data)
defer arr.Release()
if outSchema != nil {
ExportArrowSchema(rb.Schema(), outSchema)
}
exportArray(arr, out, nil)
}
// ExportArrowArray populates the CArrowArray that is passed in with the pointers to the memory
// being used by the arrow.Array passed in, in order to share with zero-copy across the C
// Data Interface. See the documentation for ExportArrowRecordBatch for details on how to ensure
// you do not leak memory and prevent unwanted, undefined or strange behaviors.
//
// WARNING: the output ArrowArray MUST BE ZERO INITIALIZED, or the Go garbage collector
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
exportArray(arr, out, outSchema)
}
// ExportRecordReader populates the CArrowArrayStream that is passed in with the appropriate
// callbacks to be a working ArrowArrayStream utilizing the passed in RecordReader. The
// CArrowArrayStream takes ownership of the RecordReader until the consumer calls the release
// callback, as such it is unnecessary to call Release on the passed in reader unless it has
// previously been retained.
//
// WARNING: the output ArrowArrayStream MUST BE ZERO INITIALIZED, or the Go garbage
// collector may error at runtime, due to CGO rules ("the current implementation may
// sometimes cause a runtime error if the contents of the C memory appear to be a Go
// pointer"). You have been warned!
func ExportRecordReader(reader array.RecordReader, out *CArrowArrayStream) {
exportStream(reader, out)
}
// ReleaseCArrowArray calls ArrowArrayRelease on the passed in cdata array
func ReleaseCArrowArray(arr *CArrowArray) { releaseArr(arr) }
// ReleaseCArrowSchema calls ArrowSchemaRelease on the passed in cdata schema
func ReleaseCArrowSchema(schema *CArrowSchema) { releaseSchema(schema) }
// RecordMessage is a simple container for a record batch channel to stream for
// using the Async C Data Interface via ExportAsyncRecordBatchStream.
type RecordMessage struct {
Record arrow.Record
AdditionalMetadata arrow.Metadata
Err error
}
// AsyncRecordBatchStream represents a stream of record batches being read in
// from an ArrowAsyncDeviceStreamHandler's callbacks. If an error was encountered
// before the call to on_schema, then this will contain the error as Err. Otherwise
// the Schema will be valid and the Stream is a channel of RecordMessages being
// propagated via on_next_task and extract_data.
type AsyncRecordBatchStream struct {
Schema *arrow.Schema
AdditionalMetadata arrow.Metadata
Err error
Stream <-chan RecordMessage
}
// AsyncStreamError represents an error encountered via a call to the on_error
// callback of an ArrowAsyncDeviceStreamHandler. The Code is the error code that
// should be errno compatible.
type AsyncStreamError struct {
Code int
Msg string
Metadata string
}
func (e AsyncStreamError) Error() string { return e.Msg }
// CreateAsyncDeviceStreamHandler populates a given ArrowAsyncDeviceStreamHandler's callbacks
// and waits for the on_schema callback to be called before passing the AsyncRecordBatchStream
// object across the returned channel.
//
// The provided queueSize is the number of records that will be requested at a time to be passed
// along the Stream in the returned AsyncRecordBatchStream. See the documentation on
// https://arrow.apache.org/docs/format/CDeviceDataInterface.html for more information as to the
// expected semantics of that size.
//
// The populated ArrowAsyncDeviceStreamHandler can then be given to any compatible provider for
// async record batch streams via the C Device interface.
func CreateAsyncDeviceStreamHandler(ctx context.Context, queueSize uint64, out *CArrowAsyncDeviceStreamHandler) <-chan AsyncRecordBatchStream {
ch := make(chan AsyncRecordBatchStream)
exportAsyncHandler(cAsyncState{ctx: ctx, ch: ch, queueSize: queueSize}, out)
return ch
}
// ExportAsyncRecordBatchStream takes in a schema and a channel of RecordMessages along with a
// ArrowAsyncDeviceStreamHandler to export the records as they come across the channel and call
// the appropriate callbacks on the handler. This function will block until the stream is closed
// or a message containing an error comes across the channel.
//
// The returned error will be nil if everything is successful, otherwise it will be the error which
// is encountered on the stream or an AsyncError if one of the handler callbacks returns an error.
func ExportAsyncRecordBatchStream(schema *arrow.Schema, stream <-chan RecordMessage, handler *CArrowAsyncDeviceStreamHandler) error {
return exportAsyncProducer(schema, stream, handler)
}
|