1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
import numpy as np
from ..core import indexing
from ..core.utils import Frozen, FrozenDict
from ..core.variable import Variable
from .common import AbstractDataStore, BackendArray
from .file_manager import CachingFileManager
from .locks import HDF5_LOCK, NETCDFC_LOCK, SerializableLock, combine_locks, ensure_lock
# PyNIO can invoke netCDF libraries internally
# Add a dedicated lock just in case NCL as well isn't thread-safe.
NCL_LOCK = SerializableLock()
PYNIO_LOCK = combine_locks([HDF5_LOCK, NETCDFC_LOCK, NCL_LOCK])
class NioArrayWrapper(BackendArray):
def __init__(self, variable_name, datastore):
self.datastore = datastore
self.variable_name = variable_name
array = self.get_array()
self.shape = array.shape
self.dtype = np.dtype(array.typecode())
def get_array(self, needs_lock=True):
ds = self.datastore._manager.acquire(needs_lock)
return ds.variables[self.variable_name]
def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
key, self.shape, indexing.IndexingSupport.BASIC, self._getitem
)
def _getitem(self, key):
with self.datastore.lock:
array = self.get_array(needs_lock=False)
if key == () and self.ndim == 0:
return array.get_value()
return array[key]
class NioDataStore(AbstractDataStore):
"""Store for accessing datasets via PyNIO"""
def __init__(self, filename, mode="r", lock=None, **kwargs):
import Nio
if lock is None:
lock = PYNIO_LOCK
self.lock = ensure_lock(lock)
self._manager = CachingFileManager(
Nio.open_file, filename, lock=lock, mode=mode, kwargs=kwargs
)
# xarray provides its own support for FillValue,
# so turn off PyNIO's support for the same.
self.ds.set_option("MaskedArrayMode", "MaskedNever")
@property
def ds(self):
return self._manager.acquire()
def open_store_variable(self, name, var):
data = indexing.LazilyOuterIndexedArray(NioArrayWrapper(name, self))
return Variable(var.dimensions, data, var.attributes)
def get_variables(self):
return FrozenDict(
(k, self.open_store_variable(k, v)) for k, v in self.ds.variables.items()
)
def get_attrs(self):
return Frozen(self.ds.attributes)
def get_dimensions(self):
return Frozen(self.ds.dimensions)
def get_encoding(self):
return {
"unlimited_dims": {k for k in self.ds.dimensions if self.ds.unlimited(k)}
}
def close(self):
self._manager.close()
|