File: dataframe-sql.rst

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (273 lines) | stat: -rw-r--r-- 12,068 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
Dask Dataframe and SQL
======================

SQL is a method for executing tabular computation on database servers.
Similar operations can be done on Dask Dataframes. Users commonly wish
to link the two together.

This document describes the connection between Dask and SQL-databases
and serves to clarify several of the questions that we commonly
receive from users.

.. contents::
    :local:
    :depth: 1
    :backlinks: top

Does Dask implement SQL?
------------------------

The short answer is "no". Dask has no parser or query planner for SQL
queries. However, the Pandas API, which is largely identical for
Dask Dataframes, has many analogues to SQL operations. A good
description for mapping SQL onto Pandas syntax can be found in the
`pandas docs`_.

.. _pandas docs: https://pandas.pydata.org/docs/getting_started/comparison/comparison_with_sql.html

The following packages may be of interest:

- `dask-sql`_ adds a SQL query engine on top of Dask.
  In addition to working on CPU, it offers experimental support for CUDA-enabled GPUs through RAPIDS libraries such as `cuDF`_.

- `FugueSQL`_ provides a unified interface to run SQL code on a variety of different computing frameworks.
  Specifying ``DaskExecutionEngine`` or ``DaskSQLExecutionEngine`` as the execution engine for queries allows them to be computed using Dask or dask-sql, respectively.

.. _dask-sql: https://dask-sql.readthedocs.io/en/latest/
.. _cuDF: https://docs.rapids.ai/api/cudf/stable/
.. _FugueSQL: https://fugue-tutorials.readthedocs.io/en/latest/tutorials/fugue_sql/index.html

Database or Dask?
-----------------

A database server is able to process tabular data and produce results just like
Dask Dataframe. Why would you choose to use one over the other?

These days a database server can be a sharded/distributed system, capable of
handling tables with millions of rows. Most database implementations are
geared towards row-wise retrieval and (atomic) updates of small subset of a
table. Configuring a database to be fast for a particular
sort of query can be challenging, but assuming all your data is already in the
database, it may well be the best solution - particularly if you understand
something about SQL query plan optimisation. A SQL implementation can
very efficiently analyse a query to only extract a small part of a table
for consideration, when the rest is excluded by conditionals.

Dask is much more flexible than a database, and designed explicitly
to work with larger-than-memory datasets, in parallel, and potentially distributed
across a cluster. If your workflow is not well suited to SQL, use dask. If
your database server struggles with volume, dask may do better. It
would be best to profile your queries
(and keep in mind other users of the resources!). If you need
to combine data from different sources, dask may be your best option.

You may find the dask API easier to use than writing SQL (if you
are already used to Pandas), and the diagnostic feedback more useful.
These points can debatably be in Dask's favour.

Loading from SQL with read_sql_table or read_sql_query
------------------------------------------------------

Dask allows you to build dataframes from SQL tables and queries using the
function :func:`dask.dataframe.read_sql_table` and :func:`dask.dataframe.read_sql_query`,
based on the `Pandas version`_, sharing most arguments, and using SQLAlchemy
for the actual handling of the queries. You may need to install additional
driver packages for your chosen database server.

.. _Pandas version: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_table.html

Since Dask is designed to work with larger-than-memory datasets, or be distributed
on a cluster, the following are the main differences versus Pandas to watch out for

- Dask does not support arbitrary text queries, only whole tables and SQLAlchemy
  `sql expressions`_

- the con argument must be a `URI string`_, not an SQLAlchemy engine/connection

- partitioning information is *required*, which can be as simple as providing
  an index column argument, or can be more explicit (see below)

- the chunksize argument is not used, since the partitioning must be via an
  index column

.. _URI string: https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls
.. _sql expressions: https://docs.sqlalchemy.org/en/13/core/tutorial.html

If you need something more flexible than this, or the
method fails for you (e.g., on type inference), then skip to the next section.

Why the differences
^^^^^^^^^^^^^^^^^^^

Dask is intended to make processing large volumes of data possible, including
potentially distributing that processing across a cluster. For the retrieval of
data from SQL servers, this means that the query must be partitionable: that
each partition can be fetched independently of others and not dependent on
some global state, and that the definitions of the tasks must be serialisable,
i.e., can be represented as a stream of bytes communicated to workers.

The constraints mean that we cannot directly accept SQLAlchemy engines
or connection objects, since they have internal state (buffers, etc.)
that cannot be serialised. A `URI string`_  must be used, which can be
recreated into a fresh engine on the workers.
Similarly, we cannot accommodate chunked queries
which rely on the internal state of a database cursor; nor LIMIT/OFFSET
queries, which are not guaranteed to be repeatable, and involve scanning
the whole query on th server (which is very inefficient).

**If** your data is small enough not to require Dask's out-of-core and/or
distributed capabilities, then you are probably better to use Pandas or SQLAlchemy
directly.

Index Column
^^^^^^^^^^^^

We need a way to turn a single main query into sub-queries for each
partition. For most reasonable database tables, there should be an obvious
column which can be used for partitioning - it is probably numeric,
and should certainly be indexed in the database. The latter condition
is important, since many simultaneous queries will hit your server once
Dask starts to compute.

