1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
|
######################################################################
#
# File: b2sdk/_internal/sync/sync.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations
import concurrent.futures as futures
import logging
from enum import Enum, unique
from typing import cast
from ..bounded_queue_executor import BoundedQueueExecutor
from ..scan.exception import InvalidArgument
from ..scan.folder import AbstractFolder, B2Folder, LocalFolder
from ..scan.path import AbstractPath
from ..scan.policies import DEFAULT_SCAN_MANAGER, ScanPoliciesManager
from ..scan.scan import zip_folders
from ..transfer.outbound.upload_source import UploadMode
from .encryption_provider import (
SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
AbstractSyncEncryptionSettingsProvider,
)
from .exception import IncompleteSync
from .policy import CompareVersionMode, NewerFileSyncMode
from .policy_manager import POLICY_MANAGER, SyncPolicyManager
from .report import SyncReport
logger = logging.getLogger(__name__)
def count_files(local_folder, reporter, policies_manager):
"""
Count all of the files in a local folder.
:param b2sdk._internal.scan.folder.AbstractFolder local_folder: a folder object.
:param reporter: reporter object
"""
# Don't pass in a reporter to all_files. Broken symlinks will be reported
# during the next pass when the source and dest files are compared.
for _ in local_folder.all_files(None, policies_manager=policies_manager):
reporter.update_total(1)
reporter.end_total()
@unique
class KeepOrDeleteMode(Enum):
"""Mode of dealing with old versions of files on the destination"""
DELETE = 301 #: delete the old version as soon as the new one has been uploaded
KEEP_BEFORE_DELETE = 302 #: keep the old versions of the file for a configurable number of days before deleting them, always keeping the newest version
NO_DELETE = 303 #: keep old versions of the file, do not delete anything
class Synchronizer:
"""
Copies multiple "files" from source to destination. Optionally
deletes or hides destination files that the source does not have.
The synchronizer can copy files:
- From a B2 bucket to a local destination.
- From a local source to a B2 bucket.
- From one B2 bucket to another.
- Between different folders in the same B2 bucket.
It will sync only the latest versions of files.
By default, the synchronizer:
- Fails when the specified source directory doesn't exist or is empty.
(see ``allow_empty_source`` argument)
- Fails when the source is newer.
(see ``newer_file_mode`` argument)
- Doesn't delete a file if it's present on the destination but not on the source.
(see ``keep_days_or_delete`` and ``keep_days`` arguments)
- Compares files based on modification time.
(see ``compare_version_mode`` and ``compare_threshold`` arguments)
"""
def __init__(
self,
max_workers,
policies_manager=DEFAULT_SCAN_MANAGER,
dry_run=False,
allow_empty_source=False,
newer_file_mode=NewerFileSyncMode.RAISE_ERROR,
keep_days_or_delete=KeepOrDeleteMode.NO_DELETE,
compare_version_mode=CompareVersionMode.MODTIME,
compare_threshold=None,
keep_days=None,
sync_policy_manager: SyncPolicyManager = POLICY_MANAGER,
upload_mode: UploadMode = UploadMode.FULL,
absolute_minimum_part_size: int | None = None,
):
"""
Initialize synchronizer class and validate arguments
:param int max_workers: max number of workers
:param policies_manager: object which decides which files to process
:param bool dry_run: test mode, does not actually transfer/delete when enabled
:param bool allow_empty_source: if True, do not check whether source folder is empty
:param b2sdk.v2.NewerFileSyncMode newer_file_mode: setting which determines handling for destination files newer than on the source
:param b2sdk.v2.KeepOrDeleteMode keep_days_or_delete: setting which determines if we should delete or not delete or keep for `keep_days`
:param b2sdk.v2.CompareVersionMode compare_version_mode: how to compare the source and destination files to find new ones
:param int compare_threshold: should be greater than 0, default is 0
:param int keep_days: if keep_days_or_delete is `b2sdk.v2.KeepOrDeleteMode.KEEP_BEFORE_DELETE`, then this should be greater than 0
:param SyncPolicyManager sync_policy_manager: object which decides what to do with each file (upload, download, delete, copy, hide etc)
:param b2sdk.v2.UploadMode upload_mode: determines how file uploads are handled
:param int absolute_minimum_part_size: minimum file part size for large files
"""
self.newer_file_mode = newer_file_mode
self.keep_days_or_delete = keep_days_or_delete
self.keep_days = keep_days
self.compare_version_mode = compare_version_mode
self.compare_threshold = compare_threshold or 0
self.dry_run = dry_run
self.allow_empty_source = allow_empty_source
self.policies_manager = (
policies_manager # actually it should be called scan_policies_manager
)
self.sync_policy_manager = sync_policy_manager
self.max_workers = max_workers
self.upload_mode = upload_mode
self.absolute_minimum_part_size = absolute_minimum_part_size
self._validate()
def _validate(self):
if self.compare_threshold < 0:
raise InvalidArgument('compare_threshold', 'must be a positive integer')
if self.newer_file_mode not in tuple(NewerFileSyncMode):
raise InvalidArgument(
'newer_file_mode',
'must be one of :%s' % NewerFileSyncMode.__members__,
)
if self.keep_days_or_delete not in tuple(KeepOrDeleteMode):
raise InvalidArgument(
'keep_days_or_delete',
'must be one of :%s' % KeepOrDeleteMode.__members__,
)
if (
self.keep_days_or_delete == KeepOrDeleteMode.KEEP_BEFORE_DELETE
and self.keep_days is None
):
raise InvalidArgument(
'keep_days',
'is required when keep_days_or_delete is %s' % KeepOrDeleteMode.KEEP_BEFORE_DELETE,
)
if self.compare_version_mode not in tuple(CompareVersionMode):
raise InvalidArgument(
'compare_version_mode',
'must be one of :%s' % CompareVersionMode.__members__,
)
def sync_folders(
self,
source_folder: AbstractFolder,
dest_folder: AbstractFolder,
now_millis: int,
reporter: SyncReport | None,
encryption_settings_provider: AbstractSyncEncryptionSettingsProvider = SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
):
"""
Syncs two folders. Always ensures that every file in the
source is also in the destination. Deletes any file versions
in the destination older than history_days.
:param source_folder: source folder object
:param dest_folder: destination folder object
:param now_millis: current time in milliseconds
:param reporter: progress reporter
:param encryption_settings_provider: encryption setting provider
"""
source_type = source_folder.folder_type()
dest_type = dest_folder.folder_type()
if source_type != 'b2' and dest_type != 'b2':
raise ValueError('Sync between two local folders is not supported!')
# For downloads, make sure that the target directory is there.
if dest_type == 'local' and not self.dry_run:
cast(LocalFolder, dest_folder).ensure_present()
if source_type == 'local' and not self.allow_empty_source:
cast(LocalFolder, source_folder).ensure_non_empty()
# Make an executor to count files and run all of the actions. This is
# not the same as the executor in the API object which is used for
# uploads. The tasks in this executor wait for uploads. Putting them
# in the same thread pool could lead to deadlock.
#
# We use an executor with a bounded queue to avoid using up lots of memory
# when syncing lots of files.
unbounded_executor = futures.ThreadPoolExecutor(max_workers=self.max_workers)
queue_limit = self.max_workers + 1000
sync_executor = BoundedQueueExecutor(unbounded_executor, queue_limit=queue_limit)
if source_type == 'local' and reporter is not None:
# Start the thread that counts the local files. That's the operation
# that should be fastest, and it provides scale for the progress reporting.
sync_executor.submit(count_files, source_folder, reporter, self.policies_manager)
# Bucket for scheduling actions.
# For bucket-to-bucket sync, the bucket for the API calls should be the destination.
action_bucket = None
if dest_type == 'b2':
action_bucket = cast(B2Folder, dest_folder).bucket
elif source_type == 'b2':
action_bucket = cast(B2Folder, source_folder).bucket
# Schedule each of the actions.
for action in self._make_folder_sync_actions(
source_folder,
dest_folder,
now_millis,
reporter,
self.policies_manager,
encryption_settings_provider,
):
logging.debug('scheduling action %s on bucket %s', action, action_bucket)
sync_executor.submit(action.run, action_bucket, reporter, self.dry_run)
# Wait for everything to finish
sync_executor.shutdown()
if sync_executor.get_num_exceptions() != 0:
raise IncompleteSync('sync is incomplete')
def _make_folder_sync_actions(
self,
source_folder: AbstractFolder,
dest_folder: AbstractFolder,
now_millis: int,
reporter: SyncReport,
policies_manager: ScanPoliciesManager = DEFAULT_SCAN_MANAGER,
encryption_settings_provider: AbstractSyncEncryptionSettingsProvider = SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
):
"""
Yield a sequence of actions that will sync the destination
folder to the source folder.
:param source_folder: source folder object
:param dest_folder: destination folder object
:param now_millis: current time in milliseconds
:param reporter: reporter object
:param policies_manager: object which decides which files to process
:param encryption_settings_provider: encryption setting provider
"""
if (
self.keep_days_or_delete == KeepOrDeleteMode.KEEP_BEFORE_DELETE
and dest_folder.folder_type() == 'local'
):
raise InvalidArgument('keep_days_or_delete', 'cannot be used for local files')
source_type = source_folder.folder_type()
dest_type = dest_folder.folder_type()
sync_type = f'{source_type}-to-{dest_type}'
if source_type != 'b2' and dest_type != 'b2':
raise ValueError('Sync between two local folders is not supported!')
total_files = 0
total_bytes = 0
for source_path, dest_path in zip_folders(
source_folder,
dest_folder,
reporter,
policies_manager,
):
if source_path is None:
logger.debug('determined that %s is not present on source', dest_path)
elif dest_path is None:
logger.debug('determined that %s is not present on destination', source_path)
if source_path is not None:
if source_type == 'b2':
# For buckets we don't want to count files separately as it would require
# more API calls. Instead, we count them when comparing.
reporter.update_total(1)
reporter.update_compare(1)
for action in self._make_file_sync_actions(
sync_type,
source_path,
dest_path,
source_folder,
dest_folder,
now_millis,
encryption_settings_provider,
):
total_files += 1
total_bytes += action.get_bytes()
yield action
if reporter is not None:
if source_type == 'b2':
# For buckets we don't want to count files separately as it would require
# more API calls. Instead, we count them when comparing.
reporter.end_total()
reporter.end_compare(total_files, total_bytes)
def _make_file_sync_actions(
self,
sync_type: str,
source_path: AbstractPath | None,
dest_path: AbstractPath | None,
source_folder: AbstractFolder,
dest_folder: AbstractFolder,
now_millis: int,
encryption_settings_provider: AbstractSyncEncryptionSettingsProvider = SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
):
"""
Yields the sequence of actions needed to sync the two files
:param sync_type: synchronization type
:param source_path: source file object
:param dest_path: destination file object
:param source_folder: a source folder object
:param dest_folder: a destination folder object
:param now_millis: current time in milliseconds
:param encryption_settings_provider: encryption setting provider
"""
delete = self.keep_days_or_delete == KeepOrDeleteMode.DELETE
policy = self.sync_policy_manager.get_policy(
sync_type,
source_path,
source_folder,
dest_path,
dest_folder,
now_millis,
delete,
self.keep_days,
self.newer_file_mode,
self.compare_threshold,
self.compare_version_mode,
encryption_settings_provider=encryption_settings_provider,
upload_mode=self.upload_mode,
absolute_minimum_part_size=self.absolute_minimum_part_size,
)
return policy.get_all_actions()
|