File: test_compression_bzip2.py

package info (click to toggle)
netcdf4-python 1.7.2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 2,588 kB
  • sloc: python: 6,002; ansic: 854; makefile: 15; sh: 2
file content (60 lines) | stat: -rw-r--r-- 2,254 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from typing import TYPE_CHECKING, Any
from numpy.random.mtrand import uniform
from netCDF4 import Dataset
from numpy.testing import assert_almost_equal
import os, tempfile, unittest, sys
from filter_availability import no_plugins, has_bzip2_filter
if TYPE_CHECKING:
    from netCDF4 import CompressionLevel
else:
    CompressionLevel = Any

ndim = 100000
filename1 = tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name
filename2 = tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name
array = uniform(size=(ndim,))

def write_netcdf(filename,dtype='f8',complevel: CompressionLevel = 6):
    nc = Dataset(filename,'w')
    nc.createDimension('n', ndim)
    foo = nc.createVariable('data',\
            dtype,('n'),compression='bzip2',complevel=complevel)
    foo[:] = array
    nc.close()


@unittest.skipIf(no_plugins or not has_bzip2_filter, "bzip2 filter not available")
class CompressionTestCase(unittest.TestCase):
    def setUp(self):
        self.filename1 = filename1
        self.filename2 = filename2
        write_netcdf(self.filename1,complevel=0) # no compression
        write_netcdf(self.filename2,complevel=4) # with compression

    def tearDown(self):
        # Remove the temporary files
        os.remove(self.filename1)
        os.remove(self.filename2)

    def runTest(self):
        uncompressed_size = os.stat(self.filename1).st_size
        # check uncompressed data
        f = Dataset(self.filename1)
        size = os.stat(self.filename1).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':False,'szip':False,'zstd':False,'bzip2':False,'blosc':False,'shuffle':False,'complevel':0,'fletcher32':False}
        assert_almost_equal(size,uncompressed_size)
        f.close()
        # check compressed data.
        f = Dataset(self.filename2)
        size = os.stat(self.filename2).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert f.variables['data'].filters() ==\
        {'zlib':False,'szip':False,'zstd':False,'bzip2':True,'blosc':False,'shuffle':False,'complevel':4,'fletcher32':False}
        assert size < 0.96*uncompressed_size
        f.close()


if __name__ == '__main__':
    unittest.main()