1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
|
Dask Dataframe and SQL
======================
SQL is a method for executing tabular computation on database servers.
Similar operations can be done on Dask Dataframes. Users commonly wish
to link the two together.
This document describes the connection between Dask and SQL-databases
and serves to clarify several of the questions that we commonly
receive from users.
.. contents::
:local:
:depth: 1
:backlinks: top
Does Dask implement SQL?
------------------------
The short answer is "no". Dask has no parser or query planner for SQL
queries. However, the Pandas API, which is largely identical for
Dask Dataframes, has many analogues to SQL operations. A good
description for mapping SQL onto Pandas syntax can be found in the
`pandas docs`_.
.. _pandas docs: https://pandas.pydata.org/docs/getting_started/comparison/comparison_with_sql.html
The following packages may be of interest:
- `dask-sql`_ adds a SQL query engine on top of Dask.
In addition to working on CPU, it offers experimental support for CUDA-enabled GPUs through RAPIDS libraries such as `cuDF`_.
- `FugueSQL`_ provides a unified interface to run SQL code on a variety of different computing frameworks.
Specifying ``DaskExecutionEngine`` or ``DaskSQLExecutionEngine`` as the execution engine for queries allows them to be computed using Dask or dask-sql, respectively.
.. _dask-sql: https://dask-sql.readthedocs.io/en/latest/
.. _cuDF: https://docs.rapids.ai/api/cudf/stable/
.. _FugueSQL: https://fugue-tutorials.readthedocs.io/en/latest/tutorials/fugue_sql/index.html
Database or Dask?
-----------------
A database server is able to process tabular data and produce results just like
Dask Dataframe. Why would you choose to use one over the other?
These days a database server can be a sharded/distributed system, capable of
handling tables with millions of rows. Most database implementations are
geared towards row-wise retrieval and (atomic) updates of small subset of a
table. Configuring a database to be fast for a particular
sort of query can be challenging, but assuming all your data is already in the
database, it may well be the best solution - particularly if you understand
something about SQL query plan optimisation. A SQL implementation can
very efficiently analyse a query to only extract a small part of a table
for consideration, when the rest is excluded by conditionals.
Dask is much more flexible than a database, and designed explicitly
to work with larger-than-memory datasets, in parallel, and potentially distributed
across a cluster. If your workflow is not well suited to SQL, use dask. If
your database server struggles with volume, dask may do better. It
would be best to profile your queries
(and keep in mind other users of the resources!). If you need
to combine data from different sources, dask may be your best option.
You may find the dask API easier to use than writing SQL (if you
are already used to Pandas), and the diagnostic feedback more useful.
These points can debatably be in Dask's favour.
Loading from SQL with read_sql_table or read_sql_query
------------------------------------------------------
Dask allows you to build dataframes from SQL tables and queries using the
function :func:`dask.dataframe.read_sql_table` and :func:`dask.dataframe.read_sql_query`,
based on the `Pandas version`_, sharing most arguments, and using SQLAlchemy
for the actual handling of the queries. You may need to install additional
driver packages for your chosen database server.
.. _Pandas version: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_table.html
Since Dask is designed to work with larger-than-memory datasets, or be distributed
on a cluster, the following are the main differences versus Pandas to watch out for
- Dask does not support arbitrary text queries, only whole tables and SQLAlchemy
`sql expressions`_
- the con argument must be a `URI string`_, not an SQLAlchemy engine/connection
- partitioning information is *required*, which can be as simple as providing
an index column argument, or can be more explicit (see below)
- the chunksize argument is not used, since the partitioning must be via an
index column
.. _URI string: https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls
.. _sql expressions: https://docs.sqlalchemy.org/en/13/core/tutorial.html
If you need something more flexible than this, or the
method fails for you (e.g., on type inference), then skip to the next section.
Why the differences
^^^^^^^^^^^^^^^^^^^
Dask is intended to make processing large volumes of data possible, including
potentially distributing that processing across a cluster. For the retrieval of
data from SQL servers, this means that the query must be partitionable: that
each partition can be fetched independently of others and not dependent on
some global state, and that the definitions of the tasks must be serialisable,
i.e., can be represented as a stream of bytes communicated to workers.
The constraints mean that we cannot directly accept SQLAlchemy engines
or connection objects, since they have internal state (buffers, etc.)
that cannot be serialised. A `URI string`_ must be used, which can be
recreated into a fresh engine on the workers.
Similarly, we cannot accommodate chunked queries
which rely on the internal state of a database cursor; nor LIMIT/OFFSET
queries, which are not guaranteed to be repeatable, and involve scanning
the whole query on th server (which is very inefficient).
**If** your data is small enough not to require Dask's out-of-core and/or
distributed capabilities, then you are probably better to use Pandas or SQLAlchemy
directly.
Index Column
^^^^^^^^^^^^
We need a way to turn a single main query into sub-queries for each
partition. For most reasonable database tables, there should be an obvious
column which can be used for partitioning - it is probably numeric,
and should certainly be indexed in the database. The latter condition
is important, since many simultaneous queries will hit your server once
Dask starts to compute.
By providing just a column name for the index argument, you imply that the
column is numeric, and Dask guesses a reasonable partitioning by evenly
splitting the space between minimum and maximum values into ``npartitions``
intervals. You can also provide the max/min that you would like to
consider so that Dask doesn't need to query for these. Alternatively,
you can have Dask fetch the first few row (5 by default) and use
them to guess the typical bytes/row, and base the partitioning size on
this. Needless to say, the results will vary a lot for tables that are
not uncommonly homogeneous.
Specific partitioning
^^^^^^^^^^^^^^^^^^^^^
In some cases, you may have a very good idea of how to partition the data,
for example based on a column that has a finite number of unique values
or categories. This enables using string columns, or anything with a
natural ordering, for the index column, not only numerical types.
In this case, you would provide a specific set of ``divisions``,
the start/end values of the index column for each partition. For example,
if a column happened to contain a random ID in hex string format, then you
could specify 16 partitions with
.. code-block:: python
df = read_sql_table("mytable", divisions=list("0123456789abcdefh"),
index_col="hexID")
so the first partition would have IDs with values ``"0" <= hexID < "1"``, i.e.,
leading character "0".
SQLAlchemy expressions
^^^^^^^^^^^^^^^^^^^^^^
Since we only send the database connection URI and not the engine object,
we cannot rely on SQLAlchemy's table class inference and ORM to conduct queries. However, we can
use the "select" `sql expressions`_, which only get formatted into a text query at
the point of execution.
.. code-block:: python
from sqlalchemy import sql
number = sql.column("number")
name = sql.column("name")
s1 = sql.select([
number, name, sql.func.length(name).label("lenname")
]
).select_from(sql.table("test"))
data = read_sql_query(
s1, db, npartitions=2, index_col=number
)
Here we have also demonstrated the use of the function ``length`` to
perform an operation server-side. Note that it is necessary to *label* such
operations, but you can use them for the index column,
so long as it is also
in the set of selected columns. If using for the index/partitioning, the
column should still be indexed in the database, for performance.
One of the most important functions to consider is ``cast`` to specify the
output data type or conversion in the database, if pandas is having
trouble inferring the data type.
You should be warned, that SQLAlchemy expressions take some time to get
used to, and you can practice with Pandas first, reading only the first small
chunk of a query, until things look right. You can find a more complete
object-oriented example in `this gist`_
.. _this gist: https://gist.github.com/quasiben/08a7f291039db2b04c2e28e1a6c21e3b
Load from SQL, manual approaches
--------------------------------
If ``read_sql_table`` is not sufficient for your needs, you can try one of
the following methods.
From Map functions
^^^^^^^^^^^^^^^^^^
Often you know more about your data and server than the generic approach above
allows. Indeed, some database-like servers may simply not be supported by
SQLAlchemy, or provide an alternate API which is better optimised.
If you already have a way to fetch data from the database in partitions,
then you can use :func:`dask.dataframe.from_map` and construct a
dataframe this way. It might look something like.
.. code-block:: python
import dask.dataframe as dd
def fetch_partition(part):
conn = establish_connection()
df = fetch_query(base_query.format(part))
return df.astype(known_types)
ddf = dd.from_map(fetch_partition,
parts,
meta=known_types,
divisions=div_from_parts(parts))
Where you must provide your own functions for setting up a connection to the server,
your own query, and a way to format that query to be specific to each partition.
For example, you might have ranges or specific unique values with a WHERE
clause. The ``known_types`` here is used to transform the dataframe partition and provide
a ``meta``, to help for consistency and avoid Dask having to analyse one partition
up front to guess the columns/types; you may also want to explicitly set the index.
Stream via client
^^^^^^^^^^^^^^^^^
In some cases, the workers may not have access to data, but the client does;
or the initial loading time of the data is not important, so long as the
dataset is then held in cluster memory and available for dask-dataframe
queries. It is possible to construct the dataframe by uploading chunks of
data from the client:
See a complete example of how to do this `here`_
.. _here: https://stackoverflow.com/questions/62818473/why-dasks-read-sql-table-requires-a-index-col-parameter/62821858#62821858
Access data files directly
^^^^^^^^^^^^^^^^^^^^^^^^^^
Some database systems such as Apache Hive store their data in a location
and format that may be directly accessible to Dask, such as parquet files
on S3 or HDFS. In cases where your SQL query would read whole datasets and pass
them to Dask, the streaming of data from the database is very likely the
bottleneck, and it's probably faster to read the source data files directly.
Query pushdown?
---------------
If you define a query based on a database table, then only use some columns
of the output, you may expect that Dask is able to tell the database server
to only send some of the table's data. Dask is not currently able to
do this "pushdown" optimisation, and you would need to change your query using
the SQL expression syntax.
We may be able to resolve this in the future (:issue:`6388`).
If the divisions on your dataframe are well defined, then selections on the
index may successfully avoid reading irrelevant partitions.
|