File: blockingkernelmanager.py

package info (click to toggle)
ipython 0.13.1-2%2Bdeb7u1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 15,752 kB
  • sloc: python: 69,537; makefile: 355; lisp: 272; sh: 80; objc: 37
file content (154 lines) | stat: -rw-r--r-- 5,036 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Implement a fully blocking kernel manager.

Useful for test suites and blocking terminal interfaces.
"""
#-----------------------------------------------------------------------------
#  Copyright (C) 2010-2011  The IPython Development Team
#
#  Distributed under the terms of the BSD License.  The full license is in
#  the file COPYING.txt, distributed as part of this software.
#-----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import print_function

# Stdlib
from Queue import Queue, Empty
from threading import Event

# Our own
from IPython.utils import io
from IPython.utils.traitlets import Type

from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel,
                           ShellSocketChannel, StdInSocketChannel)

#-----------------------------------------------------------------------------
# Functions and classes
#-----------------------------------------------------------------------------

class BlockingSubSocketChannel(SubSocketChannel):

    def __init__(self, context, session, address=None):
        super(BlockingSubSocketChannel, self).__init__(context, session,
                                                       address)
        self._in_queue = Queue()

    def call_handlers(self, msg):
        #io.rprint('[[Sub]]', msg) # dbg
        self._in_queue.put(msg)

    def msg_ready(self):
        """Is there a message that has been received?"""
        if self._in_queue.qsize() == 0:
            return False
        else:
            return True

    def get_msg(self, block=True, timeout=None):
        """Get a message if there is one that is ready."""
        if block and timeout is None:
            # never use timeout=None, because get
            # becomes uninterruptible
            timeout = 1e6
        return self._in_queue.get(block, timeout)

    def get_msgs(self):
        """Get all messages that are currently ready."""
        msgs = []
        while True:
            try:
                msgs.append(self.get_msg(block=False))
            except Empty:
                break
        return msgs


class BlockingShellSocketChannel(ShellSocketChannel):

    def __init__(self, context, session, address=None):
        super(BlockingShellSocketChannel, self).__init__(context, session,
                                                        address)
        self._in_queue = Queue()

    def call_handlers(self, msg):
        #io.rprint('[[Shell]]', msg) # dbg
        self._in_queue.put(msg)

    def msg_ready(self):
        """Is there a message that has been received?"""
        if self._in_queue.qsize() == 0:
            return False
        else:
            return True

    def get_msg(self, block=True, timeout=None):
        """Get a message if there is one that is ready."""
        if block and timeout is None:
            # never use timeout=None, because get
            # becomes uninterruptible
            timeout = 1e6
        return self._in_queue.get(block, timeout)

    def get_msgs(self):
        """Get all messages that are currently ready."""
        msgs = []
        while True:
            try:
                msgs.append(self.get_msg(block=False))
            except Empty:
                break
        return msgs
    

class BlockingStdInSocketChannel(StdInSocketChannel):
    
    def __init__(self, context, session, address=None):
        super(BlockingStdInSocketChannel, self).__init__(context, session, address)
        self._in_queue = Queue()
        
    def call_handlers(self, msg):
        #io.rprint('[[Rep]]', msg) # dbg
        self._in_queue.put(msg)
        
    def get_msg(self, block=True, timeout=None):
        "Gets a message if there is one that is ready."
        return self._in_queue.get(block, timeout)
        
    def get_msgs(self):
        """Get all messages that are currently ready."""
        msgs = []
        while True:
            try:
                msgs.append(self.get_msg(block=False))
            except Empty:
                break
        return msgs
    
    def msg_ready(self):
        "Is there a message that has been received?"
        return not self._in_queue.empty()


class BlockingHBSocketChannel(HBSocketChannel):
    
    # This kernel needs quicker monitoring, shorten to 1 sec.
    # less than 0.5s is unreliable, and will get occasional
    # false reports of missed beats.
    time_to_dead = 1.

    def call_handlers(self, since_last_heartbeat):
        """pause beating on missed heartbeat"""
        pass


class BlockingKernelManager(KernelManager):
    
    # The classes to use for the various channels.
    shell_channel_class = Type(BlockingShellSocketChannel)
    sub_channel_class = Type(BlockingSubSocketChannel)
    stdin_channel_class = Type(BlockingStdInSocketChannel)
    hb_channel_class = Type(BlockingHBSocketChannel)