File: execution_model.py

package info (click to toggle)
vtk9 9.5.2%2Bdfsg3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 205,916 kB
  • sloc: cpp: 2,336,565; ansic: 327,116; python: 111,200; yacc: 4,104; java: 3,977; sh: 3,032; xml: 2,771; perl: 2,189; lex: 1,787; makefile: 178; javascript: 165; objc: 153; tcl: 59
file content (268 lines) | stat: -rw-r--r-- 10,764 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""Utility classes to help with the simpler Python interface
for connecting and executing pipelines."""

__all__ = ['select_ports', 'Pipeline', 'Output']

def _call(first, last, inp=None, port=0):
    """Set the input of the first filter, update the pipeline
    and return the output."""
    if inp and not first.GetNumberOfInputPorts():
        raise ValueError(f"{first.GetClassName()} does not have input ports yet an input was passed to the pipeline.")
    in_cons = []
    if first.GetNumberOfInputPorts():
        n_cons = first.GetNumberOfInputConnections(port)
        for i in range(n_cons):
            op = first.GetInputConnection(port, i)
            if op and op.GetProducer():
                op.GetProducer().Register(None)
            in_cons.append(op)
        from vtkmodules.vtkCommonExecutionModel import vtkAlgorithm
        from vtkmodules.vtkCommonExecutionModel import vtkTrivialProducer
        from collections.abc import Sequence
        if isinstance(inp, Sequence):
            if first.GetInputPortInformation(port).Has(
                    vtkAlgorithm.INPUT_IS_REPEATABLE()):
                first.RemoveAllInputConnections(port)
                for aInp in inp:
                    tp = vtkTrivialProducer()
                    tp.SetOutput(aInp)
                    first.AddInputConnection(port, tp.GetOutputPort());
        else:
            tp = vtkTrivialProducer()
            tp.SetOutput(inp)
            first.SetInputConnection(port, tp.GetOutputPort());

    output = last.update().output

    if first.GetNumberOfInputPorts():
        first.RemoveAllInputConnections(port)
        for op in in_cons:
            first.AddInputConnection(port, op)
            if op and op.GetProducer():
                op.GetProducer().UnRegister(None)

    output_copy = []
    if type(output) is not tuple:
        output = (output,)
    for opt in output:
        copy = opt.NewInstance()
        copy.ShallowCopy(opt)
        output_copy.append(copy)
    if len(output_copy) == 1:
        return output_copy[0]
    else:
        return tuple(output_copy)


class select_ports(object):
    """Helper class for selecting input and output ports when
    connecting pipeline objects with the >> operator.
    Example uses:
    # Connect a source to the second input of a filter.
    source >> select_ports(1, filter)
    # Connect the second output of a source to a filter.
    select_ports(source, 1) >> filter
    # Combination of both: Connect source to second
    # input of the filter, then connect the second
    # output of that filter to another one.
    source >>> select_ports(1, filter, 1) >> filter2
    """
    def __init__(self, *args):
        """This constructor takes 2 or 3 arguments.
        The possibilities are:
        select_ports(input_port, algorithm)
        select_ports(algorithm, output_port)
        select_ports(input_port, algorithm, output_port)
        """
        nargs = len(args)
        if nargs < 2 or nargs > 3:
            raise ValueError("Expecting 2 or 3 arguments")

        self.input_port = None
        self.output_port = None
        before_alg = True
        for arg in args:
            if hasattr(arg, "IsA") and arg.IsA("vtkAlgorithm"):
                self.algorithm = arg
                before_alg = False
            else:
                if before_alg:
                    self.input_port = arg
                else:
                    self.output_port = arg
        if not self.input_port:
            self.input_port = 0
        if not self.output_port:
            self.output_port = 0

    def SetInputConnection(self, inp):
        "Forwards to underlying algorithm and port."
        self.algorithm.SetInputConnection(self.input_port, inp)

    def AddInputConnection(self, inp):
        "Forwards to underlying algorithm and port."
        self.algorithm.AddInputConnection(self.input_port, inp)

    def GetOutputPort(self):
        "Returns the output port of the underlying algorithm."
        return self.algorithm.GetOutputPort(self.output_port)

    def GetInputPortInformation(self, port):
        return self.algorithm.GetInputPortInformation(self.input_port)

    def update(self):
        """Execute the algorithm and return the output from the selected
        output port."""
        return self.algorithm.update()

    def __rshift__(self, rhs):
        "Creates a pipeline between the underlying port and an algorithm."
        return Pipeline(self, rhs)

    def __rrshift__(self, lhs):
        """Creates a pipeline between the underlying port and an algorithm.
        This is to handle sequence >> select_ports where the port can
        accept multiple connections."""
        from collections.abc import Sequence
        if lhs is None or (isinstance(lhs, Sequence) and len(lhs == 0)):
            self.algorithm.RemoveAllInputConnections(self.input_port)
            return self
        return Pipeline(lhs, self)

    def __call__(self, inp=None):
        """Executes the underlying algorithm by passing input data to
        the selected input port. Returns a single output or a tuple
        if there are multiple outputs."""
        return _call(self.algorithm, self.algorithm, inp, self.input_port)

class Pipeline(object):
    """Pipeline objects are created when 2 or more algorithms are
    connected with the >> operator. They store the first and last
    algorithms in the pipeline and enable connecting more algorithms
    and executing the pipeline. One should not have to create Pipeline
    objects directly. They are created by the use of the >> operator."""

    PIPELINE = 0
    ALGORITHM = 1
    DATA = 2
    UNKNOWN = 3

    def __init__(self, lhs, rhs):
        """Create a pipeline object that connects two objects of the
        following type: data object, pipeline object, algorithm object."""
        left_type = self._determine_type(lhs)
        right_type = self._determine_type(rhs)
        if right_type == Pipeline.ALGORITHM:
            rhs_alg = rhs
        elif right_type == Pipeline.PIPELINE:
            rhs_alg = rhs.first
        else:
            raise TypeError(
              f"unsupported operand type(s) for >>: {type(lhs).__name__} and {type(rhs).__name__}")

        from collections.abc import Sequence
        if isinstance(lhs, Sequence):
            for inp in lhs:
                self._connect(inp, rhs, rhs_alg, "AddInputConnection")
        else:
            self._connect(lhs, rhs, rhs_alg, "SetInputConnection")

    def _connect(self, lhs, rhs, rhs_alg, connect_method):
        from vtkmodules.vtkCommonExecutionModel import vtkAlgorithm
        inInfo = rhs_alg.GetInputPortInformation(0)
        if inInfo.Has(vtkAlgorithm.INPUT_IS_REPEATABLE()):
            connect_method = 'AddInputConnection'

        left_type = self._determine_type(lhs)
        right_type = self._determine_type(rhs)
        if left_type == Pipeline.UNKNOWN:
            raise TypeError(
              f"unsupported operand type(s) for >>: {type(lhs).__name__} and {type(rhs).__name__}")
        if right_type == Pipeline.ALGORITHM:
            if left_type == Pipeline.ALGORITHM:
                getattr(rhs_alg, connect_method)(lhs.GetOutputPort())
                self.first = lhs
                self.last = rhs
            elif left_type == Pipeline.PIPELINE:
                getattr(rhs_alg, connect_method)(lhs.last.GetOutputPort())
                self.first = lhs.first
                self.last = rhs
            elif left_type == Pipeline.DATA:
                from vtkmodules.vtkCommonExecutionModel import vtkTrivialProducer
                source = vtkTrivialProducer()
                source.SetOutput(lhs)
                getattr(rhs_alg, connect_method)(source.GetOutputPort())
                self.first = source
                self.last = rhs
        elif right_type == Pipeline.PIPELINE:
            if left_type == Pipeline.ALGORITHM:
                self.first = lhs
                self.last = rhs.last
                getattr(rhs_alg, connect_method)(lhs.GetOutputPort())
            elif left_type == Pipeline.PIPELINE:
                getattr(rhs_alg, connect_method)(lhs.last.GetOutputPort())
                self.first = lhs.first
                self.last = rhs.last
            elif left_type == Pipeline.DATA:
                from vtkmodules.vtkCommonExecutionModel import vtkTrivialProducer
                source = vtkTrivialProducer()
                source.SetOutput(lhs)
                getattr(rhs_alg, connect_method)(source.GetOutputPort())
                self.first = source
                self.last = rhs.last

    def _determine_type(self, arg):
        if type(arg) is Pipeline:
            return Pipeline.PIPELINE
        if hasattr(arg, "SetInputConnection"):
            return Pipeline.ALGORITHM
        if hasattr(arg, "IsA") and arg.IsA("vtkDataObject"):
            return Pipeline.DATA
        return Pipeline.UNKNOWN

    def update(self, **kwargs):
        """Update the pipeline and return the last algorithm's
        output."""
        return self.last.update()

    def __call__(self, inp=None):
        """Sets the input of the first filter, update the pipeline
        and returns the output. A single data object or a tuple
        of data objects (when there are multiple outputs) are
        returned."""
        return _call(self.first, self.last, inp)

    def __rshift__(self, rhs):
        """Used to connect two pipeline items. The left side can
        be a data object, an algorithm or a pipeline. The right
        side can be an algorithm or a pipeline."""
        return Pipeline(self, rhs)

    def __rrshift__(self, lhs):
        """Creates a pipeline between a sequence input and a pipeline."""
        from collections.abc import Sequence
        if lhs is None or (isinstance(lhs, Sequence) and len(lhs) == 0):
            self.first.RemoveAllInputConnections(0)
            return self
        return Pipeline(lhs, self)

class Output(object):
    """Helper object to represent the output of an algorithms as
    returned by the update() method. Implements the output property
    enabling calling update().output."""
    def __init__(self, algorithm, **kwargs):
        self.algorithm = algorithm
        self.algorithm.Update()

    @property
    def output(self):
        """Returns a single data object or a tuple of data objects
        if there are multiple outputs."""
        if self.algorithm.GetNumberOfOutputPorts() == 1:
            return self.algorithm.GetOutputDataObject(0)
        else:
            outputs = []
            nOutputs = self.algorithm.GetNumberOfOutputPorts()
            for i in range(nOutputs):
                outputs.append(self.algorithm.GetOutputDataObject(i))
        return tuple(outputs)