1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
|
"""
Common tools for running tests
"""
from __future__ import print_function
import functools
import os
import re
import shlex
import shutil
import sys
import tempfile
try:
# Python2
from StringIO import StringIO
except ImportError:
# Python3
from io import StringIO
from subprocess import Popen, PIPE, CalledProcessError
import yaml
def assert_raises(exception_classes, callable_obj=None, *args, **kwargs):
context = AssertRaisesContext(exception_classes)
if callable_obj is None:
return context
with context:
callable_obj(*args, **kwargs)
def assert_raises_regex(exception_classes, expected_regex, callable_obj=None, *args, **kwargs):
context = AssertRaisesContext(exception_classes, expected_regex)
if callable_obj is None:
return context
with context:
callable_obj(*args, **kwargs)
class AssertRaisesContext(object):
def __init__(self, expected, expected_regex=None):
self.expected = expected
self.expected_regex = expected_regex
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if self.expected is None:
if exc_type is None:
return True
else:
raise
if exc_type is None:
try:
exc_name = self.expected.__name__
except AttributeError:
exc_name = str(self.expected)
raise AssertionError("{0} not raised".format(exc_name))
if not issubclass(exc_type, self.expected):
raise
if self.expected_regex is None:
return True
expected_regex = self.expected_regex
expected_regex = re.compile(expected_regex)
if not expected_regex.search(str(exc_value)):
raise AssertionError("'{0}' does not match '{1}'".format(expected_regex.pattern, str(exc_value)))
return True
class bloom_answer(object):
ASSERT_NO_QUESTION = -1
def __init__(self, answer, util_module=None):
self.answer = answer
if util_module is None:
import bloom.util as util_module
import bloom.commands.git.config as config_module
self.util_module = util_module
self.config_module = config_module
def __call__(self, msg=None):
if msg is not None:
print(msg)
assert self.answer != self.ASSERT_NO_QUESTION, \
"bloom asked a question, and it should not have"
if isinstance(self.answer, str):
print(self.answer)
return self.answer
elif isinstance(self.answer, list):
if self.answer:
print(self.answer)
return self.answer.pop(0)
else:
print('Nada')
return ''
else:
assert False, "Invalid answers given to bloom_answer"
def __enter__(self):
self.orig_util_module_input = self.util_module.safe_input
self.orig_config_module_input = self.config_module.safe_input
self.util_module.safe_input = self
self.config_module.safe_input = self
def __exit__(self, exc_type, exc_value, traceback):
self.util_module.safe_input = self.orig_util_module_input
self.config_module.safe_input = self.orig_config_module_input
class change_directory(object):
def __init__(self, directory=''):
self.directory = directory
self.original_cwd = None
def __enter__(self):
self.original_cwd = os.getcwd()
os.chdir(self.directory)
return self.directory
def __exit__(self, exc_type, exc_value, traceback):
if self.original_cwd and os.path.exists(self.original_cwd):
os.chdir(self.original_cwd)
class change_environ(object):
def __init__(self, env=None):
self.original_env = os.environ
self.new_env = dict(env) if env is not None else dict(os.environ)
def __enter__(self):
self.original_env = os.environ
os.environ = dict(self.new_env)
def __exit__(self, exc_type, exc_value, traceback):
os.environ = self.original_env
def in_temporary_directory(f):
@functools.wraps(f)
def decorated(*args, **kwds):
with temporary_directory() as directory:
try:
from inspect import getfullargspec as getargspec
except ImportError:
from inspect import getargspec
# If it takes directory of kwargs and kwds does already have
# directory, inject it
if 'directory' not in kwds and 'directory' in getargspec(f)[0]:
kwds['directory'] = directory
return f(*args, **kwds)
decorated.__name__ = f.__name__
return decorated
class redirected_stdio(object):
def __enter__(self):
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
sys.stdout = out = StringIO()
sys.stderr = err = StringIO()
return out, err
def __exit__(self, exc_type, exc_value, traceback):
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
class temporary_directory(object):
def __init__(self, prefix=''):
self.prefix = prefix
def __enter__(self):
self.original_cwd = os.getcwd()
self.temp_path = tempfile.mkdtemp(prefix=self.prefix)
os.chdir(self.temp_path)
return self.temp_path
def __exit__(self, exc_type, exc_value, traceback):
if self.temp_path and os.path.exists(self.temp_path):
shutil.rmtree(self.temp_path)
if self.original_cwd and os.path.exists(self.original_cwd):
os.chdir(self.original_cwd)
def user_bloom(cmd, args=None, directory=None, auto_assert=True,
return_io=True, silent=False, env=None):
"""Runs the given bloom cmd ('git-bloom-{cmd}') with the given args"""
assert type(cmd) == str, \
"user_bloom cmd takes str only, got " + str(type(cmd))
if args is None:
args = cmd.split()[1:]
cmd = cmd.split()[0]
assert type(args) in [list, tuple, str], \
"user_bloom args takes [list, tuple, str] only, got " + \
str(type(args))
if sys.version_info[0:2] < (3, 10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points
from bloom import __version__ as ver
if not cmd.startswith('git-bloom-'):
cmd = 'git-bloom-' + cmd
if type(args) != list:
if type(args) == str:
args = args.split()
args = list(args)
with change_directory(directory if directory is not None else os.getcwd()):
with redirected_stdio() as (out, err):
# importlib can't filter entry points by distribution because they
# don't compare. So get all matching entry points and filter by
# distribution version and name after.
eps = [ep for ep in entry_points(group="console_scripts", name=cmd)
if ep.dist.version == ver and ep.dist.name == 'bloom']
assert len(eps) == 1, f"Multiple entry points found for command '{cmd}'."
func = eps[0].load()
try:
with change_environ(env):
ret = func(args) or 0
except SystemExit as e:
ret = e.code
if ret != 0 and auto_assert:
raise
if not silent:
print("Command '{0}' returned '{1}':".format(cmd, ret))
print(out.getvalue(), file=sys.stdout, end='')
print(err.getvalue(), file=sys.stderr, end='')
if return_io:
return ret, out.getvalue(), err.getvalue()
return ret
def user_cd(cmd, **kwargs):
"""
Used in system tests to emulate a user changing directories
Used in place of user('cd <new_directory>')
"""
if type(cmd) is str:
assert cmd != '', "no arguments passed to cd, not allowed"
cmd = cmd.split()
new_directory = cmd[0]
assert os.path.exists(new_directory), \
"user tried to cd to '" + new_directory + "' which does not exist"
os.chdir(new_directory)
return 0
def user_echo(cmd, **kwargs):
"""
Used to emulate the user echoing something even to a file with >>
"""
assert type(cmd) is str, "user echo only takes str for the cmd argument"
cmd = shlex.split(cmd)
if len(cmd) == 1:
print(cmd[0])
elif len(cmd) == 3 and cmd[1] in ['>>', '>']:
# echo into somefile
assert not os.path.isdir(cmd[2]), \
"user tried to echo into a directory: '" + cmd[2] + "'"
if cmd[1] == '>>':
mode = 'a'
else:
mode = 'w+'
with open(cmd[2], mode) as f:
f.write(cmd[0])
return 0
def user_mkdir(cmd, **kwargs):
"""
Used in system tests to emulte a user creating a directory
"""
if type(cmd) is str:
assert cmd != '', "no arguments passed to mkdir, not allowed"
cmd = cmd.split()
if len(cmd) == 2:
assert '-p' in cmd, "two args to mkdir, neither is '-p', not allowed"
cmd.remove('-p')
mkdir_cmd = os.makedirs
else:
mkdir_cmd = os.mkdir
new_dir = cmd[0]
assert not os.path.exists(new_dir), \
"directory '" + new_dir + "' already exists"
mkdir_cmd(new_dir)
return 0
def user_touch(cmd, **kwargs):
"""
Used to emulat a user touching a file
"""
assert not os.path.exists(cmd), \
"user tried to touch a file '" + cmd + "' but it exists"
if os.path.exists(cmd):
os.utime(cmd, None)
else:
open(cmd, 'w').close()
_special_user_commands = {
'cd': user_cd,
'echo': user_echo,
'git-bloom-': user_bloom,
'mkdir': user_mkdir,
'touch': user_touch
}
def user(cmd, directory=None, auto_assert=True, return_io=False,
bash_only=False, silent=True, env=None):
"""Used in system tests to emulate a user action"""
if type(cmd) in [list, tuple]:
cmd = ' '.join(cmd)
if not bash_only:
# Handle special cases
for case in _special_user_commands:
if cmd.startswith(case):
cmd = ''.join(cmd.split(case)[1:]).strip()
return _special_user_commands[case](
cmd,
directory=directory,
auto_assert=auto_assert,
return_io=return_io,
silent=silent,
env=env
)
ret = -1
try:
p = Popen(cmd, shell=True, cwd=directory, env=env, stdout=PIPE, stderr=PIPE)
out, err = p.communicate()
if out is not None and not isinstance(out, str):
out = out.decode('utf-8')
if err is not None and not isinstance(err, str):
err = err.decode('utf-8')
ret = p.returncode
except CalledProcessError as err:
ret = err.returncode
if not silent:
print(out, file=sys.stdout, end='')
print(err, file=sys.stderr, end='')
if auto_assert:
assert ret == 0, \
"user command '" + cmd + "' returned " + str(p.returncode)
if return_io:
return ret, out, err
return ret
def set_up_fake_rosdep(staging_dir, distros={}, rules={}):
"""
Used to create a 'fake' rosdep cache from a locally generated index.
Example invocation:
.. code-block:: python
env = dict(os.environ)
env.update(set_up_fake_rosdep(
'/tmp/fake_rosdep',
{
'melodic': {
'ubuntu': ['bionic']
}
},
{
'some_rosdep_key': {
'ubuntu': ['some-package-name']
}
}))
:param staging_dir: Scratch directory in which to make infrastructure files.
:param distros: Mapping of ROS distributions to populate the index with.
:param rules: rosdep rules to populate the rosdep database with.
:returns: Environment variables which cause Bloom will use the fake cache.
"""
# Construct bare environment
rosdistro_dir = os.path.join(staging_dir, 'rosdistro')
rosdistro_index_path = os.path.join(rosdistro_dir, 'index.yaml')
sources_list_dir = os.path.join(staging_dir, 'sources.list.d')
ros_home_dir = os.path.join(staging_dir, 'ros_home')
os.makedirs(rosdistro_dir)
os.makedirs(sources_list_dir)
os.makedirs(ros_home_dir)
env = {
'BLOOM_SKIP_ROSDEP_UPDATE': '1',
'ROSDEP_SOURCE_PATH': os.path.realpath(sources_list_dir),
'ROSDISTRO_INDEX_URL': 'file://' + os.path.realpath(rosdistro_index_path),
'ROS_HOME': os.path.realpath(ros_home_dir)
}
# Create the specified distributions
for distro, platforms in distros.items():
distro_dir = os.path.join(rosdistro_dir, distro)
distro_yaml_path = os.path.join(distro_dir, 'distribution.yaml')
os.makedirs(distro_dir)
distro_yaml_data = {
'release_platforms': platforms,
'repositories': dict({}),
'type': 'distribution',
'version': 2
}
with open(distro_yaml_path, 'w') as f:
f.write(yaml.dump(distro_yaml_data))
# Index the specified distributions
rosdistro_index_data = {
'distributions': {
distro: {
'distribution': [os.path.join(distro, 'distribution.yaml')],
'distribution_cache': 'DOES-NOT-EXIST',
'distribution_status': 'active',
'distribution_type': 'ros1',
'python_version': 2
} for distro in distros.keys()
},
'type': 'index',
'version': 4
}
with open(rosdistro_index_path, 'w') as f:
f.write(yaml.dump(rosdistro_index_data))
# Create the rosdep database
rosdep_db_dir = os.path.join(rosdistro_dir, 'rosdep')
os.makedirs(rosdep_db_dir)
rosdep_db_path = os.path.join(rosdep_db_dir, 'rosdep.yaml')
with open(rosdep_db_path, 'w') as f:
f.write(yaml.dump(rules))
# Create the rosdep sources list
rosdistro_source_path = os.path.join(sources_list_dir, '50-rosdep.list')
with open(rosdistro_source_path, 'w') as f:
f.write('yaml file://' + os.path.realpath(rosdep_db_path))
# Perform the initial rosdep update
setup_env = dict(os.environ)
setup_env.update(env)
user('rosdep update', env=setup_env)
return env
|