1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
"""async-rasterio.py
Operate on a raster dataset window-by-window using asyncio's event loop
and thread executor.
Simulates a CPU-bound thread situation where multiple threads can improve
performance.
"""
import asyncio
import numpy as np
import rasterio
from rasterio._example import compute
def main(infile, outfile, with_threads=False):
with rasterio.Env():
# Open the source dataset.
with rasterio.open(infile) as src:
# Create a destination dataset based on source params. The
# destination will be tiled, and we'll "process" the tiles
# concurrently.
meta = src.meta
meta.update(blockxsize=256, blockysize=256, tiled='yes')
with rasterio.open(outfile, 'w', **meta) as dst:
loop = asyncio.get_event_loop()
# With the exception of the ``yield from`` statement,
# process_window() looks like callback-free synchronous
# code. With a coroutine, we can keep the read, compute,
# and write statements close together for
# maintainability. As in the concurrent-cpu-bound.py
# example, all of the speedup is provided by
# distributing raster computation across multiple
# threads. The difference here is that we're submitting
# jobs to the thread pool asynchronously.
@asyncio.coroutine
def process_window(window):
# Read a window of data.
data = src.read(window=window)
# We run the raster computation in a separate thread
# and pause until the computation finishes, letting
# other coroutines advance.
#
# The _example.compute function modifies no Python
# objects and releases the GIL. It can execute
# concurrently.
result = np.zeros(data.shape, dtype=data.dtype)
if with_threads:
yield from loop.run_in_executor(
None, compute, data, result)
else:
compute(data, result)
dst.write(result, window=window)
# Queue up the loop's tasks.
tasks = [asyncio.Task(process_window(window))
for ij, window in dst.block_windows(1)]
# Wait for all the tasks to finish, and close.
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description="Concurrent raster processing demo")
parser.add_argument(
'input',
metavar='INPUT',
help="Input file name")
parser.add_argument(
'output',
metavar='OUTPUT',
help="Output file name")
parser.add_argument(
'--with-workers',
action='store_true',
help="Run with a pool of worker threads")
args = parser.parse_args()
main(args.input, args.output, args.with_workers)
|