File: controllers.py

package info (click to toggle)
python-molotov 2.7-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 8,268 kB
  • sloc: python: 4,121; makefile: 60
file content (156 lines) | stat: -rw-r--r-- 4,447 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import queue
import signal
from datetime import datetime

import humanize
import multiprocess
from prompt_toolkit import HTML
from prompt_toolkit.formatted_text import to_formatted_text
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.controls import UIContent, UIControl


def create_key_bindings():
    kb = KeyBindings()

    @kb.add("c-l")
    def _clear(event):
        event.app.renderer.clear()

    @kb.add("c-c")
    def _interrupt(event):
        os.kill(os.getpid(), signal.SIGTERM)

    return kb


class BaseController(UIControl):
    def __init__(self, max_lines=25, add_style=True):
        super().__init__()
        self._creator = os.getpid()
        self.max_lines = max_lines
        self._add_style = add_style
        self._key_bindings = create_key_bindings()

    def create_content(self, width, height):
        raise NotImplementedError

    def is_focusable(self):
        return True

    def get_key_bindings(self):
        return self._key_bindings

    def write(self, data):
        raise NotImplementedError

    def write_line(self, data, fg=None):
        if self._add_style and fg is not None:
            data = f'<style fg="{fg}">{data}</style>'

        # pid header
        if os.getpid() != self._creator:
            if not self._add_style:
                data = f"[P:{os.getpid()} {data}"
            else:
                data = f'<style fg="#cecece">[P:{os.getpid()}]</style> {data}'

        self.write(f"{data}\n")


class TerminalController(BaseController):
    def __init__(self, max_lines=25, single_process=True):
        super().__init__(max_lines, add_style=True)
        self.single_process = single_process
        if not single_process:
            self.manager = multiprocess.Manager()
            self.data = self.manager.list()
        else:
            self.data = list()
        self._closed = False

    def close(self):
        self._closed = True
        if not self.single_process:
            self.manager.shutdown()

    def write(self, data):
        if self._closed:
            return
        try:
            self.data.append(data)
            if len(self.data) > self.max_lines:
                self.data[:] = self.data[-self.max_lines :]
        except BrokenPipeError:
            pass

    def create_content(self, width, height):
        items = ["\n"]

        def format_items():
            lines = "".join(items).split("\n")
            items[:] = [to_formatted_text(HTML(line)) for line in lines]

        def get_line(i: int):
            return items[i]

        if self._closed:
            items.append("data stream closed!")
            format_items()
            return UIContent(
                get_line=get_line, line_count=len(items), show_cursor=False
            )

        for line in self.data:
            items.append(line)

        format_items()
        return UIContent(get_line=get_line, line_count=len(items), show_cursor=False)


class SimpleController(BaseController):
    def __init__(self, max_lines, single_process=True):
        super().__init__(max_lines, add_style=False)
        if single_process:
            self.data = queue.Queue()
        else:
            self.data = multiprocess.Queue()

    def close(self):
        pass

    def write(self, data):
        self.data.put(data)

    def dump(self):
        while not self.data.empty():
            yield self.data.get()


class RunStatus(BaseController):
    def __init__(self, max_lines=25):
        super().__init__(max_lines)
        self._status = {}
        self._started = datetime.now()

    def update(self, results):
        self._status.update(results)

    def formatted(self):
        delta = datetime.now() - self._started
        return to_formatted_text(
            HTML(
                f'<style fg="green" bg="#cecece">SUCCESS: {self._status.get("OK", 0)} </style>'
                f'<style fg="red" bg="#cecece"> FAILED: {self._status.get("FAILED", 0)} </style>'
                f' WORKERS: {self._status.get("WORKER", 0)}'
                f' PROCESSES: {self._status.get("PROCESS", 0)} '
                f'<style fg="blue" bg="#cecece"> ELAPSED: {humanize.precisedelta(delta)}</style>'
            )
        )

    def create_content(self, width: int, height: int) -> UIContent:
        def get_line(i):
            return self.formatted()

        return UIContent(get_line=get_line, line_count=1, show_cursor=False)