File: _modis_utils.pyx

package info (click to toggle)
python-geotiepoints 1.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,876 kB
  • sloc: python: 3,148; makefile: 111; sh: 15
file content (200 lines) | stat: -rw-r--r-- 7,219 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# cython: language_level=3, boundscheck=False, cdivision=True, wraparound=False, initializedcheck=False, nonecheck=False
from functools import wraps

cimport cython
cimport numpy as np
from libc.math cimport asin, sin, cos, sqrt, acos, M_PI
import numpy as np

np.import_array()

try:
    import dask.array as da
except ImportError:
    # if dask can't be imported then we aren't going to be given dask arrays
    da = None

try:
    import xarray as xr
except ImportError:
    xr = None


DEF EARTH_RADIUS = 6370997.0


cdef void lonlat2xyz(
        floating[:, ::1] lons,
        floating[:, ::1] lats,
        floating[:, :, ::1] xyz,
) noexcept nogil:
    """Convert lons and lats to cartesian coordinates."""
    cdef Py_ssize_t i, j, k
    cdef floating lon_rad, lat_rad
    for i in range(lons.shape[0]):
        for j in range(lons.shape[1]):
            lon_rad = deg2rad(lons[i, j])
            lat_rad = deg2rad(lats[i, j])
            xyz[i, j, 0] = EARTH_RADIUS * cos(lat_rad) * cos(lon_rad)
            xyz[i, j, 1] = EARTH_RADIUS * cos(lat_rad) * sin(lon_rad)
            xyz[i, j, 2] = EARTH_RADIUS * sin(lat_rad)


cdef void xyz2lonlat(
        floating[:, :, ::1] xyz,
        floating[:, ::1] lons,
        floating[:, ::1] lats,
        bint low_lat_z=True,
        floating thr=0.8) noexcept nogil:
    """Get longitudes from cartesian coordinates."""
    cdef Py_ssize_t i, j
    cdef np.float64_t x, y, z
    for i in range(xyz.shape[0]):
        for j in range(xyz.shape[1]):
            # 64-bit precision matters apparently
            x = <np.float64_t>xyz[i, j, 0]
            y = <np.float64_t>xyz[i, j, 1]
            z = <np.float64_t>xyz[i, j, 2]
            lons[i, j] = rad2deg(acos(x / sqrt(x ** 2 + y ** 2))) * _sign(y)
            # if we are at low latitudes - small z, then get the
            # latitudes only from z. If we are at high latitudes (close to the poles)
            # then derive the latitude using x and y:
            if low_lat_z and (z < thr * EARTH_RADIUS) and (z > -1.0 * thr * EARTH_RADIUS):
                lats[i, j] = 90 - rad2deg(acos(z / EARTH_RADIUS))
            else:
                lats[i, j] = _sign(z) * (90 - rad2deg(asin(sqrt(x ** 2 + y ** 2) / EARTH_RADIUS)))


cdef inline int _sign(floating x) noexcept nogil:
    return 1 if x > 0 else (-1 if x < 0 else 0)


cdef inline floating rad2deg(floating x) noexcept nogil:
    return x * (180.0 / M_PI)


cdef inline floating deg2rad(floating x) noexcept nogil:
    return x * (M_PI / 180.0)


def rows_per_scan_for_resolution(res):
    return {
        5000: 2,
        1000: 10,
        500: 20,
        250: 40,
    }[res]


