File: TestFidesWriterBasic.py

package info (click to toggle)
vtk9 9.5.2%2Bdfsg3-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 205,984 kB
  • sloc: cpp: 2,336,570; ansic: 327,116; python: 111,200; yacc: 4,104; java: 3,977; sh: 3,032; xml: 2,771; perl: 2,189; lex: 1,787; makefile: 181; javascript: 165; objc: 153; tcl: 59
file content (165 lines) | stat: -rwxr-xr-x 6,258 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python

from vtkmodules.vtkIOFides import vtkFidesWriter, vtkFidesReader
from vtkmodules.vtkCommonDataModel import vtkPartitionedDataSetCollection
from vtkmodules.vtkCommonExecutionModel import vtkStreamingDemandDrivenPipeline
from vtkmodules.vtkFiltersCore import vtkConvertToMultiBlockDataSet, vtkConvertToPartitionedDataSetCollection
from vtkmodules.vtkFiltersParallelDIY2 import vtkRedistributeDataSetFilter
from vtkmodules.vtkImagingCore import vtkRTAnalyticSource
from vtkmodules.util.misc import vtkGetTempDir
try:
    # Not directly used here, but if it's not imported when VTK_USE_MPI, we get following error:
    # Attempting to use an MPI routine before initializing MPICH
    from mpi4py import MPI
except ImportError:
    pass

import os.path
VTK_TEMP_DIR = vtkGetTempDir()

from vtk.test import Testing

