File: flight.rst

package info (click to toggle)
apache-arrow 23.0.1-1
  • links: PTS
  • area: main
  • in suites: sid
  • size: 76,220 kB
  • sloc: cpp: 654,608; python: 70,522; ruby: 45,964; ansic: 18,742; sh: 7,365; makefile: 669; javascript: 125; xml: 41
file content (374 lines) | stat: -rw-r--r-- 14,880 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements.  See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership.  The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License.  You may obtain a copy of the License at

..   http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied.  See the License for the
.. specific language governing permissions and limitations
.. under the License.

.. default-domain:: cpp
.. highlight:: cpp

================
Arrow Flight RPC
================

Arrow Flight is an RPC framework for efficient transfer of Flight data
over the network.

.. seealso::

   :doc:`Flight protocol documentation <../format/Flight>`
        Documentation of the Flight protocol, including how to use
        Flight conceptually.

   :doc:`Flight API documentation <./api/flight>`
        C++ API documentation listing all of the various client and
        server types.

   `C++ Cookbook <https://arrow.apache.org/cookbook/cpp/flight.html>`_
        Recipes for using Arrow Flight in C++.

Writing a Flight Service
========================

Servers are subclasses of :class:`arrow::flight::FlightServerBase`. To
implement individual RPCs, override the RPC methods on this class.

.. code-block:: cpp

   class MyFlightServer : public FlightServerBase {
     Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
                        std::unique_ptr<FlightListing>* listings) override {
       std::vector<FlightInfo> flights = ...;
       *listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
       return Status::OK();
     }
   };

Each RPC method always takes a
:class:`arrow::flight::ServerCallContext` for common parameters and
returns a :class:`arrow::Status` to indicate success or
failure. Flight-specific error codes can be returned via
:func:`arrow::flight::MakeFlightError`.

RPC methods that return a value in addition to a status will use an
out parameter, as shown above. Often, there are helper classes
providing basic implementations of these out parameters. For instance,
above, :class:`arrow::flight::SimpleFlightListing` uses a vector of
:class:`arrow::flight::FlightInfo` objects as the result of a
``ListFlights`` RPC.

To start a server, create a :class:`arrow::flight::Location` to
specify where to listen, and call
:func:`arrow::flight::FlightServerBase::Init`. This will start the
server, but won't block the rest of the program. Use
:func:`arrow::flight::FlightServerBase::SetShutdownOnSignals` to
enable stopping the server if an interrupt signal is received, then
call :func:`arrow::flight::FlightServerBase::Serve` to block until the
server stops.

.. code-block:: cpp

   std::unique_ptr<arrow::flight::FlightServerBase> server;
   // Initialize server
   arrow::flight::Location location;
   // Listen to all interfaces on a free port
   ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location));
   arrow::flight::FlightServerOptions options(location);

   // Start the server
   ARROW_CHECK_OK(server->Init(options));
   // Exit with a clean error code (0) on SIGTERM
   ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));

   std::cout << "Server listening on localhost:" << server->port() << std::endl;
   ARROW_CHECK_OK(server->Serve());

Using the Flight Client
=======================

To connect to a Flight service, create an instance of
:class:`arrow::flight::FlightClient` by calling :func:`Connect
<arrow::flight::FlightClient::Connect>`.

Each RPC method returns :class:`arrow::Result` to indicate the
success/failure of the request, and the result object if the request
succeeded. Some calls are streaming calls, so they will return a
reader and/or a writer object; the final call status isn't known until
the stream is completed.

Cancellation and Timeouts
=========================

When making a call, clients can optionally provide
:class:`FlightCallOptions <arrow::flight::FlightCallOptions>`. This
allows clients to set a timeout on calls or provide custom HTTP
headers, among other features. Also, some objects returned by client
RPC calls expose a ``Cancel`` method which allows terminating a call
early.

On the server side, no additional code is needed to implement
timeouts. For cancellation, the server needs to manually poll
:func:`ServerCallContext::is_cancelled
<arrow::flight::ServerCallContext::is_cancelled>` to check if the
client has cancelled the call, and if so, break out of any processing
the server is currently doing.

