File: creation.py

package info (click to toggle)
mssql-django 1.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 644 kB
  • sloc: python: 5,289; sh: 105; makefile: 7
file content (157 lines) | stat: -rw-r--r-- 6,030 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# Copyright (c) Microsoft Corporation.
# Licensed under the BSD license.

import binascii
import os

from django.db.utils import InterfaceError
from django.db.backends.base.creation import BaseDatabaseCreation
from django import VERSION as django_version


class DatabaseCreation(BaseDatabaseCreation):

    def cursor(self):
        if django_version >= (3, 1):
            return self.connection._nodb_cursor()

        return self.connection._nodb_connection.cursor()

    def _create_test_db(self, verbosity, autoclobber, keepdb=False):
        """
        Internal implementation - create the test db tables.
        """

        # Try to create the test DB, but if we fail due to 28000 (Login failed for user),
        #   it's probably because the user doesn't have permission to [dbo].[master],
        #   so we can proceed if we're keeping the DB anyway.
        # https://github.com/microsoft/mssql-django/issues/61
        try:
            test_database_name = super()._create_test_db(verbosity, autoclobber, keepdb)
            
            # Create required schemas for Django tests (only for 5.2+)
            if django_version >= (5, 2):
                self._create_test_schemas(test_database_name, verbosity)

            return test_database_name
        except InterfaceError as err:
            if err.args[0] == '28000' and keepdb:
                self.log('Received error %s, proceeding because keepdb=True' % (
                    err.args[1],
                ))
            else:
                raise err

    def _create_test_schemas(self, test_database_name, verbosity):
        """
        Create required schemas in test database for Django tests.
        """
        schemas_to_create = ['inspectdb_special', 'inspectdb_pascal']
        
        # Use a cursor connected to the test database
        test_settings = self.connection.settings_dict.copy()
        test_settings['NAME'] = test_database_name
        test_connection = self.connection.__class__(test_settings)
        
        try:
            with test_connection.cursor() as cursor:
                for schema in schemas_to_create:
                    try:
                        quoted_schema = self.connection.ops.quote_name(schema)
                        cursor.execute(f"CREATE SCHEMA {quoted_schema}")
                        if verbosity >= 2:
                            self.log(f'Created schema {schema} in test database {test_database_name}')
                    except Exception as e:
                        # Schema might already exist, which is fine
                        if verbosity >= 2:
                            self.log(f'Schema {schema} creation failed (might already exist): {e}')
        finally:
            test_connection.close()

    def _destroy_test_db(self, test_database_name, verbosity):
        """
        Internal implementation - remove the test db tables.
        """
        # Remove the test database to clean up after
        # ourselves. Connect to the previous database (not the test database)
        # to do so, because it's not allowed to delete a database while being
        # connected to it.
        with self.cursor() as cursor:
            to_azure_sql_db = self.connection.to_azure_sql_db
            if not to_azure_sql_db:
                cursor.execute("ALTER DATABASE %s SET SINGLE_USER WITH ROLLBACK IMMEDIATE"
                               % self.connection.ops.quote_name(test_database_name))
            cursor.execute("DROP DATABASE %s"
                           % self.connection.ops.quote_name(test_database_name))

    def sql_table_creation_suffix(self):
        suffix = []
        collation = self.connection.settings_dict['TEST'].get('COLLATION', None)
        if collation:
            suffix.append('COLLATE %s' % collation)
        return ' '.join(suffix)

    # The following code to add regex support in SQLServer is taken from django-mssql
    # see https://bitbucket.org/Manfre/django-mssql
    def enable_clr(self):
        """ Enables clr for server if not already enabled
        This function will not fail if current user doesn't have
        permissions to enable clr, and clr is already enabled
        """
        with self.cursor() as cursor:
            # check whether clr is enabled
            cursor.execute('''
            SELECT value FROM sys.configurations
            WHERE name = 'clr enabled'
            ''')
            res = None
            try:
                res = cursor.fetchone()
            except Exception:
                pass

            if not res or not res[0]:
                # if not enabled enable clr
                cursor.execute("sp_configure 'clr enabled', 1")
                cursor.execute("RECONFIGURE")

                cursor.execute("sp_configure 'show advanced options', 1")
                cursor.execute("RECONFIGURE")

                cursor.execute("sp_configure 'clr strict security', 0")
                cursor.execute("RECONFIGURE")

    def install_regex_clr(self, database_name):
        sql = '''
USE {database_name};
-- Drop and recreate the function if it already exists
IF OBJECT_ID('REGEXP_LIKE') IS NOT NULL
DROP FUNCTION [dbo].[REGEXP_LIKE]
IF EXISTS(select * from sys.assemblies where name like 'regex_clr')
DROP ASSEMBLY regex_clr
;
CREATE ASSEMBLY regex_clr
FROM 0x{assembly_hex}
WITH PERMISSION_SET = SAFE;
create function [dbo].[REGEXP_LIKE]
(
@input nvarchar(max),
@pattern nvarchar(max),
@caseSensitive int
)
RETURNS INT  AS
EXTERNAL NAME regex_clr.UserDefinedFunctions.REGEXP_LIKE
        '''.format(
            database_name=self.connection.ops.quote_name(database_name),
            assembly_hex=self.get_regex_clr_assembly_hex(),
        ).split(';')

        self.enable_clr()

        with self.cursor() as cursor:
            for s in sql:
                cursor.execute(s)

    def get_regex_clr_assembly_hex(self):
        with open(os.path.join(os.path.dirname(__file__), 'regex_clr.dll'), 'rb') as f:
            return binascii.hexlify(f.read()).decode('ascii')