File: clientwrap.py

package info (click to toggle)
python-papermill 2.6.0-3.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,216 kB
  • sloc: python: 4,977; makefile: 17; sh: 5
file content (110 lines) | stat: -rw-r--r-- 4,346 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import asyncio
import sys

from nbclient import NotebookClient
from nbclient.exceptions import CellExecutionError
from traitlets import Bool, Instance


class PapermillNotebookClient(NotebookClient):
    """
    Module containing a  that executes the code cells
    and updates outputs
    """

    log_output = Bool(False).tag(config=True)
    stdout_file = Instance(object, default_value=None).tag(config=True)
    stderr_file = Instance(object, default_value=None).tag(config=True)

    def __init__(self, nb_man, km=None, raise_on_iopub_timeout=True, **kw):
        """Initializes the execution manager.

        Parameters
        ----------
        nb_man : NotebookExecutionManager
            Notebook execution manager wrapper being executed.
        km : KernerlManager (optional)
            Optional kernel manager. If none is provided, a kernel manager will
            be created.
        """
        super().__init__(nb_man.nb, km=km, raise_on_iopub_timeout=raise_on_iopub_timeout, **kw)
        self.nb_man = nb_man

    def execute(self, **kwargs):
        """
        Wraps the parent class process call slightly
        """
        self.reset_execution_trackers()

        # See https://bugs.python.org/issue37373 :(
        if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

        with self.setup_kernel(**kwargs):
            self.log.info(f"Executing notebook with kernel: {self.kernel_name}")
            self.papermill_execute_cells()
            info_msg = self.wait_for_reply(self.kc.kernel_info())
            self.nb.metadata['language_info'] = info_msg['content']['language_info']
            self.set_widgets_metadata()

        return self.nb

    def papermill_execute_cells(self):
        """
        This function replaces cell execution with it's own wrapper.

        We are doing this for the following reasons:

        1. Notebooks will stop executing when they encounter a failure but not
           raise a `CellException`. This allows us to save the notebook with the
           traceback even though a `CellExecutionError` was encountered.

        2. We want to write the notebook as cells are executed. We inject our
           logic for that here.

        3. We want to include timing and execution status information with the
           metadata of each cell.
        """
        # Execute each cell and update the output in real time.
        for index, cell in enumerate(self.nb.cells):
            try:
                self.nb_man.cell_start(cell, index)
                self.execute_cell(cell, index)
            except CellExecutionError as ex:
                self.nb_man.cell_exception(self.nb.cells[index], cell_index=index, exception=ex)
                break
            finally:
                self.nb_man.cell_complete(self.nb.cells[index], cell_index=index)

    def log_output_message(self, output):
        """
        Process a given output. May log it in the configured logger and/or write it into
        the configured stdout/stderr files.

        :param output: nbformat.notebooknode.NotebookNode
        :return:
        """
        if output.output_type == "stream":
            content = "".join(output.text)
            if output.name == "stdout":
                if self.log_output:
                    self.log.info(content)
                if self.stdout_file:
                    self.stdout_file.write(content)
                    self.stdout_file.flush()
            elif output.name == "stderr":
                if self.log_output:
                    # In case users want to redirect stderr differently, pipe to warning
                    self.log.warning(content)
                if self.stderr_file:
                    self.stderr_file.write(content)
                    self.stderr_file.flush()
        elif self.log_output and ("data" in output and "text/plain" in output.data):
            self.log.info("".join(output.data['text/plain']))

    def process_message(self, *arg, **kwargs):
        output = super().process_message(*arg, **kwargs)
        self.nb_man.autosave_cell()
        if output and (self.log_output or self.stderr_file or self.stdout_file):
            self.log_output_message(output)
        return output