File: kernelapp.py

package info (click to toggle)
spyder-kernels 3.1.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,088 kB
  • sloc: python: 6,327; sh: 9; makefile: 5
file content (83 lines) | stat: -rw-r--r-- 2,363 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2009- Spyder Kernels Contributors
#
# Licensed under the terms of the MIT License
# (see spyder_kernels/__init__.py for details)
# -----------------------------------------------------------------------------

"""
Spyder kernel application.
"""

# Standard library imports
import os
from threading import Thread
import time

# Third-party imports
from ipykernel.kernelapp import IPKernelApp
import psutil
from traitlets import DottedObjectName
from traitlets.log import get_logger

# Local imports
from spyder_kernels.console.kernel import SpyderKernel


class SpyderParentPoller(Thread):
    """
    Daemon thread that terminates the program immediately when the parent
    process no longer exists.

    Notes
    -----
    This is based on the ParentPollerUnix class from ipykernel.
    """

    def __init__(self, parent_pid=0):
        """Initialize the poller."""
        super().__init__()
        self.parent_pid = parent_pid
        self.daemon = True

    def run(self):
        """Run the poller."""
        while True:
            if self.parent_pid != 0 and not psutil.pid_exists(self.parent_pid):
                get_logger().warning(
                    "Parent appears to have exited, shutting down."
                )
                os._exit(1)
            time.sleep(1.0)


class SpyderKernelApp(IPKernelApp):

    outstream_class = DottedObjectName(
        'spyder_kernels.console.outstream.TTYOutStream'
    )

    kernel_class = SpyderKernel

    def init_pdb(self):
        """
        This method was added in IPykernel 5.3.1 and it replaces
        the debugger used by the kernel with a new class
        introduced in IPython 7.15 during kernel's initialization.
        Therefore, it doesn't allow us to use our debugger.
        """
        pass

    def close(self):
        """Close the loopback socket."""
        socket = self.kernel.loopback_socket
        if socket and not socket.closed:
            socket.close()
        return super().close()

    def init_poller(self):
        """User our own poller."""
        # The SPY_PARENT_PID env var must be set on the Spyder side.
        parent_pid = int(os.environ.get("SPY_PARENT_PID") or 0)
        self.poller = SpyderParentPoller(parent_pid=parent_pid)