1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
import logging
from dask.utils import stringify
from .client import futures_of, wait
from .utils import sync
from .utils_comm import pack_data
from .worker import _deserialize
logger = logging.getLogger(__name__)
class ReplayExceptionScheduler:
"""A plugin for the scheduler to recreate exceptions locally
This adds the following routes to the scheduler
* cause_of_failure
"""
def __init__(self, scheduler):
self.scheduler = scheduler
self.scheduler.handlers["cause_of_failure"] = self.cause_of_failure
self.scheduler.extensions["exceptions"] = self
def cause_of_failure(self, *args, keys=(), **kwargs):
"""
Return details of first failed task required by set of keys
Parameters
----------
keys: list of keys known to the scheduler
Returns
-------
Dictionary with:
cause: the key that failed
task: the definition of that key
deps: keys that the task depends on
"""
for key in keys:
if isinstance(key, list):
key = tuple(key) # ensure not a list from msgpack
key = stringify(key)
ts = self.scheduler.tasks.get(key)
if ts is not None and ts.exception_blame is not None:
cause = ts.exception_blame
# NOTE: cannot serialize sets
return {
"deps": [dts.key for dts in cause.dependencies],
"cause": cause.key,
"task": cause.run_spec,
}
class ReplayExceptionClient:
"""
A plugin for the client allowing replay of remote exceptions locally
Adds the following methods (and their async variants)to the given client:
- ``recreate_error_locally``: main user method
- ``get_futures_error``: gets the task, its details and dependencies,
responsible for failure of the given future.
"""
def __init__(self, client):
self.client = client
self.client.extensions["exceptions"] = self
# monkey patch
self.client.recreate_error_locally = self.recreate_error_locally
self.client._recreate_error_locally = self._recreate_error_locally
self.client._get_futures_error = self._get_futures_error
self.client.get_futures_error = self.get_futures_error
@property
def scheduler(self):
return self.client.scheduler
async def _get_futures_error(self, future):
# only get errors for futures that errored.
futures = [f for f in futures_of(future) if f.status == "error"]
if not futures:
raise ValueError("No errored futures passed")
out = await self.scheduler.cause_of_failure(keys=[f.key for f in futures])
deps, task = out["deps"], out["task"]
if isinstance(task, dict):
function, args, kwargs = _deserialize(**task)
return (function, args, kwargs, deps)
else:
function, args, kwargs = _deserialize(task=task)
return (function, args, kwargs, deps)
def get_futures_error(self, future):
"""
Ask the scheduler details of the sub-task of the given failed future
When a future evaluates to a status of "error", i.e., an exception
was raised in a task within its graph, we an get information from
the scheduler. This function gets the details of the specific task
that raised the exception and led to the error, but does not fetch
data from the cluster or execute the function.
Parameters
----------
future : future that failed, having ``status=="error"``, typically
after an attempt to ``gather()`` shows a stack-stace.
Returns
-------
Tuple:
- the function that raised an exception
- argument list (a tuple), may include values and keys
- keyword arguments (a dictionary), may include values and keys
- list of keys that the function requires to be fetched to run
See Also
--------
ReplayExceptionClient.recreate_error_locally
"""
return self.client.sync(self._get_futures_error, future)
async def _recreate_error_locally(self, future):
await wait(future)
out = await self._get_futures_error(future)
function, args, kwargs, deps = out
futures = self.client._graph_to_futures({}, deps)
data = await self.client._gather(futures)
args = pack_data(args, data)
kwargs = pack_data(kwargs, data)
return (function, args, kwargs)
def recreate_error_locally(self, future):
"""
For a failed calculation, perform the blamed task locally for debugging.
This operation should be performed after a future (result of ``gather``,
``compute``, etc) comes back with a status of "error", if the stack-
trace is not informative enough to diagnose the problem. The specific
task (part of the graph pointing to the future) responsible for the
error will be fetched from the scheduler, together with the values of
its inputs. The function will then be executed, so that ``pdb`` can
be used for debugging.
Examples
--------
>>> future = c.submit(div, 1, 0) # doctest: +SKIP
>>> future.status # doctest: +SKIP
'error'
>>> c.recreate_error_locally(future) # doctest: +SKIP
ZeroDivisionError: division by zero
If you're in IPython you might take this opportunity to use pdb
>>> %pdb # doctest: +SKIP
Automatic pdb calling has been turned ON
>>> c.recreate_error_locally(future) # doctest: +SKIP
ZeroDivisionError: division by zero
1 def div(x, y):
----> 2 return x / y
ipdb>
Parameters
----------
future : future or collection that failed
The same thing as was given to ``gather``, but came back with
an exception/stack-trace. Can also be a (persisted) dask collection
containing any errored futures.
Returns
-------
Nothing; the function runs and should raise an exception, allowing
the debugger to run.
"""
func, args, kwargs = sync(
self.client.loop, self._recreate_error_locally, future
)
func(*args, **kwargs)
|