File: test_compression.py

package info (click to toggle)
netcdf4-python 1.7.2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 2,588 kB
  • sloc: python: 6,002; ansic: 854; makefile: 15; sh: 2
file content (163 lines) | stat: -rw-r--r-- 7,593 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from typing import TYPE_CHECKING, Any
from numpy.random.mtrand import uniform
from netCDF4 import Dataset
from netCDF4.utils import _quantize
from numpy.testing import assert_almost_equal
import os, tempfile, unittest
if TYPE_CHECKING:
    from netCDF4 import CompressionLevel
else:
    CompressionLevel = Any

ndim = 100000
ndim2 = 100
chunk1 = 10; chunk2 = ndim2
nfiles = 7
files = [tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name for nfile in range(nfiles)]
array = uniform(size=(ndim,))
array2 = uniform(size=(ndim,ndim2))
lsd = 3

def write_netcdf(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
                 chunksizes=None, complevel: CompressionLevel = 6, fletcher32=False):
    file = Dataset(filename,'w')
    file.createDimension('n', ndim)
    foo = file.createVariable('data',\
            dtype,('n'),zlib=zlib,least_significant_digit=least_significant_digit,\
            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
    # use compression kwarg instead of deprecated zlib
    if zlib:
        compression='zlib'
    else:
        compression=None
        # anything that evaluates to False is same as None
        #compression=False
        #compression=''
        #compression=0
        #compression='gzip' # should fail
    foo2 = file.createVariable('data2',
            dtype,('n'),compression=compression,least_significant_digit=least_significant_digit,  # type: ignore  # mypy doesn't like compression
            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
    foo[:] = data
    foo2[:] = data
    file.close()
    file = Dataset(filename)
    data = file.variables['data'][:]
    data2 = file.variables['data2'][:]
    file.close()

def write_netcdf2(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
                 chunksizes=None, complevel: CompressionLevel = 6, fletcher32=False):
    file = Dataset(filename,'w')
    file.createDimension('n', ndim)
    file.createDimension('n2', ndim2)
    foo = file.createVariable('data2',\
            dtype,('n','n2'),zlib=zlib,least_significant_digit=least_significant_digit,\
            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
    foo[:] = data
    file.close()
    file = Dataset(filename)
    data = file.variables['data2'][:]
    file.close()

class CompressionTestCase(unittest.TestCase):

    def setUp(self):
        self.files = files
        # no compression
        write_netcdf(self.files[0],False,None,array)
        # compressed, lossless, no shuffle.
        write_netcdf(self.files[1],True,None,array)
        # compressed, lossless, with shuffle.
        write_netcdf(self.files[2],True,None,array,shuffle=True)
        # compressed, lossy, no shuffle.
        write_netcdf(self.files[3],True,lsd,array)
        # compressed, lossy, with shuffle.
        write_netcdf(self.files[4],True,lsd,array,shuffle=True)
        # compressed, lossy, with shuffle and fletcher32 checksum.
        write_netcdf(self.files[5],True,lsd,array,shuffle=True,fletcher32=True)
        # 2-d compressed, lossy, with shuffle and fletcher32 checksum and
        # chunksizes.
        write_netcdf2(self.files[6],True,lsd,array2,shuffle=True,fletcher32=True,chunksizes=(chunk1,chunk2))

    def tearDown(self):
        # Remove the temporary files
        for file in self.files:
            os.remove(file)

    def runTest(self):
        """testing zlib and shuffle compression filters"""
        uncompressed_size = os.stat(self.files[0]).st_size
        # check uncompressed data
        f = Dataset(self.files[0])
        size = os.stat(self.files[0]).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert_almost_equal(array,f.variables['data2'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':False,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':0,'fletcher32':False}
        assert f.variables['data2'].filters() ==\
        {'zlib':False,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':0,'fletcher32':False}
        assert_almost_equal(size,uncompressed_size)
        f.close()
        # check compressed data.
        f = Dataset(self.files[1])
        size = os.stat(self.files[1]).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert_almost_equal(array,f.variables['data2'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':6,'fletcher32':False}
        assert f.variables['data2'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':6,'fletcher32':False}
        assert size < 0.95*uncompressed_size
        f.close()
        # check compression with shuffle
        f = Dataset(self.files[2])
        size = os.stat(self.files[2]).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert_almost_equal(array,f.variables['data2'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':False}
        assert f.variables['data2'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':False}
        assert size < 0.85*uncompressed_size
        f.close()
        # check lossy compression without shuffle
        f = Dataset(self.files[3])
        size = os.stat(self.files[3]).st_size
        checkarray = _quantize(array,lsd)
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert_almost_equal(checkarray,f.variables['data2'][:])
        assert size < 0.27*uncompressed_size
        f.close()
        # check lossy compression with shuffle
        f = Dataset(self.files[4])
        size = os.stat(self.files[4]).st_size
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert_almost_equal(checkarray,f.variables['data2'][:])
        assert size < 0.20*uncompressed_size
        size_save = size
        f.close()
        # check lossy compression with shuffle and fletcher32 checksum.
        f = Dataset(self.files[5])
        size = os.stat(self.files[5]).st_size
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert_almost_equal(checkarray,f.variables['data2'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
        assert f.variables['data2'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
        assert size < 0.20*uncompressed_size
        # should be slightly larger than without fletcher32
        assert size > size_save
        # check chunksizes
        f.close()
        f = Dataset(self.files[6])
        checkarray2 = _quantize(array2,lsd)
        assert_almost_equal(checkarray2,f.variables['data2'][:])
        assert f.variables['data2'].filters() ==\
        {'zlib':True,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':True,'complevel':6,'fletcher32':True}
        assert f.variables['data2'].chunking() == [chunk1,chunk2]
        f.close()

if __name__ == '__main__':
    unittest.main()