Enabling TLS
============

TLS can be enabled when setting up a server by providing a certificate
and key pair to :func:`FlightServerBase::Init
<arrow::flight::FlightServerBase::Init>`.

On the client side, use :func:`Location::ForGrpcTls
<arrow::flight::Location::ForGrpcTls>` to construct the
:class:`arrow::flight::Location` to listen on.

Enabling Authentication
=======================

.. warning:: Authentication is insecure without enabling TLS.

Handshake-based authentication can be enabled by implementing
:class:`ServerAuthHandler <arrow::flight::ServerAuthHandler>` and
providing this to the server during construction.

Authentication consists of two parts: on initial client connection,
the server and client authentication implementations can perform any
negotiation needed. The client authentication handler then provides a
token that will be attached to future calls. This is done by calling
:func:`Authenticate <arrow::flight::FlightClient::Authenticate>` with
the desired client authentication implementation.

On each RPC thereafter, the client handler's token is automatically
added to the call in the request headers. The server authentication
handler validates the token and provides the identity of the
client. On the server, this identity can be obtained from the
:class:`arrow::flight::ServerCallContext`.

Custom Middleware
=================

Servers and clients support custom middleware (or interceptors) that
are called on every request and can modify the request in a limited
fashion.  These can be implemented by subclassing :class:`ServerMiddleware
<arrow::flight::ServerMiddleware>` and :class:`ClientMiddleware
<arrow::flight::ClientMiddleware>`, then providing them when creating
the client or server.

Middleware are fairly limited, but they can add headers to a
request/response. On the server, they can inspect incoming headers and
fail the request; hence, they can be used to implement custom
authentication methods.

.. _flight-best-practices:

Best practices
==============

gRPC
----

When using the default gRPC transport, options can be passed to it via
:member:`arrow::flight::FlightClientOptions::generic_options`. For example:

.. tab-set::

   .. tab-item:: C++

      .. code-block:: cpp

         auto options = FlightClientOptions::Defaults();
         // Set the period after which a keepalive ping is sent on transport.
         options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);

   .. tab-item:: Python

      .. code-block:: python

         # Set the period after which a keepalive ping is sent on transport.
         generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
         client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

Also see `best gRPC practices`_ and available `gRPC keys`_.

Re-use clients whenever possible
--------------------------------

Creating and closing clients requires setup and teardown on the client and
server side which can take away from actually handling RPCs. Reuse clients
whenever possible to avoid this. Note that clients are thread-safe, so a
single client can be shared across multiple threads.

Don’t round-robin load balance
------------------------------

`Round robin load balancing`_ means every client can have an open connection to
every server, causing an unexpected number of open connections and depleting
server resources.

Debugging connection issues
---------------------------

When facing unexpected disconnects on long running connections use netstat to
monitor the number of open connections. If number of connections is much
greater than the number of clients it might cause issues.

For debugging, certain environment variables enable logging in gRPC. For
example, ``env GRPC_VERBOSITY=info GRPC_TRACE=http`` will print the initial
headers (on both sides) so you can see if gRPC established the connection or
not. It will also print when a message is sent, so you can tell if the
connection is open or not.

gRPC may not report connection errors until a call is actually made.
Hence, to detect connection errors when creating a client, some sort
of dummy RPC should be made.

Memory management
-----------------

Flight tries to reuse allocations made by gRPC to avoid redundant
data copies. However, experience shows that such data is frequently
misaligned. Some use cases might require data to have data type-specific
alignment (for example, for the data buffer of an Int32 array to be aligned
on a 4-byte boundary), which can be enforced
by setting :member:`arrow::ipc::IpcReadOptions::ensure_alignment`
to :member:`arrow::ipc::Alignment::kDataTypeSpecificAlignment`.
This uses the :member:`arrow::ipc::IpcReadOptions::memory_pool`
to allocate memory with aligned addresses, but only for mis-aligned data.
However, this creates data copies of your data received via Flight.

