File: report.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (250 lines) | stat: -rw-r--r-- 7,328 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
######################################################################
#
# File: b2sdk/_internal/scan/report.py
#
# Copyright 2022 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import logging
import re
import threading
import time
from dataclasses import dataclass
from io import TextIOWrapper

from ..utils import format_and_scale_number
from ..utils.escape import escape_control_chars

logger = logging.getLogger(__name__)

_REMOVE_EXTENDED_PATH_PREFIX = re.compile(r'\\\\\?\\')


def _safe_path_print(path: str) -> str:
    """
    Print a path, escaping control characters if necessary.

    Windows extended path prefix is removed from the path before printing for better readability.
    Since Windows 10 the prefix is not needed.

    :param path: a path to print
    :return: a path that can be printed
    """
    return escape_control_chars(_REMOVE_EXTENDED_PATH_PREFIX.sub('', path))


@dataclass
class ProgressReport:
    """
    Handle reporting progress.

    This class is THREAD SAFE, so it can be used from parallel scan threads.
    """

    # Minimum time between displayed updates
    UPDATE_INTERVAL = 0.1

    stdout: TextIOWrapper  # standard output file object
    no_progress: bool  # if True, do not show progress

    def __post_init__(self):
        self.start_time = time.time()

        self.count = 0
        self.total_done = False
        self.total_count = 0

        self.closed = False
        self.lock = threading.Lock()
        self.current_line = ''
        self.encoding_warning_was_already_printed = False
        self._last_update_time = 0
        self._update_progress()
        self.warnings = []
        self.errors_encountered = False

    def close(self):
        """
        Perform a clean-up.
        """
        with self.lock:
            if not self.no_progress:
                self._print_line('', False)
            self.closed = True
            for warning in self.warnings:
                self._print_line(warning, True)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def has_errors_or_warnings(self) -> bool:
        """
        Check if there are any errors or warnings.

        :return: True if there are any errors or warnings
        """
        return self.errors_encountered or bool(self.warnings)

    def error(self, message: str) -> None:
        """
        Print an error, gracefully interleaving it with a progress bar.

        :param message: an error message
        """
        self.print_completion(message)
        self.errors_encountered = True

    def print_completion(self, message: str) -> None:
        """
        Remove the progress bar, prints a message, and puts the progress
        bar back.

        :param message: an error message
        """
        with self.lock:
            self._print_line(message, True)
            self._last_update_time = 0
            self._update_progress()

    def update_count(self, delta: int) -> None:
        """
        Report that items have been processed.
        """
        with self.lock:
            self.count += delta
            self._update_progress()

    def _update_progress(self):
        if self.closed or self.no_progress:
            return

        now = time.time()
        interval = now - self._last_update_time
        if interval < self.UPDATE_INTERVAL:
            return

        self._last_update_time = now
        time_delta = time.time() - self.start_time
        rate = 0 if time_delta == 0 else int(self.count / time_delta)

        message = ' count: %d/%d   %s' % (
            self.count,
            self.total_count,
            format_and_scale_number(rate, '/s'),
        )

        self._print_line(message, False)

    def _print_line(self, line: str, newline: bool) -> None:
        """
        Print a line to stdout.

        :param line: a string without a \r or \n in it.
        :param newline: True if the output should move to a new line after this one.
        """
        if len(line) < len(self.current_line):
            line += ' ' * (len(self.current_line) - len(line))
        try:
            self.stdout.write(line)
        except UnicodeEncodeError as encode_error:
            if not self.encoding_warning_was_already_printed:
                self.encoding_warning_was_already_printed = True
                self.stdout.write(
                    f'!WARNING! this terminal cannot properly handle progress reporting.  encoding is {self.stdout.encoding}.\n'
                )
            self.stdout.write(line.encode('ascii', 'backslashreplace').decode())
            logger.warning(
                f'could not output the following line with encoding {self.stdout.encoding} on stdout due to {encode_error}: {line}'
            )
        if newline:
            self.stdout.write('\n')
            self.current_line = ''
        else:
            self.stdout.write('\r')
            self.current_line = line
        self.stdout.flush()

    def update_total(self, delta: int) -> None:
        """
        Report that more files have been found for comparison.

        :param delta: number of files found since the last check
        """
        with self.lock:
            self.total_count += delta
            self._update_progress()

    def end_total(self) -> None:
        """
        Total files count is done. Can proceed to step 2.
        """
        with self.lock:
            self.total_done = True
            self._update_progress()

    def local_access_error(self, path: str) -> None:
        """
        Add a file access error message to the list of warnings.

        :param path: file path
        """
        self.warnings.append(
            f'WARNING: {_safe_path_print(path)} could not be accessed (broken symlink?)'
        )

    def local_permission_error(self, path: str) -> None:
        """
        Add a permission error message to the list of warnings.

        :param path: file path
        """
        self.warnings.append(
            f'WARNING: {_safe_path_print(path)} could not be accessed (no permissions to read?)'
        )

    def symlink_skipped(self, path: str) -> None:
        pass

    def circular_symlink_skipped(self, path: str) -> None:
        """
        Add a circular symlink error message to the list of warnings.

        :param path: file path
        """
        self.warnings.append(
            f'WARNING: {_safe_path_print(path)} is a circular symlink, which was already visited. Skipping.'
        )

    def invalid_name(self, path: str, error: str) -> None:
        """
        Add an invalid filename error message to the list of warnings.

        :param path: file path
        """
        self.warnings.append(
            f'WARNING: {_safe_path_print(path)} path contains invalid name ({error}). Skipping.'
        )


def sample_report_run():
    """
    Generate a sample report.
    """
    import sys

    report = ProgressReport(sys.stdout, False)

    for i in range(20):
        report.update_total(1)
        time.sleep(0.2)
        if i % 2 == 0:
            report.update_count(1)
    report.end_total()
    report.close()