1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
from tests.base_test import BaseTest
from tests import config
from core import modules
from core.sessions import SessionURL
from testfixtures import log_capture
from core import messages
import logging
import os
import subprocess
class FileBzip(BaseTest):
# Create and bzip2 binary files for the test
binstring = [
b'\\xe0\\xf5\\xfe\\xe2\\xbd\\x0c\\xbc\\x9b\\xa0\\x8f\\xed?\\xa1\\xe1',
b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x06\\x00\\x00\\x00'
]
uncompressed = [
os.path.join(config.base_folder, 'test_file_bzip2', 'binfile0'),
os.path.join(config.base_folder, 'test_file_bzip2', 'binfile1')
]
compressed = [
os.path.join(config.base_folder, 'test_file_bzip2', 'binfile0.bz2'),
os.path.join(config.base_folder, 'test_file_bzip2', 'binfile1.bz2')
]
def setUp(self):
session = SessionURL(self.url, self.password, volatile = True)
modules.load_modules(session)
subprocess.check_output("""
BASE_FOLDER="{config.base_folder}/test_file_bzip2/"
rm -rf "$BASE_FOLDER"
mkdir -p "$BASE_FOLDER/"
echo -n '\\xe0\\xf5\\xfe\\xe2\\xbd\\x0c\\xbc\\x9b\\xa0\\x8f\\xed?\\xa1\\xe1' > "$BASE_FOLDER/binfile0"
echo -n '\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x06\\x00\\x00\\x00' > "$BASE_FOLDER/binfile1"
bzip2 "$BASE_FOLDER/binfile0"
bzip2 "$BASE_FOLDER/binfile1"
chown www-data: -R "$BASE_FOLDER/"
""".format(
config = config
), shell=True)
self.run_argv = modules.loaded['file_bzip2'].run_argv
def test_compress_decompress(self):
# Decompress and check test file
self.assertTrue(self.run_argv(["--decompress", self.compressed[0]]));
self.assertEqual(
subprocess.check_output('cat "%s"' % self.uncompressed[0], shell=True),
self.binstring[0]
)
# Let's re-compress it, and decompress and check again
self.assertTrue(self.run_argv([self.uncompressed[0]]))
self.assertTrue(self.run_argv(["--decompress", self.compressed[0]]));
self.assertEqual(
subprocess.check_output('cat "%s"' % self.uncompressed[0], shell=True),
self.binstring[0]
)
# Recompress it keeping the original file
self.assertTrue(self.run_argv([self.uncompressed[0], '--keep']))
# Check the existance of the original file and remove it
subprocess.check_call('stat -c %%a "%s"' % self.uncompressed[0], shell=True)
subprocess.check_call('rm "%s"' % self.uncompressed[0], shell=True)
#Do the same check
self.assertTrue(self.run_argv(["--decompress", self.compressed[0]]));
self.assertEqual(
subprocess.check_output('cat "%s"' % self.uncompressed[0], shell=True),
self.binstring[0]
)
def test_compress_decompress_multiple(self):
for index in range(0, len(self.compressed)):
# Decompress and check test file
self.assertTrue(self.run_argv(["--decompress", self.compressed[index]]));
self.assertEqual(
subprocess.check_output('cat "%s"' % self.uncompressed[index], shell=True),
self.binstring[index]
)
# Let's re-compress it, and decompress and check again
self.assertTrue(self.run_argv([self.uncompressed[index]]))
self.assertTrue(self.run_argv(["--decompress", self.compressed[index]]));
self.assertEqual(
subprocess.check_output('cat "%s"' % self.uncompressed[index], shell=True),
self.binstring[index]
)
@log_capture()
def test_already_exists(self, log_captured):
# Decompress keeping it and check test file
self.assertTrue(self.run_argv(["--decompress", self.compressed[0], '--keep']));
self.assertEqual(
subprocess.check_output('cat "%s"' % self.uncompressed[0], shell=True),
self.binstring[0]
)
# Do it again and trigger that the file decompressed already exists
self.assertIsNone(self.run_argv(["--decompress", self.compressed[0]]));
self.assertEqual(log_captured.records[-1].msg,
"File '%s' already exists, skipping decompressing" % self.uncompressed[0])
# Compress and trigger that the file compressed already exists
self.assertIsNone(self.run_argv([self.uncompressed[0]]));
self.assertEqual(log_captured.records[-1].msg,
"File '%s' already exists, skipping compressing" % self.compressed[0])
@log_capture()
def test_wrong_ext(self, log_captured):
# Decompress it and check test file
self.assertTrue(self.run_argv(["--decompress", self.compressed[0]]));
self.assertEqual(
subprocess.check_output('cat "%s"' % self.uncompressed[0], shell=True),
self.binstring[0]
)
# Decompress the decompressed, wrong ext
self.assertIsNone(self.run_argv(["--decompress", self.uncompressed[0]]));
self.assertEqual(log_captured.records[-1].msg,
"Unknown suffix, skipping decompressing")
@log_capture()
def test_unexistant(self, log_captured):
# Decompress it and check test file
self.assertIsNone(self.run_argv(["--decompress", 'bogus']));
self.assertEqual(log_captured.records[-1].msg,
"Skipping file '%s', check existance and permission" % 'bogus')
|