1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
|
"""wrappers for stdout/stderr forwarding over zmq
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
import os
import threading
import time
import uuid
from io import StringIO, UnsupportedOperation
import zmq
from session import extract_header
from IPython.utils import py3compat
#-----------------------------------------------------------------------------
# Globals
#-----------------------------------------------------------------------------
MASTER = 0
CHILD = 1
#-----------------------------------------------------------------------------
# Stream classes
#-----------------------------------------------------------------------------
class OutStream(object):
"""A file like object that publishes the stream to a 0MQ PUB socket."""
# The time interval between automatic flushes, in seconds.
_subprocess_flush_limit = 256
flush_interval = 0.05
topic=None
def __init__(self, session, pub_socket, name, pipe=True):
self.encoding = 'UTF-8'
self.session = session
self.pub_socket = pub_socket
self.name = name
self.topic = b'stream.' + py3compat.cast_bytes(name)
self.parent_header = {}
self._new_buffer()
self._buffer_lock = threading.Lock()
self._master_pid = os.getpid()
self._master_thread = threading.current_thread().ident
self._pipe_pid = os.getpid()
self._pipe_flag = pipe
if pipe:
self._setup_pipe_in()
def _setup_pipe_in(self):
"""setup listening pipe for subprocesses"""
ctx = self.pub_socket.context
# use UUID to authenticate pipe messages
self._pipe_uuid = uuid.uuid4().bytes
self._pipe_in = ctx.socket(zmq.PULL)
self._pipe_in.linger = 0
self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
self._pipe_poller = zmq.Poller()
self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
def _setup_pipe_out(self):
# must be new context after fork
ctx = zmq.Context()
self._pipe_pid = os.getpid()
self._pipe_out = ctx.socket(zmq.PUSH)
self._pipe_out_lock = threading.Lock()
self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
def _is_master_process(self):
return os.getpid() == self._master_pid
def _is_master_thread(self):
return threading.current_thread().ident == self._master_thread
def _have_pipe_out(self):
return os.getpid() == self._pipe_pid
def _check_mp_mode(self):
"""check for forks, and switch to zmq pipeline if necessary"""
if not self._pipe_flag or self._is_master_process():
return MASTER
else:
if not self._have_pipe_out():
self._flush_buffer()
# setup a new out pipe
self._setup_pipe_out()
return CHILD
def set_parent(self, parent):
self.parent_header = extract_header(parent)
def close(self):
self.pub_socket = None
def _flush_from_subprocesses(self):
"""flush possible pub data from subprocesses into my buffer"""
if not self._pipe_flag or not self._is_master_process():
return
for i in range(self._subprocess_flush_limit):
if self._pipe_poller.poll(0):
msg = self._pipe_in.recv_multipart()
if msg[0] != self._pipe_uuid:
continue
else:
self._buffer.write(msg[1].decode(self.encoding, 'replace'))
# this always means a flush,
# so reset our timer
self._start = 0
else:
break
def flush(self):
"""trigger actual zmq send"""
if self.pub_socket is None:
raise ValueError(u'I/O operation on closed file')
mp_mode = self._check_mp_mode()
if mp_mode != CHILD:
# we are master
if not self._is_master_thread():
# sub-threads must not trigger flush,
# but at least they can force the timer.
self._start = 0
return
self._flush_from_subprocesses()
data = self._flush_buffer()
if data:
content = {u'name':self.name, u'data':data}
msg = self.session.send(self.pub_socket, u'stream', content=content,
parent=self.parent_header, ident=self.topic)
if hasattr(self.pub_socket, 'flush'):
# socket itself has flush (presumably ZMQStream)
self.pub_socket.flush()
else:
with self._pipe_out_lock:
string = self._flush_buffer()
tracker = self._pipe_out.send_multipart([
self._pipe_uuid,
string.encode(self.encoding, 'replace'),
], copy=False, track=True)
try:
tracker.wait(1)
except:
pass
def isatty(self):
return False
def __next__(self):
raise IOError('Read not supported on a write only stream.')
if not py3compat.PY3:
next = __next__
def read(self, size=-1):
raise IOError('Read not supported on a write only stream.')
def readline(self, size=-1):
raise IOError('Read not supported on a write only stream.')
def fileno(self):
raise UnsupportedOperation("IOStream has no fileno.")
def write(self, string):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
# Make sure that we're handling unicode
if not isinstance(string, unicode):
string = string.decode(self.encoding, 'replace')
is_child = (self._check_mp_mode() == CHILD)
self._buffer.write(string)
if is_child:
# newlines imply flush in subprocesses
# mp.Pool cannot be trusted to flush promptly (or ever),
# and this helps.
if '\n' in string:
self.flush()
# do we want to check subprocess flushes on write?
# self._flush_from_subprocesses()
current_time = time.time()
if self._start < 0:
self._start = current_time
elif current_time - self._start > self.flush_interval:
self.flush()
def writelines(self, sequence):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
for string in sequence:
self.write(string)
def _flush_buffer(self):
"""clear the current buffer and return the current buffer data"""
data = u''
if self._buffer is not None:
data = self._buffer.getvalue()
self._buffer.close()
self._new_buffer()
return data
def _new_buffer(self):
self._buffer = StringIO()
self._start = -1
|