1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
######################################################################
#
# File: b2sdk/v2/replication/setup.py
#
# Copyright 2025 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations
from b2sdk._internal.replication.setup import * # noqa
from b2sdk import v3 as v3
from ..bucket import Bucket
from ..application_key import ApplicationKey # type: ignore
class ReplicationSetupHelper(v3.ReplicationSetupHelper): # type: ignore
def setup_source(
self,
source_bucket: Bucket,
source_key: ApplicationKey,
destination_bucket: Bucket,
prefix: str | None = None,
name: str | None = None,
priority: int = None,
include_existing_files: bool = False,
) -> Bucket:
return super().setup_source(
source_bucket=source_bucket,
source_key=source_key,
destination_bucket=destination_bucket,
prefix=prefix,
name=name,
priority=priority,
include_existing_files=include_existing_files,
)
@classmethod
def _get_source_key(
cls,
source_bucket: Bucket,
prefix: str,
current_replication_configuration: ReplicationConfiguration,
) -> ApplicationKey:
return super()._get_source_key(
source_bucket=source_bucket,
prefix=prefix,
current_replication_configuration=current_replication_configuration,
)
@classmethod
def _should_make_new_source_key(
cls,
current_replication_configuration: ReplicationConfiguration,
current_source_key: ApplicationKey | None,
) -> bool:
return super()._should_make_new_source_key(
current_replication_configuration=current_replication_configuration,
current_source_key=current_source_key,
)
@classmethod
def _create_source_key(
cls,
name: str,
bucket: Bucket,
prefix: str | None = None,
) -> ApplicationKey:
return super()._create_source_key(
name=name,
bucket=bucket,
prefix=prefix,
)
@classmethod
def _create_destination_key(
cls,
name: str,
bucket: Bucket,
prefix: str | None = None,
) -> ApplicationKey:
return super()._create_destination_key(
name=name,
bucket=bucket,
prefix=prefix,
)
@classmethod
def _create_key(
cls,
name: str,
bucket: Bucket,
prefix: str | None = None,
capabilities=tuple(),
) -> ApplicationKey:
api: B2Api = bucket.api
return api.create_key(
capabilities=capabilities,
key_name=name,
bucket_id=bucket.id_,
name_prefix=prefix,
)
|