1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
|
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build go1.18
package compute
import (
"context"
"fmt"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/compute/exec"
)
type Function interface {
Name() string
Kind() FuncKind
Arity() Arity
Doc() FunctionDoc
NumKernels() int
Execute(context.Context, FunctionOptions, ...Datum) (Datum, error)
DispatchExact(...arrow.DataType) (exec.Kernel, error)
DispatchBest(...arrow.DataType) (exec.Kernel, error)
DefaultOptions() FunctionOptions
Validate() error
}
// Arity defines the number of required arguments for a function.
//
// Naming conventions are taken from https://en.wikipedia.org/wiki/Arity
type Arity struct {
NArgs int
IsVarArgs bool
}
// Convenience functions to generating Arities
func Nullary() Arity { return Arity{0, false} }
func Unary() Arity { return Arity{1, false} }
func Binary() Arity { return Arity{2, false} }
func Ternary() Arity { return Arity{3, false} }
func VarArgs(minArgs int) Arity { return Arity{minArgs, true} }
type FunctionDoc struct {
// A one-line summary of the function, using a verb.
//
// For example, "Add two numeric arrays or scalars"
Summary string
// A detailed description of the function, meant to follow the summary.
Description string
// Symbolic names (identifiers) for the function arguments.
//
// Can be used to generate nicer function signatures.
ArgNames []string
// Name of the options struct type, if any
OptionsType string
// Whether or not options are required for function execution.
//
// If false, then either there are no options for this function,
// or there is a usable default options value.
OptionsRequired bool
}
// EmptyFuncDoc is a reusable empty function doc definition for convenience.
var EmptyFuncDoc FunctionDoc
// FuncKind is an enum representing the type of a function
type FuncKind int8
const (
// A function that performs scalar data operations on whole arrays
// of data. Can generally process Array or Scalar values. The size
// of the output will be the same as the size (or broadcasted size,
// in the case of mixing Array and Scalar inputs) of the input.
FuncScalar FuncKind = iota // Scalar
// A function with array input and output whose behavior depends on
// the values of the entire arrays passed, rather than the value of
// each scalar value.
FuncVector // Vector
// A function that computes a scalar summary statistic from array input.
FuncScalarAgg // ScalarAggregate
// A function that computes grouped summary statistics from array
// input and an array of group identifiers.
FuncHashAgg // HashAggregate
// A function that dispatches to other functions and does not contain
// its own kernels.
FuncMeta // Meta
)
func validateFunctionSummary(summary string) error {
if strings.Contains(summary, "\n") {
return fmt.Errorf("%w: summary contains a newline", arrow.ErrInvalid)
}
if summary[len(summary)-1] == '.' {
return fmt.Errorf("%w: summary ends with a point", arrow.ErrInvalid)
}
return nil
}
func validateFunctionDescription(desc string) error {
if len(desc) != 0 && desc[len(desc)-1] == '\n' {
return fmt.Errorf("%w: description ends with a newline", arrow.ErrInvalid)
}
const maxLineSize = 78
for _, ln := range strings.Split(desc, "\n") {
if len(ln) > maxLineSize {
return fmt.Errorf("%w: description line length exceeds %d characters", arrow.ErrInvalid, maxLineSize)
}
}
return nil
}
// baseFunction is the base class for compute functions. Function
// implementations should embed this baseFunction and will contain
// a collection of "kernels" which are implementations of the function
// for specific argument types. Selecting a viable kernel for
// executing the function is referred to as "dispatching".
type baseFunction struct {
name string
kind FuncKind
arity Arity
doc FunctionDoc
defaultOpts FunctionOptions
}
func (b *baseFunction) Name() string { return b.name }
func (b *baseFunction) Kind() FuncKind { return b.kind }
func (b *baseFunction) Arity() Arity { return b.arity }
func (b *baseFunction) Doc() FunctionDoc { return b.doc }
func (b *baseFunction) DefaultOptions() FunctionOptions { return b.defaultOpts }
func (b *baseFunction) Validate() error {
if b.doc.Summary == "" {
return nil
}
argCount := len(b.doc.ArgNames)
if argCount != b.arity.NArgs && !(b.arity.IsVarArgs && argCount == b.arity.NArgs+1) {
return fmt.Errorf("in function '%s': number of argument names for function doc != function arity", b.name)
}
if err := validateFunctionSummary(b.doc.Summary); err != nil {
return err
}
return validateFunctionDescription(b.doc.Description)
}
func checkOptions(fn Function, opts FunctionOptions) error {
if opts == nil && fn.Doc().OptionsRequired {
return fmt.Errorf("%w: function '%s' cannot be called without options", arrow.ErrInvalid, fn.Name())
}
return nil
}
func (b *baseFunction) checkArity(nargs int) error {
switch {
case b.arity.IsVarArgs && nargs < b.arity.NArgs:
return fmt.Errorf("%w: varargs function '%s' needs at least %d arguments, but only %d passed",
arrow.ErrInvalid, b.name, b.arity.NArgs, nargs)
case !b.arity.IsVarArgs && nargs != b.arity.NArgs:
return fmt.Errorf("%w: function '%s' accepts %d arguments but %d passed",
arrow.ErrInvalid, b.name, b.arity.NArgs, nargs)
}
return nil
}
// kernelType is a type constraint interface that is used for funcImpl
// generic definitions. It will be extended as other kernel types
// are defined.
//
// Currently only ScalarKernels are allowed to be used.
type kernelType interface {
exec.ScalarKernel | exec.VectorKernel
// specifying the Kernel interface here allows us to utilize
// the methods of the Kernel interface on the generic
// constrained type
exec.Kernel
}
// funcImpl is the basic implementation for any functions that use kernels
// i.e. all except for Meta functions.
type funcImpl[KT kernelType] struct {
baseFunction
kernels []KT
}
func (fi *funcImpl[KT]) DispatchExact(vals ...arrow.DataType) (*KT, error) {
if err := fi.checkArity(len(vals)); err != nil {
return nil, err
}
for i := range fi.kernels {
if fi.kernels[i].GetSig().MatchesInputs(vals) {
return &fi.kernels[i], nil
}
}
return nil, fmt.Errorf("%w: function '%s' has no kernel matching input types %s",
arrow.ErrNotImplemented, fi.name, arrow.TypesToString(vals))
}
func (fi *funcImpl[KT]) NumKernels() int { return len(fi.kernels) }
func (fi *funcImpl[KT]) Kernels() []*KT {
res := make([]*KT, len(fi.kernels))
for i := range fi.kernels {
res[i] = &fi.kernels[i]
}
return res
}
// A ScalarFunction is a function that executes element-wise operations
// on arrays or scalars, and therefore whose results generally do not
// depend on the order of the values in the arguments. Accepts and returns
// arrays that are all of the same size. These functions roughly correspond
// to the functions used in most SQL expressions.
type ScalarFunction struct {
funcImpl[exec.ScalarKernel]
}
// NewScalarFunction constructs a new ScalarFunction object with the passed in
// name, arity and function doc.
func NewScalarFunction(name string, arity Arity, doc FunctionDoc) *ScalarFunction {
return &ScalarFunction{
funcImpl: funcImpl[exec.ScalarKernel]{
baseFunction: baseFunction{
name: name,
arity: arity,
doc: doc,
kind: FuncScalar,
},
},
}
}
func (s *ScalarFunction) SetDefaultOptions(opts FunctionOptions) {
s.defaultOpts = opts
}
func (s *ScalarFunction) DispatchExact(vals ...arrow.DataType) (exec.Kernel, error) {
return s.funcImpl.DispatchExact(vals...)
}
func (s *ScalarFunction) DispatchBest(vals ...arrow.DataType) (exec.Kernel, error) {
return s.DispatchExact(vals...)
}
// AddNewKernel constructs a new kernel with the provided signature
// and execution/init functions and then adds it to the function's list of
// kernels. This assumes default null handling (intersection of validity bitmaps)
func (s *ScalarFunction) AddNewKernel(inTypes []exec.InputType, outType exec.OutputType, execFn exec.ArrayKernelExec, init exec.KernelInitFn) error {
if err := s.checkArity(len(inTypes)); err != nil {
return err
}
if s.arity.IsVarArgs && len(inTypes) != 1 {
return fmt.Errorf("%w: varargs signatures must have exactly one input type", arrow.ErrInvalid)
}
sig := &exec.KernelSignature{
InputTypes: inTypes,
OutType: outType,
IsVarArgs: s.arity.IsVarArgs,
}
s.kernels = append(s.kernels, exec.NewScalarKernelWithSig(sig, execFn, init))
return nil
}
// AddKernel adds the provided kernel to the list of kernels
// this function has. A copy of the kernel is added to the slice of kernels,
// which means that a given kernel object can be created, added and then
// reused to add other kernels.
func (s *ScalarFunction) AddKernel(k exec.ScalarKernel) error {
if err := s.checkArity(len(k.Signature.InputTypes)); err != nil {
return err
}
if s.arity.IsVarArgs && !k.Signature.IsVarArgs {
return fmt.Errorf("%w: function accepts varargs but kernel signature does not", arrow.ErrInvalid)
}
s.kernels = append(s.kernels, k)
return nil
}
// Execute uses the passed in context, function options and arguments to eagerly
// execute the function using kernel dispatch, batch iteration and memory
// allocation details as defined by the kernel.
//
// If opts is nil, then the DefaultOptions() will be used.
func (s *ScalarFunction) Execute(ctx context.Context, opts FunctionOptions, args ...Datum) (Datum, error) {
return execInternal(ctx, s, opts, -1, args...)
}
type VectorFunction struct {
funcImpl[exec.VectorKernel]
}
func NewVectorFunction(name string, arity Arity, doc FunctionDoc) *VectorFunction {
return &VectorFunction{
funcImpl: funcImpl[exec.VectorKernel]{
baseFunction: baseFunction{
name: name,
arity: arity,
doc: doc,
kind: FuncVector,
},
},
}
}
func (f *VectorFunction) SetDefaultOptions(opts FunctionOptions) {
f.defaultOpts = opts
}
func (f *VectorFunction) DispatchExact(vals ...arrow.DataType) (exec.Kernel, error) {
return f.funcImpl.DispatchExact(vals...)
}
func (f *VectorFunction) DispatchBest(vals ...arrow.DataType) (exec.Kernel, error) {
return f.DispatchExact(vals...)
}
func (f *VectorFunction) AddNewKernel(inTypes []exec.InputType, outType exec.OutputType, execFn exec.ArrayKernelExec, init exec.KernelInitFn) error {
if err := f.checkArity(len(inTypes)); err != nil {
return err
}
if f.arity.IsVarArgs && len(inTypes) != 1 {
return fmt.Errorf("%w: varags signatures must have exactly one input type", arrow.ErrInvalid)
}
sig := &exec.KernelSignature{
InputTypes: inTypes,
OutType: outType,
IsVarArgs: f.arity.IsVarArgs,
}
f.kernels = append(f.kernels, exec.NewVectorKernelWithSig(sig, execFn, init))
return nil
}
func (f *VectorFunction) AddKernel(kernel exec.VectorKernel) error {
if err := f.checkArity(len(kernel.Signature.InputTypes)); err != nil {
return err
}
if f.arity.IsVarArgs && !kernel.Signature.IsVarArgs {
return fmt.Errorf("%w: function accepts varargs but kernel signature does not", arrow.ErrInvalid)
}
f.kernels = append(f.kernels, kernel)
return nil
}
func (f *VectorFunction) Execute(ctx context.Context, opts FunctionOptions, args ...Datum) (Datum, error) {
return execInternal(ctx, f, opts, -1, args...)
}
// MetaFunctionImpl is the signature needed for implementing a MetaFunction
// which is a function that dispatches to another function instead.
type MetaFunctionImpl func(context.Context, FunctionOptions, ...Datum) (Datum, error)
// MetaFunction is a function which dispatches to other functions, the impl
// must not be nil.
//
// For Array, ChunkedArray and Scalar datums, this may rely on the execution
// of concrete function types, but this must handle other Datum kinds on its
// own.
type MetaFunction struct {
baseFunction
impl MetaFunctionImpl
}
// NewMetaFunction constructs a new MetaFunction which will call the provided
// impl for dispatching with the expected arity.
//
// Will panic if impl is nil.
func NewMetaFunction(name string, arity Arity, doc FunctionDoc, impl MetaFunctionImpl) *MetaFunction {
if impl == nil {
panic("arrow/compute: cannot construct MetaFunction with nil impl")
}
return &MetaFunction{
baseFunction: baseFunction{
name: name,
arity: arity,
doc: doc,
},
impl: impl,
}
}
func (MetaFunction) NumKernels() int { return 0 }
func (m *MetaFunction) DispatchExact(...arrow.DataType) (exec.Kernel, error) {
return nil, fmt.Errorf("%w: dispatch for metafunction", arrow.ErrNotImplemented)
}
func (m *MetaFunction) DispatchBest(...arrow.DataType) (exec.Kernel, error) {
return nil, fmt.Errorf("%w: dispatch for metafunction", arrow.ErrNotImplemented)
}
func (m *MetaFunction) Execute(ctx context.Context, opts FunctionOptions, args ...Datum) (Datum, error) {
if err := m.checkArity(len(args)); err != nil {
return nil, err
}
if err := checkOptions(m, opts); err != nil {
return nil, err
}
if opts == nil {
opts = m.defaultOpts
}
return m.impl(ctx, opts, args...)
}
|