1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
|
# Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 of the
# License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
# 02110-1301 USA
"""
How to run the tests in this file
==================================
1. Copy the settings.py.in and rename it to settings.py. Addapt the
settings.py file you have just created to match the settings of
your test environmnent.
2. Open a console and cd to the directory where these files live
(plugins/migration/unit-tests).
3. Execute this command to run the tests:
python -m unittest test_wbcopytables
"""
import unittest
import types
import os
import sys
import fnmatch
import subprocess
import hashlib
import functools
import logging
import re
import platform
import settings
_this_dir = os.path.abspath(os.path.dirname(__file__))
# FIXME: password containing spaces will only be scrambled up to the first space:
_pwd_regex = re.compile(r"""(?<=\s-p).*?(?=\s|$)|"""
r"""(?<=\s--source-password=).*?(?=\s|$)|"""
r"""(?<=\s--target-password=).*?(?=\s|$)|"""
r"""(?<=\s--password=).*?(?=\s|$)""")
def scramble_pwd(cmd, replacement='***'):
return _pwd_regex.sub(replacement, cmd)
if settings.log_file:
logging.basicConfig(filename=settings.log_file,level=logging.DEBUG)
else:
logging.basicConfig(level=logging.DEBUG)
class CopyTablesTestCase(unittest.TestCase):
thread_count = 1
@classmethod
def setUpClass(cls):
"""Preparations before running all tests in this class.
Sets the environment variable for the linker to find the wb base lib needed by wbcopytables' executable.
"""
OS = platform.system()
cls._env_var_original = None
if OS == 'Linux':
env_var = 'LD_LIBRARY_PATH'
# set lib_dir to dir(wbcopytables executable)/../lib/mysql-workbench
lib_dir = os.path.join(os.path.dirname(os.path.dirname(settings.copytables_path)), 'lib', 'mysql-workbench')
sep = ':'
elif OS == 'Windows':
return # In our current build setup, base.dll is in the same dir than wbcopytables.exe, so no need to tweak
# the path. Remove this return if it should be set in the future
env_var = 'PATH'
lib_dir = os.path.dirname(settings.copytables_path)
sep = ';'
else: # Mac OS X
return # TODO
env_var = 'DYLD_LIBRARY_PATH'
lib_dir = '' # TODO
sep = ':'
cls._env_var_original = (env_var, os.getenv(env_var))
os.putenv(env_var, (cls._env_var_original[1] + sep + lib_dir) if cls._env_var_original[1] else lib_dir)
def setUp(self):
"""Preparations before running each test in this class.
Creates test database in the MySQL test servers.
"""
for target_name, target_info in settings.mysql_instances:
mysql_call = (settings.mysql_client +
" -u %(user)s -p%(password)s -h %(host)s -P %(port)d -e 'CREATE DATABASE %(database)s'" % target_info
)
logging.debug('Calling the MySQL Client with command: %s' % scramble_pwd(mysql_call))
subprocess.Popen(mysql_call, shell=True).wait()
def _param_test(self, test_info, source_instance, source_info, target_instance, target_info):
"""Parameterized test function.
The individual tests in this class call this functions with their specific data. This test
does the following:
1. Connect to the given source instance and run there the associated .sql code that sets
up the test data in the source server.
2. Connect to the given mysql instance and run there the associated .sql code that creates
the target table that is to be loaded with data with wbcopytables.
3. Call wbcopytables to move the data from the source to the target server.
4. Run mysqldump in the target server and compare its output with the expected output for
the test. The test fails if they differ.
Args:
test_info: A dictionary containing information about the test (test name, test files, etc.)
like the one that returns the available_tests function below.
source_instance: A string with the name of a source server instance. This name is also the
key to that instance in the settings.source_instances dict.
source_info: A dictionary with information about the source server instance. Is the dict
associated to the source_instance key in settings.source_instances.
target_instance: A string with the name of a mysql target server instance that will be used
by the test. Like with source_instance, this is also the key to that instance settings
dict in settings.mysql_instances.
target_info: A dictionary with information about the mysql target server instance. This is
the dict associated to the target_instance key in settings.mysql_instances.
"""
# Run the source script in the source RDBMS instance:
logging.debug('Importing python module %s' % source_info['module'])
__import__(source_info['module'])
module = sys.modules[source_info['module']]
try:
source_conn_str = source_info['connection_string'] % source_info
logging.debug('source_conn_str = %s' % scramble_pwd(source_conn_str))
conn = module.connect(source_conn_str)
except Exception:
self.fail('Could not connect to the %s instance' % source_instance)
else:
logging.debug('Connected to source instance')
cursor = conn.cursor()
script = open(test_info['source'], 'rb').read()
logging.debug('Executing this script in source instance: \n%s' % '\t'.join(line + '\n' for line in script.split('\n')))
if hasattr(cursor, 'executescript'):
logging.debug('Running the whole script in one call')
cursor.executescript(script)
else:
logging.debug('Running the whole script sentence by sentence')
for stmt in script.split(';'):
cursor.execute(stmt)
conn.commit()
# Run the target script in the target MySQL instance:
mysql_call = (settings.mysql_client + ' -u %(user)s -p%(password)s -h %(host)s -P %(port)d %(database)s < ' % target_info
+ test_info['target'] )
logging.debug('Calling the MySQL Client with command: %s' % scramble_pwd(mysql_call))
subprocess.Popen(mysql_call, shell=True).wait()
# Call copytables to transfer the data from source to target:
copytables_params = (' --pythondbapi-source="%(module)s' % source_info + '''://'%s'"''' % source_conn_str +
' --source-password="%(password)s"' % source_info +
' --target="%(user)s@%(host)s:%(port)d" --target-password="%(password)s"' % target_info +
' --table-file="%(table_file)s"' % test_info +
' --thread-count=%u' % self.thread_count
)
logging.debug('Calling copytables with command: %s' % settings.copytables_path + scramble_pwd(copytables_params))
subprocess.Popen(settings.copytables_path + copytables_params, shell=True).wait()
# Dump the MySQL data and compare it with the expected data:
mysqldump_call = settings.mysql_dump + ' -u %(user)s -p%(password)s -h %(host)s -P %(port)d --compact %(database)s' % target_info
logging.debug('Calling the MySQL Dump with command: %s' % scramble_pwd(mysqldump_call))
p = subprocess.Popen(mysqldump_call, shell=True, stdout=subprocess.PIPE)
dumped_data = p.communicate()[0]
dumped_hash = hashlib.md5(dumped_data).hexdigest()
expected_data = open(test_info['expected'][target_instance], 'rb').read()
expected_hash = hashlib.md5(expected_data).hexdigest()
if dumped_hash != expected_hash:
logging.error('The dumped SQL file is different from the expected one.\n' +
60*'-' + '\nExpected file:\n' + 60*'-' +
'\n%s\n' % expected_data +
60*'-' + '\nDumped file:\n' + 60*'-' +
'\n%s\n' % dumped_data + 60*'-'
)
self.assertEqual(dumped_hash, expected_hash)
def tearDown(self):
"""Clean up after running each test in this class.
Drops the test database in the MySQL test servers.
"""
for target_name, target_info in settings.mysql_instances:
mysql_call = (settings.mysql_client +
" -u %(user)s -p%(password)s -h %(host)s -P %(port)d %(database)s -e 'DROP DATABASE %(database)s'" % target_info
)
logging.debug('Calling the MySQL Client with command: %s' % scramble_pwd(mysql_call))
subprocess.Popen(mysql_call, shell=True).wait()
@classmethod
def tearDownClass(cls):
"""Clean up after running all tests in this class.
Restore original value of modified environment variables.
"""
if cls._env_var_original:
os.putenv(*cls._env_var_original)
def available_tests(path):
"""Iterates over available tests in a given path.
Walks through a directory structure yielding information about the tests it founds.
A test is identified by the presence of the files <test_name>_source.sql,
<test_name>_target.sql and either a <test_name>_expected_target_data.sql or a
subset of <test_name>_<mysql_server_name>_expected_target_data.sql files for each
defined mysql server in settings.py.
Args:
path: A string containing the path to the directory where the tests are stored.
The tests can be in separate directories inside <path> but all the test files
for a given test must reside in the same directory.
Yields:
A dictionary with information for each individual test. The dictionary contains
the following info:
'test_name' : The name of the test.
'source' : The .sql file to be run in the source RDBMS to set up the test
table(s) that wbcopytables will use to extract the data from.
'target' : The .sql file to be run in each target MySQL server. This file
usually sets up the table(s) that will contain the migrated data.
'table_file' : The table file that wbcopytables will use.
'expected' : A dictionary mapping a MySQL server name (obtained from iterating
over the MySQL servers defined in settings.py) to the expected result
file, obtained from dumping the target table that contains the expected
data using "mysqldump --compact".
"""
if os.path.isdir(path):
servers = [server[0] for server in settings.mysql_instances]
for root, dirs, files in os.walk(path):
for candidate_test in fnmatch.filter(files, '*_source.sql'):
test_name = candidate_test[:-11]
permitted_targets = ( [test_name+'_expected_target_data.sql'] +
[test_name+'_%s_expected_target_data.sql' % server for server in servers]
)
if (test_name+'_target.sql' in files and
test_name+'_table_file.txt' in files and
any(expected_file in files for expected_file in permitted_targets)
):
expected = {}
for sname in servers:
if (test_name+'_%s_expected_target_data.sql' % sname) in files:
expected[sname] = os.path.join(root, test_name + '_%s_expected_target_data.sql' % sname)
elif (test_name + '_expected_target_data.sql') in files:
expected[sname] = os.path.join(root, test_name + '_expected_target_data.sql')
else:
expected[sname] = None
yield( { 'test_name' : test_name,
'source' : os.path.join(root, candidate_test),
'target' : os.path.join(root, test_name + '_target.sql'),
'table_file': os.path.join(root, test_name + '_table_file.txt'),
'expected' : expected
}
)
# Generate the tests based on the directory structure and the files on disk:
for source_instance, source_info in settings.source_instances:
for test_info in available_tests(os.path.join(_this_dir, 'fixtures', source_instance)):
for target_instance, target_info in settings.mysql_instances:
# Dynamically add a meaningful test function to the test case class defined above:
setattr(CopyTablesTestCase, 'test_%s_%s_%s' % (test_info['test_name'], source_instance, target_instance),
types.MethodType(functools.partial(CopyTablesTestCase._param_test,
test_info=test_info,
source_instance = source_instance,
source_info = source_info,
target_instance = target_instance,
target_info = target_info
),
None,
CopyTablesTestCase)
)
if __name__ == '__main__':
unittest.main()
|