File: sync.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (346 lines) | stat: -rw-r--r-- 14,597 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
######################################################################
#
# File: b2sdk/_internal/sync/sync.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import concurrent.futures as futures
import logging
from enum import Enum, unique
from typing import cast

from ..bounded_queue_executor import BoundedQueueExecutor
from ..scan.exception import InvalidArgument
from ..scan.folder import AbstractFolder, B2Folder, LocalFolder
from ..scan.path import AbstractPath
from ..scan.policies import DEFAULT_SCAN_MANAGER, ScanPoliciesManager
from ..scan.scan import zip_folders
from ..transfer.outbound.upload_source import UploadMode
from .encryption_provider import (
    SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
    AbstractSyncEncryptionSettingsProvider,
)
from .exception import IncompleteSync
from .policy import CompareVersionMode, NewerFileSyncMode
from .policy_manager import POLICY_MANAGER, SyncPolicyManager
from .report import SyncReport

logger = logging.getLogger(__name__)


def count_files(local_folder, reporter, policies_manager):
    """
    Count all of the files in a local folder.

    :param b2sdk._internal.scan.folder.AbstractFolder local_folder: a folder object.
    :param reporter: reporter object
    """
    # Don't pass in a reporter to all_files.  Broken symlinks will be reported
    # during the next pass when the source and dest files are compared.
    for _ in local_folder.all_files(None, policies_manager=policies_manager):
        reporter.update_total(1)
    reporter.end_total()


@unique
class KeepOrDeleteMode(Enum):
    """Mode of dealing with old versions of files on the destination"""

    DELETE = 301  #: delete the old version as soon as the new one has been uploaded
    KEEP_BEFORE_DELETE = 302  #: keep the old versions of the file for a configurable number of days before deleting them, always keeping the newest version
    NO_DELETE = 303  #: keep old versions of the file, do not delete anything


