File: blocking.py

package info (click to toggle)
ipython 2.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 28,032 kB
  • ctags: 15,433
  • sloc: python: 73,792; makefile: 428; sh: 297
file content (58 lines) | stat: -rw-r--r-- 2,128 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
""" Implements a fully blocking kernel client.

Useful for test suites and blocking terminal interfaces.
"""
#-----------------------------------------------------------------------------
#  Copyright (C) 2012  The IPython Development Team
#
#  Distributed under the terms of the BSD License.  The full license is in
#  the file COPYING.txt, distributed as part of this software.
#-----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

# IPython imports
from IPython.utils.io import raw_print
from IPython.utils.traitlets import Type
from IPython.kernel.blocking.channels import BlockingChannelMixin

# Local imports
from .channels import (
    InProcessShellChannel,
    InProcessIOPubChannel,
    InProcessStdInChannel,
)
from .client import InProcessKernelClient

#-----------------------------------------------------------------------------
# Blocking kernel manager
#-----------------------------------------------------------------------------

class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel):
    pass

class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel):
    pass

class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel):

    def call_handlers(self, msg):
        """ Overridden for the in-process channel.

        This methods simply calls raw_input directly.
        """
        msg_type = msg['header']['msg_type']
        if msg_type == 'input_request':
            _raw_input = self.client.kernel._sys_raw_input
            prompt = msg['content']['prompt']
            raw_print(prompt, end='')
            self.input(_raw_input())

class BlockingInProcessKernelClient(InProcessKernelClient):

    # The classes to use for the various channels.
    shell_channel_class = Type(BlockingInProcessShellChannel)
    iopub_channel_class = Type(BlockingInProcessIOPubChannel)
    stdin_channel_class = Type(BlockingInProcessStdInChannel)