1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
import numpy as np
from .. import conventions
from ..core import indexing
from ..core.dataset import Dataset
from ..core.utils import Frozen, FrozenDict, close_on_error
from ..core.variable import Variable
from .common import AbstractDataStore, BackendArray
from .locks import SerializableLock, ensure_lock
# FIXME: Add a dedicated lock, even if ecCodes is supposed to be thread-safe
# in most circumstances. See:
# https://confluence.ecmwf.int/display/ECC/Frequently+Asked+Questions
ECCODES_LOCK = SerializableLock()
class CfGribArrayWrapper(BackendArray):
def __init__(self, datastore, array):
self.datastore = datastore
self.shape = array.shape
self.dtype = array.dtype
self.array = array
def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
key, self.shape, indexing.IndexingSupport.OUTER, self._getitem
)
def _getitem(self, key):
with self.datastore.lock:
return self.array[key]
class CfGribDataStore(AbstractDataStore):
"""
Implements the ``xr.AbstractDataStore`` read-only API for a GRIB file.
"""
def __init__(self, filename, lock=None, **backend_kwargs):
import cfgrib
if lock is None:
lock = ECCODES_LOCK
self.lock = ensure_lock(lock)
self.ds = cfgrib.open_file(filename, **backend_kwargs)
def open_store_variable(self, name, var):
if isinstance(var.data, np.ndarray):
data = var.data
else:
wrapped_array = CfGribArrayWrapper(self, var.data)
data = indexing.LazilyOuterIndexedArray(wrapped_array)
encoding = self.ds.encoding.copy()
encoding["original_shape"] = var.data.shape
return Variable(var.dimensions, data, var.attributes, encoding)
def get_variables(self):
return FrozenDict(
(k, self.open_store_variable(k, v)) for k, v in self.ds.variables.items()
)
def get_attrs(self):
return Frozen(self.ds.attributes)
def get_dimensions(self):
return Frozen(self.ds.dimensions)
def get_encoding(self):
dims = self.get_dimensions()
encoding = {"unlimited_dims": {k for k, v in dims.items() if v is None}}
return encoding
def open_backend_dataset_cfgrib(
filename_or_obj,
*,
mask_and_scale=True,
decode_times=None,
concat_characters=None,
decode_coords=None,
drop_variables=None,
use_cftime=None,
decode_timedelta=None,
lock=None,
indexpath="{path}.{short_hash}.idx",
filter_by_keys={},
read_keys=[],
encode_cf=("parameter", "time", "geography", "vertical"),
squeeze=True,
time_dims=("time", "step"),
):
store = CfGribDataStore(
filename_or_obj,
indexpath=indexpath,
filter_by_keys=filter_by_keys,
read_keys=read_keys,
encode_cf=encode_cf,
squeeze=squeeze,
time_dims=time_dims,
lock=lock,
)
with close_on_error(store):
vars, attrs = store.load()
file_obj = store
encoding = store.get_encoding()
vars, attrs, coord_names = conventions.decode_cf_variables(
vars,
attrs,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
concat_characters=concat_characters,
decode_coords=decode_coords,
drop_variables=drop_variables,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)
ds = Dataset(vars, attrs=attrs)
ds = ds.set_coords(coord_names.intersection(vars))
ds._file_obj = file_obj
ds.encoding = encoding
return ds
|