1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
######################################################################
#
# File: b2sdk/_internal/transfer/emerge/planner/part_definition.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from functools import partial
from typing import TYPE_CHECKING
from b2sdk._internal.stream.chained import ChainedStream
from b2sdk._internal.stream.range import wrap_with_range
from b2sdk._internal.utils import hex_sha1_of_unlimited_stream
if TYPE_CHECKING:
from b2sdk._internal.transfer.emerge.unbound_write_intent import UnboundSourceBytes
class BaseEmergePartDefinition(metaclass=ABCMeta):
@abstractmethod
def get_length(self):
pass
@abstractmethod
def get_part_id(self):
pass
@abstractmethod
def get_execution_step(self, execution_step_factory):
pass
def is_hashable(self):
return False
def get_sha1(self):
return None
class UploadEmergePartDefinition(BaseEmergePartDefinition):
def __init__(self, upload_source: UnboundSourceBytes, relative_offset, length):
self.upload_source = upload_source
self.relative_offset = relative_offset
self.length = length
self._sha1 = None
def __repr__(self):
return (
f'<{self.__class__.__name__} upload_source={repr(self.upload_source)} relative_offset={self.relative_offset} '
f'length={self.length}>'
)
def get_length(self):
return self.length
def get_part_id(self):
return self.get_sha1()
def is_hashable(self):
return True
def get_sha1(self):
if self._sha1 is None:
if self.relative_offset == 0 and self.length == self.upload_source.get_content_length():
# this is part is equal to whole upload source - so we use `get_content_sha1()`
# and if sha1 is already given, we skip computing it again
self._sha1 = self.upload_source.get_content_sha1()
else:
with self._get_stream() as stream:
self._sha1, _ = hex_sha1_of_unlimited_stream(stream)
return self._sha1
def get_execution_step(self, execution_step_factory):
return execution_step_factory.create_upload_execution_step(
self._get_stream,
stream_length=self.length,
stream_sha1=self.get_sha1(),
)
def _get_stream(self):
fp = self.upload_source.open()
return wrap_with_range(
fp, self.upload_source.get_content_length(), self.relative_offset, self.length
)
class UploadSubpartsEmergePartDefinition(BaseEmergePartDefinition):
def __init__(self, upload_subparts):
self.upload_subparts = upload_subparts
self._is_hashable = all(subpart.is_hashable() for subpart in upload_subparts)
self._sha1 = None
def __repr__(self):
return f'<{self.__class__.__name__} upload_subparts={repr(self.upload_subparts)}>'
def get_length(self):
return sum(subpart.length for subpart in self.upload_subparts)
def get_part_id(self):
if self.is_hashable():
return self.get_sha1()
else:
return tuple(subpart.get_subpart_id() for subpart in self.upload_subparts)
def is_hashable(self):
return self._is_hashable
def get_sha1(self):
if self._sha1 is None and self.is_hashable():
with self._get_stream() as stream:
self._sha1, _ = hex_sha1_of_unlimited_stream(stream)
return self._sha1
def get_execution_step(self, execution_step_factory):
return execution_step_factory.create_upload_execution_step(
partial(self._get_stream, emerge_execution=execution_step_factory.emerge_execution),
stream_length=self.get_length(),
stream_sha1=self.get_sha1(),
)
def _get_stream(self, emerge_execution=None):
return ChainedStream(
[
subpart.get_stream_opener(emerge_execution=emerge_execution)
for subpart in self.upload_subparts
]
)
class CopyEmergePartDefinition(BaseEmergePartDefinition):
def __init__(self, copy_source, relative_offset, length):
self.copy_source = copy_source
self.relative_offset = relative_offset
self.length = length
def __repr__(self):
return (
f'<{self.__class__.__name__} copy_source={repr(self.copy_source)} relative_offset={self.relative_offset} '
f'length={self.length}>'
)
def get_length(self):
return self.length
def get_part_id(self):
return (self.copy_source.file_id, self.relative_offset, self.length)
def get_execution_step(self, execution_step_factory):
return execution_step_factory.create_copy_execution_step(
self.copy_source.get_copy_source_range(self.relative_offset, self.length)
)
|