Unless gRPC data are copied as described above, allocations made by gRPC may not
be tracked by the Arrow memory pool, and that memory usage behavior,
such as whether free memory is returned to the system, is dependent
on the allocator that gRPC uses (usually the system allocator).

A quick way of testing: attach to the process with a debugger and call
``malloc_trim``, or call :func:`ReleaseUnused <arrow::MemoryPool::ReleaseUnused>`
on the system pool. If memory usage drops, then likely, there is memory
allocated by gRPC or by the application that the system allocator was holding
on to. This can be adjusted in platform-specific ways; see an investigation
in ARROW-16697_ for an example of how this works on Linux/glibc. glibc malloc
can be explicitly told to dump caches.

Excessive traffic
-----------------

gRPC will spawn up to max threads quota of threads for concurrent clients. Those
threads are not necessarily cleaned up (a "cached thread pool" in Java parlance).
glibc malloc clears some per thread state and the default tuning never clears
caches in some workloads.

gRPC's default behaviour allows one server to accept many connections from many
different clients, but if requests do a lot of work (as they may under Flight),
the server may not be able to keep up. Configuring clients to retry
with backoff (and potentially connect to a different node), would give more
consistent quality of service.

.. tab-set::

   .. tab-item:: C++

      .. code-block:: cpp

         auto options = FlightClientOptions::Defaults();
         // Set the minimum time between subsequent connection attempts.
         options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);

   .. tab-item:: Python

      .. code-block:: python

         # Set the minimum time between subsequent connection attempts.
         generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
         client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)


Limiting DoPut Batch Size
--------------------------

You may wish to limit the maximum batch size a client can submit to a server through
DoPut, to prevent a request from taking up too much memory on the server. On
the client-side, set :member:`arrow::flight::FlightClientOptions::write_size_limit_bytes`.
On the server-side, set the gRPC option ``GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH``.
The client-side option will return an error that can be retried with smaller batches,
while the server-side limit will close out the connection. Setting both can be wise, since
the former provides a better user experience but the latter may be necessary to defend
against impolite clients.

Closing unresponsive connections
--------------------------------

1. A stale call can be closed using
   :member:`arrow::flight::FlightCallOptions::stop_token`. This requires recording the
   stop token at call establishment time.

   .. tab-set::

      .. tab-item:: C++

         .. code-block:: cpp

              StopSource stop_source;
              FlightCallOptions options;
              options.stop_token = stop_source.token();
              stop_source.RequestStop(Status::Cancelled("StopSource"));
              flight_client->DoAction(options, {});


2. Use call timeouts. (This is a general gRPC best practice.)

   .. tab-set::

      .. tab-item:: C++

         .. code-block:: cpp

            FlightCallOptions options;
            options.timeout = TimeoutDuration{0.2};
            Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();

      .. tab-item:: Java

         .. code-block:: java

            Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));

      .. tab-item:: Python

         .. code-block:: python

            options = pyarrow.flight.FlightCallOptions(timeout=0.2)
            result = client.do_action(action, options=options)


3. Client timeouts are not great for long-running streaming calls, where it may
   be hard to choose a timeout for the entire operation. Instead, what is often
   desired is a per-read or per-write timeout so that the operation fails if it
   isn't making progress. This can be implemented with a background thread that
   calls Cancel() on a timer, with the main thread resetting the timer every time
   an operation completes successfully. For a fully-worked out example, see the
   Cookbook.

   .. note:: There is a long standing ticket for a per-write/per-read timeout
             instead of a per call timeout (ARROW-6062_), but this is not (easily)
             possible to implement with the blocking gRPC API.

.. _best gRPC practices: https://grpc.io/docs/guides/performance/#general
.. _gRPC keys: https://grpc.github.io/grpc/cpp/group__grpc__arg__keys.html
.. _Round robin load balancing: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md#round_robin
.. _ARROW-15764: https://issues.apache.org/jira/browse/ARROW-15764
.. _ARROW-16697: https://issues.apache.org/jira/browse/ARROW-16697
.. _ARROW-6062: https://issues.apache.org/jira/browse/ARROW-6062
.. _gRPC: https://grpc.io/