1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
|
#!/usr/bin/python
# Copyright (C) 2012 Canonical
#
# Authors:
# Didier Roche <didrocks@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation; either version 3 of the License,
# or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
import configparser
import json
import os
import re
import shutil
import stat
import subprocess
import tempfile
import time
import unittest
class MigrationTests(unittest.TestCase):
def setUp(self):
self.config = configparser.RawConfigParser()
self.tmpdir = None
self.output_files = None
self.maxDiff = None
self.setup_env()
def tearDown(self):
self.clean_env()
def clean_env(self):
'''Clean setup files if present'''
if self.tmpdir:
try:
shutil.rmtree(self.tmpdir)
except OSError:
pass
self.tmpdir = None
if self.output_files:
for output_file in self.output_files:
try:
os.remove(output_file)
except OSError:
pass
self.output_files = None
def setup_env(self, test_name='main', systemtemp=False):
'''Setup the env for a particular test domain'''
# if we already called setup_env, clean it first
self.clean_env()
self.tmpdir = tempfile.mkdtemp()
os.environ['DESKTOP_SESSION'] = 'migtestsession'
home_migration_dir = os.path.join(self.tmpdir, 'home')
self.migration_legacy_home_file = os.path.join(home_migration_dir, 'user_session_migration-migtestsession')
os.environ['XDG_CURRENT_DESKTOP'] = 'my:desktop'
self.migration_home_file = os.path.join(home_migration_dir, 'user-session-migration', 'my:desktop.state')
os.environ['XDG_DATA_HOME'] = home_migration_dir
if systemtemp:
system_data_path = os.path.join(self.tmpdir, 'system')
os.environ['XDG_DATA_DIRS'] = system_data_path
else:
system_data_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), 'data', test_name))
os.environ['XDG_DATA_DIRS'] = system_data_path
# loading the expected result
with open(os.path.join(system_data_path, 'output_files')) as f:
self.output_files = json.load(f)
self.script_path = os.path.join(system_data_path, 'user-session-migration', 'scripts')
def run_migration(self, command=None, verbose=True, additional_params=None):
'''helper to run migration tool'''
if not command:
command = [os.path.join(os.path.dirname(__file__), '..', 'src', 'user-session-migration')]
if verbose:
command.append('--verbose')
if additional_params:
command.extend(additional_params)
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
return (stdout.decode('UTF-8'), stderr.decode('UTF-8'))
def test_no_output_default(self):
'''Test that by default, there is no output'''
self.setup_env(systemtemp=True)
(stdout, stderr) = self.run_migration(verbose=False)
self.assertEqual(stdout, '')
self.assertEqual(stderr, '')
# nothing should be created
self.assertFalse(os.path.isfile(self.migration_home_file))
self.assertFalse(os.path.isfile(self.migration_legacy_home_file))
def test_detect_dirs_doesn_t_exist(self):
'''Test that some system dirs doesn't exist and nothing is done (but that multiple directories are detected)'''
self.setup_env(systemtemp=True)
os.environ['XDG_DATA_DIRS'] = '{}:{}'.format(os.environ['XDG_DATA_DIRS'], os.path.join(self.tmpdir, 'system2'))
self.home_migration_dir = '/tmp/migrationstests'
(stdout, stderr) = self.run_migration()
self.assertEqual(stdout, f"""Using state file '{self.migration_home_file}'
Directory '{os.path.join(self.tmpdir, 'system', 'user-session-migration/scripts')}' does not exist, nothing to do
Directory '{os.path.join(self.tmpdir, 'system2', 'user-session-migration/scripts')}' does not exist, nothing to do
""")
self.assertEqual(stderr, '')
# nothing should be created
self.assertFalse(os.path.isfile(self.migration_home_file))
def test_migration(self):
'''Test that the migration happens as expected'''
before_run_timestamp = int(time.time())
time.sleep(1) # for the timing test
(stdout, stderr) = (self.run_migration())
# ensure scripts are executed in the right order
self.assertEqual(stdout, f"""Using state file '{self.migration_home_file}'
Using '{self.script_path}' directory
Executing: {os.path.join(self.script_path, '01_test.sh')}
Executing: {os.path.join(self.script_path, '02_test.sh')}
Executing: {os.path.join(self.script_path, '08_test.sh')}
Executing: {os.path.join(self.script_path, '10_test.sh')}
Executing: {os.path.join(self.script_path, '15_skipping_test.py')}
'{os.path.join(self.script_path, '15_skipping_test.py')}' exited with SKIP exit code
stdout:
stderr:
Executing: {os.path.join(self.script_path, '20_failing_test.sh')}
""")
# 08_test.sh is not marked as executable so it should fail to execute
self.assertEqual(stderr, f"""Failed to execute child process “{os.path.join(self.script_path, '08_test.sh')}” (Permission denied)
stdout: (null)
stderr: (null)
'{os.path.join(self.script_path, '20_failing_test.sh')}' exited with an error: Child process exited with code 20
stdout:
stderr:
""")
# ensure that the script are indeed have been run/not run and created the expected touched file
for output_file in self.output_files:
if self.output_files[output_file]:
self.assertTrue(os.path.isfile(output_file))
else:
self.assertFalse(os.path.isfile(output_file))
# ensure that the scripts are marked as done and exit under right session name (encoded in migration_home_file)
self.config.read(self.migration_home_file)
time.sleep(1) # for the timing test
# check the timestamp
after_run_timestamp = int(time.time())
self.assertTrue(self.config.getint('State', 'timestamp') < after_run_timestamp)
self.assertTrue(self.config.getint('State', 'timestamp') > before_run_timestamp)
# ensure that the scripts are stamped as migrated
self.assertEqual([elem for elem in sorted(self.config.get('State', 'migrated').split(';')) if elem], ['01_test.sh', '02_test.sh', '10_test.sh'])
def test_migration_with_dry_run(self):
'''Test that the migration claimed to happen as expected in dry run mode'''
(stdout, stderr) = (self.run_migration(additional_params=["--dry-run"]))
# ensure scripts are executed in the right order
self.assertEqual(stdout, f"""Using state file '{self.migration_home_file}'
Using '{self.script_path}' directory
Executing: {os.path.join(self.script_path, '01_test.sh')}
Executing: {os.path.join(self.script_path, '02_test.sh')}
Executing: {os.path.join(self.script_path, '08_test.sh')}
Executing: {os.path.join(self.script_path, '10_test.sh')}
Executing: {os.path.join(self.script_path, '15_skipping_test.py')}
Executing: {os.path.join(self.script_path, '20_failing_test.sh')}
""")
# ensure scripts 8 is not executed (and so no failure)
self.assertEqual(stderr, '')
# ensure that the script have indeed not be run
for output_file in self.output_files:
self.assertFalse(os.path.isfile(output_file))
# ensure that the scripts are not marked as done
self.assertFalse(os.path.isfile(self.migration_home_file))
def check_subsequent_runs_no_effect(self, migration_file):
'''Test that subsequent runs doesn't have any effect'''
(stdout, stderr) = (self.run_migration())
self.assertNotEqual(stdout, '')
self.assertNotEqual(stderr, '')
self.assertTrue(os.path.isfile(migration_file))
with open(migration_file) as f:
home_file = f.read()
(stdout, stderr) = (self.run_migration())
self.assertEqual(stdout, f"""Using state file '{migration_file}'
Directory '{self.script_path}' all uptodate, nothing to do
""")
self.assertEqual(stderr, '')
with open(migration_file) as f:
second_home_file = f.read()
self.assertEqual(home_file, second_home_file)
def test_subsequent_runs_no_effect(self):
'''Test that subsequent runs doesn't have any effect'''
self.check_subsequent_runs_no_effect(self.migration_home_file)
def test_subsequent_runs_no_effect_with_legacy_file(self):
'''Test that subsequent runs doesn't have any effect'''
os.makedirs(os.path.dirname(self.migration_legacy_home_file), exist_ok=True)
os.close(os.open(self.migration_legacy_home_file, os.O_CREAT))
os.makedirs(os.path.dirname(self.migration_home_file), exist_ok=True)
os.close(os.open(self.migration_home_file, os.O_CREAT))
self.check_subsequent_runs_no_effect(self.migration_legacy_home_file)
def test_subsequent_runs_with_new_script(self):
'''Test subsequent runs with a new script'''
(stdout, stderr) = (self.run_migration())
# remove all migration content (to ensure later that scripts are not reexecuted)
for output_file in self.output_files:
if self.output_files[output_file]:
os.remove(output_file)
# add a script to be runned in the same directory
time.sleep(1) # ensure that the directory is touched after the first run
new_script = os.path.join(self.script_path, '08_testexecute.sh')
shutil.copy(os.path.join(self.script_path, '08_test.sh'), new_script)
os.chmod(new_script, stat.S_IREAD + stat.S_IEXEC)
before_second_run = int(time.time())
time.sleep(1) # for the timing test
(stdout, stderr) = (self.run_migration())
time.sleep(1) # for the timing test
after_second_run = int(time.time())
self.config.read(self.migration_home_file)
# remove the script
os.remove(new_script)
# check the debug output
files='(01|02|10)'
self.assertTrue(re.match(f"""Using state file '{self.migration_home_file}'
Using '{self.script_path}' directory
File '{files}_test.sh already migrated, skipping
File '{files}_test.sh already migrated, skipping
File '{files}_test.sh already migrated, skipping
Executing: {re.escape(os.path.join(self.script_path, '08_test.sh'))}
Executing: {re.escape(os.path.join(self.script_path, '08_testexecute.sh'))}
""", stdout))
self.assertEqual(stderr, f"""Failed to execute child process “{os.path.join(self.script_path, '08_test.sh')}” (Permission denied)
stdout: (null)
stderr: (null)
'{os.path.join(self.script_path, '20_failing_test.sh')}' exited with an error: Child process exited with code 20
stdout:
stderr:
""")
# inverse the condition to ensure only the latest copied script has been executed
for output_file in self.output_files:
if not self.output_files[output_file]:
self.assertTrue(os.path.isfile(output_file))
else:
self.assertFalse(os.path.isfile(output_file))
# check that the timestamp is from the second run
self.assertTrue(self.config.getint('State', 'timestamp') < after_second_run)
self.assertTrue(self.config.getint('State', 'timestamp') > before_second_run)
# ensure that the new script is stamped as migrated
self.assertEqual([elem for elem in sorted(self.config.get('State', 'migrated').split(';')) if elem], ['01_test.sh', '02_test.sh', '08_testexecute.sh', '10_test.sh'])
def test_run_only_one_script(self):
'''Test that you can run one script only'''
(stdout, stderr) = self.run_migration(additional_params=["--file={}".format(os.path.join(self.script_path, '10_test.sh'))])
# check the output that only this file was ran
self.assertEqual(stdout, f"""Using state file '{self.migration_home_file}'
Executing: {os.path.join(self.script_path, '10_test.sh')}
""")
self.assertEqual(stderr, '')
# check that only this touched file has been created
for output_file in self.output_files:
if '10' in output_file:
self.assertTrue(os.path.isfile(output_file))
else:
self.assertFalse(os.path.isfile(output_file))
# nothing should be logged
self.assertFalse(os.path.isfile(self.migration_home_file))
if __name__ == '__main__':
unittest.main()
|