1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
from typing import TYPE_CHECKING, Any
from numpy.random.mtrand import uniform
from netCDF4 import Dataset
from netCDF4.utils import _quantize
from numpy.testing import assert_almost_equal
import os, tempfile, unittest
if TYPE_CHECKING:
from netCDF4 import CompressionLevel
else:
CompressionLevel = Any
ndim = 100000
ndim2 = 100
chunk1 = 10; chunk2 = ndim2
nfiles = 7
files = [tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name for nfile in range(nfiles)]
array = uniform(size=(ndim,))
array2 = uniform(size=(ndim,ndim2))
lsd = 3
def write_netcdf(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
chunksizes=None, complevel: CompressionLevel = 6, fletcher32=False):
file = Dataset(filename,'w')
file.createDimension('n', ndim)
foo = file.createVariable('data',\
dtype,('n'),zlib=zlib,least_significant_digit=least_significant_digit,\
shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
# use compression kwarg instead of deprecated zlib
if zlib:
compression='zlib'
else:
compression=None
# anything that evaluates to False is same as None
#compression=False
#compression=''
#compression=0
#compression='gzip' # should fail
foo2 = file.createVariable('data2',
dtype,('n'),compression=compression,least_significant_digit=least_significant_digit, # type: ignore # mypy doesn't like compression
shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
foo[:] = data
foo2[:] = data
file.close()
file = Dataset(filename)
data = file.variables['data'][:]
data2 = file.variables['data2'][:]
file.close()
def write_netcdf2(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
chunksizes=None, complevel: CompressionLevel = 6, fletcher32=False):
file = Dataset(filename,'w')
file.createDimension('n', ndim)
file.createDimension('n2', ndim2)
foo = file.createVariable('data2',\
dtype,('n','n2'),zlib=zlib,least_significant_digit=least_significant_digit,\
shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
foo[:] = data
file.close()
file = Dataset(filename)
data = file.variables['data2'][:]
file.close()
class CompressionTestCase(unittest.TestCase):
def setUp(self):
self.files = files
# no compression
write_netcdf(self.files[0],False,None,array)
# compressed, lossless, no shuffle.
write_netcdf(self.files[1],True,None,array)
# compressed, lossless, with shuffle.
write_netcdf(self.files[2],True,None,array,shuffle=True)
# compressed, lossy, no shuffle.
write_netcdf(self.files[3],True,lsd,array)
# compressed, lossy, with shuffle.
write_netcdf(self.files[4],True,lsd,array,shuffle=True)
# compressed, lossy, with shuffle and fletcher32 checksum.
write_netcdf(self.files[5],True,lsd,array,shuffle=True,fletcher32=True)
# 2-d compressed, lossy, with shuffle and fletcher32 checksum and
# chunksizes.
write_netcdf2(self.files[6],True,lsd,array2,shuffle=True,fletcher32=True,chunksizes=(chunk1,chunk2))
def tearDown(self):
# Remove the temporary files
for file in self.files:
os.remove(file)
def runTest(self):
"""testing zlib and shuffle compression filters"""
uncompressed_size = os.stat(self.files[0]).st_size
# check uncompressed data
f = Dataset(self.files[0])
size = os.stat(self.files[0]).st_size
assert_almost_equal(array,f.variables['data'][:])
assert_almost_equal(array,f.variables['data2'][:])
assert f.variables['data'].filters() ==\
{'zlib':False,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':0,'fletcher32':False}
assert f.variables['data2'].filters() ==\
{'zlib':False,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':0,'fletcher32':False}
assert_almost_equal(size,uncompressed_size)
f.close()
# check compressed data.
f = Dataset(self.files[1])
size = os.stat(self.files[1]).st_size
assert_almost_equal(array,f.variables['data'][:])
assert_almost_equal(array,f.variables['data2'][:])
assert f.variables['data'].filters() ==\
{'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':6,'fletcher32':False}
assert f.variables['data2'].filters() ==\
{'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':6,'fletcher32':False}
assert size < 0.95*uncompressed_size
f.close()
# check compression with shuffle
f = Dataset(self.files[2])
size = os.stat(self.files[2]).st_size
assert_almost_equal(array,f.variables['data'][:])
assert_almost_equal(array,f.variables['data2'][:])
assert f.variables['data'].filters() ==\
{'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':False}
assert f.variables['data2'].filters() ==\
{'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':False}
assert size < 0.85*uncompressed_size
f.close()
# check lossy compression without shuffle
f = Dataset(self.files[3])
size = os.stat(self.files[3]).st_size
checkarray = _quantize(array,lsd)
assert_almost_equal(checkarray,f.variables['data'][:])
assert_almost_equal(checkarray,f.variables['data2'][:])
assert size < 0.27*uncompressed_size
f.close()
# check lossy compression with shuffle
f = Dataset(self.files[4])
size = os.stat(self.files[4]).st_size
assert_almost_equal(checkarray,f.variables['data'][:])
assert_almost_equal(checkarray,f.variables['data2'][:])
assert size < 0.20*uncompressed_size
size_save = size
f.close()
# check lossy compression with shuffle and fletcher32 checksum.
f = Dataset(self.files[5])
size = os.stat(self.files[5]).st_size
assert_almost_equal(checkarray,f.variables['data'][:])
assert_almost_equal(checkarray,f.variables['data2'][:])
assert f.variables['data'].filters() ==\
{'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
assert f.variables['data2'].filters() ==\
{'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
assert size < 0.20*uncompressed_size
# should be slightly larger than without fletcher32
assert size > size_save
# check chunksizes
f.close()
f = Dataset(self.files[6])
checkarray2 = _quantize(array2,lsd)
assert_almost_equal(checkarray2,f.variables['data2'][:])
assert f.variables['data2'].filters() ==\
{'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
assert f.variables['data2'].chunking() == [chunk1,chunk2]
f.close()
if __name__ == '__main__':
unittest.main()
|