File: manager.py

package info (click to toggle)
python-qtconsole 5.6.1-4
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,184 kB
  • sloc: python: 7,222; makefile: 180; sh: 36
file content (85 lines) | stat: -rw-r--r-- 2,602 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
""" Defines a KernelClient that provides signals and slots.
"""

from qtpy import QtCore

# Local imports
from traitlets import Bool, DottedObjectName

from jupyter_client import KernelManager
from jupyter_client.restarter import KernelRestarter

from .kernel_mixins import QtKernelManagerMixin, QtKernelRestarterMixin


class QtKernelRestarter(KernelRestarter, QtKernelRestarterMixin):

    def start(self):
        if self._timer is None:
            self._timer = QtCore.QTimer()
            self._timer.timeout.connect(self.poll)
        self._timer.start(round(self.time_to_dead * 1000))

    def stop(self):
        self._timer.stop()

    def poll(self):
        super().poll()

    def reset_count(self):
        self._restart_count = 0


class QtKernelManager(KernelManager, QtKernelManagerMixin):
    """A KernelManager with Qt signals for restart"""

    client_class = DottedObjectName('qtconsole.client.QtKernelClient')
    autorestart = Bool(True, config=True)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._is_restarting = False

    def start_restarter(self):
        """Start restarter mechanism."""
        if self.autorestart and self.has_kernel:
            if self._restarter is None:
                self._restarter = QtKernelRestarter(
                    kernel_manager=self,
                    parent=self,
                    log=self.log,
                )
                self._restarter.add_callback(self._handle_kernel_restarting)
            self._restarter.start()

    def stop_restarter(self):
        """Stop restarter mechanism."""
        if self.autorestart:
            if self._restarter is not None:
                self._restarter.stop()

    def post_start_kernel(self, **kw):
        """Kernel restarted."""
        super().post_start_kernel(**kw)
        if self._is_restarting:
            self.kernel_restarted.emit()
            self._is_restarting = False

    def reset_autorestart_count(self):
        """Reset autorestart count."""
        if self._restarter:
            self._restarter.reset_count()

    async def _async_post_start_kernel(self, **kw):
        """
        This is necessary for Jupyter-client 8+ because `start_kernel` doesn't
        call `post_start_kernel` directly.
        """
        await super()._async_post_start_kernel(**kw)
        if self._is_restarting:
            self.kernel_restarted.emit()
            self._is_restarting = False

    def _handle_kernel_restarting(self):
        """Kernel has died, and will be restarted."""
        self._is_restarting = True