1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
# -*- coding: utf-8 -*-
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
import time
logging.basicConfig(level=logging.ERROR)
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
import taskflow.engines
from taskflow import exceptions
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.tests import utils
from taskflow.types import failure
import example_utils as eu # noqa
# INTRO: In this example we create two tasks which can trigger exceptions
# based on various inputs to show how to analyze the thrown exceptions for
# which types were thrown and handle the different types in different ways.
#
# This is especially important if a set of tasks run in parallel and each of
# those tasks may fail while running. This creates a scenario where multiple
# exceptions have been thrown and those exceptions need to be handled in a
# unified manner. Since an engine does not currently know how to resolve
# those exceptions (someday it could) the code using that engine and activating
# the flows and tasks using that engine will currently have to deal with
# catching those exceptions (and silencing them if this is desired).
#
# NOTE(harlowja): The engine *will* trigger rollback even under multiple
# exceptions being thrown, but at the end of that rollback the engine will
# rethrow these exceptions to the code that called the run() method; allowing
# that code to do further cleanups (if desired).
class FirstException(Exception):
"""Exception that first task raises."""
class SecondException(Exception):
"""Exception that second task raises."""
class FirstTask(task.Task):
def execute(self, sleep1, raise1):
time.sleep(sleep1)
if not isinstance(raise1, bool):
raise TypeError('Bad raise1 value: %r' % raise1)
if raise1:
raise FirstException('First task failed')
class SecondTask(task.Task):
def execute(self, sleep2, raise2):
time.sleep(sleep2)
if not isinstance(raise2, bool):
raise TypeError('Bad raise2 value: %r' % raise2)
if raise2:
raise SecondException('Second task failed')
def run(**store):
# Creates a flow, each task in the flow will examine the kwargs passed in
# here and based on those kwargs it will behave in a different manner
# while executing; this allows for the calling code (see below) to show
# different usages of the failure catching and handling mechanism.
flow = uf.Flow('flow').add(
FirstTask(),
SecondTask()
)
try:
with utils.wrap_all_failures():
taskflow.engines.run(flow, store=store,
engine='parallel')
except exceptions.WrappedFailure as ex:
unknown_failures = []
for a_failure in ex:
if a_failure.check(FirstException):
print("Got FirstException: %s" % a_failure.exception_str)
elif a_failure.check(SecondException):
print("Got SecondException: %s" % a_failure.exception_str)
else:
print("Unknown failure: %s" % a_failure)
unknown_failures.append(a_failure)
failure.Failure.reraise_if_any(unknown_failures)
eu.print_wrapped("Raise and catch first exception only")
run(sleep1=0.0, raise1=True,
sleep2=0.0, raise2=False)
# NOTE(imelnikov): in general, sleeping does not guarantee that we'll have both
# task running before one of them fails, but with current implementation this
# works most of times, which is enough for our purposes here (as an example).
eu.print_wrapped("Raise and catch both exceptions")
run(sleep1=1.0, raise1=True,
sleep2=1.0, raise2=True)
eu.print_wrapped("Handle one exception, and re-raise another")
try:
run(sleep1=1.0, raise1=True,
sleep2=1.0, raise2='boom')
except TypeError as ex:
print("As expected, TypeError is here: %s" % ex)
else:
assert False, "TypeError expected"
|