By providing just a column name for the index argument, you imply that the
column is numeric, and Dask guesses a reasonable partitioning by evenly
splitting the space between minimum and maximum values into ``npartitions``
intervals. You can also provide the max/min that you would like to
consider so that Dask doesn't need to query for these. Alternatively,
you can have Dask fetch the first few row (5 by default) and use
them to guess the typical bytes/row, and base the partitioning size on
this. Needless to say, the results will vary a lot for tables that are
not uncommonly homogeneous.

Specific partitioning
^^^^^^^^^^^^^^^^^^^^^

In some cases, you may have a very good idea of how to partition the data,
for example based on a column that has a finite number of unique values
or categories. This enables using string columns, or anything with a
natural ordering, for the index column, not only numerical types.

In this case, you would provide a specific set of ``divisions``,
the start/end values of the index column for each partition. For example,
if a column happened to contain a random ID in hex string format, then you
could specify 16 partitions with

.. code-block:: python

    df = read_sql_table("mytable", divisions=list("0123456789abcdefh"),
                        index_col="hexID")

so the first partition would have IDs with values ``"0" <= hexID < "1"``, i.e.,
leading character "0".

SQLAlchemy expressions
^^^^^^^^^^^^^^^^^^^^^^

Since we only send the database connection URI and not the engine object,
we cannot rely on SQLAlchemy's table class inference and ORM to conduct queries. However, we can
use the "select" `sql expressions`_, which only get formatted into a text query at
the point of execution.

.. code-block:: python

    from sqlalchemy import sql
    number = sql.column("number")
    name = sql.column("name")
    s1 = sql.select([
            number, name, sql.func.length(name).label("lenname")
        ]
        ).select_from(sql.table("test"))
    data = read_sql_query(
        s1, db, npartitions=2, index_col=number
    )

Here we have also demonstrated the use of the function ``length`` to
perform an operation server-side. Note that it is necessary to *label* such
operations, but you can use them for the index column,
so long as it is also
in the set of selected columns. If using for the index/partitioning, the
column should still be indexed in the database, for performance.
One of the most important functions to consider is ``cast`` to specify the
output data type or conversion in the database, if pandas is having
trouble inferring the data type.

You should be warned, that SQLAlchemy expressions take some time to get
used to, and you can practice with Pandas first, reading only the first small
chunk of a query, until things look right. You can find a more complete
object-oriented example in `this gist`_

.. _this gist: https://gist.github.com/quasiben/08a7f291039db2b04c2e28e1a6c21e3b

Load from SQL, manual approaches
--------------------------------

If ``read_sql_table`` is not sufficient for your needs, you can try one of
the following methods.

From Map functions
^^^^^^^^^^^^^^^^^^

Often you know more about your data and server than the generic approach above
allows. Indeed, some database-like servers may simply not be supported by
SQLAlchemy, or provide an alternate API which is better optimised.

If you already have a way to fetch data from the database in partitions,
then you can use :func:`dask.dataframe.from_map` and construct a
dataframe this way. It might look something like.

.. code-block:: python

   import dask.dataframe as dd

   def fetch_partition(part):
       conn = establish_connection()
       df = fetch_query(base_query.format(part))
       return df.astype(known_types)

   ddf = dd.from_map(fetch_partition,
                     parts,
                     meta=known_types,
                     divisions=div_from_parts(parts))

Where you must provide your own functions for setting up a connection to the server,
your own query, and a way to format that query to be specific to each partition.
For example, you might have ranges or specific unique values with a WHERE
clause. The ``known_types`` here is used to transform the dataframe partition and provide
a ``meta``, to help for consistency and avoid Dask having to analyse one partition
up front to guess the columns/types; you may also want to explicitly set the index.

Stream via client
^^^^^^^^^^^^^^^^^

In some cases, the workers may not have access to data, but the client does;
or the initial loading time of the data is not important, so long as the
dataset is then held in cluster memory and available for dask-dataframe
queries. It is possible to construct the dataframe by uploading chunks of
data from the client:

See a complete example of how to do this `here`_

.. _here: https://stackoverflow.com/questions/62818473/why-dasks-read-sql-table-requires-a-index-col-parameter/62821858#62821858


Access data files directly
^^^^^^^^^^^^^^^^^^^^^^^^^^

Some database systems such as Apache Hive store their data in a location
and format that may be directly accessible to Dask, such as parquet files
on S3 or HDFS. In cases where your SQL query would read whole datasets and pass
them to Dask, the streaming of data from the database is very likely the
bottleneck, and it's probably faster to read the source data files directly.

Query pushdown?
---------------

If you define a query based on a database table, then only use some columns
of the output, you may expect that Dask is able to tell the database server
to only send some of the table's data. Dask is not currently able to
do this "pushdown" optimisation, and you would need to change your query using
the SQL expression syntax.
We may be able to resolve this in the future (:issue:`6388`).

If the divisions on your dataframe are well defined, then selections on the
index may successfully avoid reading irrelevant partitions.