1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
"""
Demonstrate the use of h5py in SWMR mode to write to a dataset (appending)
from one process while monitoring the growing dataset from another process.
Usage:
swmr_multiprocess.py [FILENAME [DATASETNAME]]
FILENAME: name of file to monitor. Default: swmrmp.h5
DATASETNAME: name of dataset to monitor in DATAFILE. Default: data
This script will start up two processes: a writer and a reader. The writer
will open/create the file (FILENAME) in SWMR mode, create a dataset and start
appending data to it. After each append the dataset is flushed and an event
sent to the reader process. Meanwhile the reader process will wait for events
from the writer and when triggered it will refresh the dataset and read the
current shape of it.
"""
import sys
import h5py
import numpy as np
import logging
from multiprocessing import Process, Event
class SwmrReader(Process):
def __init__(self, event, fname, dsetname, timeout = 2.0):
super().__init__()
self._event = event
self._fname = fname
self._dsetname = dsetname
self._timeout = timeout
def run(self):
self.log = logging.getLogger('reader')
self.log.info("Waiting for initial event")
assert self._event.wait( self._timeout )
self._event.clear()
self.log.info("Opening file %s", self._fname)
f = h5py.File(self._fname, 'r', libver='latest', swmr=True)
assert f.swmr_mode
dset = f[self._dsetname]
try:
# monitor and read loop
while self._event.wait( self._timeout ):
self._event.clear()
self.log.debug("Refreshing dataset")
dset.refresh()
shape = dset.shape
self.log.info("Read dset shape: %s"%str(shape))
finally:
f.close()
class SwmrWriter(Process):
def __init__(self, event, fname, dsetname):
super().__init__()
self._event = event
self._fname = fname
self._dsetname = dsetname
def run(self):
self.log = logging.getLogger('writer')
self.log.info("Creating file %s", self._fname)
f = h5py.File(self._fname, 'w', libver='latest')
try:
arr = np.array([1,2,3,4])
dset = f.create_dataset(self._dsetname, chunks=(2,), maxshape=(None,), data=arr)
assert not f.swmr_mode
self.log.info("SWMR mode")
f.swmr_mode = True
assert f.swmr_mode
self.log.debug("Sending initial event")
self._event.set()
# Write loop
for i in range(5):
new_shape = ((i+1) * len(arr), )
self.log.info("Resizing dset shape: %s"%str(new_shape))
dset.resize( new_shape )
self.log.debug("Writing data")
dset[i*len(arr):] = arr
#dset.write_direct( arr, np.s_[:], np.s_[i*len(arr):] )
self.log.debug("Flushing data")
dset.flush()
self.log.info("Sending event")
self._event.set()
finally:
f.close()
if __name__ == "__main__":
logging.basicConfig(format='%(levelname)10s %(asctime)s %(name)10s %(message)s',level=logging.INFO)
fname = 'swmrmp.h5'
dsetname = 'data'
if len(sys.argv) > 1:
fname = sys.argv[1]
if len(sys.argv) > 2:
dsetname = sys.argv[2]
event = Event()
reader = SwmrReader(event, fname, dsetname)
writer = SwmrWriter(event, fname, dsetname)
logging.info("Starting reader")
reader.start()
logging.info("Starting reader")
writer.start()
logging.info("Waiting for writer to finish")
writer.join()
logging.info("Waiting for reader to finish")
reader.join()
|