File: _download_chunking.py

package info (click to toggle)
python-azure-storage 20181109%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 76,472 kB
  • sloc: python: 28,724; makefile: 204; sh: 1
file content (159 lines) | stat: -rw-r--r-- 6,685 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import threading


def _download_file_chunks(file_service, share_name, directory_name, file_name,
                          download_size, block_size, progress, start_range, end_range,
                          stream, max_connections, progress_callback, validate_content,
                          timeout, operation_context, snapshot):

    downloader_class = _ParallelFileChunkDownloader if max_connections > 1 else _SequentialFileChunkDownloader

    downloader = downloader_class(
        file_service,
        share_name,
        directory_name,
        file_name,
        download_size,
        block_size,
        progress,
        start_range,
        end_range,
        stream,
        progress_callback,
        validate_content,
        timeout,
        operation_context,
        snapshot,
    )

    if max_connections > 1:
        import concurrent.futures
        executor = concurrent.futures.ThreadPoolExecutor(max_connections)
        list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
    else:
        for chunk in downloader.get_chunk_offsets():
            downloader.process_chunk(chunk)


class _FileChunkDownloader(object):
    def __init__(self, file_service, share_name, directory_name, file_name,
                 download_size, chunk_size, progress, start_range, end_range,
                 stream, progress_callback, validate_content, timeout, operation_context, snapshot):
        # identifiers for the file
        self.file_service = file_service
        self.share_name = share_name
        self.directory_name = directory_name
        self.file_name = file_name

        # information on the download range/chunk size
        self.chunk_size = chunk_size
        self.download_size = download_size
        self.start_index = start_range
        self.file_end = end_range

        # the destination that we will write to
        self.stream = stream

        # progress related
        self.progress_callback = progress_callback
        self.progress_total = progress

        # parameters for each get file operation
        self.validate_content = validate_content
        self.timeout = timeout
        self.operation_context = operation_context
        self.snapshot = snapshot

    def get_chunk_offsets(self):
        index = self.start_index
        while index < self.file_end:
            yield index
            index += self.chunk_size

    def process_chunk(self, chunk_start):
        if chunk_start + self.chunk_size > self.file_end:
            chunk_end = self.file_end
        else:
            chunk_end = chunk_start + self.chunk_size

        chunk_data = self._download_chunk(chunk_start, chunk_end).content
        length = chunk_end - chunk_start
        if length > 0:
            self._write_to_stream(chunk_data, chunk_start)
            self._update_progress(length)

    # should be provided by the subclass
    def _update_progress(self, length):
        pass

    # should be provided by the subclass
    def _write_to_stream(self, chunk_data, chunk_start):
        pass

    def _download_chunk(self, chunk_start, chunk_end):
        return self.file_service._get_file(
            self.share_name,
            self.directory_name,
            self.file_name,
            start_range=chunk_start,
            end_range=chunk_end - 1,
            validate_content=self.validate_content,
            timeout=self.timeout,
            _context=self.operation_context,
            snapshot=self.snapshot
        )


class _ParallelFileChunkDownloader(_FileChunkDownloader):
    def __init__(self, file_service, share_name, directory_name, file_name,
                 download_size, chunk_size, progress, start_range, end_range,
                 stream, progress_callback, validate_content, timeout, operation_context, snapshot):
        super(_ParallelFileChunkDownloader, self).__init__(file_service, share_name, directory_name, file_name,
                                                           download_size, chunk_size, progress, start_range, end_range,
                                                           stream, progress_callback, validate_content, timeout,
                                                           operation_context, snapshot)

        # for a parallel download, the stream is always seekable, so we note down the current position
        # in order to seek to the right place when out-of-order chunks come in
        self.stream_start = stream.tell()

        # since parallel operations are going on
        # it is essential to protect the writing and progress reporting operations
        self.stream_lock = threading.Lock()
        self.progress_lock = threading.Lock()

    def _update_progress(self, length):
        if self.progress_callback is not None:
            with self.progress_lock:
                self.progress_total += length
                total_so_far = self.progress_total
            self.progress_callback(total_so_far, self.download_size)

    def _write_to_stream(self, chunk_data, chunk_start):
        with self.stream_lock:
            self.stream.seek(self.stream_start + (chunk_start - self.start_index))
            self.stream.write(chunk_data)


class _SequentialFileChunkDownloader(_FileChunkDownloader):
    def __init__(self, file_service, share_name, directory_name, file_name, download_size, chunk_size, progress,
                 start_range, end_range, stream, progress_callback, validate_content, timeout, operation_context,
                 snapshot):
        super(_SequentialFileChunkDownloader, self).__init__(file_service, share_name, directory_name, file_name,
                                                             download_size, chunk_size, progress, start_range,
                                                             end_range, stream, progress_callback, validate_content,
                                                             timeout, operation_context, snapshot)

    def _update_progress(self, length):
        if self.progress_callback is not None:
            self.progress_total += length
            self.progress_callback(self.progress_total, self.download_size)

    def _write_to_stream(self, chunk_data, chunk_start):
        # chunk_start is ignored in the case of sequential download since we cannot seek the destination stream
        self.stream.write(chunk_data)