File: record_batch_writer.go

package info (click to toggle)
golang-github-apache-arrow-go 18.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 32,200 kB
  • sloc: asm: 477,547; ansic: 5,369; cpp: 759; sh: 585; makefile: 319; python: 190; sed: 5
file content (119 lines) | stat: -rw-r--r-- 3,972 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flight

import (
	"bytes"

	"github.com/apache/arrow-go/v18/arrow"
	"github.com/apache/arrow-go/v18/arrow/ipc"
	"github.com/apache/arrow-go/v18/arrow/memory"
)

// DataStreamWriter is an interface that represents an Arrow Flight stream
// writer that writes FlightData objects
type DataStreamWriter interface {
	Send(*FlightData) error
}

type flightPayloadWriter struct {
	w   DataStreamWriter
	fd  FlightData
	buf bytes.Buffer
}

func (f *flightPayloadWriter) Start() error { return nil }
func (f *flightPayloadWriter) WritePayload(payload ipc.Payload) error {
	m := payload.Meta()
	defer m.Release()

	f.fd.DataHeader = m.Bytes()
	f.buf.Reset()

	payload.SerializeBody(&f.buf)
	f.fd.DataBody = f.buf.Bytes()

	return f.w.Send(&f.fd)
}

func (f *flightPayloadWriter) Close() error { return nil }

// Writer is an ipc.Writer which also adds a WriteWithAppMetadata function
// in order to allow adding AppMetadata to the FlightData messages which
// are written.
type Writer struct {
	*ipc.Writer
	pw *flightPayloadWriter
}

// WriteMetadata writes a payload message to the stream containing only
// the specified app metadata.
func (w *Writer) WriteMetadata(appMetadata []byte) error {
	return w.pw.w.Send(&FlightData{AppMetadata: appMetadata})
}

// SetFlightDescriptor sets the flight descriptor into the next payload that will
// be written by the flight writer. It will only be put into the very next payload
// and afterwards the writer will no longer keep it's pointer to the descriptor.
func (w *Writer) SetFlightDescriptor(descr *FlightDescriptor) {
	w.pw.fd.FlightDescriptor = descr
}

// Write writes a recordbatch payload and returns any error, implementing the arrio.Writer interface
func (w *Writer) Write(rec arrow.Record) error {
	if w.pw.fd.FlightDescriptor != nil {
		defer func() {
			w.pw.fd.FlightDescriptor = nil
		}()
	}
	return w.Writer.Write(rec)
}

// WriteWithAppMetadata will write this record with the supplied application
// metadata attached in the flightData message.
func (w *Writer) WriteWithAppMetadata(rec arrow.Record, appMeta []byte) error {
	w.pw.fd.AppMetadata = appMeta
	defer func() {
		w.pw.fd.AppMetadata = nil
	}()
	return w.Write(rec)
}

// NewRecordWriter can be used to construct a writer for arrow flight via
// the grpc stream handler to write flight data objects and write
// record batches to the stream. Options passed here will be passed to
// ipc.NewWriter
func NewRecordWriter(w DataStreamWriter, opts ...ipc.Option) *Writer {
	pw := &flightPayloadWriter{w: w}
	return &Writer{Writer: ipc.NewWriterWithPayloadWriter(pw, opts...), pw: pw}
}

// SerializeSchema returns the serialized schema bytes for use in Arrow Flight
// protobuf messages.
func SerializeSchema(rec *arrow.Schema, mem memory.Allocator) []byte {
	// even though the spec says to send the message as in Schema.fbs,
	// it looks like all the implementations actually send a fully serialized
	// record batch just with no rows. So let's follow that pattern.
	var buf bytes.Buffer
	w := ipc.NewWriter(&buf, ipc.WithSchema(rec), ipc.WithAllocator(mem))
	w.Close()
	return buf.Bytes()
}

type MetadataWriter interface {
	WriteMetadata([]byte) error
}