1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
|
# -*- coding: utf-8 -*-
# vim: ts=4 sw=4 tw=88 et ai si
#
# Copyright (c) 2012-2014 Intel, Inc.
# License: GPLv2
# Author: Artem Bityutskiy <artem.bityutskiy@linux.intel.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2,
# as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
"""
This test verifies the base bmap creation and copying API functionality. It
generates a random sparse file, then creates a bmap for this file and copies it
to a different file using the bmap. Then it compares the original random sparse
file and the copy and verifies that they are identical.
"""
# Disable the following pylint recommendations:
# * Too many public methods (R0904)
# * Too many local variables (R0914)
# * Too many statements (R0915)
# pylint: disable=R0904
# pylint: disable=R0914
# pylint: disable=R0915
import os
import sys
import tempfile
import filecmp
import subprocess
from itertools import zip_longest
from tests import helpers
from bmaptool import BmapHelpers, BmapCreate, Filemap
# This is a work-around for Centos 6
try:
import unittest2 as unittest # pylint: disable=F0401
except ImportError:
import unittest
class Error(Exception):
"""A class for exceptions generated by this test."""
pass
def _compare_holes(file1, file2):
"""
Make sure that files 'file1' and 'file2' have holes at the same places.
The 'file1' and 'file2' arguments may be full file paths or file objects.
"""
filemap1 = Filemap.filemap(file1)
filemap2 = Filemap.filemap(file2)
iterator1 = filemap1.get_unmapped_ranges(0, filemap1.blocks_cnt)
iterator2 = filemap2.get_unmapped_ranges(0, filemap2.blocks_cnt)
iterator = zip_longest(iterator1, iterator2)
for range1, range2 in iterator:
if range1 != range2:
raise Error(
"mismatch for hole %d-%d, it is %d-%d in file2"
% (range1[0], range1[1], range2[0], range2[1])
)
def _generate_compressed_files(file_path, delete=True):
"""
This is a generator which yields compressed versions of a file
'file_path'.
The 'delete' argument specifies whether the compressed files that this
generator yields have to be automatically deleted.
"""
# Make sure the temporary files start with the same name as 'file_obj' in
# order to simplify debugging.
prefix = os.path.splitext(os.path.basename(file_path))[0] + "."
# Put the temporary files in the directory with 'file_obj'
directory = os.path.dirname(file_path)
compressors = [
("bzip2", None, ".bz2", "-c -k"),
("pbzip2", None, ".p.bz2", "-c -k"),
("gzip", None, ".gz", "-c"),
("pigz", None, ".p.gz", "-c -k"),
("xz", None, ".xz", "-c -k"),
("lzop", None, ".lzo", "-c -k"),
("lz4", None, ".lz4", "-c -k"),
("zstd", None, ".zst", "-c -k"),
# The "-P -C /" trick is used to avoid silly warnings:
# "tar: Removing leading `/' from member names"
("bzip2", "tar", ".tar.bz2", "-c -j -O -P -C /"),
("gzip", "tar", ".tar.gz", "-c -z -O -P -C /"),
("xz", "tar", ".tar.xz", "-c -J -O -P -C /"),
("lzop", "tar", ".tar.lzo", "-c --lzo -O -P -C /"),
("lz4", "tar", ".tar.lz4", "-c -Ilz4 -O -P -C /"),
("zstd", "tar", ".tar.zst", "-c -Izstd -O -P -C /"),
("zip", None, ".zip", "-q -j -"),
]
for decompressor, archiver, suffix, options in compressors:
if not BmapHelpers.program_is_available(decompressor):
continue
if archiver and not BmapHelpers.program_is_available(archiver):
continue
tmp_file_obj = tempfile.NamedTemporaryFile(
"wb+", prefix=prefix, delete=delete, dir=directory, suffix=suffix
)
if archiver:
args = archiver + " " + options + " " + file_path
else:
args = decompressor + " " + options + " " + file_path
child_process = subprocess.Popen(
args,
shell=True,
stdout=tmp_file_obj,
stderr=subprocess.DEVNULL,
)
child_process.wait()
tmp_file_obj.flush()
yield tmp_file_obj.name
tmp_file_obj.close()
def _do_test(image, image_size, delete=True):
"""
A basic test for the bmap creation and copying functionality. It first
generates a bmap for file 'image', and then copies the sparse file to a
different file, and then checks that the original file and the copy are
identical.
The 'image_size' argument is size of the image in bytes. The 'delete'
argument specifies whether the temporary files that this function creates
have to be automatically deleted.
"""
try:
Filemap.filemap(image)
except Filemap.ErrorNotSupp as e:
sys.stderr.write("%s\n" % e)
return
# Make sure the temporary files start with the same name as 'image' in
# order to simplify debugging.
prefix = os.path.splitext(os.path.basename(image))[0] + "."
# Put the temporary files in the directory with the image
directory = os.path.dirname(image)
# Create and open a temporary file for a copy of the image
f_copy = tempfile.NamedTemporaryFile(
"wb+", prefix=prefix, delete=delete, dir=directory, suffix=".copy"
)
# Create and open 2 temporary files for the bmap
f_bmap1 = tempfile.NamedTemporaryFile(
"w+", prefix=prefix, delete=delete, dir=directory, suffix=".bmap1"
)
f_bmap2 = tempfile.NamedTemporaryFile(
"w+", prefix=prefix, delete=delete, dir=directory, suffix=".bmap2"
)
image_chksum = helpers.calculate_chksum(image)
#
# Pass 1: generate the bmap, copy and compare
#
# Create bmap for the random sparse file
creator = BmapCreate.BmapCreate(image, f_bmap1.name)
creator.generate()
helpers.copy_and_verify_image(
image, f_copy.name, f_bmap1.name, image_chksum, image_size
)
# Make sure that holes in the copy are identical to holes in the random
# sparse file.
_compare_holes(image, f_copy.name)
#
# Pass 2: same as pass 1, but use file objects instead of paths
#
creator = BmapCreate.BmapCreate(image, f_bmap2)
creator.generate()
helpers.copy_and_verify_image(
image, f_copy.name, f_bmap2.name, image_chksum, image_size
)
_compare_holes(image, f_copy.name)
# Make sure the bmap files generated at pass 1 and pass 2 are identical
assert filecmp.cmp(f_bmap1.name, f_bmap2.name, False)
#
# Pass 3: test compressed files copying with bmap
#
for compressed in _generate_compressed_files(image, delete=delete):
helpers.copy_and_verify_image(
compressed, f_copy.name, f_bmap1.name, image_chksum, image_size
)
# Test without setting the size
helpers.copy_and_verify_image(
compressed, f_copy.name, f_bmap1.name, image_chksum, None
)
# Append a "file:" prefix to make BmapCopy use urllib
compressed = "file:" + compressed
helpers.copy_and_verify_image(
compressed, f_copy.name, f_bmap1.name, image_chksum, image_size
)
helpers.copy_and_verify_image(
compressed, f_copy.name, f_bmap1.name, image_chksum, None
)
#
# Pass 5: copy without bmap and make sure it is identical to the original
# file.
helpers.copy_and_verify_image(image, f_copy.name, None, image_chksum, image_size)
helpers.copy_and_verify_image(image, f_copy.name, None, image_chksum, None)
#
# Pass 6: test compressed files copying without bmap
#
for compressed in _generate_compressed_files(image, delete=delete):
helpers.copy_and_verify_image(
compressed, f_copy.name, f_bmap1.name, image_chksum, image_size
)
# Test without setting the size
helpers.copy_and_verify_image(
compressed, f_copy.name, f_bmap1.name, image_chksum, None
)
# Append a "file:" prefix to make BmapCopy use urllib
helpers.copy_and_verify_image(
compressed, f_copy.name, f_bmap1.name, image_chksum, image_size
)
helpers.copy_and_verify_image(
compressed, f_copy.name, f_bmap1.name, image_chksum, None
)
# Close temporary files, which will also remove them
f_copy.close()
f_bmap1.close()
f_bmap2.close()
class TestCreateCopy(unittest.TestCase):
"""
The test class for this unit tests. Basically executes the '_do_test()'
function for different sparse files.
"""
def test(self): # pylint: disable=R0201
"""
The test entry point. Executes the '_do_test()' function for files of
different sizes, holes distribution and format.
"""
# Delete all the test-related temporary files automatically
delete = True
# Create all the test-related temporary files in current directory
directory = "."
iterator = helpers.generate_test_files(delete=delete, directory=directory)
for f_image, image_size, _, _ in iterator:
assert image_size == os.path.getsize(f_image.name)
_do_test(f_image.name, image_size, delete=delete)
|