1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
|
######################################################################
#
# File: b2sdk/_internal/scan/report.py
#
# Copyright 2022 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations
import logging
import re
import threading
import time
from dataclasses import dataclass
from io import TextIOWrapper
from ..utils import format_and_scale_number
from ..utils.escape import escape_control_chars
logger = logging.getLogger(__name__)
_REMOVE_EXTENDED_PATH_PREFIX = re.compile(r'\\\\\?\\')
def _safe_path_print(path: str) -> str:
"""
Print a path, escaping control characters if necessary.
Windows extended path prefix is removed from the path before printing for better readability.
Since Windows 10 the prefix is not needed.
:param path: a path to print
:return: a path that can be printed
"""
return escape_control_chars(_REMOVE_EXTENDED_PATH_PREFIX.sub('', path))
@dataclass
class ProgressReport:
"""
Handle reporting progress.
This class is THREAD SAFE, so it can be used from parallel scan threads.
"""
# Minimum time between displayed updates
UPDATE_INTERVAL = 0.1
stdout: TextIOWrapper # standard output file object
no_progress: bool # if True, do not show progress
def __post_init__(self):
self.start_time = time.time()
self.count = 0
self.total_done = False
self.total_count = 0
self.closed = False
self.lock = threading.Lock()
self.current_line = ''
self.encoding_warning_was_already_printed = False
self._last_update_time = 0
self._update_progress()
self.warnings = []
self.errors_encountered = False
def close(self):
"""
Perform a clean-up.
"""
with self.lock:
if not self.no_progress:
self._print_line('', False)
self.closed = True
for warning in self.warnings:
self._print_line(warning, True)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def has_errors_or_warnings(self) -> bool:
"""
Check if there are any errors or warnings.
:return: True if there are any errors or warnings
"""
return self.errors_encountered or bool(self.warnings)
def error(self, message: str) -> None:
"""
Print an error, gracefully interleaving it with a progress bar.
:param message: an error message
"""
self.print_completion(message)
self.errors_encountered = True
def print_completion(self, message: str) -> None:
"""
Remove the progress bar, prints a message, and puts the progress
bar back.
:param message: an error message
"""
with self.lock:
self._print_line(message, True)
self._last_update_time = 0
self._update_progress()
def update_count(self, delta: int) -> None:
"""
Report that items have been processed.
"""
with self.lock:
self.count += delta
self._update_progress()
def _update_progress(self):
if self.closed or self.no_progress:
return
now = time.time()
interval = now - self._last_update_time
if interval < self.UPDATE_INTERVAL:
return
self._last_update_time = now
time_delta = time.time() - self.start_time
rate = 0 if time_delta == 0 else int(self.count / time_delta)
message = ' count: %d/%d %s' % (
self.count,
self.total_count,
format_and_scale_number(rate, '/s'),
)
self._print_line(message, False)
def _print_line(self, line: str, newline: bool) -> None:
"""
Print a line to stdout.
:param line: a string without a \r or \n in it.
:param newline: True if the output should move to a new line after this one.
"""
if len(line) < len(self.current_line):
line += ' ' * (len(self.current_line) - len(line))
try:
self.stdout.write(line)
except UnicodeEncodeError as encode_error:
if not self.encoding_warning_was_already_printed:
self.encoding_warning_was_already_printed = True
self.stdout.write(
f'!WARNING! this terminal cannot properly handle progress reporting. encoding is {self.stdout.encoding}.\n'
)
self.stdout.write(line.encode('ascii', 'backslashreplace').decode())
logger.warning(
f'could not output the following line with encoding {self.stdout.encoding} on stdout due to {encode_error}: {line}'
)
if newline:
self.stdout.write('\n')
self.current_line = ''
else:
self.stdout.write('\r')
self.current_line = line
self.stdout.flush()
def update_total(self, delta: int) -> None:
"""
Report that more files have been found for comparison.
:param delta: number of files found since the last check
"""
with self.lock:
self.total_count += delta
self._update_progress()
def end_total(self) -> None:
"""
Total files count is done. Can proceed to step 2.
"""
with self.lock:
self.total_done = True
self._update_progress()
def local_access_error(self, path: str) -> None:
"""
Add a file access error message to the list of warnings.
:param path: file path
"""
self.warnings.append(
f'WARNING: {_safe_path_print(path)} could not be accessed (broken symlink?)'
)
def local_permission_error(self, path: str) -> None:
"""
Add a permission error message to the list of warnings.
:param path: file path
"""
self.warnings.append(
f'WARNING: {_safe_path_print(path)} could not be accessed (no permissions to read?)'
)
def symlink_skipped(self, path: str) -> None:
pass
def circular_symlink_skipped(self, path: str) -> None:
"""
Add a circular symlink error message to the list of warnings.
:param path: file path
"""
self.warnings.append(
f'WARNING: {_safe_path_print(path)} is a circular symlink, which was already visited. Skipping.'
)
def invalid_name(self, path: str, error: str) -> None:
"""
Add an invalid filename error message to the list of warnings.
:param path: file path
"""
self.warnings.append(
f'WARNING: {_safe_path_print(path)} path contains invalid name ({error}). Skipping.'
)
def sample_report_run():
"""
Generate a sample report.
"""
import sys
report = ProgressReport(sys.stdout, False)
for i in range(20):
report.update_total(1)
time.sleep(0.2)
if i % 2 == 0:
report.update_count(1)
report.end_total()
report.close()
|