1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
import asyncio
import os
import re
import sys
import unittest
import multiprocess
from molotov.tests.support import catch_output, dedicatedloop
from molotov.ui.console import Console
OUTPUT = """\
one
two
3
<style fg="gray">TypeError\\("unsupported operand type(.*)?
<style fg="gray">TypeError\\("unsupported operand type.*"""
# pre-forked variable
_CONSOLE = Console(interval=0.0)
_PROC = []
def run_worker(input):
if os.getpid() not in _PROC:
_PROC.append(os.getpid())
_CONSOLE.print("hello")
try:
3 + "" # noqa
except Exception:
_CONSOLE.print_error("meh")
with catch_output() as (stdout, stderr):
loop = asyncio.new_event_loop()
fut = asyncio.ensure_future(_CONSOLE.start(), loop=loop)
loop.run_until_complete(fut)
loop.close()
stdout = stdout.read()
assert stdout == "", stdout
class TestConsole(unittest.TestCase):
@unittest.skipIf("GITHUB_ACTIONS" in os.environ, "GH action")
@dedicatedloop
@unittest.skip("Skip test that fails under Debian but passes upstream")
def test_simple_usage(self):
test_loop = asyncio.get_event_loop()
console = Console(interval=0.0)
written = []
def _write(data):
written.append(data)
console.terminal.write = _write
console.errors.write = _write
async def add_lines():
console.print("one")
console.print("two")
console.print("3")
try:
1 + "e" # noqa
except Exception as e:
console.print_error(e)
console.print_error(e, sys.exc_info()[2])
await asyncio.sleep(0.2)
await console.stop()
with catch_output() as (stdout, stderr):
adder = asyncio.ensure_future(add_lines())
displayer = asyncio.ensure_future(console.start())
test_loop.run_until_complete(asyncio.gather(adder, displayer))
test_loop.close()
output = "".join(written)
self.assertTrue(re.match(OUTPUT, output, re.S | re.M) is not None, output)
@unittest.skipIf(os.name == "nt", "win32")
@unittest.skipIf("GITHUB_ACTIONS" in os.environ, "GH action")
@dedicatedloop
@unittest.skip("Skip test that fails under Debian but passes upstream")
def test_multiprocess(self):
test_loop = asyncio.get_event_loop()
# now let's try with several processes
pool = multiprocess.Pool(3)
try:
inputs = [1] * 3
pool.map(run_worker, inputs)
finally:
pool.close()
async def stop():
await asyncio.sleep(1)
await _CONSOLE.stop()
with catch_output() as (stdout, stderr):
stop = asyncio.ensure_future(stop())
display = asyncio.ensure_future(_CONSOLE.start())
test_loop.run_until_complete(asyncio.gather(stop, display))
output = stdout.read()
for pid in _PROC:
self.assertTrue("[%d]" % pid in output)
test_loop.close()
|