File: test_wbcopytables.py

package info (click to toggle)
mysql-workbench 6.3.8%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 113,932 kB
  • ctags: 87,814
  • sloc: ansic: 955,521; cpp: 427,465; python: 59,728; yacc: 59,129; xml: 54,204; sql: 7,091; objc: 965; makefile: 638; sh: 613; java: 237; perl: 30; ruby: 6; php: 1
file content (290 lines) | stat: -rw-r--r-- 14,679 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 of the
# License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
# 02110-1301  USA

"""
How to run the tests in this file
==================================

1. Copy the settings.py.in and rename it to settings.py. Addapt the
   settings.py file you have just created to match the settings of 
   your test environmnent.
2. Open a console and cd to the directory where these files live
   (plugins/migration/unit-tests).
3. Execute this command to run the tests:
    python -m unittest test_wbcopytables

"""


import unittest
import types
import os
import sys
import fnmatch
import subprocess
import hashlib
import functools
import logging
import re
import platform

import settings

_this_dir = os.path.abspath(os.path.dirname(__file__))


# FIXME: password containing spaces will only be scrambled up to the first space:
_pwd_regex = re.compile(r"""(?<=\s-p).*?(?=\s|$)|"""
                        r"""(?<=\s--source-password=).*?(?=\s|$)|"""
                        r"""(?<=\s--target-password=).*?(?=\s|$)|"""
                        r"""(?<=\s--password=).*?(?=\s|$)""")

def scramble_pwd(cmd, replacement='***'):
    return _pwd_regex.sub(replacement, cmd)

if settings.log_file:
    logging.basicConfig(filename=settings.log_file,level=logging.DEBUG)
else:
    logging.basicConfig(level=logging.DEBUG)


