1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
logging.basicConfig(level=logging.ERROR)
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import notifier
ANY = notifier.Notifier.ANY
import example_utils as eu # noqa
# INTRO: This example shows how a graph flow and linear flow can be used
# together to execute dependent & non-dependent tasks by going through the
# steps required to build a simplistic car (an assembly line if you will). It
# also shows how raw functions can be wrapped into a task object instead of
# being forced to use the more *heavy* task base class. This is useful in
# scenarios where pre-existing code has functions that you easily want to
# plug-in to taskflow, without requiring a large amount of code changes.
def build_frame():
return 'steel'
def build_engine():
return 'honda'
def build_doors():
return '2'
def build_wheels():
return '4'
# These just return true to indiciate success, they would in the real work
# do more than just that.
def install_engine(frame, engine):
return True
def install_doors(frame, windows_installed, doors):
return True
def install_windows(frame, doors):
return True
def install_wheels(frame, engine, engine_installed, wheels):
return True
def trash(**kwargs):
eu.print_wrapped("Throwing away pieces of car!")
def startup(**kwargs):
# If you want to see the rollback function being activated try uncommenting
# the following line.
#
# raise ValueError("Car not verified")
return True
def verify(spec, **kwargs):
# If the car is not what we ordered throw away the car (trigger reversion).
for key, value in kwargs.items():
if spec[key] != value:
raise Exception("Car doesn't match spec!")
return True
# These two functions connect into the state transition notification emission
# points that the engine outputs, they can be used to log state transitions
# that are occurring, or they can be used to suspend the engine (or perform
# other useful activities).
def flow_watch(state, details):
print('Flow => %s' % state)
def task_watch(state, details):
print('Task {} => {}'.format(details.get('task_name'), state))
flow = lf.Flow("make-auto").add(
task.FunctorTask(startup, revert=trash, provides='ran'),
# A graph flow allows automatic dependency based ordering, the ordering
# is determined by analyzing the symbols required and provided and ordering
# execution based on a functioning order (if one exists).
gf.Flow("install-parts").add(
task.FunctorTask(build_frame, provides='frame'),
task.FunctorTask(build_engine, provides='engine'),
task.FunctorTask(build_doors, provides='doors'),
task.FunctorTask(build_wheels, provides='wheels'),
# These *_installed outputs allow for other tasks to depend on certain
# actions being performed (aka the components were installed), another
# way to do this is to link() the tasks manually instead of creating
# an 'artificial' data dependency that accomplishes the same goal the
# manual linking would result in.
task.FunctorTask(install_engine, provides='engine_installed'),
task.FunctorTask(install_doors, provides='doors_installed'),
task.FunctorTask(install_windows, provides='windows_installed'),
task.FunctorTask(install_wheels, provides='wheels_installed')),
task.FunctorTask(verify, requires=['frame',
'engine',
'doors',
'wheels',
'engine_installed',
'doors_installed',
'windows_installed',
'wheels_installed']))
# This dictionary will be provided to the tasks as a specification for what
# the tasks should produce, in this example this specification will influence
# what those tasks do and what output they create. Different tasks depend on
# different information from this specification, all of which will be provided
# automatically by the engine to those tasks.
spec = {
"frame": 'steel',
"engine": 'honda',
"doors": '2',
"wheels": '4',
# These are used to compare the result product, a car without the pieces
# installed is not a car after all.
"engine_installed": True,
"doors_installed": True,
"windows_installed": True,
"wheels_installed": True,
}
engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
# This registers all (ANY) state transitions to trigger a call to the
# flow_watch function for flow state transitions, and registers the
# same all (ANY) state transitions for task state transitions.
engine.notifier.register(ANY, flow_watch)
engine.atom_notifier.register(ANY, task_watch)
eu.print_wrapped("Building a car")
engine.run()
# Alter the specification and ensure that the reverting logic gets triggered
# since the resultant car that will be built by the build_wheels function will
# build a car with 4 doors only (not 5), this will cause the verification
# task to mark the car that is produced as not matching the desired spec.
spec['doors'] = 5
engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
engine.notifier.register(ANY, flow_watch)
engine.atom_notifier.register(ANY, task_watch)
eu.print_wrapped("Building a wrong car that doesn't match specification")
try:
engine.run()
except Exception as e:
eu.print_wrapped("Flow failed: %s" % e)
|