File: impl_executor.py

package info (click to toggle)
python-taskflow 5.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 3,564 kB
  • sloc: python: 28,241; sh: 269; makefile: 24
file content (360 lines) | stat: -rw-r--r-- 15,243 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# -*- coding: utf-8 -*-

#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import abc
import contextlib
import functools
import itertools
import threading

from oslo_utils import excutils
from oslo_utils import timeutils

from taskflow.conductors import base
from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener
from taskflow import logging
from taskflow import states
from taskflow.types import timing as tt
from taskflow.utils import iter_utils
from taskflow.utils import misc

LOG = logging.getLogger(__name__)


class ExecutorConductor(base.Conductor, metaclass=abc.ABCMeta):
    """Dispatches jobs from blocking :py:meth:`.run` method to some executor.

    This conductor iterates over jobs in the provided jobboard (waiting for
    the given timeout if no jobs exist) and attempts to claim them, work on
    those jobs using an executor (potentially blocking further work from being
    claimed and consumed) and then consume those work units after
    completion. This process will repeat until the conductor has been stopped
    or other critical error occurs.

    NOTE(harlowja): consumption occurs even if a engine fails to run due to
    a atom failure. This is only skipped when an execution failure or
    a storage failure occurs which are *usually* correctable by re-running on
    a different conductor (storage failures and execution failures may be
    transient issues that can be worked around by later execution). If a job
    after completing can not be consumed or abandoned the conductor relies
    upon the jobboard capabilities to automatically abandon these jobs.
    """

    LOG = None
    """
    Logger that will be used for listening to events (if none then the module
    level logger will be used instead).
    """

    REFRESH_PERIODICITY = 30
    """
    Every 30 seconds the jobboard will be resynced (if for some reason
    a watch or set of watches was not received) using the `ensure_fresh`
    option to ensure this (for supporting jobboard backends only).
    """

    #: Default timeout used to idle/wait when no jobs have been found.
    WAIT_TIMEOUT = 0.5

    MAX_SIMULTANEOUS_JOBS = -1
    """
    Default maximum number of jobs that can be in progress at the same time.

    Negative or zero values imply no limit (do note that if a executor is
    used that is built on a queue, as most are, that this will imply that the
    queue will contain a potentially large & unfinished backlog of
    submitted jobs). This *may* get better someday if
    https://bugs.python.org/issue22737 is ever implemented and released.
    """

    #: Exceptions that will **not** cause consumption to occur.
    NO_CONSUME_EXCEPTIONS = tuple([
        excp.ExecutionFailure,
        excp.StorageFailure,
    ])

    _event_factory = threading.Event
    """This attribute *can* be overridden by subclasses (for example if
       an eventlet *green* event works better for the conductor user)."""

    EVENTS_EMITTED = tuple([
        'compilation_start', 'compilation_end',
        'preparation_start', 'preparation_end',
        'validation_start', 'validation_end',
        'running_start', 'running_end',
        'job_consumed', 'job_abandoned',
    ])
    """Events will be emitted for each of the events above.  The event is
       emitted to listeners registered with the conductor.
    """

    def __init__(self, name, jobboard,
                 persistence=None, engine=None,
                 engine_options=None, wait_timeout=None,
                 log=None, max_simultaneous_jobs=None):
        if max_simultaneous_jobs is None:
            max_simultaneous_jobs = MAX_SIMULTANEOUS_JOBS
        super(ExecutorConductor, self).__init__(
            name, jobboard, persistence=persistence,
            engine=engine, engine_options=engine_options)
        self._wait_timeout = tt.convert_to_timeout(
            value=wait_timeout, default_value=self.WAIT_TIMEOUT,
            event_factory=self._event_factory)
        self._dead = self._event_factory()
        self._log = misc.pick_first_not_none(log, self.LOG, LOG)
        self._max_simultaneous_jobs = int(
            misc.pick_first_not_none(max_simultaneous_jobs,
                                     self.MAX_SIMULTANEOUS_JOBS))
        self._dispatched = set()

    def _executor_factory(self):
        """Creates an executor to be used during dispatching."""
        raise excp.NotImplementedError("This method must be implemented but"
                                       " it has not been")

    def stop(self):
        """Requests the conductor to stop dispatching.

        This method can be used to request that a conductor stop its
        consumption & dispatching loop.

        The method returns immediately regardless of whether the conductor has
        been stopped.
        """
        self._wait_timeout.interrupt()

    @property
    def dispatching(self):
        """Whether or not the dispatching loop is still dispatching."""
        return not self._dead.is_set()

    def _listeners_from_job(self, job, engine):
        listeners = super(ExecutorConductor, self)._listeners_from_job(
            job, engine)
        listeners.append(logging_listener.LoggingListener(engine,
                                                          log=self._log))
        return listeners

    def _dispatch_job(self, job):
        engine = self._engine_from_job(job)
        listeners = self._listeners_from_job(job, engine)
        with contextlib.ExitStack() as stack:
            for listener in listeners:
                stack.enter_context(listener)
            self._log.debug("Dispatching engine for job '%s'", job)
            consume = True
            details = {
                'job': job,
                'engine': engine,
                'conductor': self,
            }

            def _run_engine():
                has_suspended = False
                for _state in engine.run_iter():
                    if not has_suspended and self._wait_timeout.is_stopped():
                        self._log.info("Conductor stopped, requesting "
                                       "suspension of engine running "
                                       "job %s", job)
                        engine.suspend()
                        has_suspended = True

            try:
                for stage_func, event_name in [(engine.compile, 'compilation'),
                                               (engine.prepare, 'preparation'),
                                               (engine.validate, 'validation'),
                                               (_run_engine, 'running')]:
                    self._notifier.notify("%s_start" % event_name, details)
                    stage_func()
                    self._notifier.notify("%s_end" % event_name, details)
            except excp.WrappedFailure as e:
                if all((f.check(*self.NO_CONSUME_EXCEPTIONS) for f in e)):
                    consume = False
                if self._log.isEnabledFor(logging.WARNING):
                    if consume:
                        self._log.warn(
                            "Job execution failed (consumption being"
                            " skipped): %s [%s failures]", job, len(e))
                    else:
                        self._log.warn(
                            "Job execution failed (consumption"
                            " proceeding): %s [%s failures]", job, len(e))
                    # Show the failure/s + traceback (if possible)...
                    for i, f in enumerate(e):
                        self._log.warn("%s. %s", i + 1,
                                       f.pformat(traceback=True))
            except self.NO_CONSUME_EXCEPTIONS:
                self._log.warn("Job execution failed (consumption being"
                               " skipped): %s", job, exc_info=True)
                consume = False
            except Exception:
                self._log.warn(
                    "Job execution failed (consumption proceeding): %s",
                    job, exc_info=True)
            else:
                if engine.storage.get_flow_state() == states.SUSPENDED:
                    self._log.info("Job execution was suspended: %s", job)
                    consume = False
                else:
                    self._log.info("Job completed successfully: %s", job)
            return consume

    def _try_finish_job(self, job, consume, trash=False):
        try:
            if consume:
                self._jobboard.consume(job, self._name)
                self._notifier.notify("job_consumed", {
                    'job': job,
                    'conductor': self,
                    'persistence': self._persistence,
                })
            elif trash:
                self._jobboard.trash(job, self._name)
                self._notifier.notify("job_trashed", {
                    'job': job,
                    'conductor': self,
                    'persistence': self._persistence,
                })
            else:
                self._jobboard.abandon(job, self._name)
                self._notifier.notify("job_abandoned", {
                    'job': job,
                    'conductor': self,
                    'persistence': self._persistence,
                })
        except (excp.JobFailure, excp.NotFound):
            if consume:
                self._log.warn("Failed job consumption: %s", job,
                               exc_info=True)
            else:
                self._log.warn("Failed job abandonment: %s", job,
                               exc_info=True)

    def _on_job_done(self, job, fut):
        consume = False
        trash = False
        try:
            consume = fut.result()
        except KeyboardInterrupt:
            with excutils.save_and_reraise_exception():
                self._log.warn("Job dispatching interrupted: %s", job)
        except Exception:
            self._log.warn("Job dispatching failed: %s", job, exc_info=True)
            trash = True
        try:
            self._try_finish_job(job, consume, trash)
        finally:
            self._dispatched.discard(fut)

    def _can_claim_more_jobs(self, job):
        if self._wait_timeout.is_stopped():
            return False
        if self._max_simultaneous_jobs <= 0:
            return True
        if len(self._dispatched) >= self._max_simultaneous_jobs:
            return False
        else:
            return True

    def _run_until_dead(self, executor, max_dispatches=None):
        total_dispatched = 0
        if max_dispatches is None:
            # NOTE(TheSriram): if max_dispatches is not set,
            # then the  conductor will run indefinitely, and not
            # stop after 'n' number of dispatches
            max_dispatches = -1
        dispatch_gen = iter_utils.iter_forever(max_dispatches)
        is_stopped = self._wait_timeout.is_stopped
        try:
            # Don't even do any work in the first place...
            if max_dispatches == 0:
                raise StopIteration
            fresh_period = timeutils.StopWatch(
                duration=self.REFRESH_PERIODICITY)
            fresh_period.start()
            while not is_stopped():
                any_dispatched = False
                if fresh_period.expired():
                    ensure_fresh = True
                    fresh_period.restart()
                else:
                    ensure_fresh = False
                job_it = itertools.takewhile(
                    self._can_claim_more_jobs,
                    self._jobboard.iterjobs(ensure_fresh=ensure_fresh))
                for job in job_it:
                    self._log.debug("Trying to claim job: %s", job)
                    try:
                        self._jobboard.claim(job, self._name)
                    except (excp.UnclaimableJob, excp.NotFound):
                        self._log.debug("Job already claimed or"
                                        " consumed: %s", job)
                    else:
                        try:
                            fut = executor.submit(self._dispatch_job, job)
                        except RuntimeError:
                            with excutils.save_and_reraise_exception():
                                self._log.warn("Job dispatch submitting"
                                               " failed: %s", job)
                                self._try_finish_job(job, False)
                        else:
                            fut.job = job
                            self._dispatched.add(fut)
                            any_dispatched = True
                            fut.add_done_callback(
                                functools.partial(self._on_job_done, job))
                            total_dispatched = next(dispatch_gen)
                if not any_dispatched and not is_stopped():
                    self._wait_timeout.wait()
        except StopIteration:
            # This will be raised from 'dispatch_gen' if it reaches its
            # max dispatch number (which implies we should do no more work).
            with excutils.save_and_reraise_exception():
                if max_dispatches >= 0 and total_dispatched >= max_dispatches:
                    self._log.info("Maximum dispatch limit of %s reached",
                                   max_dispatches)

    def run(self, max_dispatches=None):
        self._dead.clear()
        self._dispatched.clear()
        try:
            self._jobboard.register_entity(self.conductor)
            with self._executor_factory() as executor:
                self._run_until_dead(executor,
                                     max_dispatches=max_dispatches)
        except StopIteration:
            pass
        except KeyboardInterrupt:
            with excutils.save_and_reraise_exception():
                self._log.warn("Job dispatching interrupted")
        finally:
            self._dead.set()

    # Inherit the docs, so we can reference them in our class docstring,
    # if we don't do this sphinx gets confused...
    run.__doc__ = base.Conductor.run.__doc__

    def wait(self, timeout=None):
        """Waits for the conductor to gracefully exit.

        This method waits for the conductor to gracefully exit. An optional
        timeout can be provided, which will cause the method to return
        within the specified timeout. If the timeout is reached, the returned
        value will be ``False``, otherwise it will be ``True``.

        :param timeout: Maximum number of seconds that the :meth:`wait` method
                        should block for.
        """
        return self._dead.wait(timeout)