File: show_processmanagerwindow.py

package info (click to toggle)
python-processview 1.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,192 kB
  • sloc: python: 1,362; makefile: 6
file content (118 lines) | stat: -rw-r--r-- 3,086 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from __future__ import annotations

import time
import numpy
import threading
from silx.gui import qt
from processview.gui.processmanager import ProcessManagerWindow
from processview.core.superviseprocess import SuperviseProcess
from processview.core.dataset import Dataset
from processview.core.dataset import DatasetIdentifier
from processview.core.manager import DatasetState
from processview.gui.processmanager import ProcessManager
import datetime


class _DummyDataset(Dataset):
    def __init__(self, name):
        super().__init__()
        self.__name = name

    def __str__(self) -> str:
        return self.__name

    def get_identifier(self) -> DatasetIdentifier:
        return _DummyIdentifier(
            self,
            metadata={
                "name": self.__name,
                "creation_time": datetime.datetime.now(),
                "modification_time": datetime.datetime.now(),
            },
        )


class _DummyIdentifier(DatasetIdentifier):
    def to_str(self):
        return str(self)

    def __str__(self):
        return self.name()

    def __eq__(self, other):
        return self.name() == other.name()

    def __hash__(self):
        return hash(self.name())


app = qt.QApplication([])

window = ProcessManagerWindow(parent=None)
window.show()

p1 = SuperviseProcess(name="process1")
p2 = SuperviseProcess(name="process2")
p3 = SuperviseProcess(name="process3")


manager = ProcessManager()


class RecursiveThread(threading.Thread):
    def __init__(self, execute_each: int):
        self.running = True
        self.execute_each = execute_each
        super().__init__()

    def run(self):
        """Method implementing thread loop that updates the plot"""
        while self.running:
            time.sleep(self.execute_each)
            self.process()

    def process(self):
        raise NotImplementedError()

    def stop(self):
        """Stop the update thread"""
        self.running = False
        self.join(2)


class CreateNewDataset(RecursiveThread):
    """Thread creating a new dataset each n seconds"""

    def process(self):
        dataset = _DummyDataset(f"scan {numpy.random.randint(0, 999999)}")
        manager.notify_dataset_state(
            dataset=dataset, state=DatasetState.PENDING, process=p1
        )


class UpdateDataset(RecursiveThread):
    """Thread that will update randomly one of the existing dataset"""

    def process(self):
        datasets = manager.get_datasets()
        if len(datasets) == 0:
            return
        dataset_to_update = numpy.random.choice(datasets)
        state = numpy.random.choice(DatasetState)
        process = numpy.random.choice(manager.get_processes())
        manager.notify_dataset_state(
            dataset=dataset_to_update,
            state=state,
            process=process,
        )


create_new_dataset_thread = CreateNewDataset(execute_each=3)
create_new_dataset_thread.start()
update_dataset_thread = UpdateDataset(execute_each=1)
update_dataset_thread.start()

app.exec_()

create_new_dataset_thread.stop()
update_dataset_thread.stop()