File: _download_chunking.py

package info (click to toggle)
python-azure-storage 20181109%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 76,472 kB
  • sloc: python: 28,724; makefile: 204; sh: 1
file content (178 lines) | stat: -rw-r--r-- 7,264 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import threading


def _download_blob_chunks(blob_service, container_name, blob_name, snapshot,
                          download_size, block_size, progress, start_range, end_range,
                          stream, max_connections, progress_callback, validate_content,
                          lease_id, if_modified_since, if_unmodified_since, if_match,
                          if_none_match, timeout, operation_context):

    downloader_class = _ParallelBlobChunkDownloader if max_connections > 1 else _SequentialBlobChunkDownloader

    downloader = downloader_class(
        blob_service,
        container_name,
        blob_name,
        snapshot,
        download_size,
        block_size,
        progress,
        start_range,
        end_range,
        stream,
        progress_callback,
        validate_content,
        lease_id,
        if_modified_since,
        if_unmodified_since,
        if_match,
        if_none_match,
        timeout,
        operation_context,
    )

    if max_connections > 1:
        import concurrent.futures
        executor = concurrent.futures.ThreadPoolExecutor(max_connections)
        list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
    else:
        for chunk in downloader.get_chunk_offsets():
            downloader.process_chunk(chunk)


class _BlobChunkDownloader(object):
    def __init__(self, blob_service, container_name, blob_name, snapshot, download_size,
                 chunk_size, progress, start_range, end_range, stream,
                 progress_callback, validate_content, lease_id, if_modified_since,
                 if_unmodified_since, if_match, if_none_match, timeout, operation_context):
        # identifiers for the blob
        self.blob_service = blob_service
        self.container_name = container_name
        self.blob_name = blob_name
        self.snapshot = snapshot

        # information on the download range/chunk size
        self.chunk_size = chunk_size
        self.download_size = download_size
        self.start_index = start_range
        self.blob_end = end_range

        # the destination that we will write to
        self.stream = stream

        # progress related
        self.progress_callback = progress_callback
        self.progress_total = progress

        # parameters for each get blob operation
        self.timeout = timeout
        self.operation_context = operation_context
        self.validate_content = validate_content
        self.lease_id = lease_id
        self.if_modified_since = if_modified_since
        self.if_unmodified_since = if_unmodified_since
        self.if_match = if_match
        self.if_none_match = if_none_match

    def get_chunk_offsets(self):
        index = self.start_index
        while index < self.blob_end:
            yield index
            index += self.chunk_size

    def process_chunk(self, chunk_start):
        if chunk_start + self.chunk_size > self.blob_end:
            chunk_end = self.blob_end
        else:
            chunk_end = chunk_start + self.chunk_size

        chunk_data = self._download_chunk(chunk_start, chunk_end).content
        length = chunk_end - chunk_start
        if length > 0:
            self._write_to_stream(chunk_data, chunk_start)
            self._update_progress(length)

    # should be provided by the subclass
    def _update_progress(self, length):
        pass

    # should be provided by the subclass
    def _write_to_stream(self, chunk_data, chunk_start):
        pass

    def _download_chunk(self, chunk_start, chunk_end):
        response = self.blob_service._get_blob(
            self.container_name,
            self.blob_name,
            snapshot=self.snapshot,
            start_range=chunk_start,
            end_range=chunk_end - 1,
            validate_content=self.validate_content,
            lease_id=self.lease_id,
            if_modified_since=self.if_modified_since,
            if_unmodified_since=self.if_unmodified_since,
            if_match=self.if_match,
            if_none_match=self.if_none_match,
            timeout=self.timeout,
            _context=self.operation_context
        )

        # This makes sure that if_match is set so that we can validate 
        # that subsequent downloads are to an unmodified blob
        self.if_match = response.properties.etag
        return response


class _ParallelBlobChunkDownloader(_BlobChunkDownloader):
    def __init__(self, blob_service, container_name, blob_name, snapshot, download_size,
                 chunk_size, progress, start_range, end_range, stream,
                 progress_callback, validate_content, lease_id, if_modified_since,
                 if_unmodified_since, if_match, if_none_match, timeout, operation_context):

        super(_ParallelBlobChunkDownloader, self).__init__(blob_service, container_name, blob_name, snapshot,
                                                           download_size,
                                                           chunk_size, progress, start_range, end_range, stream,
                                                           progress_callback, validate_content, lease_id,
                                                           if_modified_since,
                                                           if_unmodified_since, if_match, if_none_match, timeout,
                                                           operation_context)

        # for a parallel download, the stream is always seekable, so we note down the current position
        # in order to seek to the right place when out-of-order chunks come in
        self.stream_start = stream.tell()

        # since parallel operations are going on
        # it is essential to protect the writing and progress reporting operations
        self.stream_lock = threading.Lock()
        self.progress_lock = threading.Lock()

    def _update_progress(self, length):
        if self.progress_callback is not None:
            with self.progress_lock:
                self.progress_total += length
                total_so_far = self.progress_total
            self.progress_callback(total_so_far, self.download_size)

    def _write_to_stream(self, chunk_data, chunk_start):
        with self.stream_lock:
            self.stream.seek(self.stream_start + (chunk_start - self.start_index))
            self.stream.write(chunk_data)


class _SequentialBlobChunkDownloader(_BlobChunkDownloader):
    def __init__(self, *args):
        super(_SequentialBlobChunkDownloader, self).__init__(*args)

    def _update_progress(self, length):
        if self.progress_callback is not None:
            self.progress_total += length
            self.progress_callback(self.progress_total, self.download_size)

    def _write_to_stream(self, chunk_data, chunk_start):
        # chunk_start is ignored in the case of sequential download since we cannot seek the destination stream
        self.stream.write(chunk_data)