File: wrapped_exception.py

package info (click to toggle)
python-taskflow 5.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 3,564 kB
  • sloc: python: 28,241; sh: 269; makefile: 24
file content (128 lines) | stat: -rw-r--r-- 4,732 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# -*- coding: utf-8 -*-

#    Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import logging
import os
import sys
import time


logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)


import taskflow.engines
from taskflow import exceptions
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.tests import utils
from taskflow.types import failure

import example_utils as eu  # noqa

# INTRO: In this example we create two tasks which can trigger exceptions
# based on various inputs to show how to analyze the thrown exceptions for
# which types were thrown and handle the different types in different ways.
#
# This is especially important if a set of tasks run in parallel and each of
# those tasks may fail while running. This creates a scenario where multiple
# exceptions have been thrown and those exceptions need to be handled in a
# unified manner. Since an engine does not currently know how to resolve
# those exceptions (someday it could) the code using that engine and activating
# the flows and tasks using that engine will currently have to deal with
# catching those exceptions (and silencing them if this is desired).
#
# NOTE(harlowja): The engine *will* trigger rollback even under multiple
# exceptions being thrown, but at the end of that rollback the engine will
# rethrow these exceptions to the code that called the run() method; allowing
# that code to do further cleanups (if desired).


class FirstException(Exception):
    """Exception that first task raises."""


class SecondException(Exception):
    """Exception that second task raises."""


class FirstTask(task.Task):
    def execute(self, sleep1, raise1):
        time.sleep(sleep1)
        if not isinstance(raise1, bool):
            raise TypeError('Bad raise1 value: %r' % raise1)
        if raise1:
            raise FirstException('First task failed')


class SecondTask(task.Task):
    def execute(self, sleep2, raise2):
        time.sleep(sleep2)
        if not isinstance(raise2, bool):
            raise TypeError('Bad raise2 value: %r' % raise2)
        if raise2:
            raise SecondException('Second task failed')


def run(**store):
    # Creates a flow, each task in the flow will examine the kwargs passed in
    # here and based on those kwargs it will behave in a different manner
    # while executing; this allows for the calling code (see below) to show
    # different usages of the failure catching and handling mechanism.
    flow = uf.Flow('flow').add(
        FirstTask(),
        SecondTask()
    )
    try:
        with utils.wrap_all_failures():
            taskflow.engines.run(flow, store=store,
                                 engine='parallel')
    except exceptions.WrappedFailure as ex:
        unknown_failures = []
        for a_failure in ex:
            if a_failure.check(FirstException):
                print("Got FirstException: %s" % a_failure.exception_str)
            elif a_failure.check(SecondException):
                print("Got SecondException: %s" % a_failure.exception_str)
            else:
                print("Unknown failure: %s" % a_failure)
                unknown_failures.append(a_failure)
        failure.Failure.reraise_if_any(unknown_failures)


eu.print_wrapped("Raise and catch first exception only")
run(sleep1=0.0, raise1=True,
    sleep2=0.0, raise2=False)

# NOTE(imelnikov): in general, sleeping does not guarantee that we'll have both
# task running before one of them fails, but with current implementation this
# works most of times, which is enough for our purposes here (as an example).
eu.print_wrapped("Raise and catch both exceptions")
run(sleep1=1.0, raise1=True,
    sleep2=1.0, raise2=True)

eu.print_wrapped("Handle one exception, and re-raise another")
try:
    run(sleep1=1.0, raise1=True,
        sleep2=1.0, raise2='boom')
except TypeError as ex:
    print("As expected, TypeError is here: %s" % ex)
else:
    assert False, "TypeError expected"