File: monitoring.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (231 lines) | stat: -rw-r--r-- 8,508 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
######################################################################
#
# File: b2sdk/_internal/replication/monitoring.py
#
# Copyright 2022 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import sys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from queue import Queue
from typing import ClassVar, Iterator

from ..api import B2Api
from ..bucket import Bucket
from ..encryption.setting import EncryptionMode
from ..file_lock import NO_RETENTION_FILE_SETTING, LegalHold
from ..scan.folder import B2Folder
from ..scan.path import B2Path
from ..scan.policies import DEFAULT_SCAN_MANAGER, ScanPoliciesManager
from ..scan.report import ProgressReport
from ..scan.scan import (
    AbstractScanReport,
    AbstractScanResult,
    CountAndSampleScanReport,
    zip_folders,
)
from .setting import ReplicationRule
from .types import ReplicationStatus


@dataclass(frozen=True)
class ReplicationScanResult(AbstractScanResult):
    """
    Some attributes of source and destination files and their relations
    which are meaningful for replication monitoring and troubleshooting.

    Please be aware that only latest file versions are inspected, so any
    previous file versions are not represented in these results.
    """

    # source attrs
    source_replication_status: ReplicationStatus | None = None
    source_has_hide_marker: bool | None = None
    source_encryption_mode: EncryptionMode | None = None
    source_has_large_metadata: bool | None = None
    source_has_file_retention: bool | None = None
    source_has_legal_hold: bool | None = None

    # destination attrs
    destination_replication_status: ReplicationStatus | None = None

    # source & destination relation attrs
    metadata_differs: bool | None = None
    hash_differs: bool | None = None

    LARGE_METADATA_SIZE: ClassVar[int] = 2048

    @classmethod
    def from_files(
        cls, source_file: B2Path | None = None, destination_file: B2Path | None = None
    ) -> ReplicationScanResult:
        params = {}

        if source_file:
            source_file_version = source_file.selected_version
            params.update(
                {
                    'source_replication_status': source_file_version.replication_status,
                    'source_has_hide_marker': not source_file.is_visible(),
                    'source_encryption_mode': source_file_version.server_side_encryption.mode,
                    'source_has_large_metadata': source_file_version.has_large_header,
                    'source_has_file_retention': source_file_version.file_retention
                    is not NO_RETENTION_FILE_SETTING,
                    'source_has_legal_hold': source_file_version.legal_hold is LegalHold.ON,
                }
            )

        if destination_file:
            params.update(
                {
                    'destination_replication_status': destination_file.selected_version.replication_status,
                }
            )

        if source_file and destination_file:
            source_version = source_file.selected_version
            destination_version = destination_file.selected_version

            params.update(
                {
                    'metadata_differs': source_version.file_info != destination_version.file_info,
                    'hash_differs': (source_version.content_md5 != destination_version.content_md5)
                    or (source_version.content_sha1 != destination_version.content_sha1),
                }
            )

        return cls(**params)


@dataclass
class ReplicationReport(CountAndSampleScanReport):
    SCAN_RESULT_CLASS = ReplicationScanResult


@dataclass
class ReplicationMonitor:
    """
    Calculates source and (optionally) destination replication statistics.

    :param b2sdk.v2.Bucket bucket: replication source bucket
    :param b2sdk.v2.ReplicationRule rule: replication rule to be monitored;
    should belong to `bucket`'s replication configuration
    :param b2sdk.v2.B2Api destination_api: B2Api instance for destination
    bucket; if destination bucket is on the same account as source bucket,
    omit this parameter and then source bucket's B2Api will be used
    :param b2sdk.v2.ProgressReport report: instance of ProgressReport which will report
    scanning progress, by default to stdout
    :param b2sdk.v2.ScanPoliciesManager scan_policies_manager: a strategy to scan
    files, so that several files that match some criteria may be omitted
    :rtype: b2sdk.v2.ReplicationMonitor
    """

    bucket: Bucket
    rule: ReplicationRule
    destination_api: B2Api | None = None  # if None -> will use `api` of source (bucket)
    report: ProgressReport = field(default_factory=lambda: ProgressReport(sys.stdout, False))
    scan_policies_manager: ScanPoliciesManager = DEFAULT_SCAN_MANAGER

    REPORT_CLASS: ClassVar[AbstractScanReport] = ReplicationReport
    B2_FOLDER_CLASS: ClassVar[type] = B2Folder
    QUEUE_SIZE: ClassVar[int] = 20_000

    def __post_init__(self):
        if not self.bucket.replication:
            raise ValueError(f'Bucket {self.bucket} has no replication configuration')

        if self.rule not in self.bucket.replication.rules:
            raise ValueError(f'Rule {self.rule} is not a rule from {self.bucket}')

    @property
    def source_api(self) -> B2Api:
        return self.bucket.api

    @property
    def source_folder(self) -> B2_FOLDER_CLASS:
        return self.B2_FOLDER_CLASS(
            bucket_name=self.bucket.name,
            folder_name=self.rule.file_name_prefix,
            api=self.source_api,
        )

    @property
    def destination_bucket(self) -> Bucket:
        destination_api = self.destination_api or self.source_api
        bucket_id = self.rule.destination_bucket_id
        return destination_api.get_bucket_by_id(bucket_id)

    @property
    def destination_folder(self) -> B2_FOLDER_CLASS:
        destination_bucket = self.destination_bucket
        return self.B2_FOLDER_CLASS(
            bucket_name=destination_bucket.name,
            folder_name=self.rule.file_name_prefix,
            api=destination_bucket.api,
        )

    def iter_pairs(self) -> Iterator[tuple[B2Path | None, B2Path | None]]:
        """
        Iterate over files in source and destination and yield pairs.
        Required for replication inspection in-depth.

        Return pair of (source B2Path, destination B2Path). Source or destination
        path may be missing if there's no corresponding destination/source file.
        """
        yield from zip_folders(
            self.source_folder,
            self.destination_folder,
            reporter=self.report,
            policies_manager=self.scan_policies_manager,
        )

    def scan(self, scan_destination: bool = True) -> AbstractScanReport:
        """
        Scan source bucket (only, or with destination) and return replication report.

        No destination scan may give limited replication information, since it only
        checks files on the source bucket without checking whether
        they we really replicated to destination. It may be handy though
        if there is no access to replication destination.
        """
        report = self.REPORT_CLASS()
        queue = Queue(maxsize=self.QUEUE_SIZE)

        if not scan_destination:

            def fill_queue():
                for path in self.source_folder.all_files(
                    policies_manager=self.scan_policies_manager,
                    reporter=self.report,
                ):
                    queue.put((path,), block=True)
                queue.put(None, block=True)
        else:

            def fill_queue():
                for pair in self.iter_pairs():
                    queue.put(pair, block=True)
                queue.put(None, block=True)

        def consume_queue():
            while True:
                items = queue.get(block=True)
                if items is None:  # using None as "end of queue" marker
                    break
                report.add(*items)

        with ThreadPoolExecutor(max_workers=2) as thread_pool:
            futures = [
                thread_pool.submit(fill_queue),
                thread_pool.submit(consume_queue),
            ]

            for future in futures:
                future.result()

        return report