File: HttpRequestManager.py

package info (click to toggle)
uranium 5.0.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,304 kB
  • sloc: python: 31,765; sh: 132; makefile: 12
file content (454 lines) | stat: -rw-r--r-- 24,648 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# Copyright (c) 2022 Ultimaker B.V.
# Uranium is released under the terms of the LGPLv3 or higher.

import json
import time
import uuid
from collections import deque
from threading import RLock
from typing import Callable, cast, Dict, Set, Union, Optional, Any

from PyQt6.QtCore import QObject, QUrl, Qt, pyqtSignal, pyqtProperty
from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply

from UM.Logger import Logger
from UM.TaskManagement.HttpRequestData import HttpRequestData
from UM.TaskManagement.HttpRequestScope import HttpRequestScope
from UM.TaskManagement.TaskManager import TaskManager


#
# Summary:
#
# HttpRequestManager is a wrapper for Qt's QNetworkAccessManager and make it more convenient to do the following things:
#  (1) Keep track of the HTTP requests one has issued. This is done via the HttpRequestData object. Each HttpRequestData
#      object represents an issued HTTP request.
#  (2) A request can be aborted if it hasn't been issued to QNetworkAccessManager or if it's still running by
#      QNetworkAccessManager.
#  (3) Updates on each request is done via user-specified callback functions. So, for each request, you can give
#      optional callbacks:
#       - A successful callback, invoked when the request has been finished successfully.
#       - An error callback, invoked when an error has occurred, including when a request was aborted by the user or
#         timed out.
#       - A download progress callback, invoked when there's an update on the download progress.
#       - An upload progress callback, invoked when there's an update on the upload progress.
#  (4) An optional timeout can be specified for an HTTP request. Note that this timeout is the max wait time between
#      each time the request gets a response from the server. This is handled via the download and upload progress
#      callbacks. A QTimer is used for each request to track its timeout if set. If the timer gets triggered and there
#      is indeed a timeout, the request will be aborted. All requests that are aborted due to a timeout will result in
#      invoking its error callback with an error code QNetworkReply::OperationCanceledError, but the HttpRequestData
#      will have its "is_aborted_due_to_timeout" property set to True.
#     Because of
#
# All requests are handled by QNetworkAccessManager. We consider that all the requests that are being handled by
# QNetworkAccessManager at a certain point are running concurrently.
#

