File: operation.py

package info (click to toggle)
freedombox 26.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 82,976 kB
  • sloc: python: 48,504; javascript: 1,736; xml: 481; makefile: 290; sh: 167; php: 32
file content (263 lines) | stat: -rw-r--r-- 9,493 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Utilities to run operations and show their progress or failures."""

import enum
import logging
import threading
from collections import OrderedDict
from typing import Callable

from plinth.utils import SafeFormatter

from . import app as app_module

logger = logging.getLogger(__name__)


class Operation:
    """Represent an ongoing or finished activity."""

    class State(enum.Enum):
        """Various states of an operation."""

        WAITING = 'waiting'
        RUNNING = 'running'
        COMPLETED = 'completed'

    def __init__(self, op_id: str, app_id: str, name: str, target: Callable,
                 args: list | None = None, kwargs: dict | None = None,
                 show_message: bool = True, show_notification: bool = False,
                 thread_data: dict | None = None,
                 on_complete: Callable | None = None):
        """Initialize to no operation."""
        self.op_id = op_id
        self.app_id = app_id
        self.name = name
        self.show_message = show_message
        self.show_notification = show_notification

        self.target = target
        self.args = args or []
        self.kwargs = kwargs or {}
        self.on_complete = on_complete

        self.state = Operation.State.WAITING
        self.return_value = None
        self._message: str | None = None
        self.exception: Exception | None = None

        # Operation specific data
        self.thread_data: dict = thread_data or {}

        self.thread = threading.Thread(target=self._catch_thread_errors)
        self.start_event = threading.Event()
        setattr(self.thread, '_operation', self)
        self._update_notification()

    def __str__(self):
        """Return a string representation of the operation."""
        return f'Operation: {self.app_id}: {self.name}'

    def _catch_thread_errors(self):
        """Collect exceptions when running in a thread."""
        self._update_notification()
        try:
            self.return_value = self.target(*self.args, **self.kwargs)
        except Exception as exception:
            logger.exception('Error: %s, %s', self, exception)
            self.exception = exception
        finally:
            self.state = Operation.State.COMPLETED
            self._update_notification()
            # Notify
            if self.on_complete:
                self.on_complete(self)

    def run(self):
        """Run a specified operation in a thread."""
        logger.info('%s: running', str(self))
        self.state = Operation.State.RUNNING
        self.thread.start()
        self.start_event.set()

    def join(self):
        """Block the current thread until the operation is completed.

        Raise an exception if the thread encountered an exception.
        """
        self.start_event.wait()
        self.thread.join()
        if self.exception:
            raise self.exception

        return self.return_value

    @staticmethod
    def get_operation() -> 'Operation':
        """Return the operation associated with this thread."""
        thread = threading.current_thread()
        return thread._operation  # type: ignore [attr-defined]

    def on_update(self, message: str | None = None,
                  exception: Exception | None = None):
        """Call from within the thread to update the progress of operation."""
        if message:
            self._message = message

        if exception:
            self.exception = exception

        self._update_notification()

    @property
    def message(self) -> str | None:
        """Return a message about status of the operation."""
        from django.utils.translation import gettext_noop
        if self._message:  # Progress has been set by the operation itself
            return self._message

        if self.exception:  # Operation resulted in a error.
            return gettext_noop('Error: {name}: {exception}')

        if self.state == Operation.State.WAITING:
            return gettext_noop('Waiting to start: {name}')

        if self.state == Operation.State.RUNNING:
            return '{name}'  # No translation needed

        if self.state == Operation.State.COMPLETED:
            return gettext_noop('Finished: {name}')

        return None

    @property
    def translated_message(self) -> str:
        """Return a message about status of operation after translating.

        Must be called from a web request (UI) thread with user language set so
        that localization is done properly.
        """
        from django.utils.translation import gettext
        message = gettext(self.message)
        data = {'name': gettext(self.name), 'exception': str(self.exception)}
        if self.app_id:
            data['app_name'] = app_module.App.get(self.app_id).info.name

        try:
            message = SafeFormatter().vformat(message, [], data)
        except (KeyError, AttributeError) as error:
            logger.warning(
                'Operation missing required key during translation: %s', error)

        return message

    def _update_notification(self) -> None:
        """Show an updated notification if needed."""
        if not self.show_notification:
            return

        from plinth.notification import Notification
        severity = 'info' if not self.exception else 'error'
        app = app_module.App.get(self.app_id)
        data = {
            'app_name': str(app.info.name),
            'app_icon': app.info.icon,
            'app_icon_filename': app.info.icon_filename,
            'state': self.state.value,
            'exception': str(self.exception) if self.exception else None,
            'name': 'translate:' + str(self.name),
        }
        Notification.update_or_create(
            id=self.app_id + '-operation', app_id=self.app_id,
            severity=severity, title=app.info.name, message=self.message,
            body_template='operation-notification.html', data=data,
            group='admin', dismissed=False)


class OperationsManager:
    """Global handler for all operations and their results."""

    def __init__(self) -> None:
        """Initialize the object."""
        self._operations: OrderedDict[str, Operation] = OrderedDict()
        self._current_operation: Operation | None = None

        # Assume that operations manager will be called from various threads
        # including the callback called from the threads it creates. Ensure
        # that properties don't get corrupted due to race conditions when
        # called from different threads by locking all code that updates them.
        # It is re-entrant lock, meaning it can be re-obtained without blocking
        # when done from the same thread which holds the lock.
        self._lock = threading.RLock()

    def new(self, op_id: str, *args, **kwargs) -> Operation:
        """Create a new operation instance and add to global list."""
        with self._lock:
            if (op_id in self._operations and self._operations[op_id].state !=
                    Operation.State.COMPLETED):
                raise KeyError('Operation in progress/scheduled')

            kwargs['on_complete'] = self._on_operation_complete
            operation = Operation(op_id, *args, **kwargs)
            self._operations[op_id] = operation
            logger.info('%s: added', operation)
            self._schedule_next()
            return operation

    def get(self, op_id: str) -> Operation:
        """Return an operation with given operation ID."""
        with self._lock:
            return self._operations[op_id]

    def _on_operation_complete(self, operation: Operation):
        """Trigger next operation. Called from within previous thread."""
        logger.debug('%s: on_complete called', operation)
        with self._lock:
            self._current_operation = None
            if not operation.show_message:
                # No need to keep it lingering for later collection
                del self._operations[operation.op_id]

            self._schedule_next()

    def _schedule_next(self) -> None:
        """Schedule the next available operation."""
        with self._lock:
            if self._current_operation:
                return

            for operation in self._operations.values():
                if operation.state == Operation.State.WAITING:
                    logger.debug('%s: scheduling', operation)
                    self._current_operation = operation
                    operation.run()
                    break

    def filter(self, app_id: str) -> list[Operation]:
        """Return operations matching a pattern."""
        with self._lock:
            return [
                operation for operation in self._operations.values()
                if operation.app_id == app_id
            ]

    def collect_results(self, app_id: str) -> list[Operation]:
        """Return the finished operations for an app."""
        results: list[Operation] = []
        remaining: OrderedDict[str, Operation] = OrderedDict()

        with self._lock:
            for operation in self._operations.values():
                if (operation.app_id == app_id
                        and operation.state == Operation.State.COMPLETED):
                    results.append(operation)
                else:
                    remaining[operation.op_id] = operation

            if results:
                self._operations = remaining

        return results


manager = OperationsManager()