1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
|
"""Dependency utilities
Authors:
* Min RK
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
from types import ModuleType
from IPython.parallel.client.asyncresult import AsyncResult
from IPython.parallel.error import UnmetDependency
from IPython.parallel.util import interactive
from IPython.utils import py3compat
from IPython.utils.py3compat import string_types
from IPython.utils.pickleutil import can, uncan
class depend(object):
"""Dependency decorator, for use with tasks.
`@depend` lets you define a function for engine dependencies
just like you use `apply` for tasks.
Examples
--------
::
@depend(df, a,b, c=5)
def f(m,n,p)
view.apply(f, 1,2,3)
will call df(a,b,c=5) on the engine, and if it returns False or
raises an UnmetDependency error, then the task will not be run
and another engine will be tried.
"""
def __init__(self, _wrapped_f, *args, **kwargs):
self.f = _wrapped_f
self.args = args
self.kwargs = kwargs
def __call__(self, f):
return dependent(f, self.f, *self.args, **self.kwargs)
class dependent(object):
"""A function that depends on another function.
This is an object to prevent the closure used
in traditional decorators, which are not picklable.
"""
def __init__(self, _wrapped_f, _wrapped_df, *dargs, **dkwargs):
self.f = _wrapped_f
name = getattr(_wrapped_f, '__name__', 'f')
if py3compat.PY3:
self.__name__ = name
else:
self.func_name = name
self.df = _wrapped_df
self.dargs = dargs
self.dkwargs = dkwargs
def check_dependency(self):
if self.df(*self.dargs, **self.dkwargs) is False:
raise UnmetDependency()
def __call__(self, *args, **kwargs):
return self.f(*args, **kwargs)
if not py3compat.PY3:
@property
def __name__(self):
return self.func_name
@interactive
def _require(*modules, **mapping):
"""Helper for @require decorator."""
from IPython.parallel.error import UnmetDependency
from IPython.utils.pickleutil import uncan
user_ns = globals()
for name in modules:
try:
exec('import %s' % name, user_ns)
except ImportError:
raise UnmetDependency(name)
for name, cobj in mapping.items():
user_ns[name] = uncan(cobj, user_ns)
return True
def require(*objects, **mapping):
"""Simple decorator for requiring local objects and modules to be available
when the decorated function is called on the engine.
Modules specified by name or passed directly will be imported
prior to calling the decorated function.
Objects other than modules will be pushed as a part of the task.
Functions can be passed positionally,
and will be pushed to the engine with their __name__.
Other objects can be passed by keyword arg.
Examples::
In [1]: @require('numpy')
...: def norm(a):
...: return numpy.linalg.norm(a,2)
In [2]: foo = lambda x: x*x
In [3]: @require(foo)
...: def bar(a):
...: return foo(1-a)
"""
names = []
for obj in objects:
if isinstance(obj, ModuleType):
obj = obj.__name__
if isinstance(obj, string_types):
names.append(obj)
elif hasattr(obj, '__name__'):
mapping[obj.__name__] = obj
else:
raise TypeError("Objects other than modules and functions "
"must be passed by kwarg, but got: %s" % type(obj)
)
for name, obj in mapping.items():
mapping[name] = can(obj)
return depend(_require, *names, **mapping)
class Dependency(set):
"""An object for representing a set of msg_id dependencies.
Subclassed from set().
Parameters
----------
dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
The msg_ids to depend on
all : bool [default True]
Whether the dependency should be considered met when *all* depending tasks have completed
or only when *any* have been completed.
success : bool [default True]
Whether to consider successes as fulfilling dependencies.
failure : bool [default False]
Whether to consider failures as fulfilling dependencies.
If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency
as soon as the first depended-upon task fails.
"""
all=True
success=True
failure=True
def __init__(self, dependencies=[], all=True, success=True, failure=False):
if isinstance(dependencies, dict):
# load from dict
all = dependencies.get('all', True)
success = dependencies.get('success', success)
failure = dependencies.get('failure', failure)
dependencies = dependencies.get('dependencies', [])
ids = []
# extract ids from various sources:
if isinstance(dependencies, string_types + (AsyncResult,)):
dependencies = [dependencies]
for d in dependencies:
if isinstance(d, string_types):
ids.append(d)
elif isinstance(d, AsyncResult):
ids.extend(d.msg_ids)
else:
raise TypeError("invalid dependency type: %r"%type(d))
set.__init__(self, ids)
self.all = all
if not (success or failure):
raise ValueError("Must depend on at least one of successes or failures!")
self.success=success
self.failure = failure
def check(self, completed, failed=None):
"""check whether our dependencies have been met."""
if len(self) == 0:
return True
against = set()
if self.success:
against = completed
if failed is not None and self.failure:
against = against.union(failed)
if self.all:
return self.issubset(against)
else:
return not self.isdisjoint(against)
def unreachable(self, completed, failed=None):
"""return whether this dependency has become impossible."""
if len(self) == 0:
return False
against = set()
if not self.success:
against = completed
if failed is not None and not self.failure:
against = against.union(failed)
if self.all:
return not self.isdisjoint(against)
else:
return self.issubset(against)
def as_dict(self):
"""Represent this dependency as a dict. For json compatibility."""
return dict(
dependencies=list(self),
all=self.all,
success=self.success,
failure=self.failure
)
__all__ = ['depend', 'require', 'dependent', 'Dependency']
|