1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
import asyncio
import io
import shutil
import sys
from prompt_toolkit import HTML
from prompt_toolkit.application import Application
from prompt_toolkit.layout import (
FormattedTextControl,
HSplit,
Layout,
VSplit,
Window,
)
from molotov import __version__
from molotov.ui.controllers import (
RunStatus,
SimpleController,
TerminalController,
create_key_bindings,
)
TITLE = HTML(
f"<b>Molotov v{__version__}</b> ~ Happy Breaking 🥛🔨 ~ <i>Ctrl+C to abort</i>"
)
class MolotovApp:
def __init__(
self,
refresh_interval=0.3,
max_lines=25,
simple_console=False,
single_process=True,
):
term_size = shutil.get_terminal_size((80, 25))
if max_lines > (term_size.lines - 10):
max_lines = term_size.lines - 10
self.max_lines = max_lines
self.title = TITLE
self.single_process = single_process
if not simple_console:
try:
sys.stdin.fileno()
except io.UnsupportedOperation:
# This is not a terminal
simple_console = True
self.simple_console = simple_console
if simple_console:
controller_klass = SimpleController
else:
controller_klass = TerminalController
if max_lines >= 0:
self.terminal = controller_klass(max_lines, single_process)
self.errors = controller_klass(max_lines, single_process)
else:
self.terminal = self.errors = None
self.status = RunStatus()
self.key_bindings = create_key_bindings()
self.refresh_interval = refresh_interval
self._running = False
async def refresh_console(self):
while self._running:
for line in self.terminal.dump():
sys.stdout.write(line)
sys.stdout.flush()
await asyncio.sleep(0)
for line in self.errors.dump():
sys.stdout.write(line)
sys.stdout.flush()
await asyncio.sleep(self.refresh_interval)
async def start(self):
self._running = True
if self.simple_console:
self.task = asyncio.ensure_future(self.refresh_console())
return
title_toolbar = Window(
FormattedTextControl(lambda: self.title),
height=1,
style="class:title",
)
bottom_toolbar = Window(
content=self.status,
style="class:bottom-toolbar",
height=1,
)
if self.terminal is not None:
terminal = Window(content=self.terminal, height=self.max_lines + 2)
errors = Window(content=self.errors, height=self.max_lines + 2)
splits = [
title_toolbar,
VSplit([terminal, Window(width=4, char=" || "), errors]),
bottom_toolbar,
]
else:
splits = [title_toolbar, bottom_toolbar]
self.app = Application(
min_redraw_interval=0.05,
layout=Layout(HSplit(splits)),
style=None,
key_bindings=self.key_bindings,
refresh_interval=self.refresh_interval,
color_depth=None,
output=None,
input=None,
erase_when_done=True,
)
def _handle_exception(*args, **kw):
pass
self.app._handle_exception = _handle_exception
self.task = asyncio.ensure_future(self.app.run_async())
async def stop(self):
self._running = False
await asyncio.sleep(0)
if not self.simple_console:
try:
self.app.exit()
except Exception:
pass
# shows back the cursor
sys.stdout.write("\033[?25h")
sys.stdout.flush()
else:
await self.task
self.terminal.close()
self.errors.close()
|