def scanline_mapblocks(func):
    """Convert dask array inputs to appropriate map_blocks calls.

    This function, applied as a decorator, will call the wrapped function
    using dask's ``map_blocks``. It will rechunk dask array inputs when
    necessary to make sure that the input chunks are entire scanlines to
    avoid incorrect interpolation.

    """
    @wraps(func)
    def _wrapper(*args, coarse_resolution=None, fine_resolution=None, **kwargs):
        if coarse_resolution is None or fine_resolution is None:
            raise ValueError("'coarse_resolution' and 'fine_resolution' are required keyword arguments.")
        first_arr = [arr for arr in args if hasattr(arr, "ndim")][0]
        if first_arr.ndim != 2 or first_arr.ndim != 2:
            raise ValueError("Expected 2D input arrays.")
        if hasattr(first_arr, "compute"):
            # assume it is dask or xarray with dask, ensure proper chunk size
            # if DataArray get just the dask array
            dask_args = _extract_dask_arrays_from_args(args)
            rows_per_scan = rows_per_scan_for_resolution(coarse_resolution)
            rechunked_args = _rechunk_dask_arrays_if_needed(dask_args, rows_per_scan)
            results = _call_map_blocks_interp(
                func,
                coarse_resolution,
                fine_resolution,
                *rechunked_args,
                **kwargs
            )
            if hasattr(first_arr, "dims"):
                # recreate DataArrays
                results = _results_to_data_arrays(first_arr.dims, *results)
            return results
        return func(
            *args,
            coarse_resolution=coarse_resolution,
            fine_resolution=fine_resolution,
            **kwargs
        )

    return _wrapper


def _extract_dask_arrays_from_args(args):
    return [arr_obj.data if hasattr(arr_obj, "dims") else arr_obj for arr_obj in args]


def _call_map_blocks_interp(func, coarse_resolution, fine_resolution, *args, **kwargs):
    first_arr = [arr for arr in args if hasattr(arr, "ndim")][0]
    res_factor = coarse_resolution // fine_resolution
    new_row_chunks = tuple(x * res_factor for x in first_arr.chunks[0])
    fine_pixels_per_1km = {250: 4, 500: 2, 1000: 1}[fine_resolution]
    fine_scan_width = 1354 * fine_pixels_per_1km
    new_col_chunks = (fine_scan_width,)
    wrapped_func = _map_blocks_handler(func)
    res = da.map_blocks(wrapped_func, *args,
                        coarse_resolution=coarse_resolution,
                        fine_resolution=fine_resolution,
                        **kwargs,
                        new_axis=[0],
                        chunks=(2, new_row_chunks, new_col_chunks),
                        dtype=first_arr.dtype,
                        meta=np.empty((2, 2, 2), dtype=first_arr.dtype))
    return tuple(res[idx] for idx in range(res.shape[0]))


def _results_to_data_arrays(dims, *results):
    new_results = []
    for result in results:
        if not isinstance(result, da.Array):
            continue
        data_arr = xr.DataArray(result, dims=dims)
        new_results.append(data_arr)
    return new_results


def _rechunk_dask_arrays_if_needed(args, rows_per_scan: int):
    # take current chunk size and get a relatively similar chunk size
    first_arr = [arr for arr in args if hasattr(arr, "ndim")][0]
    row_chunks = first_arr.chunks[0]
    col_chunks = first_arr.chunks[1]
    num_rows = first_arr.shape[0]
    num_cols = first_arr.shape[1]
    good_row_chunks = all(x % rows_per_scan == 0 for x in row_chunks)
    good_col_chunks = len(col_chunks) == 1 and col_chunks[0] != num_cols
    all_orig_chunks = [arr.chunks for arr in args if hasattr(arr, "chunks")]

    if num_rows % rows_per_scan != 0:
        raise ValueError("Input longitude/latitude data does not consist of "
                         "whole scans (10 rows per scan).")
    all_same_chunks = all(
        all_orig_chunks[0] == some_chunks
        for some_chunks in all_orig_chunks[1:]
    )
    if good_row_chunks and good_col_chunks and all_same_chunks:
        return args

    new_row_chunks = (row_chunks[0] // rows_per_scan) * rows_per_scan
    new_args = [arr.rechunk((new_row_chunks, -1)) if hasattr(arr, "chunks") else arr for arr in args]
    return new_args


def _map_blocks_handler(func):
    @wraps(func)
    def _map_blocks_wrapper(*args, **kwargs):
        results = func(*args, **kwargs)
        return np.concatenate(
            tuple(result[np.newaxis] for result in results),
            axis=0)
    return _map_blocks_wrapper