class Synchronizer:
    """
    Copies multiple "files" from source to destination.  Optionally
    deletes or hides destination files that the source does not have.

    The synchronizer can copy files:

    - From a B2 bucket to a local destination.
    - From a local source to a B2 bucket.
    - From one B2 bucket to another.
    - Between different folders in the same B2 bucket.
      It will sync only the latest versions of files.

    By default, the synchronizer:

    - Fails when the specified source directory doesn't exist or is empty.
      (see ``allow_empty_source`` argument)
    - Fails when the source is newer.
      (see ``newer_file_mode`` argument)
    - Doesn't delete a file if it's present on the destination but not on the source.
      (see ``keep_days_or_delete`` and ``keep_days`` arguments)
    - Compares files based on modification time.
      (see ``compare_version_mode`` and ``compare_threshold`` arguments)
    """

    def __init__(
        self,
        max_workers,
        policies_manager=DEFAULT_SCAN_MANAGER,
        dry_run=False,
        allow_empty_source=False,
        newer_file_mode=NewerFileSyncMode.RAISE_ERROR,
        keep_days_or_delete=KeepOrDeleteMode.NO_DELETE,
        compare_version_mode=CompareVersionMode.MODTIME,
        compare_threshold=None,
        keep_days=None,
        sync_policy_manager: SyncPolicyManager = POLICY_MANAGER,
        upload_mode: UploadMode = UploadMode.FULL,
        absolute_minimum_part_size: int | None = None,
    ):
        """
        Initialize synchronizer class and validate arguments

        :param int max_workers: max number of workers
        :param policies_manager: object which decides which files to process
        :param bool dry_run: test mode, does not actually transfer/delete when enabled
        :param bool allow_empty_source: if True, do not check whether source folder is empty
        :param b2sdk.v2.NewerFileSyncMode newer_file_mode: setting which determines handling for destination files newer than on the source
        :param b2sdk.v2.KeepOrDeleteMode keep_days_or_delete: setting which determines if we should delete or not delete or keep for `keep_days`
        :param b2sdk.v2.CompareVersionMode compare_version_mode: how to compare the source and destination files to find new ones
        :param int compare_threshold: should be greater than 0, default is 0
        :param int keep_days: if keep_days_or_delete is `b2sdk.v2.KeepOrDeleteMode.KEEP_BEFORE_DELETE`, then this should be greater than 0
        :param SyncPolicyManager sync_policy_manager: object which decides what to do with each file (upload, download, delete, copy, hide etc)
        :param b2sdk.v2.UploadMode upload_mode: determines how file uploads are handled
        :param int absolute_minimum_part_size: minimum file part size for large files
        """
        self.newer_file_mode = newer_file_mode
        self.keep_days_or_delete = keep_days_or_delete
        self.keep_days = keep_days
        self.compare_version_mode = compare_version_mode
        self.compare_threshold = compare_threshold or 0
        self.dry_run = dry_run
        self.allow_empty_source = allow_empty_source
        self.policies_manager = (
            policies_manager  # actually it should be called scan_policies_manager
        )
        self.sync_policy_manager = sync_policy_manager
        self.max_workers = max_workers
        self.upload_mode = upload_mode
        self.absolute_minimum_part_size = absolute_minimum_part_size
        self._validate()

    def _validate(self):
        if self.compare_threshold < 0:
            raise InvalidArgument('compare_threshold', 'must be a positive integer')

        if self.newer_file_mode not in tuple(NewerFileSyncMode):
            raise InvalidArgument(
                'newer_file_mode',
                'must be one of :%s' % NewerFileSyncMode.__members__,
            )

        if self.keep_days_or_delete not in tuple(KeepOrDeleteMode):
            raise InvalidArgument(
                'keep_days_or_delete',
                'must be one of :%s' % KeepOrDeleteMode.__members__,
            )

        if (
            self.keep_days_or_delete == KeepOrDeleteMode.KEEP_BEFORE_DELETE
            and self.keep_days is None
        ):
            raise InvalidArgument(
                'keep_days',
                'is required when keep_days_or_delete is %s' % KeepOrDeleteMode.KEEP_BEFORE_DELETE,
            )

        if self.compare_version_mode not in tuple(CompareVersionMode):
            raise InvalidArgument(
                'compare_version_mode',
                'must be one of :%s' % CompareVersionMode.__members__,
            )

    def sync_folders(
        self,
        source_folder: AbstractFolder,
        dest_folder: AbstractFolder,
        now_millis: int,
        reporter: SyncReport | None,
        encryption_settings_provider: AbstractSyncEncryptionSettingsProvider = SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
    ):
        """
        Syncs two folders.  Always ensures that every file in the
        source is also in the destination.  Deletes any file versions
        in the destination older than history_days.

        :param source_folder: source folder object
        :param dest_folder: destination folder object
        :param now_millis: current time in milliseconds
        :param reporter: progress reporter
        :param encryption_settings_provider: encryption setting provider
        """
        source_type = source_folder.folder_type()
        dest_type = dest_folder.folder_type()

        if source_type != 'b2' and dest_type != 'b2':
            raise ValueError('Sync between two local folders is not supported!')

        # For downloads, make sure that the target directory is there.
        if dest_type == 'local' and not self.dry_run:
            cast(LocalFolder, dest_folder).ensure_present()

        if source_type == 'local' and not self.allow_empty_source:
            cast(LocalFolder, source_folder).ensure_non_empty()

        # Make an executor to count files and run all of the actions. This is
        # not the same as the executor in the API object which is used for
        # uploads. The tasks in this executor wait for uploads. Putting them
        # in the same thread pool could lead to deadlock.
        #
        # We use an executor with a bounded queue to avoid using up lots of memory
        # when syncing lots of files.
        unbounded_executor = futures.ThreadPoolExecutor(max_workers=self.max_workers)
        queue_limit = self.max_workers + 1000
        sync_executor = BoundedQueueExecutor(unbounded_executor, queue_limit=queue_limit)

        if source_type == 'local' and reporter is not None:
            # Start the thread that counts the local files. That's the operation
            # that should be fastest, and it provides scale for the progress reporting.
            sync_executor.submit(count_files, source_folder, reporter, self.policies_manager)

        # Bucket for scheduling actions.
        # For bucket-to-bucket sync, the bucket for the API calls should be the destination.
        action_bucket = None
        if dest_type == 'b2':
            action_bucket = cast(B2Folder, dest_folder).bucket
        elif source_type == 'b2':
            action_bucket = cast(B2Folder, source_folder).bucket

        # Schedule each of the actions.
        for action in self._make_folder_sync_actions(
            source_folder,
            dest_folder,
            now_millis,
            reporter,
            self.policies_manager,
            encryption_settings_provider,
        ):
            logging.debug('scheduling action %s on bucket %s', action, action_bucket)
            sync_executor.submit(action.run, action_bucket, reporter, self.dry_run)

        # Wait for everything to finish
        sync_executor.shutdown()
        if sync_executor.get_num_exceptions() != 0:
            raise IncompleteSync('sync is incomplete')

    def _make_folder_sync_actions(
        self,
        source_folder: AbstractFolder,
        dest_folder: AbstractFolder,
        now_millis: int,
        reporter: SyncReport,
        policies_manager: ScanPoliciesManager = DEFAULT_SCAN_MANAGER,
        encryption_settings_provider: AbstractSyncEncryptionSettingsProvider = SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
    ):
        """
        Yield a sequence of actions that will sync the destination
        folder to the source folder.

        :param source_folder: source folder object
        :param dest_folder: destination folder object
        :param now_millis: current time in milliseconds
        :param reporter: reporter object
        :param policies_manager: object which decides which files to process
        :param encryption_settings_provider: encryption setting provider
        """
        if (
            self.keep_days_or_delete == KeepOrDeleteMode.KEEP_BEFORE_DELETE
            and dest_folder.folder_type() == 'local'
        ):
            raise InvalidArgument('keep_days_or_delete', 'cannot be used for local files')

        source_type = source_folder.folder_type()
        dest_type = dest_folder.folder_type()
        sync_type = f'{source_type}-to-{dest_type}'
        if source_type != 'b2' and dest_type != 'b2':
            raise ValueError('Sync between two local folders is not supported!')

        total_files = 0
        total_bytes = 0
        for source_path, dest_path in zip_folders(
            source_folder,
            dest_folder,
            reporter,
            policies_manager,
        ):
            if source_path is None:
                logger.debug('determined that %s is not present on source', dest_path)
            elif dest_path is None:
                logger.debug('determined that %s is not present on destination', source_path)

            if source_path is not None:
                if source_type == 'b2':
                    # For buckets we don't want to count files separately as it would require
                    # more API calls. Instead, we count them when comparing.
                    reporter.update_total(1)
                reporter.update_compare(1)

            for action in self._make_file_sync_actions(
                sync_type,
                source_path,
                dest_path,
                source_folder,
                dest_folder,
                now_millis,
                encryption_settings_provider,
            ):
                total_files += 1
                total_bytes += action.get_bytes()
                yield action

        if reporter is not None:
            if source_type == 'b2':
                # For buckets we don't want to count files separately as it would require
                # more API calls. Instead, we count them when comparing.
                reporter.end_total()
            reporter.end_compare(total_files, total_bytes)

    def _make_file_sync_actions(
        self,
        sync_type: str,
        source_path: AbstractPath | None,
        dest_path: AbstractPath | None,
        source_folder: AbstractFolder,
        dest_folder: AbstractFolder,
        now_millis: int,
        encryption_settings_provider: AbstractSyncEncryptionSettingsProvider = SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
    ):
        """
        Yields the sequence of actions needed to sync the two files

        :param sync_type: synchronization type
        :param source_path: source file object
        :param dest_path: destination file object
        :param source_folder: a source folder object
        :param dest_folder: a destination folder object
        :param now_millis: current time in milliseconds
        :param encryption_settings_provider: encryption setting provider
        """
        delete = self.keep_days_or_delete == KeepOrDeleteMode.DELETE

        policy = self.sync_policy_manager.get_policy(
            sync_type,
            source_path,
            source_folder,
            dest_path,
            dest_folder,
            now_millis,
            delete,
            self.keep_days,
            self.newer_file_mode,
            self.compare_threshold,
            self.compare_version_mode,
            encryption_settings_provider=encryption_settings_provider,
            upload_mode=self.upload_mode,
            absolute_minimum_part_size=self.absolute_minimum_part_size,
        )
        return policy.get_all_actions()