class CopyTablesTestCase(unittest.TestCase):
    thread_count = 1

    @classmethod
    def setUpClass(cls):
        """Preparations before running all tests in this class.

        Sets the environment variable for the linker to find the wb base lib needed by wbcopytables' executable.
        """
        OS = platform.system()
        cls._env_var_original = None
        if OS == 'Linux':
            env_var = 'LD_LIBRARY_PATH'
            # set lib_dir to dir(wbcopytables executable)/../lib/mysql-workbench
            lib_dir = os.path.join(os.path.dirname(os.path.dirname(settings.copytables_path)), 'lib', 'mysql-workbench')
            sep = ':'
        elif OS == 'Windows':
            return  # In our current build setup, base.dll is in the same dir than wbcopytables.exe, so no need to tweak
                    # the path. Remove this return if it should be set in the future
            env_var = 'PATH'
            lib_dir = os.path.dirname(settings.copytables_path)
            sep = ';'
        else:  # Mac OS X
            return  # TODO
            env_var = 'DYLD_LIBRARY_PATH'
            lib_dir = ''  # TODO
            sep = ':'
        cls._env_var_original = (env_var, os.getenv(env_var))
        os.putenv(env_var, (cls._env_var_original[1] + sep + lib_dir) if cls._env_var_original[1] else lib_dir)

    def setUp(self):
        """Preparations before running each test in this class.

        Creates test database in the MySQL test servers.
        """
        for target_name, target_info in settings.mysql_instances:
            mysql_call = (settings.mysql_client + 
                          " -u %(user)s -p%(password)s -h %(host)s -P %(port)d -e 'CREATE DATABASE %(database)s'" % target_info
                         )
            logging.debug('Calling the MySQL Client with command: %s' % scramble_pwd(mysql_call))
            subprocess.Popen(mysql_call, shell=True).wait()

    def _param_test(self, test_info, source_instance, source_info, target_instance, target_info):
        """Parameterized test function.
        
        The individual tests in this class call this functions with their specific data. This test
        does the following:
            1. Connect to the given source instance and run there the associated .sql code that sets
               up the test data in the source server.
            2. Connect to the given mysql instance and run there the associated .sql code that creates
               the target table that is to be loaded with data with wbcopytables.
            3. Call wbcopytables to move the data from the source to the target server.
            4. Run mysqldump in the target server and compare its output with the expected output for
               the test. The test fails if they differ.

        Args:
            test_info: A dictionary containing information about the test (test name, test files, etc.)
                like the one that returns the available_tests function below.
            source_instance: A string with the name of a source server instance. This name is also the
                key to that instance in the settings.source_instances dict.
            source_info: A dictionary with information about the source server instance. Is the dict
                associated to the source_instance key in settings.source_instances.
            target_instance: A string with the name of a mysql target server instance that will be used
                by the test. Like with source_instance, this is also the key to that instance settings
                dict in settings.mysql_instances.
            target_info: A dictionary with information about the mysql target server instance. This is
                the dict associated to the target_instance key in settings.mysql_instances.
        """
        # Run the source script in the source RDBMS instance:
        logging.debug('Importing python module %s' % source_info['module'])
        __import__(source_info['module'])
        module = sys.modules[source_info['module']]
        try:
            source_conn_str = source_info['connection_string'] % source_info
            logging.debug('source_conn_str = %s' % scramble_pwd(source_conn_str))
            conn = module.connect(source_conn_str)
        except Exception:
            self.fail('Could not connect to the %s instance' % source_instance)
        else:
            logging.debug('Connected to source instance')
            cursor = conn.cursor()
            script = open(test_info['source'], 'rb').read()
            logging.debug('Executing this script in source instance: \n%s' % '\t'.join(line + '\n' for line in script.split('\n')))
            if hasattr(cursor, 'executescript'):
                logging.debug('Running the whole script in one call')
                cursor.executescript(script)
            else:
                logging.debug('Running the whole script sentence by sentence')
                for stmt in script.split(';'):
                    cursor.execute(stmt)
            conn.commit()

        # Run the target script in the target MySQL instance:
        mysql_call = (settings.mysql_client + ' -u %(user)s -p%(password)s -h %(host)s -P %(port)d %(database)s < ' % target_info
                      + test_info['target'] )
        logging.debug('Calling the MySQL Client with command: %s' % scramble_pwd(mysql_call))
        subprocess.Popen(mysql_call, shell=True).wait()

        # Call copytables to transfer the data from source to target:
        copytables_params = (' --pythondbapi-source="%(module)s' % source_info + '''://'%s'"''' % source_conn_str + 
                             ' --source-password="%(password)s"' % source_info +
                             ' --target="%(user)s@%(host)s:%(port)d" --target-password="%(password)s"' % target_info +
                             ' --table-file="%(table_file)s"' % test_info +
                             ' --thread-count=%u' % self.thread_count
                            )
        logging.debug('Calling copytables with command: %s' % settings.copytables_path + scramble_pwd(copytables_params))
        subprocess.Popen(settings.copytables_path + copytables_params, shell=True).wait()

        # Dump the MySQL data and compare it with the expected data:
        mysqldump_call = settings.mysql_dump + ' -u %(user)s -p%(password)s -h %(host)s -P %(port)d --compact %(database)s' % target_info
        logging.debug('Calling the MySQL Dump with command: %s' % scramble_pwd(mysqldump_call))
        p = subprocess.Popen(mysqldump_call, shell=True, stdout=subprocess.PIPE)
        dumped_data = p.communicate()[0]
        dumped_hash = hashlib.md5(dumped_data).hexdigest()
        expected_data = open(test_info['expected'][target_instance], 'rb').read()
        expected_hash = hashlib.md5(expected_data).hexdigest()
        if dumped_hash != expected_hash:
            logging.error('The dumped SQL file is different from the expected one.\n' +
                          60*'-' + '\nExpected file:\n' + 60*'-' +
                          '\n%s\n' % expected_data +
                          60*'-' + '\nDumped file:\n' + 60*'-' +
                          '\n%s\n' % dumped_data + 60*'-'
                         )
        self.assertEqual(dumped_hash, expected_hash)

    def tearDown(self):
        """Clean up after running each test in this class.
        
        Drops the test database in the MySQL test servers.
        """
        for target_name, target_info in settings.mysql_instances:
            mysql_call = (settings.mysql_client + 
                          " -u %(user)s -p%(password)s -h %(host)s -P %(port)d %(database)s -e 'DROP DATABASE %(database)s'" % target_info
                         )
            logging.debug('Calling the MySQL Client with command: %s' % scramble_pwd(mysql_call))
            subprocess.Popen(mysql_call, shell=True).wait()
       
    @classmethod
    def tearDownClass(cls):
        """Clean up after running all tests in this class.
        
        Restore original value of modified environment variables.
        """
        if cls._env_var_original:
            os.putenv(*cls._env_var_original)


