1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
|
# coding: utf-8
from __future__ import print_function
import io
import logging
import os
import platform
import sys
import time
from fcntl import fcntl
from tempfile import TemporaryFile
from unittest import mock
import pytest
import wurlitzer
from wurlitzer import (
PIPE,
STDOUT,
Wurlitzer,
c_stderr_p,
c_stdout_p,
libc,
pipes,
stop_sys_pipes,
sys_pipes,
sys_pipes_forever,
)
def printf(msg):
"""Call C printf"""
libc.printf((msg + '\n').encode('utf8'))
def printf_err(msg):
"""Cal C fprintf on stderr"""
libc.fprintf(c_stderr_p, (msg + '\n').encode('utf8'))
def test_pipes():
with pipes(stdout=PIPE, stderr=PIPE) as (stdout, stderr):
printf(u"Hellø")
printf_err(u"Hi, stdérr")
assert stdout.read() == u"Hellø\n"
assert stderr.read() == u"Hi, stdérr\n"
def test_pipe_bytes():
with pipes(encoding=None) as (stdout, stderr):
printf(u"Hellø")
printf_err(u"Hi, stdérr")
assert stdout.read() == u"Hellø\n".encode('utf8')
assert stderr.read() == u"Hi, stdérr\n".encode('utf8')
def test_forward():
stdout = io.StringIO()
stderr = io.StringIO()
with pipes(stdout=stdout, stderr=stderr) as (_stdout, _stderr):
printf(u"Hellø")
printf_err(u"Hi, stdérr")
assert _stdout is stdout
assert _stderr is stderr
assert stdout.getvalue() == u"Hellø\n"
assert stderr.getvalue() == u"Hi, stdérr\n"
def test_pipes_stderr():
stdout = io.StringIO()
with pipes(stdout=stdout, stderr=STDOUT) as (_stdout, _stderr):
printf(u"Hellø")
libc.fflush(c_stdout_p)
time.sleep(0.1)
printf_err(u"Hi, stdérr")
assert _stdout is stdout
assert _stderr is None
assert stdout.getvalue() == u"Hellø\nHi, stdérr\n"
def test_flush():
stdout = io.StringIO()
w = Wurlitzer(stdout=stdout, stderr=STDOUT)
with w:
printf_err(u"Hellø")
time.sleep(0.5)
assert stdout.getvalue().strip() == u"Hellø"
def test_sys_pipes():
stdout = io.StringIO()
stderr = io.StringIO()
with mock.patch('sys.stdout', stdout), mock.patch(
'sys.stderr', stderr
), sys_pipes():
printf(u"Hellø")
printf_err(u"Hi, stdérr")
assert stdout.getvalue() == u"Hellø\n"
assert stderr.getvalue() == u"Hi, stdérr\n"
def test_sys_pipes_check():
# pytest redirects stdout; un-redirect it for the test
with mock.patch('sys.stdout', sys.__stdout__), mock.patch(
'sys.stderr', sys.__stderr__
):
with pytest.raises(ValueError):
with sys_pipes():
pass
def test_redirect_everything():
stdout = io.StringIO()
stderr = io.StringIO()
with mock.patch('sys.stdout', stdout), mock.patch('sys.stderr', stderr):
sys_pipes_forever()
printf(u"Hellø")
printf_err(u"Hi, stdérr")
stop_sys_pipes()
assert stdout.getvalue() == u"Hellø\n"
assert stderr.getvalue() == u"Hi, stdérr\n"
def count_fds():
"""utility for counting file descriptors"""
proc_fds = '/proc/{}/fd'.format(os.getpid())
if os.path.isdir(proc_fds):
return len(proc_fds)
else:
# this is an approximate count,
# but it should at least be stable if we aren't leaking
with TemporaryFile() as tf:
return tf.fileno()
def test_fd_leak():
base_count = count_fds()
with pipes():
print('ok')
assert count_fds() == base_count
for i in range(10):
with pipes():
print('ok')
assert count_fds() == base_count
def test_buffer_full():
with pipes(stdout=None, stderr=io.StringIO()) as (stdout, stderr):
long_string = "x" * 100000 # create a long string (longer than 65536)
printf_err(long_string)
# Test never reaches here as the process hangs.
assert stderr.getvalue() == long_string + "\n"
def test_buffer_full_default():
with pipes() as (stdout, stderr):
long_string = "x" * 100000 # create a long string (longer than 65536)
printf(long_string)
# Test never reaches here as the process hangs.
assert stdout.read() == long_string + "\n"
def test_pipe_max_size():
max_pipe_size = wurlitzer._get_max_pipe_size()
if platform.system() == 'Linux':
assert 65535 <= max_pipe_size <= 1024 * 1024
else:
assert max_pipe_size is None
@pytest.mark.skipif(
wurlitzer._get_max_pipe_size() is None, reason="requires _get_max_pipe_size"
)
def test_bufsize():
default_bufsize = wurlitzer._get_max_pipe_size()
with wurlitzer.pipes() as (stdout, stderr):
assert fcntl(sys.__stdout__, wurlitzer.F_GETPIPE_SZ) == default_bufsize
assert fcntl(sys.__stderr__, wurlitzer.F_GETPIPE_SZ) == default_bufsize
bufsize = 2**18 # seems to only accept powers of two?
with wurlitzer.pipes(bufsize=bufsize) as (stdout, stderr):
assert fcntl(sys.__stdout__, wurlitzer.F_GETPIPE_SZ) == bufsize
assert fcntl(sys.__stderr__, wurlitzer.F_GETPIPE_SZ) == bufsize
def test_log_pipes(caplog):
with caplog.at_level(logging.INFO), wurlitzer.pipes(
logging.getLogger("wurlitzer.stdout"), logging.getLogger("wurlitzer.stderr")
):
printf("some stdout")
printf_err("some stderr")
stdout_logs = []
stderr_logs = []
for t in caplog.record_tuples:
if "stdout" in t[0]:
stdout_logs.append(t)
else:
stderr_logs.append(t)
assert stdout_logs == [
("wurlitzer.stdout", logging.INFO, "some stdout"),
]
assert stderr_logs == [
("wurlitzer.stderr", logging.ERROR, "some stderr"),
]
for record in caplog.records:
# check 'stream' extra
assert record.stream
assert record.name == "wurlitzer." + record.stream
def test_two_file_pipes(tmpdir):
test_stdout = tmpdir / "stdout.txt"
test_stderr = tmpdir / "stderr.txt"
with test_stdout.open("ab") as stdout_f, test_stderr.open("ab") as stderr_f:
w = Wurlitzer(stdout_f, stderr_f)
with w:
assert w.thread is None
printf("some stdout")
printf_err("some stderr")
# make sure capture stopped
printf("after stdout")
printf_err("after stderr")
with test_stdout.open() as f:
assert f.read() == "some stdout\n"
with test_stderr.open() as f:
assert f.read() == "some stderr\n"
def test_one_file_pipe(tmpdir):
test_stdout = tmpdir / "stdout.txt"
with test_stdout.open("ab") as stdout_f:
stderr = io.StringIO()
w = Wurlitzer(stdout_f, stderr)
with w as (stdout, stderr):
assert w.thread is not None
printf("some stdout")
printf_err("some stderr")
assert not w.thread.is_alive()
with test_stdout.open() as f:
assert f.read() == "some stdout\n"
assert stderr.getvalue() == "some stderr\n"
|