File: abc.py

package info (click to toggle)
jupyter-server 2.15.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,068 kB
  • sloc: python: 21,064; makefile: 186; sh: 25; javascript: 14
file content (29 lines) | stat: -rw-r--r-- 921 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from abc import ABC, abstractmethod
from typing import Any


class KernelWebsocketConnectionABC(ABC):
    """
    This class defines a minimal interface that should
    be used to bridge the connection between Jupyter
    Server's websocket API and a kernel's ZMQ socket
    interface.
    """

    websocket_handler: Any

    @abstractmethod
    async def connect(self):
        """Connect the kernel websocket to the kernel ZMQ connections"""

    @abstractmethod
    async def disconnect(self):
        """Disconnect the kernel websocket from the kernel ZMQ connections"""

    @abstractmethod
    def handle_incoming_message(self, incoming_msg: str) -> None:
        """Broker the incoming websocket message to the appropriate ZMQ channel."""

    @abstractmethod
    def handle_outgoing_message(self, stream: str, outgoing_msg: list[Any]) -> None:
        """Broker outgoing ZMQ messages to the kernel websocket."""