1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
from pathlib import Path
import shutil
import traceback
from typing import Any, List, Optional, Union
import attr
from nbclient import execute as executenb
from nbclient.client import CellExecutionError, CellTimeoutError
from nbformat import NotebookNode
from jupyter_cache.base import CacheBundleIn, ProjectNb
from jupyter_cache.cache.main import NbArtifacts
from jupyter_cache.utils import Timer, to_relative_paths
@attr.s()
class ExecutionResult:
nb: NotebookNode = attr.ib()
cwd: str = attr.ib()
time: float = attr.ib()
err: Optional[Union[CellExecutionError, CellTimeoutError]] = attr.ib(default=None)
exc_string: Optional[str] = attr.ib(default=None)
def single_nb_execution(
nb: NotebookNode,
cwd: Optional[str],
timeout: Optional[int],
allow_errors: bool,
meta_override: bool = True,
record_timing: bool = False,
**kwargs: Any,
) -> ExecutionResult:
"""Execute notebook in place.
:param cwd: If supplied, the kernel will run in this directory.
:param timeout: The time to wait (in seconds) for output from executions.
If a cell execution takes longer, a ``TimeoutError`` is raised.
:param allow_errors: If ``False`` when a cell raises an error the
execution is stopped and a ``CellExecutionError`` is raised.
:param meta_override: If ``True`` then timeout and allow_errors may be overridden
by equivalent keys in nb.metadata.execution
:param kwargs: Additional keyword arguments to pass to the ``NotebookClient``.
:returns: The execution time in seconds
"""
if meta_override and "execution" in nb.metadata:
if "timeout" in nb.metadata.execution:
timeout = nb.metadata.execution.timeout
if "allow_errors" in nb.metadata.execution:
allow_errors = nb.metadata.execution.allow_errors
error = exc_string = None
# TODO nbclient with record_timing=True will add execution data to each cell
timer = Timer()
with timer:
try:
executenb(
nb,
cwd=cwd,
timeout=timeout,
allow_errors=allow_errors,
record_timing=record_timing,
**kwargs,
)
except (CellExecutionError, CellTimeoutError) as err:
error = err
exc_string = "".join(traceback.format_exc())
return ExecutionResult(nb, cwd, timer.last_split, error, exc_string)
def copy_assets(uri: str, assets: List[str], folder: str) -> List[Path]:
"""Copy notebook assets to the folder the notebook will be executed in."""
asset_files = []
relative_paths = to_relative_paths(assets, Path(uri).parent)
for path, rel_path in zip(assets, relative_paths):
temp_file = Path(folder).joinpath(rel_path)
temp_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(path, temp_file)
asset_files.append(temp_file)
return asset_files
def create_cache_bundle(
project_nb: ProjectNb,
execdir: Optional[str],
asset_files: Optional[List[Path]],
exec_time: float,
exec_tb: Optional[str],
) -> CacheBundleIn:
"""Create a cache bundle to save."""
return CacheBundleIn(
project_nb.nb,
project_nb.uri,
# TODO retrieve assets that have changed file mtime?
artifacts=NbArtifacts(
[p for p in Path(execdir).glob("**/*") if p not in asset_files],
execdir,
)
if (execdir is not None and asset_files is not None)
else None,
data={"execution_seconds": exec_time},
traceback=exec_tb,
)
|