File: inprocess_terminal.py

package info (click to toggle)
ipykernel 6.30.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 1,084 kB
  • sloc: python: 9,165; makefile: 165; sh: 8
file content (67 lines) | stat: -rw-r--r-- 2,156 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""An in-process terminal example."""
import os
import sys

import tornado
from jupyter_console.ptshell import ZMQTerminalInteractiveShell

from ipykernel.inprocess.manager import InProcessKernelManager


def print_process_id():
    """Print the process id."""
    print("Process ID is:", os.getpid())


def init_asyncio_patch():
    """set default asyncio policy to be compatible with tornado
    Tornado 6 (at least) is not compatible with the default
    asyncio implementation on Windows
    Pick the older SelectorEventLoopPolicy on Windows
    if the known-incompatible default policy is in use.
    do this as early as possible to make it a low priority and overridable
    ref: https://github.com/tornadoweb/tornado/issues/2608
    FIXME: if/when tornado supports the defaults in asyncio,
           remove and bump tornado requirement for py38
    """
    if (
        sys.platform.startswith("win")
        and sys.version_info >= (3, 8)
        and tornado.version_info < (6, 1)
    ):
        import asyncio

        try:
            from asyncio import WindowsProactorEventLoopPolicy, WindowsSelectorEventLoopPolicy
        except ImportError:
            pass
            # not affected
        else:
            if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
                # WindowsProactorEventLoopPolicy is not compatible with tornado 6
                # fallback to the pre-3.8 default of Selector
                asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


def main():
    """The main function."""
    print_process_id()

    # Create an in-process kernel
    # >>> print_process_id()
    # will print the same process ID as the main process
    init_asyncio_patch()
    kernel_manager = InProcessKernelManager()
    kernel_manager.start_kernel()
    kernel = kernel_manager.kernel
    kernel.gui = "qt4"
    kernel.shell.push({"foo": 43, "print_process_id": print_process_id})
    client = kernel_manager.client()
    client.start_channels()

    shell = ZMQTerminalInteractiveShell(manager=kernel_manager, client=client)
    shell.mainloop()


if __name__ == "__main__":
    main()