File: part_definition.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (155 lines) | stat: -rw-r--r-- 5,136 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
######################################################################
#
# File: b2sdk/_internal/transfer/emerge/planner/part_definition.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

from abc import ABCMeta, abstractmethod
from functools import partial
from typing import TYPE_CHECKING

from b2sdk._internal.stream.chained import ChainedStream
from b2sdk._internal.stream.range import wrap_with_range
from b2sdk._internal.utils import hex_sha1_of_unlimited_stream

if TYPE_CHECKING:
    from b2sdk._internal.transfer.emerge.unbound_write_intent import UnboundSourceBytes


class BaseEmergePartDefinition(metaclass=ABCMeta):
    @abstractmethod
    def get_length(self):
        pass

    @abstractmethod
    def get_part_id(self):
        pass

    @abstractmethod
    def get_execution_step(self, execution_step_factory):
        pass

    def is_hashable(self):
        return False

    def get_sha1(self):
        return None


class UploadEmergePartDefinition(BaseEmergePartDefinition):
    def __init__(self, upload_source: UnboundSourceBytes, relative_offset, length):
        self.upload_source = upload_source
        self.relative_offset = relative_offset
        self.length = length
        self._sha1 = None

    def __repr__(self):
        return (
            f'<{self.__class__.__name__} upload_source={repr(self.upload_source)} relative_offset={self.relative_offset} '
            f'length={self.length}>'
        )

    def get_length(self):
        return self.length

    def get_part_id(self):
        return self.get_sha1()

    def is_hashable(self):
        return True

    def get_sha1(self):
        if self._sha1 is None:
            if self.relative_offset == 0 and self.length == self.upload_source.get_content_length():
                # this is part is equal to whole upload source - so we use `get_content_sha1()`
                # and if sha1 is already given, we skip computing it again
                self._sha1 = self.upload_source.get_content_sha1()
            else:
                with self._get_stream() as stream:
                    self._sha1, _ = hex_sha1_of_unlimited_stream(stream)
        return self._sha1

    def get_execution_step(self, execution_step_factory):
        return execution_step_factory.create_upload_execution_step(
            self._get_stream,
            stream_length=self.length,
            stream_sha1=self.get_sha1(),
        )

    def _get_stream(self):
        fp = self.upload_source.open()
        return wrap_with_range(
            fp, self.upload_source.get_content_length(), self.relative_offset, self.length
        )


class UploadSubpartsEmergePartDefinition(BaseEmergePartDefinition):
    def __init__(self, upload_subparts):
        self.upload_subparts = upload_subparts
        self._is_hashable = all(subpart.is_hashable() for subpart in upload_subparts)
        self._sha1 = None

    def __repr__(self):
        return f'<{self.__class__.__name__} upload_subparts={repr(self.upload_subparts)}>'

    def get_length(self):
        return sum(subpart.length for subpart in self.upload_subparts)

    def get_part_id(self):
        if self.is_hashable():
            return self.get_sha1()
        else:
            return tuple(subpart.get_subpart_id() for subpart in self.upload_subparts)

    def is_hashable(self):
        return self._is_hashable

    def get_sha1(self):
        if self._sha1 is None and self.is_hashable():
            with self._get_stream() as stream:
                self._sha1, _ = hex_sha1_of_unlimited_stream(stream)
        return self._sha1

    def get_execution_step(self, execution_step_factory):
        return execution_step_factory.create_upload_execution_step(
            partial(self._get_stream, emerge_execution=execution_step_factory.emerge_execution),
            stream_length=self.get_length(),
            stream_sha1=self.get_sha1(),
        )

    def _get_stream(self, emerge_execution=None):
        return ChainedStream(
            [
                subpart.get_stream_opener(emerge_execution=emerge_execution)
                for subpart in self.upload_subparts
            ]
        )


class CopyEmergePartDefinition(BaseEmergePartDefinition):
    def __init__(self, copy_source, relative_offset, length):
        self.copy_source = copy_source
        self.relative_offset = relative_offset
        self.length = length

    def __repr__(self):
        return (
            f'<{self.__class__.__name__} copy_source={repr(self.copy_source)} relative_offset={self.relative_offset} '
            f'length={self.length}>'
        )

    def get_length(self):
        return self.length

    def get_part_id(self):
        return (self.copy_source.file_id, self.relative_offset, self.length)

    def get_execution_step(self, execution_step_factory):
        return execution_step_factory.create_copy_execution_step(
            self.copy_source.get_copy_source_range(self.relative_offset, self.length)
        )