1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
|
"""Tests for the utils module."""
import io
import os
import os.path
import shutil
import tempfile
import requests
from requests_toolbelt.downloadutils import stream
from requests_toolbelt.downloadutils import tee
try:
from unittest import mock
except ImportError:
import mock
import pytest
from . import get_betamax
preserve_bytes = {'preserve_exact_body_bytes': True}
def test_get_download_file_path_uses_content_disposition():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py-0.7.1-py2.py3-none-any.whl'
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'})
path = stream.get_download_file_path(r, None)
r.close()
assert path == filename
def test_get_download_file_path_directory():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py-0.7.1-py2.py3-none-any.whl'
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'})
path = stream.get_download_file_path(r, tempfile.tempdir)
r.close()
assert path == os.path.join(tempfile.tempdir, filename)
def test_get_download_file_path_specific_file():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'})
path = stream.get_download_file_path(r, '/arbitrary/file.path')
r.close()
assert path == '/arbitrary/file.path'
def test_stream_response_to_file_uses_content_disposition():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py-0.7.1-py2.py3-none-any.whl'
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r)
assert os.path.exists(filename)
os.unlink(filename)
def test_stream_response_to_specific_filename():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py.whl'
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r, path=filename)
assert os.path.exists(filename)
os.unlink(filename)
def test_stream_response_to_directory():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
td = tempfile.mkdtemp()
try:
filename = 'github3.py-0.7.1-py2.py3-none-any.whl'
expected_path = os.path.join(td, filename)
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r, path=td)
assert os.path.exists(expected_path)
finally:
shutil.rmtree(td)
def test_stream_response_to_existing_file():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
filename = 'github3.py.whl'
with open(filename, 'w') as f_existing:
f_existing.write('test')
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
try:
stream.stream_response_to_file(r, path=filename)
except stream.exc.StreamingError as e:
assert str(e).startswith('File already exists:')
else:
assert False, "Should have raised a FileExistsError"
finally:
os.unlink(filename)
def test_stream_response_to_file_like_object():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
file_obj = io.BytesIO()
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r, path=file_obj)
assert 0 < file_obj.tell()
def test_stream_response_to_file_chunksize():
s = requests.Session()
recorder = get_betamax(s)
url = ('https://api.github.com/repos/sigmavirus24/github3.py/releases/'
'assets/37944')
class FileWrapper(io.BytesIO):
def __init__(self):
super(FileWrapper, self).__init__()
self.chunk_sizes = []
def write(self, data):
self.chunk_sizes.append(len(data))
return super(FileWrapper, self).write(data)
file_obj = FileWrapper()
chunksize = 1231
with recorder.use_cassette('stream_response_to_file', **preserve_bytes):
r = s.get(url, headers={'Accept': 'application/octet-stream'},
stream=True)
stream.stream_response_to_file(r, path=file_obj, chunksize=chunksize)
assert 0 < file_obj.tell()
assert len(file_obj.chunk_sizes) >= 1
assert file_obj.chunk_sizes[0] == chunksize
@pytest.fixture
def streamed_response(chunks=None):
chunks = chunks or [b'chunk'] * 8
response = mock.MagicMock()
response.raw.stream.return_value = chunks
return response
def test_tee(streamed_response):
response = streamed_response
expected_len = len('chunk') * 8
fileobject = io.BytesIO()
assert expected_len == sum(len(c) for c in tee.tee(response, fileobject))
assert fileobject.getvalue() == b'chunkchunkchunkchunkchunkchunkchunkchunk'
def test_tee_rejects_StringIO():
fileobject = io.StringIO()
with pytest.raises(TypeError):
# The generator needs to be iterated over before the exception will be
# raised
sum(len(c) for c in tee.tee(None, fileobject))
def test_tee_to_file(streamed_response):
response = streamed_response
expected_len = len('chunk') * 8
assert expected_len == sum(
len(c) for c in tee.tee_to_file(response, 'tee.txt')
)
assert os.path.exists('tee.txt')
os.remove('tee.txt')
def test_tee_to_bytearray(streamed_response):
response = streamed_response
arr = bytearray()
expected_arr = bytearray(b'chunk' * 8)
expected_len = len(expected_arr)
assert expected_len == sum(
len(c) for c in tee.tee_to_bytearray(response, arr)
)
assert expected_arr == arr
def test_tee_to_bytearray_only_accepts_bytearrays():
with pytest.raises(TypeError):
tee.tee_to_bytearray(None, object())
|