File: chained.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (156 lines) | stat: -rw-r--r-- 5,266 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
######################################################################
#
# File: b2sdk/_internal/stream/chained.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import io
from abc import ABCMeta, abstractmethod

from b2sdk._internal.stream.base import ReadOnlyStreamMixin


class ChainedStream(ReadOnlyStreamMixin, io.IOBase):
    """Chains multiple streams in single stream, sort of what :py:class:`itertools.chain` does for iterators.

    Cleans up buffers of underlying streams when closed.

    Can be seeked to beginning (when retrying upload, for example).
    Closes underlying streams as soon as they reaches EOF, but clears their buffers
    when the chained stream is closed for underlying streams that follow
    :py:class:`b2sdk.v2.StreamOpener` cleanup interface, for example
    :py:class:`b2sdk.v2.CachedBytesStreamOpener`
    """

    def __init__(self, stream_openers):
        """
        :param list stream_openeres: list of callables that return opened streams
        """
        stream_openers = list(stream_openers)
        if not stream_openers:
            raise ValueError('chain_links cannot be empty')
        self.stream_openers = stream_openers
        self._stream_openers_iterator = iter(self.stream_openers)
        self._current_stream = None
        self._pos = 0
        super().__init__()

    @property
    def stream(self):
        """Return currently processed stream."""
        if self._current_stream is None:
            self._next_stream()
        return self._current_stream

    def _reset_chain(self):
        if self._current_stream is not None:
            self._current_stream.close()
            self._current_stream = None
        self._stream_openers_iterator = iter(self.stream_openers)
        self._pos = 0

    def _next_stream(self):
        next_stream_opener = next(self._stream_openers_iterator, None)
        if next_stream_opener is not None:
            if self._current_stream is not None:
                self._current_stream.close()
            self._current_stream = next_stream_opener()

    def seekable(self):
        return True

    def tell(self):
        return self._pos

    def seek(self, pos, whence=0):
        """
        Resets stream to the beginning.

        :param int pos: only allowed value is ``0``
        :param int whence: only allowed value is ``0``
        """
        if pos != 0 or whence != 0:
            raise io.UnsupportedOperation('Chained stream can only be seeked to beginning')

        self._reset_chain()

        return self.tell()

    def readable(self):
        return True

    def read(self, size=None):
        """
        Read at most `size` bytes from underlying streams, or all available data, if `size` is None or negative.
        Open the streams only when their data is needed, and possibly leave them open and part-way read for further
        reading - by subsequent calls to this method.

        :param int,None size: number of bytes to read. If omitted, ``None``,
                    or negative data is read and returned until EOF from final stream is reached

        :return: data read from the stream
        """
        byte_arrays = []

        if size < 0 or size is None:
            while 1:
                current_stream = self.stream
                buff = current_stream.read()
                byte_arrays.append(buff)
                if not buff:
                    self._next_stream()
                    if self.stream is current_stream:
                        break
        else:
            remaining = size
            while 1:
                current_stream = self.stream
                buff = current_stream.read(remaining)
                byte_arrays.append(buff)
                remaining -= len(buff)
                if remaining == 0:
                    # no need to open any other streams - we're satisfied
                    break
                if not buff:
                    self._next_stream()
                    if self.stream is current_stream:
                        break

        if not byte_arrays:
            data = byte_arrays[0]
        else:
            data = b''.join(byte_arrays)
        self._pos += len(data)
        return data

    def close(self):
        if self._current_stream is not None:
            self._current_stream.close()
        for stream_opener in self.stream_openers:
            if hasattr(stream_opener, 'cleanup'):
                stream_opener.cleanup()
        super().close()


class StreamOpener(metaclass=ABCMeta):
    """Abstract class to define stream opener with cleanup."""

    @abstractmethod
    def __call__(self):
        """Create or open the stream to read and return.

        Can be called multiple times, but streamed data may be cached and reused.
        """

    def cleanup(self):
        """Clean up stream opener after chained stream closes.

        Can be used for cleaning cached data that are stored in memory
        to allow resetting chained stream without getting this data more than once,
        eg. data downloaded from external source.
        """