1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
|
Advanced Topics
===============
Using Dask
----------
Scanning and combining datasets can be computationally intensive and may
require a lot of bandwidth for some data formats. Where the target data
contains many input files, it makes sense to parallelise the job with
dask and maybe distribute the workload on a cluster to get additional
CPUs and network performance.
Simple parallel
~~~~~~~~~~~~~~~
The simplest case is for processing many individual files in parallel.
Let's say you have a list of input files; you will need to encapsulate
the processing on each in a single function. In this mode, it is typical
to save the single-file outputs to files, although returning them is OK
too (especially if you mean to combine them immediately).
Here is an example for HDF5 files. The caller should make sure the
storage options and any parameters needed for the transformer are in place.
.. code-block:: python
import ujson, fsspec, dask
def process(url, outputfile, storage_options_in={}, storage_options_out={}):
transformer = kerchunk.hdf.SingleHdf5ToZarr(url, **storage_options_in)
refs = transformer.translate()
with fsspec.open(outputfile, mode="wt", **storage_options_out) as f:
ujson.dump(refs, f)
tasks = [dask.delayed(process)(u, o)
for u, o in zip(infilenames, outfilenames)]
dask.compute(tasks)
Tree reduction
~~~~~~~~~~~~~~
In some cases, the combine process can itself be slow or memory hungry.
In such cases, it is useful to combine the single-file reference sets in
batches (which reduce a lot of redundancy between them) and then
combine the results of the batches. This technique is known as tree
reduction. An example of doing this by hand can be seen `here`_.
.. _here: https://gist.github.com/peterm790/5f901453ed7ac75ac28ed21a7138dcf8
We also provide :func:`kerchunk.combine.auto_dask` as a convenience. This
function is a one-stop call to process the individual inputs, combine
them in batches, and then combine the results of those batches into a
final combined references set.
The ``auto_dask`` function takes a number of dicts as arguments, and users
should consult the docstrings of the specific class which decodes the
input files, and also of :class:`kerchunk.combine.MultiZarrToZarr`. Note that
any "preprocessing" for ``MultiZarrToZarr`` will be performed *before* the
batch stage, and any "postprocessing" only *after* the final combine.
Archive Files
-------------
It is often convenient to distribute datasets by wrapping multiple files
into an archive, ZIP or TAR. If those files are of formats supported by
``kerchunk``, they can be directly scanned with something like
.. code-block:: python
transformer = kerchunk.netCDF3.NetCDF3ToZarr(
"tar://myfile.nc::file://archive.tar",
inline_threshold=0
)
out = transformer.translate()
where "myfile.nc" is a member file of the local archive.
.. note::
We have turned off inlining here (it can be done
later with :func:`kerchunk.utils.do_inline`; support for this
will come later.
At this point, the
generated references will contain URLs "tar://myfile.nc::file://archive.tar",
which are problematic for loading, so we can transform them to point to
ranges in the original tar file instead, and then transform back to
nominal form, ready to use. We may automate these steps in the future.
.. code-block:: python
out2 = kerchunk.utils.dereference_archives(out)
# optional out2 = kerchunk.utils.do_inline(out2, 100)
final = kerchunk.utils.consolidate(out2)
Now the references are all "file://archive.tar", and the reference set
can be used directly or in combining.
.. warning::
For ZIP archives, only uncompressed members can be accessed this way
Parquet Storage
---------------
JSON is very convenient as a storage format for references, because it is
simple, human-readable and ubiquitously supported. However, it is not the most
efficient in terms of storage size of parsing speed. For python, in particular,
it comes with the added downside of repeated strings becoming separate python
string instances, greatly inflating memory footprint at load time.
To overcome these problems, and in particular keep down the memory use for the
end-user of kerchunked data, we can convert references to be stored in parquet,
and use them with ``fsspec.implementations.reference.ReferenceFileSystem``,
an alternative new implementation designed to work only with parquet input.
The principle benefits of the parquet path are:
- much more compact storage, typically 2x smaller than compressed JSON or 10x
smaller than uncompressed
- correspondingly faster instantiation of a filesystem, since much of that time
is taken by loading in the bytes of the references
- smaller in-memory size (e.g., a python int requires 28 bytes, but an int in
an array needs 4 or 8.
- optional lazy loading, by partitioning the references into files by key; only
the variables you actually access need to have their references loaded
- optional dictionary encoding of URLs in the case that there are may repeated
URLs (many references per target file). In this format, each unique URL is only
stored in memory once.
The only access point to the new parquet storage is
:func:`kerchunk.df.refs_to_dataframe`, which transforms an existing kerchunk
reference set (in memory as dicts or in a JSON file) to parquet. Careful reading
of the docstring is recommended, to understand the options. More options may
be added.
.. note::
For now, :class:`kerchunk.combine.MultiZarrToZarr` only operates on JSON/dict
input. Therefore, ``refs_to_dataframe`` can only be used on the final output
reference set. For a very large merge of many/large inputs, this may mean
that the combine step requires a lot of memory, as will converting the
output to parquet. However, the end-user should be able to access data via
these references with much smaller memory requirements.
A concrete workflow may be something like the following. Note that
:func:`kerchunk.combine.auto_dask` can execute the first three stages in
one go and may be faster, if you have a Dask cluster available.
.. code-block:: python
from kerchunk import hdf, combine, df
import fsspec.implementations.reference
from fsspec.implementations.reference import LazyReferenceMapper
from tempfile import TemporaryDirectory
import xarray as xr
files = fsspec.open(location_of_data)
# Create LazyReferenceMapper to pass to MultiZarrToZarr
fs = fsspec.filesystem("file")
os.makedirs("combined.parq")
out = LazyReferenceMapper.create(record_size=1000, root="combined.parq", fs=fs)
# Create references from input files
single_ref_sets = [hdf.SingleHdf5ToZarr(_).translate() for _ in files]
out_dict = MultiZarrToZarr(
single_ref_sets,
remote_protocol="s3",
concat_dims=["time"],
remote_options={"anon": True},
out=out
).translate()
out.flush()
df.refs_to_dataframe(out_dict, "combined.parq")
fs = fsspec.implementations.reference.ReferenceFileSystem(
"combined.parq", remote_protocol="s3", target_protocol="file", lazy=True)
ds = xr.open_dataset(
fs.get_mapper(), engine="zarr",
backend_kwargs={"consolidated": False}
)
At this point, xarray has loaded the metadata and coordinates only, so the
main reference files corresponding to the data variables have not been touched.
Even for a very large reference set, the memory use at this point should be <500MB.
As you access the variables of ``ds``, they will be loaded on demand and cached.
If using ``dask``, workers will also only load those references they need.
|