1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
from __future__ import annotations
import os
import warnings
import numpy as np
from xarray.backends.common import (
BACKEND_ENTRYPOINTS,
AbstractDataStore,
BackendArray,
BackendEntrypoint,
_normalize_path,
)
from xarray.backends.locks import SerializableLock, ensure_lock
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.utils import Frozen, FrozenDict, close_on_error, module_available
from xarray.core.variable import Variable
# FIXME: Add a dedicated lock, even if ecCodes is supposed to be thread-safe
# in most circumstances. See:
# https://confluence.ecmwf.int/display/ECC/Frequently+Asked+Questions
ECCODES_LOCK = SerializableLock()
class CfGribArrayWrapper(BackendArray):
def __init__(self, datastore, array):
self.datastore = datastore
self.shape = array.shape
self.dtype = array.dtype
self.array = array
def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
key, self.shape, indexing.IndexingSupport.BASIC, self._getitem
)
def _getitem(self, key):
with self.datastore.lock:
return self.array[key]
class CfGribDataStore(AbstractDataStore):
"""
Implements the ``xr.AbstractDataStore`` read-only API for a GRIB file.
"""
def __init__(self, filename, lock=None, **backend_kwargs):
try:
import cfgrib
# cfgrib throws a RuntimeError if eccodes is not installed
except (ImportError, RuntimeError) as err:
warnings.warn(
"Failed to load cfgrib - most likely there is a problem accessing the ecCodes library. "
"Try `import cfgrib` to get the full error message"
)
raise err
if lock is None:
lock = ECCODES_LOCK
self.lock = ensure_lock(lock)
self.ds = cfgrib.open_file(filename, **backend_kwargs)
def open_store_variable(self, name, var):
if isinstance(var.data, np.ndarray):
data = var.data
else:
wrapped_array = CfGribArrayWrapper(self, var.data)
data = indexing.LazilyIndexedArray(wrapped_array)
encoding = self.ds.encoding.copy()
encoding["original_shape"] = var.data.shape
return Variable(var.dimensions, data, var.attributes, encoding)
def get_variables(self):
return FrozenDict(
(k, self.open_store_variable(k, v)) for k, v in self.ds.variables.items()
)
def get_attrs(self):
return Frozen(self.ds.attributes)
def get_dimensions(self):
return Frozen(self.ds.dimensions)
def get_encoding(self):
dims = self.get_dimensions()
return {"unlimited_dims": {k for k, v in dims.items() if v is None}}
class CfgribfBackendEntrypoint(BackendEntrypoint):
available = module_available("cfgrib")
def guess_can_open(self, filename_or_obj):
try:
_, ext = os.path.splitext(filename_or_obj)
except TypeError:
return False
return ext in {".grib", ".grib2", ".grb", ".grb2"}
def open_dataset(
self,
filename_or_obj,
*,
mask_and_scale=True,
decode_times=True,
concat_characters=True,
decode_coords=True,
drop_variables=None,
use_cftime=None,
decode_timedelta=None,
lock=None,
indexpath="{path}.{short_hash}.idx",
filter_by_keys={},
read_keys=[],
encode_cf=("parameter", "time", "geography", "vertical"),
squeeze=True,
time_dims=("time", "step"),
):
filename_or_obj = _normalize_path(filename_or_obj)
store = CfGribDataStore(
filename_or_obj,
indexpath=indexpath,
filter_by_keys=filter_by_keys,
read_keys=read_keys,
encode_cf=encode_cf,
squeeze=squeeze,
time_dims=time_dims,
lock=lock,
)
store_entrypoint = StoreBackendEntrypoint()
with close_on_error(store):
ds = store_entrypoint.open_dataset(
store,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
concat_characters=concat_characters,
decode_coords=decode_coords,
drop_variables=drop_variables,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)
return ds
BACKEND_ENTRYPOINTS["cfgrib"] = CfgribfBackendEntrypoint
|