File: Job.py

package info (click to toggle)
uranium 5.0.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,304 kB
  • sloc: python: 31,765; sh: 132; makefile: 12
file content (134 lines) | stat: -rw-r--r-- 3,742 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# Copyright (c) 2018 Ultimaker B.V.
# Uranium is released under the terms of the LGPLv3 or higher.

import time
from typing import Any, Optional

from UM.JobQueue import JobQueue
from UM.Signal import Signal, signalemitter


@signalemitter
class Job:
    """Base class for things that should be performed in a thread.

    The Job class provides a basic interface for a 'job', that is a
    self-contained task that should be performed in a thread. It makes
    use of the JobQueue for the actual threading.
    :sa JobQueue
    """

    def __init__(self) -> None:
        super().__init__()
        self._running = False   # type: bool
        self._finished = False  # type: bool
        self._result = None     # type: Any
        self._message = None    # type: Any
        self._error = None      # type: Optional[Exception]

    def run(self) -> None:
        """Perform the actual task of this job. Should be reimplemented by subclasses.

        :exception NotImplementedError
        """

        raise NotImplementedError()

    def getResult(self) -> Any:
        """Get the result of the job.

        The actual result object returned by this method is dependant on the implementation.
        """

        return self._result

    def setResult(self, result: Any) -> None:
        """Set the result of this job.

        This should be called by run() to set the actual result of the Job.
        """

        self._result = result

    def setError(self, error: Exception) -> None:
        """Set an exception that was thrown while the job was being executed.

        Setting error to something else than None implies the Job failed
        to execute properly.

        :param error: The exception to set.
        """

        self._error = error

    def start(self) -> None:
        """Start the job.

        This will put the Job into the JobQueue to be processed whenever a thread is available.

        :sa JobQueue::add()
        """

        JobQueue.getInstance().add(self)

    def cancel(self) -> None:
        """Cancel the job.

        This will remove the Job from the JobQueue. If the run() function has already been called,
        this will do nothing.
        """

        JobQueue.getInstance().remove(self)

    def isRunning(self) -> bool:
        """Check whether the job is currently running.

        :return:
        """

        return self._running

    def isFinished(self) -> bool:
        """Check whether the job has finished processing."""

        return self._finished

    def hasError(self) -> bool:
        """Check whether the Job has encountered an error during execution.

        :return: True if an error was set, False if not.
        """

        return self._error is not None

    def getError(self) -> Optional[Exception]:
        """Get the error that was encountered during execution.

        :return: The error encountered during execution or None if there was no error.
        """

        return self._error

    finished = Signal()
    """Emitted when the job has finished processing.

    :param job: :type{Job} The finished job.
    """

    progress = Signal()
    """Emitted when the job processing has progressed.

    :param job: :type{Job} The job reporting progress.
    :param amount: :type{int} The amount of progress made, from 0 to 100.
    """

    @staticmethod
    def yieldThread() -> None:
        """Utility function that allows us to yield thread processing.

        This is mostly a workaround for broken python threads. This function
        forces a GIL release and allows a different thread to start processing
        if it is waiting.
        """

        time.sleep(0)  # Sleeping for 0 introduces no delay but does allow context switching.