File: sqlite.py

package info (click to toggle)
migrate 0.11.0-5~bpo9+1
  • links: PTS, VCS
  • area: main
  • in suites: stretch-backports
  • size: 1,000 kB
  • sloc: python: 5,853; makefile: 113; sh: 107
file content (228 lines) | stat: -rw-r--r-- 8,309 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
   `SQLite`_ database specific implementations of changeset classes.

   .. _`SQLite`: http://www.sqlite.org/
"""
try:  # Python 3
    from collections import MutableMapping as DictMixin
except ImportError:  # Python 2
    from UserDict import DictMixin
from copy import copy
import re
import sqlite3

from sqlalchemy.databases import sqlite as sa_base
from sqlalchemy.schema import ForeignKeyConstraint
from sqlalchemy.schema import UniqueConstraint

from migrate import exceptions
from migrate.changeset import ansisql


SQLiteSchemaGenerator = sa_base.SQLiteDDLCompiler


class SQLiteCommon(object):

    def _not_supported(self, op):
        raise exceptions.NotSupportedError("SQLite does not support "
            "%s; see http://www.sqlite.org/lang_altertable.html" % op)


class SQLiteHelper(SQLiteCommon):

    def _filter_columns(self, cols, table):
        """Splits the string of columns and returns those only in the table.

        :param cols: comma-delimited string of table columns
        :param table: the table to check
        :return: list of columns in the table
        """
        columns = []
        for c in cols.split(","):
            if c in table.columns:
                # There was a bug in reflection of SQLite columns with
                # reserved identifiers as names (SQLite can return them
                # wrapped with double quotes), so strip double quotes.
                columns.extend(c.strip(' "'))
        return columns

    def _get_constraints(self, table):
        """Retrieve information about existing constraints of the table

        This feature is needed for recreate_table() to work properly.
        """

        data = table.metadata.bind.execute(
            """SELECT sql
               FROM sqlite_master
               WHERE
                   type='table' AND
                   name=:table_name""",
            table_name=table.name
        ).fetchone()[0]

        UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)"
        constraints = []
        for name, cols in re.findall(UNIQUE_PATTERN, data):
            # Filter out any columns that were dropped from the table.
            columns = self._filter_columns(cols, table)
            if columns:
                constraints.extend(UniqueConstraint(*columns, name=name))

        FKEY_PATTERN = "CONSTRAINT (\w+) FOREIGN KEY \(([^\)]+)\)"
        for name, cols in re.findall(FKEY_PATTERN, data):
            # Filter out any columns that were dropped from the table.
            columns = self._filter_columns(cols, table)
            if columns:
                constraints.extend(ForeignKeyConstraint(*columns, name=name))

        return constraints

    def recreate_table(self, table, column=None, delta=None,
                       omit_constraints=None):
        table_name = self.preparer.format_table(table)

        # we remove all indexes so as not to have
        # problems during copy and re-create
        for index in table.indexes:
            index.drop()

        # reflect existing constraints
        for constraint in self._get_constraints(table):
            table.append_constraint(constraint)
        # omit given constraints when creating a new table if required
        table.constraints = set([
            cons for cons in table.constraints
            if omit_constraints is None or cons.name not in omit_constraints
        ])

        tup = sqlite3.sqlite_version_info
        if tup[0] > 3 or (tup[0] == 3 and tup[1] >= 26):
            self.append('PRAGMA legacy_alter_table = ON')
            self.execute()
        self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
        self.execute()
        if tup[0] > 3 or (tup[0] == 3 and tup[1] >= 26):
            self.append('PRAGMA legacy_alter_table = OFF')
            self.execute()

        insertion_string = self._modify_table(table, column, delta)

        table.create(bind=self.connection)
        self.append(insertion_string % {'table_name': table_name})
        self.execute()
        self.append('DROP TABLE migration_tmp')
        self.execute()

    def visit_column(self, delta):
        if isinstance(delta, DictMixin):
            column = delta.result_column
            table = self._to_table(delta.table)
        else:
            column = delta
            table = self._to_table(column.table)
        self.recreate_table(table,column,delta)

class SQLiteColumnGenerator(SQLiteSchemaGenerator,
                            ansisql.ANSIColumnGenerator,
                            # at the end so we get the normal
                            # visit_column by default
                            SQLiteHelper,
                            SQLiteCommon
                            ):
    """SQLite ColumnGenerator"""

    def _modify_table(self, table, column, delta):
        columns = ' ,'.join(map(
                self.preparer.format_column,
                [c for c in table.columns if c.name!=column.name]))
        return ('INSERT INTO %%(table_name)s (%(cols)s) '
                'SELECT %(cols)s from migration_tmp')%{'cols':columns}

    def visit_column(self,column):
        if column.foreign_keys:
            SQLiteHelper.visit_column(self,column)
        else:
            super(SQLiteColumnGenerator,self).visit_column(column)

class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper):
    """SQLite ColumnDropper"""

    def _modify_table(self, table, column, delta):

        columns = ' ,'.join(map(self.preparer.format_column, table.columns))
        return 'INSERT INTO %(table_name)s SELECT ' + columns + \
            ' from migration_tmp'

    def visit_column(self,column):
        # For SQLite, we *have* to remove the column here so the table
        # is re-created properly.
        column.remove_from_table(column.table,unset_table=False)
        super(SQLiteColumnDropper,self).visit_column(column)


class SQLiteSchemaChanger(SQLiteHelper, ansisql.ANSISchemaChanger):
    """SQLite SchemaChanger"""

    def _modify_table(self, table, column, delta):
        return 'INSERT INTO %(table_name)s SELECT * from migration_tmp'

    def visit_index(self, index):
        """Does not support ALTER INDEX"""
        self._not_supported('ALTER INDEX')


class SQLiteConstraintGenerator(ansisql.ANSIConstraintGenerator, SQLiteHelper, SQLiteCommon):

    def visit_migrate_primary_key_constraint(self, constraint):
        tmpl = "CREATE UNIQUE INDEX %s ON %s ( %s )"
        cols = ', '.join(map(self.preparer.format_column, constraint.columns))
        tname = self.preparer.format_table(constraint.table)
        name = self.get_constraint_name(constraint)
        msg = tmpl % (name, tname, cols)
        self.append(msg)
        self.execute()

    def _modify_table(self, table, column, delta):
        return 'INSERT INTO %(table_name)s SELECT * from migration_tmp'

    def visit_migrate_foreign_key_constraint(self, *p, **k):
        self.recreate_table(p[0].table)

    def visit_migrate_unique_constraint(self, *p, **k):
        self.recreate_table(p[0].table)


class SQLiteConstraintDropper(ansisql.ANSIColumnDropper,
                              SQLiteHelper,
                              ansisql.ANSIConstraintCommon):

    def _modify_table(self, table, column, delta):
        return 'INSERT INTO %(table_name)s SELECT * from migration_tmp'

    def visit_migrate_primary_key_constraint(self, constraint):
        tmpl = "DROP INDEX %s "
        name = self.get_constraint_name(constraint)
        msg = tmpl % (name)
        self.append(msg)
        self.execute()

    def visit_migrate_foreign_key_constraint(self, *p, **k):
        self.recreate_table(p[0].table, omit_constraints=[p[0].name])

    def visit_migrate_check_constraint(self, *p, **k):
        self._not_supported('ALTER TABLE DROP CONSTRAINT')

    def visit_migrate_unique_constraint(self, *p, **k):
        self.recreate_table(p[0].table, omit_constraints=[p[0].name])


# TODO: technically primary key is a NOT NULL + UNIQUE constraint, should add NOT NULL to index

class SQLiteDialect(ansisql.ANSIDialect):
    columngenerator = SQLiteColumnGenerator
    columndropper = SQLiteColumnDropper
    schemachanger = SQLiteSchemaChanger
    constraintgenerator = SQLiteConstraintGenerator
    constraintdropper = SQLiteConstraintDropper