1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
import pytest
import sys
import lz4.stream
import psutil
import os
# This test requires allocating a big lump of memory. In order to
# avoid a massive memory allocation during byte compilation, we have
# to declare a variable for the size of the buffer we're going to
# create outside the scope of the function below. See:
# https://bugs.python.org/issue21074
_4GB = 0xffffffff # actually 4GB - 1B, the maximum size on 4 bytes.
# This test will be killed on Travis due to the 3GB memory limit
# there. Unfortunately psutil reports the host memory, not the memory
# available to the container, and so can't be used to detect available
# memory, so instead, as an ugly hack for detecting we're on Travis we
# check for the TRAVIS environment variable being set. This is quite
# fragile.
if os.environ.get('TRAVIS') is not None or sys.maxsize < _4GB or \
psutil.virtual_memory().available < _4GB:
huge = None
else:
try:
huge = b'\0' * _4GB
except (MemoryError, OverflowError):
huge = None
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < _4GB,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_1():
data = b''
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': huge,
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
# The internal LZ4 context will request at least 3 times buffer_size
# as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
# for the output buffer)
pytest.skip('Insufficient system memory for this test')
# Triggering overflow error
message = r'^Dictionary too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < 0xffffffff,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_2():
data = huge
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': b'',
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
# The internal LZ4 context will request at least 3 times buffer_size
# as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
# for the output buffer)
pytest.skip('Insufficient system memory for this test')
# Raising overflow error
message = r'^Input too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
# On decompression, too large input will raise LZ4StreamError
with pytest.raises(lz4.stream.LZ4StreamError):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
@pytest.mark.skipif(
os.environ.get('TRAVIS') is not None,
reason='Skipping test on Travis due to insufficient memory'
)
@pytest.mark.skipif(
sys.maxsize < 0xffffffff,
reason='Py_ssize_t too small for this test'
)
@pytest.mark.skipif(
psutil.virtual_memory().available < _4GB or huge is None,
reason='Insufficient system memory for this test'
)
def test_huge_3():
data = huge
kwargs = {
'strategy': "double_buffer",
'buffer_size': lz4.stream.LZ4_MAX_INPUT_SIZE,
'store_comp_size': 4,
'dictionary': huge,
}
if psutil.virtual_memory().available < 3 * kwargs['buffer_size']:
# The internal LZ4 context will request at least 3 times buffer_size
# as memory (2 buffer_size for the double-buffer, and 1.x buffer_size
# for the output buffer)
pytest.skip('Insufficient system memory for this test')
# Raising overflow error (during initialization because of the dictionary parameter)
message = r'^Dictionary too large for LZ4 API$'
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
proc.compress(data)
with pytest.raises(OverflowError, match=message):
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
proc.decompress(data)
def test_dummy():
pass
|