File: test_upload_chunking.py

package info (click to toggle)
python-azure-storage 20181109%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 76,472 kB
  • sloc: python: 28,724; makefile: 204; sh: 1
file content (111 lines) | stat: -rw-r--r-- 4,789 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# coding: utf-8

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import os

from azure.storage.blob._upload_chunking import _SubStream
from threading import Lock
from io import (BytesIO, SEEK_SET)

from tests.testcase import (
    StorageTestCase,
)

# ------------------------------------------------------------------------------


class StorageBlobUploadChunkingTest(StorageTestCase):

    # this is a white box test that's designed to make sure _Substream behaves properly
    # when the buffer needs to be swapped out at least once
    def test_sub_stream_with_length_larger_than_buffer(self):
        data = os.urandom(12 * 1024 * 1024)

        # assuming the max size of the buffer is 4MB, this test needs to be updated if that has changed
        # the block size is 6MB for this test
        expected_data = data[0: 6 * 1024 * 1024]
        wrapped_stream = BytesIO(data)  # simulate stream given by user
        lockObj = Lock()  # simulate multi-threaded environment
        substream = _SubStream(wrapped_stream, stream_begin_index=0, length=6 * 1024 * 1024, lockObj=lockObj)

        try:
            # substream should start with position at 0
            self.assertEqual(substream.tell(), 0)

            # reading a chunk that is smaller than the buffer
            data_chunk_1 = substream.read(2 * 1024 * 1024)
            self.assertEqual(len(data_chunk_1), 2 * 1024 * 1024)

            # reading a chunk that is bigger than the data remaining in buffer, force a buffer swap
            data_chunk_2 = substream.read(4 * 1024 * 1024)
            self.assertEqual(len(data_chunk_2), 4 * 1024 * 1024)

            # assert data is consistent
            self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)
            self.assertEqual(6 * 1024 * 1024, substream.tell())

            # attempt to read more than what the sub stream contains should return nothing
            empty_data = substream.read(1 * 1024 * 1024)
            self.assertEqual(0, len(empty_data))
            self.assertEqual(6 * 1024 * 1024, substream.tell())

            # test seek outside of current buffer, which is at the moment the last 2MB of data
            substream.seek(0, SEEK_SET)
            data_chunk_1 = substream.read(4 * 1024 * 1024)
            data_chunk_2 = substream.read(2 * 1024 * 1024)

            # assert data is consistent
            self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)

            # test seek inside of buffer, which is at the moment the last 2MB of data
            substream.seek(4 * 1024 * 1024, SEEK_SET)
            data_chunk_2 = substream.read(2 * 1024 * 1024)

            # assert data is consistent
            self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)

        finally:
            wrapped_stream.close()
            substream.close()

    # this is a white box test that's designed to make sure _Substream behaves properly
    # when block size is smaller than 4MB, thus there's no need for buffer swap
    def test_sub_stream_with_length_equal_to_buffer(self):
        data = os.urandom(6 * 1024 * 1024)

        # assuming the max size of the buffer is 4MB, this test needs to be updated if that has changed
        # the block size is 2MB for this test
        expected_data = data[0: 2 * 1024 * 1024]
        wrapped_stream = BytesIO(expected_data)  # simulate stream given by user
        lockObj = Lock()  # simulate multi-threaded environment
        substream = _SubStream(wrapped_stream, stream_begin_index=0, length=2 * 1024 * 1024, lockObj=lockObj)

        try:
            # substream should start with position at 0
            self.assertEqual(substream.tell(), 0)

            # reading a chunk that is smaller than the buffer
            data_chunk_1 = substream.read(1 * 1024 * 1024)
            self.assertEqual(len(data_chunk_1), 1 * 1024 * 1024)

            # reading a chunk that is bigger than the buffer, should not read anything beyond
            data_chunk_2 = substream.read(4 * 1024 * 1024)
            self.assertEqual(len(data_chunk_2), 1 * 1024 * 1024)

            # assert data is consistent
            self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)

            # test seek
            substream.seek(1 * 1024 * 1024, SEEK_SET)
            data_chunk_2 = substream.read(1 * 1024 * 1024)

            # assert data is consistent
            self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)

        finally:
            wrapped_stream.close()
            substream.close()