#
# A dedicated manager that processes and schedules HTTP requests. It provides public APIs for issuing HTTP requests
# and the results, successful or not, will be communicated back via callback functions. For each request, 2 callback
# functions can be optionally specified:
#
#  - callback: This function will be invoked when a request finishes. (bound to QNetworkReply.finished signal)
#        Its signature should be "def callback(QNetworkReply) -> None" or other compatible form.
#
#  - error_callback: This function will be invoked when a request fails. (bound to QNetworkReply.error signal)
#        Its signature should be "def callback(QNetworkReply, QNetworkReply.NetworkError) -> None" or other compatible
#        form.
#
#  - download_progress_callback: This function will be invoked whenever the download progress changed. (bound to
#       QNetworkReply.downloadProgress signal)
#       Its signature should be "def callback(bytesReceived: int, bytesTotal: int) -> None" or other compatible form.
#
#  - upload_progress_callback: This function will be invoked whenever the upload progress changed. (bound to
#       QNetworkReply.downloadProgress signal)
#       Its signature should be "def callback(bytesSent: int, bytesTotal: int) -> None" or other compatible form.
#
#  - timeout (EXPERIMENTAL): The timeout is seconds for a request. This is the timeout since the request was first
#       issued to the QNetworkManager. NOTE that this timeout is NOT the timeout between each response from the other
#       party, but the timeout for the complete request. So, if you have a very slow network which takes 2 hours to
#       download a 1MB file, and for this request you set a timeout of 10 minutes, the request will be aborted after
#       10 minutes if it's not finished.
#
class HttpRequestManager(TaskManager):

    __instance = None  # type: Optional[HttpRequestManager]

    internetReachableChanged = pyqtSignal(bool)

    @classmethod
    def getInstance(cls, *args, **kwargs) -> "HttpRequestManager":
        if cls.__instance is None:
            cls.__instance = cls(*args, **kwargs)
        return cls.__instance

    def __init__(self, max_concurrent_requests: int = 4, parent: Optional["QObject"] = None,
                 enable_request_benchmarking: bool = False) -> None:
        if HttpRequestManager.__instance is not None:
            raise RuntimeError("Try to create singleton '%s' more than once" % self.__class__.__name__)

        super().__init__(parent)
        HttpRequestManager.__instance = self

        self._network_manager = QNetworkAccessManager(self)
        self._account_manager = None
        self._is_internet_reachable = True

        # All the requests that have been issued to the QNetworkManager are considered as running concurrently. This
        # number defines the max number of requests that will be issued to the QNetworkManager.
        self._max_concurrent_requests = max_concurrent_requests

        # A FIFO queue for the pending requests.
        self._request_queue = deque()  # type: deque

        # A set of all currently in progress requests
        self._requests_in_progress = set()  # type: Set[HttpRequestData]
        self._request_lock = RLock()
        self._process_requests_scheduled = False

        # Debug options
        #
        # Enabling benchmarking will make the manager to time how much time it takes for a request from start to finish
        # and log them.
        self._enable_request_benchmarking = enable_request_benchmarking

    @pyqtProperty(bool, notify = internetReachableChanged)
    def isInternetReachable(self) -> bool:
        return self._is_internet_reachable

    # Public API for creating an HTTP GET request.
    # Returns an HttpRequestData instance that represents this request.
    def get(self, url: str,
            headers_dict: Optional[Dict[str, str]] = None,
            callback: Optional[Callable[["QNetworkReply"], None]] = None,
            error_callback: Optional[Callable[["QNetworkReply", "QNetworkReply.NetworkError"], None]] = None,
            download_progress_callback: Optional[Callable[[int, int], None]] = None,
            upload_progress_callback: Optional[Callable[[int, int], None]] = None,
            timeout: Optional[float] = None,
            scope: Optional[HttpRequestScope] = None) -> "HttpRequestData":
        return self._createRequest("get", url, headers_dict = headers_dict,
                                   callback = callback, error_callback = error_callback,
                                   download_progress_callback = download_progress_callback,
                                   upload_progress_callback = upload_progress_callback,
                                   timeout = timeout,
                                   scope = scope)

    # Public API for creating an HTTP PUT request.
    # Returns an HttpRequestData instance that represents this request.
    def put(self, url: str,
            headers_dict: Optional[Dict[str, str]] = None,
            data: Optional[Union[bytes, bytearray]] = None,
            callback: Optional[Callable[["QNetworkReply"], None]] = None,
            error_callback: Optional[Callable[["QNetworkReply", "QNetworkReply.NetworkError"], None]] = None,
            download_progress_callback: Optional[Callable[[int, int], None]] = None,
            upload_progress_callback: Optional[Callable[[int, int], None]] = None,
            timeout: Optional[float] = None,
            scope: Optional[HttpRequestScope] = None) -> "HttpRequestData":
        return self._createRequest("put", url, headers_dict = headers_dict, data = data,
                                   callback = callback, error_callback = error_callback,
                                   download_progress_callback = download_progress_callback,
                                   upload_progress_callback = upload_progress_callback,
                                   timeout = timeout,
                                   scope = scope)

    # Public API for creating an HTTP POST request. Returns a unique request ID for this request.
    # Returns an HttpRequestData instance that represents this request.
    def post(self, url: str,
             headers_dict: Optional[Dict[str, str]] = None,
             data: Optional[Union[bytes, bytearray]] = None,
             callback: Optional[Callable[["QNetworkReply"], None]] = None,
             error_callback: Optional[Callable[["QNetworkReply", "QNetworkReply.NetworkError"], None]] = None,
             download_progress_callback: Optional[Callable[[int, int], None]] = None,
             upload_progress_callback: Optional[Callable[[int, int], None]] = None,
             timeout: Optional[float] = None,
             scope: Optional[HttpRequestScope] = None) -> "HttpRequestData":
        return self._createRequest("post", url, headers_dict = headers_dict, data = data,
                                   callback = callback, error_callback = error_callback,
                                   download_progress_callback = download_progress_callback,
                                   upload_progress_callback = upload_progress_callback,
                                   timeout = timeout,
                                   scope = scope)

    # Public API for creating an HTTP DELETE request.
    # Returns an HttpRequestData instance that represents this request.
    def delete(self, url: str,
               headers_dict: Optional[Dict[str, str]] = None,
               callback: Optional[Callable[["QNetworkReply"], None]] = None,
               error_callback: Optional[Callable[["QNetworkReply", "QNetworkReply.NetworkError"], None]] = None,
               download_progress_callback: Optional[Callable[[int, int], None]] = None,
               upload_progress_callback: Optional[Callable[[int, int], None]] = None,
               timeout: Optional[float] = None,
               scope: Optional[HttpRequestScope] = None) -> "HttpRequestData":
        return self._createRequest("deleteResource", url, headers_dict=headers_dict,
                                   callback=callback, error_callback=error_callback,
                                   download_progress_callback=download_progress_callback,
                                   upload_progress_callback=upload_progress_callback,
                                   timeout=timeout,
                                   scope=scope)

    # Public API for aborting a given HttpRequestData. If the request is not pending or in progress, nothing
    # will be done.
    def abortRequest(self, request: "HttpRequestData") -> None:
        with self._request_lock:
            # If the request is currently pending, just remove it from the pending queue.
            if request in self._request_queue:
                self._request_queue.remove(request)

            # If the request is currently in progress, abort it.
            if request in self._requests_in_progress:
                if request.reply is not None and request.reply.isRunning():
                    request.reply.abort()
                    Logger.log("d", "%s aborted", request)

    @staticmethod
    def readJSON(reply: QNetworkReply) -> Any:
        """ Read a Json response into a Python object (list, dict, str depending on json type)

        :return: Python object representing the Json or None in case of error
        """
        try:
            return json.loads(HttpRequestManager.readText(reply))
        except json.decoder.JSONDecodeError:
            Logger.log("w", "Received invalid JSON: " + str(reply.url()))
            return None

    @staticmethod
    def readText(reply: QNetworkReply) -> str:
        """Decode raw reply bytes as utf-8"""
        return bytes(reply.readAll()).decode("utf-8")

    @staticmethod
    def replyIndicatesSuccess(reply: QNetworkReply, error: Optional["QNetworkReply.NetworkError"] = None) -> bool:
        """Returns whether reply status code indicates success and error is None"""
        return error is None and 200 <= reply.attribute(QNetworkRequest.Attribute.HttpStatusCodeAttribute) < 300

    @staticmethod
    def safeHttpStatus(reply: Optional[QNetworkReply]):
        """Returns the status code or -1 if there isn't any"""
        if reply is None:
            return -1

        return reply.attribute(QNetworkRequest.Attribute.HttpStatusCodeAttribute) or -1

    @staticmethod
    def qt_network_error_name(error: QNetworkReply.NetworkError):
        """String representation of a NetworkError, eg 'ProtocolInvalidOperationError'"""

        for k, v in QNetworkReply.NetworkError.__dict__.items():
            if v == error:
                return k
        return "Unknown Qt Network error"

    # This function creates a HttpRequestData with the given data and puts it into the pending request queue.
    # If no request processing call has been scheduled, it will schedule it too.
    # Returns an HttpRequestData instance that represents this request.
    def _createRequest(self, http_method: str, url: str,
                       headers_dict: Optional[Dict[str, str]] = None,
                       data: Optional[Union[bytes, bytearray]] = None,
                       callback: Optional[Callable[["QNetworkReply"], None]] = None,
                       error_callback: Optional[Callable[["QNetworkReply", "QNetworkReply.NetworkError"], None]] = None,
                       download_progress_callback: Optional[Callable[[int, int], None]] = None,
                       upload_progress_callback: Optional[Callable[[int, int], None]] = None,
                       timeout: Optional[float] = None,
                       scope: Optional[HttpRequestScope] = None ) -> "HttpRequestData":
        # Sanity checks
        if timeout is not None and timeout <= 0:
            raise ValueError("Timeout must be a positive number if provided, but [%s] was given" % timeout)

        request = QNetworkRequest(QUrl(url))

        # Set headers
        if headers_dict is not None:
            for key, value in headers_dict.items():
                request.setRawHeader(key.encode("utf-8"), value.encode("utf-8"))

        if scope is not None:
            scope.requestHook(request)

        # Generate a unique request ID
        request_id = uuid.uuid4().hex

        # Create the request data
        request_data = HttpRequestData(request_id,
                                       http_method = http_method,
                                       request = request,
                                       data = data,
                                       manager_timeout_callback = self._onRequestTimeout,
                                       callback = callback,
                                       error_callback = error_callback,
                                       download_progress_callback = download_progress_callback,
                                       upload_progress_callback = upload_progress_callback,
                                       timeout = timeout)

        with self._request_lock:
            self._request_queue.append(request_data)

            # Schedule a call to process pending requests in the queue
            if not self._process_requests_scheduled:
                self.callLater(0, self._processNextRequestsInQueue)
                self._process_requests_scheduled = True

        return request_data

    # For easier debugging, so you know when the call is triggered by the timeout timer.
    def _onRequestTimeout(self, request_data: "HttpRequestData") -> None:
        Logger.log("d", "Request [%s] timeout.", self)

        # Make typing happy
        if request_data.reply is None:
            return

        with self._request_lock:
            if request_data not in self._requests_in_progress:
                return

            request_data.reply.abort()
            request_data.is_aborted_due_to_timeout = True

    # Processes the next requests in the pending queue. This function will issue as many requests to the QNetworkManager
    # as possible but limited by the value "_max_concurrent_requests". It stops if there is no more pending requests.
    def _processNextRequestsInQueue(self) -> None:
        # Process all requests until the max concurrent number is hit or there's no more requests to process.
        while True:
            with self._request_lock:
                # Do nothing if there's no more requests to process
                if not self._request_queue:
                    self._process_requests_scheduled = False
                    return

                # Do not exceed the max request limit
                if len(self._requests_in_progress) >= self._max_concurrent_requests:
                    self._process_requests_scheduled = False
                    return

                # Fetch the next request and process
                next_request_data = self._request_queue.popleft()
            self._processRequest(cast(HttpRequestData, next_request_data))

    # Processes the given HttpRequestData by issuing the request using QNetworkAccessManager and moves the
    # request into the currently in-progress list.
    def _processRequest(self, request_data: "HttpRequestData") -> None:
        now = time.time()

        # Get the right http_method function and prepare arguments.
        method = getattr(self._network_manager, request_data.http_method)
        args = [request_data.request]
        if request_data.data is not None:
            args.append(request_data.data)

        # Issue the request and add the reply into the currently in-progress requests set
        reply = method(*args)
        request_data.reply = reply

        # Connect callback signals
        reply.errorOccurred.connect(lambda err, rd = request_data: self._onRequestError(rd, err), type = Qt.ConnectionType.QueuedConnection)
        reply.finished.connect(lambda rd = request_data: self._onRequestFinished(rd), type = Qt.ConnectionType.QueuedConnection)

        # Only connect download/upload progress callbacks when necessary to reduce CPU usage.
        if request_data.download_progress_callback is not None or request_data.timeout is not None:
            reply.downloadProgress.connect(request_data.onDownloadProgressCallback, type = Qt.ConnectionType.QueuedConnection)
        if request_data.upload_progress_callback is not None or request_data.timeout is not None:
            reply.uploadProgress.connect(request_data.onUploadProgressCallback, type = Qt.ConnectionType.QueuedConnection)

        with self._request_lock:
            self._requests_in_progress.add(request_data)
            request_data.setStartTime(now)

    def _onRequestError(self, request_data: "HttpRequestData", error: "QNetworkReply.NetworkError") -> None:
        error_string = None
        if request_data.reply is not None:
            error_string = request_data.reply.errorString()

        if error == QNetworkReply.NetworkError.UnknownNetworkError or QNetworkReply.NetworkError.HostNotFoundError:
            self._setInternetReachable(False)
            # manager seems not always able to recover from a total loss of network access, so re-create it
            self._network_manager = QNetworkAccessManager(self)

        # Use peek() to retrieve the reply's body instead of readAll(), because readAll consumes the content
        reply_body = request_data.reply.peek(request_data.reply.bytesAvailable())  # unlike readAll(), peek doesn't consume the content
        Logger.log("d", "%s got an QNetworkReplyError %s. The server returned: %s", request_data, error_string, reply_body)

        with self._request_lock:
            # Safeguard: make sure that we have the reply in the currently in-progress requests set
            if request_data not in self._requests_in_progress:
                # TODO: ERROR, should not happen
                Logger.log("e", "%s not found in the in-progress set", request_data)
                pass
            else:
                # Disconnect callback signals
                if request_data.reply is not None:
                    if request_data.download_progress_callback is not None:
                        request_data.reply.downloadProgress.disconnect(request_data.onDownloadProgressCallback)
                    if request_data.upload_progress_callback is not None:
                        request_data.reply.uploadProgress.disconnect(request_data.onUploadProgressCallback)

                    request_data.setDone()
                    self._requests_in_progress.remove(request_data)

        # Schedule the error callback if there is one
        if request_data.error_callback is not None:
            self.callLater(0, request_data.error_callback, request_data.reply, error)

        # Continue to process the next request
        self._processNextRequestsInQueue()

    def _onRequestFinished(self, request_data: "HttpRequestData") -> None:
        # See https://doc.qt.io/archives/qt-5.10/qnetworkreply.html#abort
        # Calling QNetworkReply.abort() will also trigger finished(), so we need to know if a request was finished or
        # aborted. This can be done by checking if the error is QNetworkReply.OperationCanceledError. If a request was
        # aborted due to timeout, the request's HttpRequestData.is_aborted_due_to_timeout will be set to True.
        #
        # We do nothing if the request was aborted or and error was detected because an error callback will also
        # be triggered by Qt.
        reply = request_data.reply
        if reply is not None:
            reply_error = reply.error()  # error() must only be called once
            if reply_error != QNetworkReply.NetworkError.NoError:
                if reply_error == QNetworkReply.NetworkError.OperationCanceledError:
                    Logger.log("d", "%s was aborted, do nothing", request_data)

                # stop processing for any kind of error
                return

        # No error? Internet is reachable
        self._setInternetReachable(True)

        if self._enable_request_benchmarking:
            time_spent = None  # type: Optional[float]
            if request_data.start_time is not None:
                time_spent = time.time() - request_data.start_time
            Logger.log("d", "Request [%s] finished, took %s seconds, pending for %s seconds",
                       request_data, time_spent, request_data.pending_time)

        with self._request_lock:
            # Safeguard: make sure that we have the reply in the currently in-progress requests set.
            if request_data not in self._requests_in_progress:
                # This can happen if a request has been aborted. The finished() signal will still be triggered at the
                # end. In this case, do nothing with this request.
                Logger.log("e", "%s not found in the in-progress set", request_data)
            else:
                # Disconnect callback signals
                if reply is not None:
                    # Even after the request was successfully finished, an error may still be emitted if
                    # the network connection is lost seconds later. Bug in Qt? Fixes CURA-7349
                    reply.errorOccurred.disconnect()

                    if request_data.download_progress_callback is not None:
                        reply.downloadProgress.disconnect(request_data.onDownloadProgressCallback)
                    if request_data.upload_progress_callback is not None:
                        reply.uploadProgress.disconnect(request_data.onUploadProgressCallback)

                request_data.setDone()
                self._requests_in_progress.remove(request_data)

        # Schedule the callback if there is one
        if request_data.callback is not None:
            self.callLater(0, request_data.callback, reply)

        # Continue to process the next request
        self._processNextRequestsInQueue()

    def _setInternetReachable(self, reachable: bool):
        if reachable != self._is_internet_reachable:
            self._is_internet_reachable = reachable
            self.internetReachableChanged.emit(reachable)