1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011 Odd Simon Simonsen <oddsimons@gmail.com>
# Copyright (C) 2012 Steffen Hoffmann <hoff.st@web.de>
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution.
#
from __future__ import with_statement
import shutil
import tempfile
import unittest
from trac import __version__ as trac_version
from trac.db import Table, Column, Index
from trac.db.api import DatabaseManager
from trac.test import EnvironmentStub
from tractags import db_default
from tractags.db import TagSetup
class TagSetupTestCase(unittest.TestCase):
def setUp(self):
self.env = EnvironmentStub(enable=['trac.*'])
self.env.path = tempfile.mkdtemp()
self.db_mgr = DatabaseManager(self.env)
def tearDown(self):
self.env.shutdown()
shutil.rmtree(self.env.path)
# Helpers
def _get_cursor_description(self, cursor):
# Cursors don't look the same across Trac versions
if trac_version < '0.12':
return cursor.description
else:
return cursor.cursor.description
def _revert_tractags_schema_init(self):
with self.env.db_transaction as db:
db("DROP TABLE IF EXISTS tags")
db("DROP TABLE IF EXISTS tags_change")
db("DELETE FROM system WHERE name='tags_version'")
db("DELETE FROM permission WHERE action %s" % db.like(),
('TAGS_%',))
def get_db_version(self):
for version, in self.env.db_query("""
SELECT value FROM system
WHERE name='tags_version'
"""):
return int(version)
# Tests
def test_new_install(self):
setup = TagSetup(self.env)
# Current tractags schema is setup with enabled component anyway.
# Revert these changes for clean install testing.
self._revert_tractags_schema_init()
self.assertEquals(0, setup.get_schema_version())
self.assertTrue(setup.environment_needs_upgrade())
setup.upgrade_environment()
self.assertFalse(setup.environment_needs_upgrade())
with self.env.db_query as db:
cursor = db.cursor()
cursor.execute("SELECT * FROM tags")
cols = [col[0] for col in self._get_cursor_description(cursor)]
self.assertEquals([], cursor.fetchall())
self.assertEquals(['tagspace', 'name', 'tag'], cols)
self.assertEquals(db_default.schema_version, self.get_db_version())
def test_upgrade_schema_v1(self):
# Ancient, unversioned schema - wiki only.
schema = [
Table('wiki_namespace')[
Column('name'),
Column('namespace'),
Index(['name', 'namespace']),
]
]
setup = TagSetup(self.env)
# Current tractags schema is setup with enabled component anyway.
# Revert these changes for clean install testing.
self._revert_tractags_schema_init()
connector = self.db_mgr._get_connector()[0]
with self.env.db_transaction as db:
for table in schema:
for stmt in connector.to_sql(table):
db(stmt)
# Populate table with migration test data.
db("""INSERT INTO wiki_namespace (name, namespace)
VALUES ('WikiStart', 'tag')""")
tags = self.env.db_query("SELECT * FROM wiki_namespace")
self.assertEquals([('WikiStart', 'tag')], tags)
self.assertEquals(1, setup.get_schema_version())
self.assertTrue(setup.environment_needs_upgrade())
setup.upgrade_environment()
self.assertFalse(setup.environment_needs_upgrade())
with self.env.db_query as db:
cursor = db.cursor()
cursor.execute("SELECT * FROM tags")
tags = cursor.fetchall()
cols = [col[0] for col in self._get_cursor_description(cursor)]
# Db content should be migrated.
self.assertEquals([('wiki', 'WikiStart', 'tag')], tags)
self.assertEquals(['tagspace', 'name', 'tag'], cols)
self.assertEquals(db_default.schema_version, self.get_db_version())
def test_upgrade_schema_v2(self):
# Just register a current, but unversioned schema.
schema = [
Table('tags', key=('tagspace', 'name', 'tag'))[
Column('tagspace'),
Column('name'),
Column('tag'),
Index(['tagspace', 'name']),
Index(['tagspace', 'tag']),
]
]
setup = TagSetup(self.env)
# Current tractags schema is setup with enabled component anyway.
# Revert these changes for clean install testing.
self._revert_tractags_schema_init()
connector = self.db_mgr._get_connector()[0]
with self.env.db_transaction as db:
for table in schema:
for stmt in connector.to_sql(table):
db(stmt)
# Populate table with test data.
db("""INSERT INTO tags (tagspace, name, tag)
VALUES ('wiki', 'WikiStart', 'tag')""")
tags = self.env.db_query("SELECT * FROM tags")
self.assertEquals([('wiki', 'WikiStart', 'tag')], tags)
self.assertEquals(2, setup.get_schema_version())
self.assertTrue(setup.environment_needs_upgrade())
setup.upgrade_environment()
self.assertFalse(setup.environment_needs_upgrade())
with self.env.db_query as db:
cursor = db.cursor()
cursor.execute("SELECT * FROM tags")
tags = cursor.fetchall()
cols = [col[0] for col in self._get_cursor_description(cursor)]
# Db should be unchanged.
self.assertEquals([('wiki', 'WikiStart', 'tag')], tags)
self.assertEquals(['tagspace', 'name', 'tag'], cols)
self.assertEquals(db_default.schema_version, self.get_db_version())
def test_upgrade_schema_v3(self):
# Add table for tag change records to the schema.
schema = [
Table('tags', key=('tagspace', 'name', 'tag'))[
Column('tagspace'),
Column('name'),
Column('tag'),
Index(['tagspace', 'name']),
Index(['tagspace', 'tag']),
]
]
setup = TagSetup(self.env)
# Current tractags schema is setup with enabled component anyway.
# Revert these changes for clean install testing.
self._revert_tractags_schema_init()
connector = self.db_mgr._get_connector()[0]
with self.env.db_transaction as db:
for table in schema:
for stmt in connector.to_sql(table):
db(stmt)
# Preset system db table with old version.
db("""INSERT INTO system (name, value)
VALUES ('tags_version', '3')""")
self.assertEquals(3, setup.get_schema_version())
self.assertTrue(setup.environment_needs_upgrade())
setup.upgrade_environment()
self.assertFalse(setup.environment_needs_upgrade())
with self.env.db_query as db:
cursor = db.cursor()
cursor.execute("SELECT * FROM tags_change")
cols = [col[0] for col in self._get_cursor_description(cursor)]
self.assertEquals(['tagspace', 'name', 'time', 'author',
'oldtags', 'newtags'], cols)
self.assertEquals(db_default.schema_version, self.get_db_version())
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TagSetupTestCase))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
|