File: _async_random_stream.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (105 lines) | stat: -rw-r--r-- 3,142 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from typing import AsyncIterator
from io import BytesIO

from ._random_stream import get_random_bytes, _DEFAULT_LENGTH


class AsyncRandomStream(BytesIO):
    def __init__(self, length, initial_buffer_length=_DEFAULT_LENGTH):
        super().__init__()
        self._base_data = get_random_bytes(initial_buffer_length)
        self._data_length = length
        self._base_buffer_length = initial_buffer_length
        self._position = 0
        self._remaining = length
        self._closed = False

    def __len__(self):
        return self._remaining

    def reset(self):
        self._position = 0
        self._remaining = self._data_length
        self._closed = False

    def read(self, size=None):
        if self._remaining == 0:
            return b""

        if size is None:
            e = self._base_buffer_length
        else:
            e = size
        e = min(e, self._remaining)
        if e > self._base_buffer_length:
            self._base_data = get_random_bytes(e)
            self._base_buffer_length = e
        self._remaining = self._remaining - e
        self._position += e
        return self._base_data[:e]

    def seek(self, index, whence=0):
        if whence == 0:
            self._position = index
        elif whence == 1:
            self._position = self._position + index
        elif whence == 2:
            self._position = self._data_length - 1 + index

    def tell(self):
        return self._position

    def remaining(self):
        return self._remaining

    def close(self):
        self._closed = True


class AsyncIteratorRandomStream(AsyncIterator[bytes]):
    """
    Async random stream of bytes for methods that accept AsyncIterator as input.
    """

    def __init__(self, length, initial_buffer_length=_DEFAULT_LENGTH):
        self._base_data = get_random_bytes(initial_buffer_length)
        self._data_length = length
        self._base_buffer_length = initial_buffer_length
        self._position = 0
        self._remaining = length

    def __len__(self):
        return self._remaining

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self._remaining == 0:
            raise StopAsyncIteration
        return self.read()

    def reset(self):
        self._position = 0
        self._remaining = self._data_length

    def read(self, size=None):
        if self._remaining == 0:
            return b""

        if size is None:
            e = self._base_buffer_length
        else:
            e = size
        e = min(e, self._remaining)
        if e > self._base_buffer_length:
            self._base_data = get_random_bytes(e)
            self._base_buffer_length = e
        self._remaining = self._remaining - e
        self._position += e
        return self._base_data[:e]