File: bench_diskless.py

package info (click to toggle)
netcdf4-python 1.7.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,588 kB
  • sloc: python: 6,002; ansic: 854; makefile: 15; sh: 2
file content (72 lines) | stat: -rw-r--r-- 2,988 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# benchmark reads and writes, with and without compression.
# tests all four supported file formats.
from typing import TYPE_CHECKING, Any, Literal
from numpy.random.mtrand import uniform
import netCDF4
from timeit import Timer
import os, sys
if TYPE_CHECKING:
    from netCDF4 import Format as NCFormat
else:
    NCFormat = Any

# create an n1dim by n2dim by n3dim random array.
n1dim = 30
n2dim = 15
n3dim = 73
n4dim = 144
ntrials = 10
sys.stdout.write('reading and writing a %s by %s by %s by %s random array ..\n'%(n1dim,n2dim,n3dim,n4dim))
array = uniform(size=(n1dim,n2dim,n3dim,n4dim))

def write_netcdf(filename, zlib=False, least_significant_digit=None, format: NCFormat='NETCDF4',closeit=False):
    file = netCDF4.Dataset(filename,'w',format=format,diskless=True,persist=True)
    file.createDimension('n1', n1dim)
    file.createDimension('n2', n2dim)
    file.createDimension('n3', n3dim)
    file.createDimension('n4', n4dim)
    foo = file.createVariable('data',\
          'f8',('n1','n2','n3','n4'),zlib=zlib,least_significant_digit=None)
    foo.testme="hi I am an attribute"
    foo.testme1="hi I am an attribute"
    foo.testme2="hi I am an attribute"
    foo.testme3="hi I am an attribute"
    foo.testme4="hi I am an attribute"
    foo.testme5="hi I am an attribute"
    foo[:] = array
    if closeit: file.close()
    return file

def read_netcdf(ncfile):
    data = ncfile.variables['data'][:]

for format in ['NETCDF4','NETCDF3_CLASSIC','NETCDF3_64BIT']:
    sys.stdout.write('testing file format %s ...\n' % format)
    # writing, no compression.
    t = Timer("write_netcdf('test1.nc',closeit=True,format='%s')" % format,"from __main__ import write_netcdf")
    sys.stdout.write('writing took %s seconds\n' %\
            repr(sum(t.repeat(ntrials,1))/ntrials))
    # test reading.
    ncfile = write_netcdf('test1.nc',format=format)  # type: ignore
    t = Timer("read_netcdf(ncfile)","from __main__ import read_netcdf,ncfile")
    sys.stdout.write('reading took %s seconds\n' %
            repr(sum(t.repeat(ntrials,1))/ntrials))

# test diskless=True in nc_open
format: Literal["NETCDF3_CLASSIC"] = 'NETCDF3_CLASSIC'  # mypy should know this but it needs help...
trials=50
sys.stdout.write('test caching of file in memory on open for %s\n' % format)
sys.stdout.write('testing file format %s ...\n' % format)
write_netcdf('test1.nc',format=format,closeit=True)
ncfile = netCDF4.Dataset('test1.nc',diskless=False)
t = Timer("read_netcdf(ncfile)","from __main__ import read_netcdf,ncfile")
sys.stdout.write('reading (from disk) took %s seconds\n' %
            repr(sum(t.repeat(ntrials,1))/ntrials))
ncfile.close()
ncfile = netCDF4.Dataset('test1.nc',diskless=True)
# setting diskless=True should cache the file in memory,
# resulting in faster reads.
t = Timer("read_netcdf(ncfile)","from __main__ import read_netcdf,ncfile")
sys.stdout.write('reading (cached in memory) took %s seconds\n' %
            repr(sum(t.repeat(ntrials,1))/ntrials))
ncfile.close()