| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 
 | ######################################################################
#
# File: test/unit/internal/transfer/downloader/test_parallel.py
#
# Copyright 2024 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import hashlib
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from unittest.mock import Mock
import pytest
from requests import RequestException
def mock_download_response_factory(apiver_module, bucket, file_size: int = 0):
    hasher = hashlib.sha1()
    dummy_data = b'dummy'
    file_content = (dummy_data * (file_size // len(dummy_data) + 1))[:file_size]
    file_version = bucket.upload_bytes(file_content, f'dummy_file_{file_size}.txt')
    hasher.update(file_content)
    url = bucket.api.session.get_download_url_by_name(bucket.name, file_version.file_name)
    response = bucket.api.services.session.download_file_from_url(url).__enter__()
    return response, apiver_module.DownloadVersionFactory(bucket.api).from_response_headers(
        response.headers
    )
@pytest.fixture
def thread_pool():
    with ThreadPoolExecutor(max_workers=10) as executor:
        yield executor
@pytest.fixture
def output_file():
    return BytesIO()
@pytest.fixture
def downloader(apiver_module, thread_pool):
    return apiver_module.ParallelDownloader(
        min_part_size=10,
        force_chunk_size=5,
        thread_pool=thread_pool,
    )
def test_download_empty_file(apiver_module, b2api, bucket, downloader, output_file):
    file_size = 0
    mock_response, download_version = mock_download_response_factory(
        apiver_module, bucket, file_size=file_size
    )
    mock_response.close = Mock(side_effect=mock_response.close)
    bytes_written, hash_hex = downloader.download(
        output_file, mock_response, download_version, b2api.session
    )
    assert bytes_written == file_size
    assert hash_hex == 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
    assert output_file.getvalue() == b''
    mock_response.close.assert_called_once()
def test_download_file(apiver_module, b2api, bucket, downloader, output_file):
    file_size = 100
    mock_response, download_version = mock_download_response_factory(
        apiver_module, bucket, file_size=file_size
    )
    mock_response.close = Mock(side_effect=mock_response.close)
    bytes_written, hash_hex = downloader.download(
        output_file, mock_response, download_version, b2api.session
    )
    assert bytes_written == file_size
    assert hash_hex == '7804df8c623573ccfc1993e04981006e5bc30383'
    assert output_file.getvalue() == b'dummy' * 20
    mock_response.close.assert_called_once()
def test_download_file__data_stream_error__in_first_response(
    apiver_module, b2api, bucket, downloader, output_file
):
    """
    Test that the downloader handles a stream error in the first response.
    """
    file_size = 100
    mock_response, download_version = mock_download_response_factory(
        apiver_module, bucket, file_size=file_size
    )
    def iter_content(chunk_size=1, decode_unicode=False):
        yield b'DUMMY'
        raise RequestException('stream error')
        yield  # noqa
    mock_response.iter_content = iter_content
    bytes_written, hash_hex = downloader.download(
        output_file, mock_response, download_version, b2api.session
    )
    assert bytes_written == file_size
    assert output_file.getvalue() == b'DUMMY' + b'dummy' * 19
def test_download_file__data_stream_error__persistent_errors(
    apiver_module, b2api, bucket, downloader, output_file
):
    file_size = 1000
    mock_response, download_version = mock_download_response_factory(
        apiver_module, bucket, file_size=file_size
    )
    # Ensure that follow-up requests also return errors
    def iter_content(chunk_size=1, decode_unicode=False):
        yield b'd'
        raise RequestException('stream error')
    mock_response.iter_content = iter_content
    bucket.api.services.session.download_file_from_url = Mock(return_value=mock_response)
    with pytest.raises(RequestException):
        downloader.download(output_file, mock_response, download_version, b2api.session)
def test_download_file__data_stream_error__multiple_errors_recovery(
    apiver_module, b2api, bucket, downloader, output_file
):
    """Test downloader handles multiple half-failed requests and still downlaods entire file."""
    # This works since each part is attempted up to 15 times before giving up
    file_size = 100
    mock_response, download_version = mock_download_response_factory(
        apiver_module, bucket, file_size=file_size
    )
    def first_iter_content(chunk_size=1, decode_unicode=False):
        yield mock_response.raw.read(1)
        raise RequestException('stream error')
    mock_response.iter_content = first_iter_content
    download_func = bucket.api.services.session.download_file_from_url
    def download_func_mock(*args, **kwargs):
        response = download_func(*args, **kwargs).__enter__()
        def iter_content(chunk_size=1, decode_unicode=False):
            yield response.raw.read(1).upper()
            raise RequestException('stream error')
        response.iter_content = iter_content
        return response
    bucket.api.services.session.download_file_from_url = download_func_mock
    bytes_written, hash_hex = downloader.download(
        output_file, mock_response, download_version, b2api.session
    )
    assert bytes_written == file_size
    assert output_file.getvalue() == b'dUMMY' + b'DUMMY' * 19
 |