class TestFidesWriterBasic(Testing.vtkTest):
    def testDataSet(self):
        wavelet = vtkRTAnalyticSource()

        filename = os.path.join(VTK_TEMP_DIR, 'testDataSet.bp')
        writer = vtkFidesWriter()
        writer.SetFileName(filename)
        writer.SetInputConnection(wavelet.GetOutputPort())
        # setting to true but not turning on an array means no array will be written
        writer.SetChooseFieldsToWrite(True)
        writer.Write()

        reader = vtkFidesReader()
        reader.SetFileName(filename)
        reader.UpdateInformation()
        self.assertTrue(reader.GetOutputInformation(0).Has(
            vtkStreamingDemandDrivenPipeline.TIME_STEPS()))
        nsteps = reader.GetOutputInformation(0).Length(vtkStreamingDemandDrivenPipeline.TIME_STEPS())
        self.assertEqual(nsteps, 1)

        self.assertEqual(reader.GetNumberOfPointArrays(), 0)
        reader.ConvertToVTKOn()
        reader.Update()

        pdsc = reader.GetOutputDataObject(0)
        self.assertTrue(isinstance(pdsc, vtkPartitionedDataSetCollection))
        pds = pdsc.GetPartitionedDataSet(0)
        nParts = pds.GetNumberOfPartitions()
        self.assertEqual(nParts, 1)

        ds = pds.GetPartition(0)
        self.assertEqual(ds.GetNumberOfCells(), 8000)
        self.assertEqual(ds.GetNumberOfPoints(), 9261)


    def testPartitionedDataSet(self):
        wavelet = vtkRTAnalyticSource()
        # get the data into a vtkPartitionedDataSet
        rdsf = vtkRedistributeDataSetFilter()
        rdsf.SetInputConnection(wavelet.GetOutputPort())
        rdsf.SetNumberOfPartitions(4)

        filename = os.path.join(VTK_TEMP_DIR, 'testPDS.bp')
        writer = vtkFidesWriter()
        writer.SetFileName(filename)
        writer.SetInputConnection(rdsf.GetOutputPort())
        writer.Write()

        reader = vtkFidesReader()
        reader.SetFileName(filename)
        reader.UpdateInformation()
        self.assertTrue(reader.GetOutputInformation(0).Has(
            vtkStreamingDemandDrivenPipeline.TIME_STEPS()))
        nsteps = reader.GetOutputInformation(0).Length(vtkStreamingDemandDrivenPipeline.TIME_STEPS())
        self.assertEqual(nsteps, 1)

        self.assertEqual(reader.GetNumberOfPointArrays(), 1)
        reader.ConvertToVTKOn()
        reader.Update()

        pdsc = reader.GetOutputDataObject(0)
        self.assertTrue(isinstance(pdsc, vtkPartitionedDataSetCollection))
        pds = pdsc.GetPartitionedDataSet(0)
        nParts = pds.GetNumberOfPartitions()
        self.assertEqual(nParts, 1)

        ds = pds.GetPartition(0)
        self.assertEqual(ds.GetNumberOfCells(), 8000)
        self.assertEqual(ds.GetNumberOfPoints(), 9261)


    def testPartitionedDataSetCollection(self):
        wavelet = vtkRTAnalyticSource()

        pdsc = vtkConvertToPartitionedDataSetCollection()
        pdsc.SetInputConnection(wavelet.GetOutputPort())

        filename = os.path.join(VTK_TEMP_DIR, 'testPDSC.bp')
        writer = vtkFidesWriter()
        writer.SetFileName(filename)
        writer.SetInputConnection(pdsc.GetOutputPort())
        writer.Write()

        reader = vtkFidesReader()
        reader.SetFileName(filename)
        reader.UpdateInformation()
        self.assertTrue(reader.GetOutputInformation(0).Has(
            vtkStreamingDemandDrivenPipeline.TIME_STEPS()))
        nsteps = reader.GetOutputInformation(0).Length(vtkStreamingDemandDrivenPipeline.TIME_STEPS())
        self.assertEqual(nsteps, 1)

        self.assertEqual(reader.GetNumberOfPointArrays(), 1)
        reader.ConvertToVTKOn()
        reader.Update()

        pdsc = reader.GetOutputDataObject(0)
        self.assertTrue(isinstance(pdsc, vtkPartitionedDataSetCollection))
        pds = pdsc.GetPartitionedDataSet(0)
        nParts = pds.GetNumberOfPartitions()
        self.assertEqual(nParts, 1)

        ds = pds.GetPartition(0)
        self.assertEqual(ds.GetNumberOfCells(), 8000)
        self.assertEqual(ds.GetNumberOfPoints(), 9261)


    def testMultiBlockDataSet(self):
        wavelet = vtkRTAnalyticSource()

        mbds = vtkConvertToMultiBlockDataSet()
        mbds.SetInputConnection(wavelet.GetOutputPort())

        filename = os.path.join(VTK_TEMP_DIR, 'testMB.bp')
        writer = vtkFidesWriter()
        writer.SetFileName(filename)
        writer.SetInputConnection(mbds.GetOutputPort())
        writer.Write()

        reader = vtkFidesReader()
        reader.SetFileName(filename)
        reader.UpdateInformation()
        self.assertTrue(reader.GetOutputInformation(0).Has(
            vtkStreamingDemandDrivenPipeline.TIME_STEPS()))
        nsteps = reader.GetOutputInformation(0).Length(vtkStreamingDemandDrivenPipeline.TIME_STEPS())
        self.assertEqual(nsteps, 1)

        self.assertEqual(reader.GetNumberOfPointArrays(), 1)
        reader.ConvertToVTKOn()
        reader.Update()

        pdsc = reader.GetOutputDataObject(0)
        # vtkFidesReader always reads in a PDSC, as well as vtkFidesWriter always converts MB to PDSC
        self.assertTrue(isinstance(pdsc, vtkPartitionedDataSetCollection))
        pds = pdsc.GetPartitionedDataSet(0)
        nParts = pds.GetNumberOfPartitions()
        self.assertEqual(nParts, 1)

        ds = pds.GetPartition(0)
        self.assertEqual(ds.GetNumberOfCells(), 8000)
        self.assertEqual(ds.GetNumberOfPoints(), 9261)


if __name__ == "__main__":
    Testing.main([(TestFidesWriterBasic, 'test')])