File: task.py

package info (click to toggle)
fs-uae-arcade 3.1.63-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 24,456 kB
  • sloc: python: 56,011; makefile: 170
file content (115 lines) | stat: -rw-r--r-- 2,828 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import threading
import traceback
from .signal import Signal


local_tasks = threading.local()


class TaskFailure(Exception):
    def __init__(self, message):
        Exception.__init__(self, message)
        self.message = message


class TaskStopped(Exception):
    def __init__(self):
        Exception.__init__(self)


class Task(object):

    Failure = TaskFailure
    Stopped = TaskStopped

    def __init__(self, name: str) -> None:
        self.__name = name
        self.stop_flag = False
        self.started = Signal()
        self.succeeded = Signal()
        self.progressed = Signal()
        self.failed = Signal()
        self.stopped = Signal()
        self.finished = Signal()

    def get_task_name(self):
        return self.__name

    def stop(self):
        self.stop_flag = True

    def stop_check(self):
        if self.stop_flag:
            print("raising TaskStopped for", self)
            raise TaskStopped()

    def start(self):
        threading.Thread(
            target=self.__run,
            name="TaskThread({0})".format(self.get_task_name()),
        ).start()

    def __run(self):
        local_tasks.task = self
        try:
            print(self, "starting")
            self.started()
            try:
                self.run()
            except TaskStopped:
                self.stopped()
            except TaskFailure as e:
                print(self, "failed", e.message)
                self.failed(e.message)
            except Exception as e:
                print(self, "failed", repr(str(e)))
                traceback.print_exc()
                self.failed(
                    "Task: {}\nError: {}\nMessage: {}\n\n"
                    "See the log file for more details.".format(
                        self.get_task_name(), type(e).__name__, str(e)
                    )
                )
            else:
                print(self, "succeeded")
                self.succeeded()
            self.finished.notify()
        finally:
            del local_tasks.task

    def set_progress(self, text):
        print(" --", text, "--")
        self.progressed.notify(text)

    def run(self):
        raise NotImplementedError("Task.run is not implemented")

    def __repr__(self):
        return '<Task "{0}">'.format(self.get_task_name())


class CurrentTaskProxy(object):
    @property
    def task(self):
        try:
            t = local_tasks.task
        except AttributeError:
            t = null_task
        return t

    @property
    def stop_flag(self):
        return self.task.stop_flag

    def set_progress(self, text):
        self.task.set_progress(text)

    def set_download_indicator(self, progress):
        pass

    def set_upload_indicator(self, progress):
        pass


null_task = Task("Null Task")
current_task = CurrentTaskProxy()