File: execution.py

package info (click to toggle)
ufo-tofu 0.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,388 kB
  • sloc: python: 15,892; makefile: 7
file content (200 lines) | stat: -rw-r--r-- 8,366 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import gi
import logging
import networkx as nx
gi.require_version('Ufo', '1.0')
from gi.repository import Ufo
from PyQt5.QtCore import QObject, pyqtSignal
from qtpynodeeditor import PortType
from threading import Thread
from tofu.flow.models import ARRAY_DATA_TYPE, UFO_DATA_TYPE, UfoTaskModel
from tofu.flow.util import FlowError


LOG = logging.getLogger(__name__)


class UfoExecutor(QObject):

    """Class holding GPU resources and organizing UFO graph execution."""

    number_of_inputs_changed = pyqtSignal(int)  # Number of inputs has been determined
    processed_signal = pyqtSignal(int)  # Image has been processed
    execution_started = pyqtSignal()  # Graph execution started
    execution_finished = pyqtSignal()  # Graph execution finished
    exception_occured = pyqtSignal(str)

    def __init__(self):
        super().__init__(parent=None)
        self._resources = Ufo.Resources()
        self._reset()
        # If True only log the exception and emit the signal but don't re-raise it in the executing
        # thread
        self.swallow_run_exceptions = False

    def _reset(self):
        self._aborted = False
        self._schedulers = []
        self.num_generated = 0

    def abort(self):
        LOG.debug('Execution aborted')
        try:
            self._aborted = True
            for scheduler in self._schedulers:
                scheduler.abort()
        finally:
            self.execution_finished.emit()

    def on_processed(self, ufo_task):
        self.processed_signal.emit(self.num_generated)
        self.num_generated += 1

    def setup_ufo_graph(self, graph, gpu=None, region=None, signalling_model=None):
        ufo_graph = Ufo.TaskGraph()
        ufo_tasks = {}
        for source, dest, ports in graph.edges.data():
            if hasattr(source, 'create_ufo_task') and hasattr(dest, 'create_ufo_task'):
                if dest not in ufo_tasks:
                    ufo_tasks[dest] = dest.create_ufo_task(region=region)
                if source not in ufo_tasks:
                    ufo_tasks[source] = source.create_ufo_task(region=region)
                ufo_graph.connect_nodes_full(ufo_tasks[source],
                                             ufo_tasks[dest],
                                             ports[PortType.input])
                LOG.debug(f'{source.name}->{dest.name}@{ports[PortType.input]}')
                if source == signalling_model:
                    ufo_tasks[source].connect('generated', self.on_processed)

        if gpu is not None:
            for task in ufo_tasks.values():
                if task.uses_gpu():
                    task.set_proc_node(gpu)

        return ufo_graph

    def _run_ufo_graph(self, ufo_graph, use_fixed_scheduler):
        LOG.debug(f'Executing graph, fixed scheduler: {use_fixed_scheduler}')

        try:
            scheduler = Ufo.FixedScheduler() if use_fixed_scheduler else Ufo.Scheduler()
            self._schedulers.append(scheduler)
            scheduler.set_resources(self._resources)
            scheduler.run(ufo_graph)
            LOG.info(f'Execution time: {scheduler.props.time} s')
        except Exception as e:
            # Do not continue execution of other batches
            self._aborted = True
            LOG.error(e, exc_info=True)
            self.exception_occured.emit(str(e))
            if not self.swallow_run_exceptions:
                raise e

    def check_graph(self, graph):
        """
        Check that *graph* starts with an UfoTaskModel and ends with either that or an UfoModel
        but no UfoTaskModel successor exists (there can be only one UFO path in the graph).
        """
        roots = [n for n in graph.nodes if graph.in_degree(n) == 0]
        leaves = [n for n in graph.nodes if graph.out_degree(n) == 0]

        for root in roots:
            for leave in leaves:
                for path in nx.simple_paths.all_simple_paths(graph, root, leave):
                    if not isinstance(path[0], UfoTaskModel):
                        raise FlowError('Flow must start with an UFO node')
                    ufo_ended = False
                    for (i, succ) in enumerate(path[1:]):
                        model = path[i]
                        edge_data = graph.get_edge_data(model, succ)
                        if len(edge_data) > 1:
                            # There cannot be multiple edges between nodes
                            raise FlowError('Multiple edges not allowed but detected '
                                            'between {model} and {succ}')
                        out_index = edge_data[0]['output']
                        # We don't need to check if input data type is ARRAY_DATA_TYPE because
                        # UFO_DATA_TYPE cannot be connected to ARRAY_DATA_TYPE in the scene
                        if ufo_ended:
                            # From now on only non-UFO tasks are allowed
                            if model.data_type['output'][out_index] != ARRAY_DATA_TYPE:
                                raise FlowError('After a non-UFO node cannot come another UFO node')
                        elif model.data_type['output'][out_index] != UFO_DATA_TYPE:
                            # Output is non-UFO, UFO ends here
                            ufo_ended = True

    def run(self, graph):
        self._reset()
        self.check_graph(graph)
        gpus = self._resources.get_gpu_nodes()

        num_inputs = -1
        signalling_model = None
        for model in graph.nodes:
            if graph.in_degree(model) == 0:
                if 'number' in model:
                    current = model['number']
                if current > num_inputs:
                    num_inputs = current
                    signalling_model = model

        batches = [[(None, None)]]
        gpu_splitting_model = None
        gpu_splitting_models = get_gpu_splitting_models(graph)
        if len(gpu_splitting_models) > 1:
            # There cannot be multiple splitting models
            raise FlowError('Only one gpu splitting model is allowed')
        elif gpu_splitting_models:
            gpu_splitting_model = gpu_splitting_models[0]
            batches = gpu_splitting_model.split_gpu_work(self._resources.get_gpu_nodes())

        for model in graph.nodes:
            # Reset internal model state
            if hasattr(model, 'reset_batches'):
                model.reset_batches()

        LOG.debug(f'{len(batches)} batches: {batches}')
        if signalling_model:
            self.number_of_inputs_changed.emit(len(batches) * num_inputs)
            LOG.debug(f'Number of inputs: {len(batches) * num_inputs}, defined '
                      f'by {signalling_model}')

        def execute_batches():
            self.execution_started.emit()
            try:
                for (i, parallel_batch) in enumerate(batches):
                    LOG.info(f'starting batch {i}: {parallel_batch}')
                    threads = []
                    for gpu_index, region in parallel_batch:
                        if self._aborted:
                            break
                        gpu = None if gpu_index is None else gpus[gpu_index]
                        ufo_graph = self.setup_ufo_graph(graph, gpu=gpu, region=region,
                                                         signalling_model=signalling_model)
                        t = Thread(target=self._run_ufo_graph,
                                   args=(ufo_graph,
                                         len(gpu_splitting_models) > 0))
                        t.daemon = True
                        threads.append(t)
                        t.start()
                    for t in threads:
                        t.join()
                    if self._aborted:
                        break
            except Exception as e:
                LOG.error(e, exc_info=True)
                self.exception_occured.emit(str(e))
                raise e
            finally:
                self.execution_finished.emit()

        gt = Thread(target=execute_batches)
        gt.daemon = True
        gt.start()


def get_gpu_splitting_models(graph):
    gpu_splitting_models = []
    for model in graph.nodes:
        if isinstance(model, UfoTaskModel) and model.can_split_gpu_work:
            gpu_splitting_models.append(model)

    return gpu_splitting_models