1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
######################################################################
#
# File: b2sdk/v1/sync/sync.py
#
# Copyright 2021 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations
from b2sdk import v2
from b2sdk.v2 import exception as v2_exception
from .file_to_path_translator import make_files_from_paths, make_paths_from_files
from .scan_policies import DEFAULT_SCAN_MANAGER, wrap_if_necessary as scan_wrap_if_necessary
from .encryption_provider import wrap_if_necessary as encryption_wrap_if_necessary
from ..exception import DestFileNewer
# Override to change "policies_manager" default argument
def zip_folders(folder_a, folder_b, reporter, policies_manager=DEFAULT_SCAN_MANAGER):
return v2.zip_folders(
folder_a, folder_b, reporter, policies_manager=scan_wrap_if_necessary(policies_manager)
)
# Override to change "policies_manager" default arguments
# and to wrap encryption_settings_providers in argument name translators
class Synchronizer(v2.Synchronizer):
def __init__(
self,
max_workers,
policies_manager=DEFAULT_SCAN_MANAGER,
dry_run=False,
allow_empty_source=False,
newer_file_mode=v2.NewerFileSyncMode.RAISE_ERROR,
keep_days_or_delete=v2.KeepOrDeleteMode.NO_DELETE,
compare_version_mode=v2.CompareVersionMode.MODTIME,
compare_threshold=None,
keep_days=None,
sync_policy_manager: v2.SyncPolicyManager = v2.POLICY_MANAGER,
):
super().__init__(
max_workers,
scan_wrap_if_necessary(policies_manager),
dry_run,
allow_empty_source,
newer_file_mode,
keep_days_or_delete,
compare_version_mode,
compare_threshold,
keep_days,
sync_policy_manager,
)
def make_folder_sync_actions(
self,
source_folder,
dest_folder,
now_millis,
reporter,
policies_manager=DEFAULT_SCAN_MANAGER,
encryption_settings_provider=v2.SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
):
return super()._make_folder_sync_actions(
source_folder,
dest_folder,
now_millis,
reporter,
scan_wrap_if_necessary(policies_manager),
encryption_wrap_if_necessary(encryption_settings_provider),
)
# override to retain a public method
def make_file_sync_actions(
self,
sync_type,
source_file,
dest_file,
source_folder,
dest_folder,
now_millis,
encryption_settings_provider: v2.AbstractSyncEncryptionSettingsProvider = v2.SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
):
"""
Yields the sequence of actions needed to sync the two files
:param str sync_type: synchronization type
:param b2sdk.v1.File source_file: source file object
:param b2sdk.v1.File dest_file: destination file object
:param b2sdk.v1.AbstractFolder source_folder: a source folder object
:param b2sdk.v1.AbstractFolder dest_folder: a destination folder object
:param int now_millis: current time in milliseconds
:param b2sdk.v1.AbstractSyncEncryptionSettingsProvider encryption_settings_provider: encryption setting provider
"""
dest_path, source_path = make_paths_from_files(dest_file, source_file, sync_type)
return self._make_file_sync_actions(
sync_type,
source_path,
dest_path,
source_folder,
dest_folder,
now_millis,
encryption_wrap_if_necessary(encryption_settings_provider),
)
# override to raise old style DestFileNewer exceptions
def _make_file_sync_actions(
self,
sync_type,
source_path,
dest_path,
source_folder,
dest_folder,
now_millis,
encryption_settings_provider: v2.AbstractSyncEncryptionSettingsProvider = v2.SERVER_DEFAULT_SYNC_ENCRYPTION_SETTINGS_PROVIDER,
):
"""
Yields the sequence of actions needed to sync the two files
:param str sync_type: synchronization type
:param b2sdk.v1.AbstractSyncPath source_path: source file object
:param b2sdk.v1.AbstractSyncPath dest_path: destination file object
:param b2sdk.v1.AbstractFolder source_folder: a source folder object
:param b2sdk.v1.AbstractFolder dest_folder: a destination folder object
:param int now_millis: current time in milliseconds
:param b2sdk.v1.AbstractSyncEncryptionSettingsProvider encryption_settings_provider: encryption setting provider
"""
try:
yield from super()._make_file_sync_actions(
sync_type,
source_path,
dest_path,
source_folder,
dest_folder,
now_millis,
encryption_wrap_if_necessary(encryption_settings_provider),
)
except v2_exception.DestFileNewer as ex:
dest_file, source_file = make_files_from_paths(ex.dest_path, ex.source_path, sync_type)
raise DestFileNewer(dest_file, source_file, ex.dest_prefix, ex.source_prefix)
|