1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
import numpy as np
from ..core import indexing
from ..core.utils import Frozen, FrozenDict
from ..core.variable import Variable
from .common import AbstractDataStore, BackendArray
from .file_manager import CachingFileManager
from .locks import HDF5_LOCK, NETCDFC_LOCK, combine_locks, ensure_lock
# psuedonetcdf can invoke netCDF libraries internally
PNETCDF_LOCK = combine_locks([HDF5_LOCK, NETCDFC_LOCK])
class PncArrayWrapper(BackendArray):
def __init__(self, variable_name, datastore):
self.datastore = datastore
self.variable_name = variable_name
array = self.get_array()
self.shape = array.shape
self.dtype = np.dtype(array.dtype)
def get_array(self, needs_lock=True):
ds = self.datastore._manager.acquire(needs_lock)
return ds.variables[self.variable_name]
def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
)
def _getitem(self, key):
with self.datastore.lock:
array = self.get_array(needs_lock=False)
return array[key]
class PseudoNetCDFDataStore(AbstractDataStore):
"""Store for accessing datasets via PseudoNetCDF"""
@classmethod
def open(cls, filename, lock=None, mode=None, **format_kwargs):
from PseudoNetCDF import pncopen
keywords = {"kwargs": format_kwargs}
# only include mode if explicitly passed
if mode is not None:
keywords["mode"] = mode
if lock is None:
lock = PNETCDF_LOCK
manager = CachingFileManager(pncopen, filename, lock=lock, **keywords)
return cls(manager, lock)
def __init__(self, manager, lock=None):
self._manager = manager
self.lock = ensure_lock(lock)
@property
def ds(self):
return self._manager.acquire()
def open_store_variable(self, name, var):
data = indexing.LazilyOuterIndexedArray(PncArrayWrapper(name, self))
attrs = {k: getattr(var, k) for k in var.ncattrs()}
return Variable(var.dimensions, data, attrs)
def get_variables(self):
return FrozenDict(
(k, self.open_store_variable(k, v)) for k, v in self.ds.variables.items()
)
def get_attrs(self):
return Frozen({k: getattr(self.ds, k) for k in self.ds.ncattrs()})
def get_dimensions(self):
return Frozen(self.ds.dimensions)
def get_encoding(self):
return {
"unlimited_dims": {
k for k in self.ds.dimensions if self.ds.dimensions[k].isunlimited()
}
}
def close(self):
self._manager.close()
|