1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
# FIXME: This tests are now broken after the redesign of the log reader classes
import sys
import os
import re
import unittest
import StringIO
import wb_admin_test_main
import wb_log_reader
base_str = ("""Preamble<field1><field2><field3>
extra line""" +
"""<field1><field2><field3>extra text"""*4
)
slow_query_str = ("""This is some dummy text
to see that it can ignore the first part
# Time: 110901 12:20:56
# User@Host: root[root] @ localhost []
# Query_time: 3.000833 Lock_time: 0.000000 Rows_sent: 1 Rows_examined: 0
SET timestamp=1314897656;
SELECT sleep(3);
/usr/local/mysql/bin/mysqld, Version: 5.5.15-log (MySQL Community Server (GPL)). started with:
Tcp port: 3306 Unix socket: /tmp/mysql.sock
Time Id Command Argument
/usr/local/mysql/bin/mysqld, Version: 5.5.15-log (MySQL Community Server (GPL)). started with:
Tcp port: 3306 Unix socket: /tmp/mysql.sock
Time Id Command Argument
""" +
"""# Time: 110901 12:20:56
# User@Host: root[root] @ localhost []
# Query_time: 3.000833 Lock_time: 0.000000 Rows_sent: 1 Rows_examined: 0
SET timestamp=1314897656;
SELECT <some large query>;
<that spans several>
<lines>
"""*8)
@unittest.skip('Skipped until more relevant tests are added')
class TestQueryLogReader(wb_admin_test_main.AdminTestCase):
@classmethod
def setUpClass(cls):
cls.local_db_fixtures.load_sample_database(sample_db='world', db_name='testdb')
cls.local_db_fixtures.execute_script(os.path.join(os.path.dirname(__file__), 'fixtures/sample_general_log.sql'))
def test_general_log_creation(self):
general_log = wb_log_reader.GeneralQueryLogReader(self.local_ctrl_be, 'test.general_log')
self.assertIsInstance(general_log, wb_log_reader.GeneralQueryLogReader)
#class TestBaseReaderClass(unittest.TestCase):
# def setUp(self):
# self.regex = re.compile(r'<field(1)><field(2)><field(3)>')
# self.logFile1 = wb_log_file_reader.BaseLogFileReader(StringIO.StringIO(base_str),
# self.regex,
# 40,
# False,
# True)
# self.logFile2 = wb_log_file_reader.BaseLogFileReader(StringIO.StringIO(base_str),
# self.regex,
# 40,
# False,
# False)
#
#
# def test_start(self):
# self.assertEqual(self.logFile1.first_record_pos, 8)
# self.assertEqual(self.logFile2.first_record_pos, 8)
#
# def test_size(self):
# self.assertEqual(self.logFile1.file_size, len(base_str), 'file size mismatch')
#
# def test_shorten_query_field(self):
# data = '#'*1024
# result = self.logFile1._shorten_query_field(data)
# self.assertTrue(result.startswith('#'*256))
# self.assertTrue('1.0 KB' in result)
#
# def test_chunk_parsing1(self):
# records = self.logFile2._parse_chunk(base_str)
# self.assertEqual(records, [ ['1', '2', '3'] ]*5)
#
# def test_chunk_parsing2(self):
# records = self.logFile1._parse_chunk(base_str)
# result = [ ['1', '2', '3\nextra line'] ]
# result.extend([ ['1', '2', '3extra text'] ]*4)
# self.assertEqual(records, result)
#
# def test_read_next_chunk(self):
# log_reader = wb_log_file_reader.BaseLogFileReader(StringIO.StringIO(base_str),
# self.regex,
# 40,
# False,
# True)
# # If positioned at the EOF (this is the initial state) a call to
# # _read_next_chunk should raise an exception:
# self.assertRaises(IndexError, log_reader._read_next_chunk)
# log_reader.chunk_end = log_reader.first_record_pos
# log_reader.chunk_size = 5
# self.assertEqual(log_reader._read_next_chunk(), '<field1><field2><field3>\nextra line')
#
#
# def test_read_previous_chunk(self):
# log_reader = wb_log_file_reader.BaseLogFileReader(StringIO.StringIO(base_str),
# self.regex,
# 40000,
# False,
# True)
# self.assertEqual(log_reader._read_previous_chunk(), base_str[log_reader.first_record_pos:])
# # When the file is smaller than chunk_size, once the only existent chunk is
# # read, there are neither previous nor next chunks:
# self.assertRaises(IndexError, log_reader._read_next_chunk)
# self.assertRaises(IndexError, log_reader._read_previous_chunk)
#
# def test_adjust_chunk_start(self):
# log_reader = wb_log_file_reader.BaseLogFileReader(StringIO.StringIO(base_str),
# self.regex,
# 5,
# False,
# True)
# log_reader.chunk_start = log_reader.file_size - 3
# log_reader._adjust_chunk_start()
# self.assertEqual(log_reader.chunk_start,
# len(base_str) - 40 + log_reader.pat.search(base_str[-40:]).start())
# log_reader.chunk_start = 0
# log_reader._adjust_chunk_start()
# self.assertEqual(log_reader.chunk_start, log_reader.first_record_pos)
#
# def test_adjust_chunk_end(self):
# self.logFile2.chunk_end = self.logFile2.file_size - 2
# self.logFile2._adjust_chunk_end()
# self.assertEqual(self.logFile2.chunk_end, self.logFile2.file_size)
# self.logFile2.chunk_end = self.logFile2.first_record_pos
# self.logFile2._adjust_chunk_end()
# self.assertEqual(self.logFile2.chunk_end, self.logFile2.first_record_pos)
#
# @unittest.skip('to be done') # remove this line when ready to run this test
# def test_updated_log_file(self):
# pass
#
##=============================================================================
#class TestSlowReaderClass(unittest.TestCase):
# def setUp(self):
# self.logFile1 = wb_log_file_reader.SlowLogFileReader(StringIO.StringIO(slow_query_str))
#
# @unittest.skip('to be done') # remove this line when ready to run this test
# def test_start(self):
# self.assertEqual(self.logFile1.previous(), [])
#if __name__ == '__main__':
# unittest.main()
|