def available_tests(path):
    """Iterates over available tests in a given path.
    
    Walks through a directory structure yielding information about the tests it founds.
    A test is identified by the presence of the files <test_name>_source.sql,
    <test_name>_target.sql and either a <test_name>_expected_target_data.sql or a
    subset of <test_name>_<mysql_server_name>_expected_target_data.sql files for each
    defined mysql server in settings.py.

    Args:
        path: A string containing the path to the directory where the tests are stored.
            The tests can be in separate directories inside <path> but all the test files
            for a given test must reside in the same directory.

    Yields:
        A dictionary with information for each individual test. The dictionary contains
        the following info:
            'test_name' : The name of the test.
            'source' : The .sql file to be run in the source RDBMS to set up the test
                table(s) that wbcopytables will use to extract the data from.
            'target' : The .sql file to be run in each target MySQL server. This file
                usually sets up the table(s) that will contain the migrated data.
            'table_file' : The table file that wbcopytables will use.
            'expected' : A dictionary mapping a MySQL server name (obtained from iterating
                over the MySQL servers defined in settings.py) to the expected result
                file, obtained from dumping the target table that contains the expected
                data using "mysqldump --compact".
    """
    if os.path.isdir(path):
        servers = [server[0] for server in settings.mysql_instances]
        for root, dirs, files in os.walk(path):
            for candidate_test in fnmatch.filter(files, '*_source.sql'):
                test_name = candidate_test[:-11]
                permitted_targets = (  [test_name+'_expected_target_data.sql'] +
                             [test_name+'_%s_expected_target_data.sql' % server for server in servers]
                          )
                if (test_name+'_target.sql' in files and
                    test_name+'_table_file.txt' in files and
                    any(expected_file in files for expected_file in permitted_targets)
                   ):
                    expected = {}
                    for sname in servers:
                        if (test_name+'_%s_expected_target_data.sql' % sname) in files:
                            expected[sname] = os.path.join(root, test_name + '_%s_expected_target_data.sql' % sname)
                        elif (test_name + '_expected_target_data.sql') in files:
                            expected[sname] = os.path.join(root, test_name + '_expected_target_data.sql')
                        else:
                            expected[sname] = None

                    yield( { 'test_name' : test_name,
                             'source'    : os.path.join(root, candidate_test),
                             'target'    : os.path.join(root, test_name + '_target.sql'),
                             'table_file': os.path.join(root, test_name + '_table_file.txt'),
                             'expected'  : expected
                           }
                         )



# Generate the tests based on the directory structure and the files on disk:
for source_instance, source_info in settings.source_instances:
    for test_info in available_tests(os.path.join(_this_dir, 'fixtures', source_instance)):
        for target_instance, target_info in settings.mysql_instances:
            # Dynamically add a meaningful test function to the test case class defined above:
            setattr(CopyTablesTestCase, 'test_%s_%s_%s' % (test_info['test_name'], source_instance, target_instance),
                    types.MethodType(functools.partial(CopyTablesTestCase._param_test,
                                                       test_info=test_info,
                                                       source_instance = source_instance,
                                                       source_info = source_info,
                                                       target_instance = target_instance,
                                                       target_info = target_info
                                                      ),
                                     None,
                                     CopyTablesTestCase)
                   )


if __name__ == '__main__':
    unittest.main()