File: large_file_upload_state.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (73 lines) | stat: -rw-r--r-- 2,114 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
######################################################################
#
# File: b2sdk/_internal/transfer/outbound/large_file_upload_state.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import threading


class LargeFileUploadState:
    """
    Track the status of uploading a large file, accepting updates
    from the tasks that upload each of the parts.

    The aggregated progress is passed on to a ProgressListener that
    reports the progress for the file as a whole.

    This class is THREAD SAFE.
    """

    def __init__(self, file_progress_listener):
        """
        :param b2sdk.v2.AbstractProgressListener file_progress_listener: a progress listener object to use. Use :py:class:`b2sdk.v2.DoNothingProgressListener` to disable.
        """
        self.lock = threading.RLock()
        self.error_message = None
        self.file_progress_listener = file_progress_listener
        self.part_number_to_part_state = {}
        self.bytes_completed = 0

    def set_error(self, message):
        """
        Set an error message.

        :param str message: an error message
        """
        with self.lock:
            self.error_message = message

    def has_error(self):
        """
        Check whether an error occurred.

        :rtype: bool
        """
        with self.lock:
            return self.error_message is not None

    def get_error_message(self):
        """
        Fetche an error message.

        :return: an error message
        :rtype: str
        """
        with self.lock:
            assert self.has_error()
            return self.error_message

    def update_part_bytes(self, bytes_delta):
        """
        Update listener progress info.

        :param int bytes_delta: number of bytes to increase a progress for
        """
        with self.lock:
            self.bytes_completed += bytes_delta
            self.file_progress_listener.bytes_completed(self.bytes_completed)