1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
#!/usr/bin/env python
from vtkmodules.vtkIOFides import vtkFidesWriter, vtkFidesReader
from vtkmodules.vtkCommonDataModel import vtkPartitionedDataSetCollection
from vtkmodules.vtkCommonExecutionModel import vtkStreamingDemandDrivenPipeline
from vtkmodules.vtkFiltersCore import vtkConvertToMultiBlockDataSet, vtkConvertToPartitionedDataSetCollection
from vtkmodules.vtkFiltersParallelDIY2 import vtkRedistributeDataSetFilter
from vtkmodules.vtkImagingCore import vtkRTAnalyticSource
from vtkmodules.util.misc import vtkGetTempDir
try:
# Not directly used here, but if it's not imported when VTK_USE_MPI, we get following error:
# Attempting to use an MPI routine before initializing MPICH
from mpi4py import MPI
except ImportError:
pass
import os.path
VTK_TEMP_DIR = vtkGetTempDir()
from vtk.test import Testing
class TestFidesWriterBasic(Testing.vtkTest):
def testDataSet(self):
wavelet = vtkRTAnalyticSource()
filename = os.path.join(VTK_TEMP_DIR, 'testDataSet.bp')
writer = vtkFidesWriter()
writer.SetFileName(filename)
writer.SetInputConnection(wavelet.GetOutputPort())
# setting to true but not turning on an array means no array will be written
writer.SetChooseFieldsToWrite(True)
writer.Write()
reader = vtkFidesReader()
reader.SetFileName(filename)
reader.UpdateInformation()
self.assertTrue(reader.GetOutputInformation(0).Has(
vtkStreamingDemandDrivenPipeline.TIME_STEPS()))
nsteps = reader.GetOutputInformation(0).Length(vtkStreamingDemandDrivenPipeline.TIME_STEPS())
self.assertEqual(nsteps, 1)
self.assertEqual(reader.GetNumberOfPointArrays(), 0)
reader.ConvertToVTKOn()
reader.Update()
pdsc = reader.GetOutputDataObject(0)
self.assertTrue(isinstance(pdsc, vtkPartitionedDataSetCollection))
pds = pdsc.GetPartitionedDataSet(0)
nParts = pds.GetNumberOfPartitions()
self.assertEqual(nParts, 1)
ds = pds.GetPartition(0)
self.assertEqual(ds.GetNumberOfCells(), 8000)
self.assertEqual(ds.GetNumberOfPoints(), 9261)
def testPartitionedDataSet(self):
wavelet = vtkRTAnalyticSource()
# get the data into a vtkPartitionedDataSet
rdsf = vtkRedistributeDataSetFilter()
rdsf.SetInputConnection(wavelet.GetOutputPort())
rdsf.SetNumberOfPartitions(4)
filename = os.path.join(VTK_TEMP_DIR, 'testPDS.bp')
writer = vtkFidesWriter()
writer.SetFileName(filename)
writer.SetInputConnection(rdsf.GetOutputPort())
writer.Write()
reader = vtkFidesReader()
reader.SetFileName(filename)
reader.UpdateInformation()
self.assertTrue(reader.GetOutputInformation(0).Has(
vtkStreamingDemandDrivenPipeline.TIME_STEPS()))
nsteps = reader.GetOutputInformation(0).Length(vtkStreamingDemandDrivenPipeline.TIME_STEPS())
self.assertEqual(nsteps, 1)
self.assertEqual(reader.GetNumberOfPointArrays(), 1)
reader.ConvertToVTKOn()
reader.Update()
pdsc = reader.GetOutputDataObject(0)
self.assertTrue(isinstance(pdsc, vtkPartitionedDataSetCollection))
pds = pdsc.GetPartitionedDataSet(0)
nParts = pds.GetNumberOfPartitions()
self.assertEqual(nParts, 1)
ds = pds.GetPartition(0)
self.assertEqual(ds.GetNumberOfCells(), 8000)
self.assertEqual(ds.GetNumberOfPoints(), 9261)
def testPartitionedDataSetCollection(self):
wavelet = vtkRTAnalyticSource()
pdsc = vtkConvertToPartitionedDataSetCollection()
pdsc.SetInputConnection(wavelet.GetOutputPort())
filename = os.path.join(VTK_TEMP_DIR, 'testPDSC.bp')
writer = vtkFidesWriter()
writer.SetFileName(filename)
writer.SetInputConnection(pdsc.GetOutputPort())
writer.Write()
reader = vtkFidesReader()
reader.SetFileName(filename)
reader.UpdateInformation()
self.assertTrue(reader.GetOutputInformation(0).Has(
vtkStreamingDemandDrivenPipeline.TIME_STEPS()))
nsteps = reader.GetOutputInformation(0).Length(vtkStreamingDemandDrivenPipeline.TIME_STEPS())
self.assertEqual(nsteps, 1)
self.assertEqual(reader.GetNumberOfPointArrays(), 1)
reader.ConvertToVTKOn()
reader.Update()
pdsc = reader.GetOutputDataObject(0)
self.assertTrue(isinstance(pdsc, vtkPartitionedDataSetCollection))
pds = pdsc.GetPartitionedDataSet(0)
nParts = pds.GetNumberOfPartitions()
self.assertEqual(nParts, 1)
ds = pds.GetPartition(0)
self.assertEqual(ds.GetNumberOfCells(), 8000)
self.assertEqual(ds.GetNumberOfPoints(), 9261)
def testMultiBlockDataSet(self):
wavelet = vtkRTAnalyticSource()
mbds = vtkConvertToMultiBlockDataSet()
mbds.SetInputConnection(wavelet.GetOutputPort())
filename = os.path.join(VTK_TEMP_DIR, 'testMB.bp')
writer = vtkFidesWriter()
writer.SetFileName(filename)
writer.SetInputConnection(mbds.GetOutputPort())
writer.Write()
reader = vtkFidesReader()
reader.SetFileName(filename)
reader.UpdateInformation()
self.assertTrue(reader.GetOutputInformation(0).Has(
vtkStreamingDemandDrivenPipeline.TIME_STEPS()))
nsteps = reader.GetOutputInformation(0).Length(vtkStreamingDemandDrivenPipeline.TIME_STEPS())
self.assertEqual(nsteps, 1)
self.assertEqual(reader.GetNumberOfPointArrays(), 1)
reader.ConvertToVTKOn()
reader.Update()
pdsc = reader.GetOutputDataObject(0)
# vtkFidesReader always reads in a PDSC, as well as vtkFidesWriter always converts MB to PDSC
self.assertTrue(isinstance(pdsc, vtkPartitionedDataSetCollection))
pds = pdsc.GetPartitionedDataSet(0)
nParts = pds.GetNumberOfPartitions()
self.assertEqual(nParts, 1)
ds = pds.GetPartition(0)
self.assertEqual(ds.GetNumberOfCells(), 8000)
self.assertEqual(ds.GetNumberOfPoints(), 9261)
if __name__ == "__main__":
Testing.main([(TestFidesWriterBasic, 'test')])
|