File: kernel_mixins.py

package info (click to toggle)
python-qtconsole 5.6.1-4
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,184 kB
  • sloc: python: 7,222; makefile: 180; sh: 36
file content (56 lines) | stat: -rw-r--r-- 1,813 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Defines a KernelManager that provides signals and slots."""

# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from qtpy import QtCore

from traitlets import HasTraits, Type
from .util import MetaQObjectHasTraits, SuperQObject
from .comms import CommManager


class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):

    _timer = None


class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
    """ A KernelClient that provides signals and slots.
    """

    kernel_restarted = QtCore.Signal()


class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
    """ A KernelClient that provides signals and slots.
    """

    # Emitted when the kernel client has started listening.
    started_channels = QtCore.Signal()

    # Emitted when the kernel client has stopped listening.
    stopped_channels = QtCore.Signal()

    #---------------------------------------------------------------------------
    # 'KernelClient' interface
    #---------------------------------------------------------------------------

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.comm_manager = None
    #------ Channel management -------------------------------------------------

    def start_channels(self, *args, **kw):
        """ Reimplemented to emit signal.
        """
        super().start_channels(*args, **kw)
        self.started_channels.emit()
        self.comm_manager = CommManager(parent=self, kernel_client=self)

    def stop_channels(self):
        """ Reimplemented to emit signal.
        """
        super().stop_channels()
        self.stopped_channels.emit()
        self.comm_manager = None