1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
from contextlib import contextmanager
import filecmp
import os
import posix
import stat
import sys
import sysconfig
import time
import unittest
from ..xattr import get_all
from ..logger import setup_logging
from ..platform import umount
try:
import llfuse
# Does this version of llfuse support ns precision?
have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
except ImportError:
have_fuse_mtime_ns = False
has_lchflags = hasattr(os, 'lchflags')
# The mtime get/set precision varies on different OS and Python versions
if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
st_mtime_ns_round = 0
elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
st_mtime_ns_round = -6
else:
st_mtime_ns_round = -9
if sys.platform.startswith('netbsd'):
st_mtime_ns_round = -4 # only >1 microsecond resolution here?
# Ensure that the loggers exist for all tests
setup_logging()
def no_selinux(x):
# selinux fails our FUSE tests, thus ignore selinux xattrs
SELINUX_KEY = 'security.selinux'
if isinstance(x, dict):
return {k: v for k, v in x.items() if k != SELINUX_KEY}
if isinstance(x, list):
return [k for k in x if k != SELINUX_KEY]
class BaseTestCase(unittest.TestCase):
"""
"""
assert_in = unittest.TestCase.assertIn
assert_not_in = unittest.TestCase.assertNotIn
assert_equal = unittest.TestCase.assertEqual
assert_not_equal = unittest.TestCase.assertNotEqual
assert_raises = unittest.TestCase.assertRaises
assert_true = unittest.TestCase.assertTrue
@contextmanager
def assert_creates_file(self, path):
self.assert_true(not os.path.exists(path), '{} should not exist'.format(path))
yield
self.assert_true(os.path.exists(path), '{} should exist'.format(path))
def assert_dirs_equal(self, dir1, dir2, **kwargs):
diff = filecmp.dircmp(dir1, dir2)
self._assert_dirs_equal_cmp(diff, **kwargs)
def _assert_dirs_equal_cmp(self, diff, ignore_bsdflags=False, ignore_xattrs=False):
self.assert_equal(diff.left_only, [])
self.assert_equal(diff.right_only, [])
self.assert_equal(diff.diff_files, [])
self.assert_equal(diff.funny_files, [])
for filename in diff.common:
path1 = os.path.join(diff.left, filename)
path2 = os.path.join(diff.right, filename)
s1 = os.lstat(path1)
s2 = os.lstat(path2)
# Assume path2 is on FUSE if st_dev is different
fuse = s1.st_dev != s2.st_dev
attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']
if has_lchflags and not ignore_bsdflags:
attrs.append('st_flags')
if not fuse or not os.path.isdir(path1):
# dir nlink is always 1 on our fuse filesystem
attrs.append('st_nlink')
d1 = [filename] + [getattr(s1, a) for a in attrs]
d2 = [filename] + [getattr(s2, a) for a in attrs]
# ignore st_rdev if file is not a block/char device, fixes #203
if not stat.S_ISCHR(d1[1]) and not stat.S_ISBLK(d1[1]):
d1[4] = None
if not stat.S_ISCHR(d2[1]) and not stat.S_ISBLK(d2[1]):
d2[4] = None
# Older versions of llfuse do not support ns precision properly
if fuse and not have_fuse_mtime_ns:
d1.append(round(s1.st_mtime_ns, -4))
d2.append(round(s2.st_mtime_ns, -4))
else:
d1.append(round(s1.st_mtime_ns, st_mtime_ns_round))
d2.append(round(s2.st_mtime_ns, st_mtime_ns_round))
if not ignore_xattrs:
d1.append(no_selinux(get_all(path1, follow_symlinks=False)))
d2.append(no_selinux(get_all(path2, follow_symlinks=False)))
self.assert_equal(d1, d2)
for sub_diff in diff.subdirs.values():
self._assert_dirs_equal_cmp(sub_diff, ignore_bsdflags=ignore_bsdflags, ignore_xattrs=ignore_xattrs)
@contextmanager
def fuse_mount(self, location, mountpoint, *options):
os.mkdir(mountpoint)
args = ['mount', location, mountpoint] + list(options)
self.cmd(*args, fork=True)
self.wait_for_mount(mountpoint)
yield
umount(mountpoint)
os.rmdir(mountpoint)
# Give the daemon some time to exit
time.sleep(.2)
def wait_for_mount(self, path, timeout=5):
"""Wait until a filesystem is mounted on `path`
"""
timeout += time.time()
while timeout > time.time():
if os.path.ismount(path):
return
time.sleep(.1)
raise Exception('wait_for_mount(%s) timeout' % path)
class changedir:
def __init__(self, dir):
self.dir = dir
def __enter__(self):
self.old = os.getcwd()
os.chdir(self.dir)
def __exit__(self, *args, **kw):
os.chdir(self.old)
class environment_variable:
def __init__(self, **values):
self.values = values
self.old_values = {}
def __enter__(self):
for k, v in self.values.items():
self.old_values[k] = os.environ.get(k)
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
def __exit__(self, *args, **kw):
for k, v in self.old_values.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
class FakeInputs:
"""Simulate multiple user inputs, can be used as input() replacement"""
def __init__(self, inputs):
self.inputs = inputs
def __call__(self, prompt=None):
if prompt is not None:
print(prompt, end='')
try:
return self.inputs.pop(0)
except IndexError:
raise EOFError from None
|