File: scan.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (118 lines) | stat: -rw-r--r-- 3,937 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
######################################################################
#
# File: b2sdk/_internal/scan/scan.py
#
# Copyright 2022 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

from abc import ABCMeta, abstractclassmethod, abstractmethod
from collections import Counter
from dataclasses import dataclass, field
from typing import ClassVar

from ..file_version import FileVersion
from .folder import AbstractFolder
from .path import AbstractPath
from .policies import DEFAULT_SCAN_MANAGER, ScanPoliciesManager
from .report import ProgressReport


def zip_folders(
    folder_a: AbstractFolder,
    folder_b: AbstractFolder,
    reporter: ProgressReport,
    policies_manager: ScanPoliciesManager = DEFAULT_SCAN_MANAGER,
) -> tuple[AbstractPath | None, AbstractPath | None]:
    """
    Iterate over all of the files in the union of two folders,
    matching file names.

    Each item is a pair (file_a, file_b) with the corresponding file
    in both folders.  Either file (but not both) will be None if the
    file is in only one folder.

    :param b2sdk._internal.scan.folder.AbstractFolder folder_a: first folder object.
    :param b2sdk._internal.scan.folder.AbstractFolder folder_b: second folder object.
    :param reporter: reporter object
    :param policies_manager: policies manager object
    :return: yields two element tuples
    """

    iter_a = folder_a.all_files(reporter, policies_manager)
    iter_b = folder_b.all_files(reporter)

    current_a = next(iter_a, None)
    current_b = next(iter_b, None)

    while current_a is not None or current_b is not None:
        if current_a is None:
            yield (None, current_b)
            current_b = next(iter_b, None)
        elif current_b is None:
            yield (current_a, None)
            current_a = next(iter_a, None)
        elif current_a.relative_path < current_b.relative_path:
            yield (current_a, None)
            current_a = next(iter_a, None)
        elif current_b.relative_path < current_a.relative_path:
            yield (None, current_b)
            current_b = next(iter_b, None)
        else:
            assert current_a.relative_path == current_b.relative_path
            yield (current_a, current_b)
            current_a = next(iter_a, None)
            current_b = next(iter_b, None)

    reporter.close()


@dataclass(frozen=True)
class AbstractScanResult(metaclass=ABCMeta):
    """
    Some attributes of files which are meaningful for monitoring and troubleshooting.
    """

    @abstractclassmethod
    def from_files(cls, *files: AbstractPath | None) -> AbstractScanResult:
        pass


@dataclass
class AbstractScanReport(metaclass=ABCMeta):
    """
    Aggregation of valuable information about files after scanning.
    """

    SCAN_RESULT_CLASS: ClassVar[type] = AbstractScanResult

    @abstractmethod
    def add(self, *files: AbstractPath | None) -> None:
        pass


@dataclass
class CountAndSampleScanReport(AbstractScanReport):
    """
    Scan report which groups and counts files by their `AbstractScanResult` and
    also stores first and last seen examples of such files.
    """

    counter_by_status: Counter = field(default_factory=Counter)
    samples_by_status_first: dict[AbstractScanResult, tuple[FileVersion, ...]] = field(
        default_factory=dict
    )
    samples_by_status_last: dict[AbstractScanResult, tuple[FileVersion, ...]] = field(
        default_factory=dict
    )

    def add(self, *files: AbstractPath | None) -> None:
        status = self.SCAN_RESULT_CLASS.from_files(*files)
        self.counter_by_status[status] += 1

        sample = tuple(file and file.selected_version for file in files)
        self.samples_by_status_first.setdefault(status, sample)
        self.samples_by_status_last[status] = sample