File: unbound_write_intent.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (218 lines) | stat: -rw-r--r-- 8,454 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
######################################################################
#
# File: b2sdk/_internal/transfer/emerge/unbound_write_intent.py
#
# Copyright 2022 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import hashlib
import io
import queue
from typing import Callable, Iterator

from b2sdk._internal.transfer.emerge.exception import UnboundStreamBufferTimeout
from b2sdk._internal.transfer.emerge.write_intent import WriteIntent
from b2sdk._internal.transfer.outbound.upload_source import AbstractUploadSource


class IOWrapper(io.BytesIO):
    """
    Wrapper for BytesIO that knows when it has been read in full.

    Note that this stream should go through ``emerge_unbound``, as it's the only
    one that skips ``_get_emerge_parts`` and pushes buffers to the cloud
    exactly as they come. This way we can (somewhat) rely on check whether
    reading of this wrapper returned no more data.

    It is assumed that this object is owned by a single thread at a time.
    For that reason, no additional synchronisation is provided.
    """

    def __init__(
        self,
        data: bytes | bytearray,
        release_function: Callable[[], None],
    ):
        """
        Prepares a new ``io.BytesIO`` structure that will call
        a ``release_function`` when buffer is read in full.

        ``release_function`` can be called from another thread.
        It is called exactly once, when the read is concluded
        and the resource is about to be released

        :param data: data to be provided as a stream
        :param release_function: function to be called when resource will be released
        """
        super().__init__(data)
        self.release_function = release_function

    def close(self):
        if not self.closed:
            self.release_function()
        return super().close()


class UnboundSourceBytes(AbstractUploadSource):
    """
    Upload source that deals with a chunk of unbound data.

    It ensures that the data it provides doesn't have to be iterated
    over more than once. To do that, we have ensured that both length
    and sha1 is known. Also, it should be used only with ``emerge_unbound``,
    as it's the only plan that pushes buffers directly to the cloud.
    """

    def __init__(
        self,
        bytes_data: bytearray,
        release_function: Callable[[], None],
    ):
        """
        Prepares a new ```UploadSource`` that can be used with ``WriteIntent``.

        Calculates SHA1 and length of the data.

        :param bytes_data: data that should be uploaded, IOWrapper for this data is created.
        :param release_function: function to be called when all the ``bytes_data`` is uploaded.
        """
        self.length = len(bytes_data)
        # Prepare sha1 of the chunk upfront to ensure that nothing iterates over the stream but the upload.
        self.chunk_sha1 = hashlib.sha1(bytes_data).hexdigest()
        self.stream = IOWrapper(bytes_data, release_function)

    def get_content_sha1(self):
        return self.chunk_sha1

    def open(self):
        return self.stream

    def get_content_length(self):
        return self.length


class UnboundWriteIntentGenerator:
    """
    Generator that creates new write intents as data is streamed from an external source.

    It tries to ensure that at most ``queue_size`` buffers with size ``buffer_size_bytes``
    are allocated at any given moment.
    """

    def __init__(
        self,
        read_only_source,
        buffer_size_bytes: int,
        read_size: int,
        queue_size: int,
        queue_timeout_seconds: float,
    ):
        """
        Prepares a new intent generator for a given source.

        ``queue_size`` is handled on a best-effort basis. It's possible, in rare cases, that there will be more buffers
        available at once. With current implementation that would be the case when the whole buffer was read, but on
        the very last byte the server stopped responding and a retry is issued.

        :param read_only_source: Python object that has a ``read`` method.
        :param buffer_size_bytes: Size of a single buffer that we're to download from the source and push to the cloud.
        :param read_size: Size of a single read to be performed on ``read_only_source``.
        :param queue_size: Maximal amount of buffers that will be created.
        :param queue_timeout_seconds: Iterator will wait at most this many seconds for an empty slot
                                      for a buffer. After that time it's considered an error.
        """
        assert (
            queue_size >= 1
            and read_size > 0
            and buffer_size_bytes > 0
            and queue_timeout_seconds > 0.0
        )

        self.read_only_source = read_only_source
        self.read_size = read_size

        self.buffer_size_bytes = buffer_size_bytes
        self.buffer_limit_queue = queue.Queue(maxsize=queue_size)
        self.queue_timeout_seconds = queue_timeout_seconds

        self.buffer = bytearray()
        self.leftovers_buffer = bytearray()

    def iterator(self) -> Iterator[WriteIntent]:
        """
        Creates new ``WriteIntent`` objects as the data is pulled from the ``read_only_source``.
        """
        datastream_done = False
        offset = 0

        while not datastream_done:
            self._wait_for_free_buffer_slot()

            # In very small buffer sizes and large read sizes we could
            # land with multiple buffers read at once. This should happen
            # only in tests.
            self._trim_to_leftovers()

            while len(self.buffer) < self.buffer_size_bytes:
                data = self.read_only_source.read(self.read_size)
                if len(data) == 0:
                    datastream_done = True
                    break

                self.buffer += data
                self._trim_to_leftovers()

            # If we've just started a new buffer and got an empty read on it,
            # we have no data to send and the process is finished.
            if len(self.buffer) == 0:
                self._release_buffer()
                break

            source = UnboundSourceBytes(self.buffer, self._release_buffer)
            intent = WriteIntent(source, destination_offset=offset)
            yield intent

            offset += len(self.buffer)
            self._rotate_leftovers()

        # If we didn't stream anything, we should still provide
        # at least an empty WriteIntent, so that the file will be created.
        if offset == 0:
            source = UnboundSourceBytes(bytearray(), release_function=lambda: None)
            yield WriteIntent(source, destination_offset=offset)

    def _trim_to_leftovers(self) -> None:
        if len(self.buffer) <= self.buffer_size_bytes:
            return
        remainder = len(self.buffer) - self.buffer_size_bytes
        buffer_view = memoryview(self.buffer)
        self.leftovers_buffer += buffer_view[-remainder:]
        # This conversion has little to no implication on performance.
        self.buffer = bytearray(buffer_view[:-remainder])

    def _rotate_leftovers(self) -> None:
        self.buffer = self.leftovers_buffer
        self.leftovers_buffer = bytearray()

    def _wait_for_free_buffer_slot(self) -> None:
        # Inserted item is only a placeholder. If we fail to insert it in given time, it means
        # that system is unable to process data quickly enough. By default, this timeout is around
        # a really large value (counted in minutes, not seconds) to indicate weird behaviour.
        try:
            self.buffer_limit_queue.put(1, timeout=self.queue_timeout_seconds)
        except queue.Full:
            raise UnboundStreamBufferTimeout()

    def _release_buffer(self) -> None:
        # Pull one element from the queue of waiting elements.
        # Note that it doesn't matter which element we pull.
        # Each of them is just a placeholder. Since we know that we've put them there,
        # there is no need to actually wait. The queue should contain at least one element if we got here.
        try:
            self.buffer_limit_queue.get_nowait()
        except queue.Empty as error:  # pragma: nocover
            raise RuntimeError('Buffer pulled twice from the queue.') from error