File: concurrency.rst

package info (click to toggle)
rasterio 1.4.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 22,744 kB
  • sloc: python: 22,881; sh: 795; makefile: 275; xml: 29
file content (176 lines) | stat: -rw-r--r-- 6,641 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
Concurrent processing
=====================

Rasterio affords concurrent processing of raster data. Python's global
interpreter lock (GIL) is released when calling GDAL's ``GDALRasterIO()``
function, which means that Python threads can read and write concurrently.

The Numpy library also often releases the GIL, e.g., in applying
universal functions to arrays, and this makes it possible to distribute
processing of an array across cores of a processor.

This means that it is possible to parallelize tasks that need to be performed
for a set of windows/pixels in the raster. Reading, writing and processing can
always be done concurrently. But it depends on the hardware and where the
bottlenecks are, how much of a speedup can be obtained. In the case that the
processing function releases the GIL, multiple threads processing
simultaneously can lead to further speedups.

.. note::
    If you wish to do multiprocessing that is not trivially parallelizable
    across very large images that do not fit in memory, or if you wish to
    do multiprocessing across multiple machines. You might want to have a
    look at `dask <https://dask.org/>`__ and in particular this
    `example <https://examples.dask.org/applications/satellite-imagery-geotiff.html>`__.

The Cython function below, included in Rasterio's ``_example`` module,
simulates a GIL-releasing CPU-intensive raster processing function. You can
also easily create GIL-releasing functions by using `numba <https://numba.pydata.org/>`__

.. code-block:: python

    # cython: boundscheck=False

    import numpy as np


    def compute(unsigned char[:, :, :] input):
        """reverses bands inefficiently

        Given input and output uint8 arrays, fakes an CPU-intensive
        computation.
        """
        cdef int I, J, K
        cdef int i, j, k, l
        cdef double val
        I = input.shape[0]
        J = input.shape[1]
        K = input.shape[2]
        output = np.empty((I, J, K), dtype='uint8')
        cdef unsigned char[:, :, :] output_view = output
        with nogil:
            for i in range(I):
                for j in range(J):
                    for k in range(K):
                        val = <double>input[i, j, k]
                        for l in range(2000):
                            val += 1.0
                        val -= 2000.0
                        output_view[~i, j, k] = <unsigned char>val
        return output

Here is the program in examples/thread_pool_executor.py. It is set up in such
a way that at most 1 thread is reading and at most 1 thread is writing at the
same time. Processing is not protected by a lock and can be done by multiple
threads simultaneously.

.. code-block:: python

    """thread_pool_executor.py

    Operate on a raster dataset window-by-window using a ThreadPoolExecutor.

    Simulates a CPU-bound thread situation where multiple threads can improve
    performance.

    With -j 4, the program returns in about 1/4 the time as with -j 1.
    """

    import concurrent.futures
    import threading

    import rasterio
    from rasterio._example import compute


    def main(infile, outfile, num_workers=4):
        """Process infile block-by-block and write to a new file

        The output is the same as the input, but with band order
        reversed.
        """

        with rasterio.open(infile) as src:

            # Create a destination dataset based on source params. The
            # destination will be tiled, and we'll process the tiles
            # concurrently.
            profile = src.profile
            profile.update(blockxsize=128, blockysize=128, tiled=True)

            with rasterio.open(outfile, "w", **src.profile) as dst:
                windows = [window for ij, window in dst.block_windows()]

                # We cannot write to the same file from multiple threads
                # without causing race conditions. To safely read/write
                # from multiple threads, we use a lock to protect the
                # DatasetReader/Writer
                read_lock = threading.Lock()
                write_lock = threading.Lock()

                def process(window):
                    with read_lock:
                        src_array = src.read(window=window)

                    # The computation can be performed concurrently
                    result = compute(src_array)

                    with write_lock:
                        dst.write(result, window=window)

                # We map the process() function over the list of
                # windows.
                with concurrent.futures.ThreadPoolExecutor(
                    max_workers=num_workers
                ) as executor:
                    executor.map(process, windows)

The code above simulates a CPU-intensive calculation that runs faster when
spread over multiple cores using  :class:`concurrent.futures.ThreadPoolExecutor`
compared to the case of one concurrent job (``-j 1``),

.. code-block:: console

   $ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 1

   real    0m4.277s
   user    0m4.356s
   sys     0m0.184s

we get over 3x speed up with four concurrent jobs.

.. code-block:: console

   $ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 4

   real    0m1.251s
   user    0m4.402s
   sys     0m0.168s

If the function that you'd like to map over raster windows doesn't release the
GIL, you unfortunately cannot simply replace
:class:`~concurrent.futures.ThreadPoolExecutor` with
:class:`~concurrent.futures.ProcessPoolExecutor`.
:class:`.DatasetReader`/:class:`.DatasetWriter` cannot be shared by multiple
processes, which means that each process needs to open the file separately.
You can do all the reading and writing from the main thread, as shown in this
next example, at the expense of serializing arrays.

.. code-block:: python

    arrays = [src.read(window=window) for window in windows]

    with concurrent.futures.ProcessPoolExecutor(
        max_workers=num_workers
    ) as executor:
        futures = executor.map(compute, arrays)
        for window, result in zip(windows, futures):
            dst.write(result, window=window)

.. attention::
   Deadlocks are easy to produce if we fork after GDAL drivers have been
   registered. The only safe time to fork is before entering
   a `rasterio.env.Env` context. Python's "spawn" mode is the safest option.
   It's the default for Windows and macOS, but won't be the default for other
   POSIX platforms until Python 3.14. Python 3.12 warns if we fork a process
   when multiple threads are detected.