1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
from __future__ import unicode_literals
import warnings
from django.db.backends.base.introspection import (
BaseDatabaseIntrospection, FieldInfo, TableInfo,
)
from django.db.models.indexes import Index
from django.utils.deprecation import RemovedInDjango21Warning
from django.utils.encoding import force_text
class DatabaseIntrospection(BaseDatabaseIntrospection):
# Maps type codes to Django Field types.
data_types_reverse = {
16: 'BooleanField',
17: 'BinaryField',
20: 'BigIntegerField',
21: 'SmallIntegerField',
23: 'IntegerField',
25: 'TextField',
700: 'FloatField',
701: 'FloatField',
869: 'GenericIPAddressField',
1042: 'CharField', # blank-padded
1043: 'CharField',
1082: 'DateField',
1083: 'TimeField',
1114: 'DateTimeField',
1184: 'DateTimeField',
1266: 'TimeField',
1700: 'DecimalField',
2950: 'UUIDField',
}
ignored_tables = []
_get_indexes_query = """
SELECT attr.attname, idx.indkey, idx.indisunique, idx.indisprimary
FROM pg_catalog.pg_class c, pg_catalog.pg_class c2,
pg_catalog.pg_index idx, pg_catalog.pg_attribute attr
WHERE c.oid = idx.indrelid
AND idx.indexrelid = c2.oid
AND attr.attrelid = c.oid
AND attr.attnum = idx.indkey[0]
AND c.relname = %s"""
def get_field_type(self, data_type, description):
field_type = super(DatabaseIntrospection, self).get_field_type(data_type, description)
if description.default and 'nextval' in description.default:
if field_type == 'IntegerField':
return 'AutoField'
elif field_type == 'BigIntegerField':
return 'BigAutoField'
return field_type
def get_table_list(self, cursor):
"""
Returns a list of table and view names in the current database.
"""
cursor.execute("""
SELECT c.relname, c.relkind
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind IN ('r', 'v')
AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
AND pg_catalog.pg_table_is_visible(c.oid)""")
return [TableInfo(row[0], {'r': 't', 'v': 'v'}.get(row[1]))
for row in cursor.fetchall()
if row[0] not in self.ignored_tables]
def get_table_description(self, cursor, table_name):
"Returns a description of the table, with the DB-API cursor.description interface."
# As cursor.description does not return reliably the nullable property,
# we have to query the information_schema (#7783)
cursor.execute("""
SELECT column_name, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = %s""", [table_name])
field_map = {line[0]: line[1:] for line in cursor.fetchall()}
cursor.execute("SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name))
return [
FieldInfo(*(
(force_text(line[0]),) +
line[1:6] +
(field_map[force_text(line[0])][0] == 'YES', field_map[force_text(line[0])][1])
)) for line in cursor.description
]
def get_relations(self, cursor, table_name):
"""
Returns a dictionary of {field_name: (field_name_other_table, other_table)}
representing all relationships to the given table.
"""
cursor.execute("""
SELECT c2.relname, a1.attname, a2.attname
FROM pg_constraint con
LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
LEFT JOIN pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
LEFT JOIN pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
WHERE c1.relname = %s
AND con.contype = 'f'""", [table_name])
relations = {}
for row in cursor.fetchall():
relations[row[1]] = (row[2], row[0])
return relations
def get_key_columns(self, cursor, table_name):
key_columns = []
cursor.execute("""
SELECT kcu.column_name, ccu.table_name AS referenced_table, ccu.column_name AS referenced_column
FROM information_schema.constraint_column_usage ccu
LEFT JOIN information_schema.key_column_usage kcu
ON ccu.constraint_catalog = kcu.constraint_catalog
AND ccu.constraint_schema = kcu.constraint_schema
AND ccu.constraint_name = kcu.constraint_name
LEFT JOIN information_schema.table_constraints tc
ON ccu.constraint_catalog = tc.constraint_catalog
AND ccu.constraint_schema = tc.constraint_schema
AND ccu.constraint_name = tc.constraint_name
WHERE kcu.table_name = %s AND tc.constraint_type = 'FOREIGN KEY'""", [table_name])
key_columns.extend(cursor.fetchall())
return key_columns
def get_indexes(self, cursor, table_name):
warnings.warn(
"get_indexes() is deprecated in favor of get_constraints().",
RemovedInDjango21Warning, stacklevel=2
)
# This query retrieves each index on the given table, including the
# first associated field name
cursor.execute(self._get_indexes_query, [table_name])
indexes = {}
for row in cursor.fetchall():
# row[1] (idx.indkey) is stored in the DB as an array. It comes out as
# a string of space-separated integers. This designates the field
# indexes (1-based) of the fields that have indexes on the table.
# Here, we skip any indexes across multiple fields.
if ' ' in row[1]:
continue
if row[0] not in indexes:
indexes[row[0]] = {'primary_key': False, 'unique': False}
# It's possible to have the unique and PK constraints in separate indexes.
if row[3]:
indexes[row[0]]['primary_key'] = True
if row[2]:
indexes[row[0]]['unique'] = True
return indexes
def get_constraints(self, cursor, table_name):
"""
Retrieve any constraints or keys (unique, pk, fk, check, index) across
one or more columns. Also retrieve the definition of expression-based
indexes.
"""
constraints = {}
# Loop over the key table, collecting things as constraints. The column
# array must return column names in the same order in which they were
# created.
# The subquery containing generate_series can be replaced with
# "WITH ORDINALITY" when support for PostgreSQL 9.3 is dropped.
cursor.execute("""
SELECT
c.conname,
array(
SELECT attname
FROM (
SELECT unnest(c.conkey) AS colid,
generate_series(1, array_length(c.conkey, 1)) AS arridx
) AS cols
JOIN pg_attribute AS ca ON cols.colid = ca.attnum
WHERE ca.attrelid = c.conrelid
ORDER BY cols.arridx
),
c.contype,
(SELECT fkc.relname || '.' || fka.attname
FROM pg_attribute AS fka
JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
cl.reloptions
FROM pg_constraint AS c
JOIN pg_class AS cl ON c.conrelid = cl.oid
JOIN pg_namespace AS ns ON cl.relnamespace = ns.oid
WHERE ns.nspname = %s AND cl.relname = %s
""", ["public", table_name])
for constraint, columns, kind, used_cols, options in cursor.fetchall():
constraints[constraint] = {
"columns": columns,
"primary_key": kind == "p",
"unique": kind in ["p", "u"],
"foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
"check": kind == "c",
"index": False,
"definition": None,
"options": options,
}
# Now get indexes
# The row_number() function for ordering the index fields can be
# replaced by WITH ORDINALITY in the unnest() functions when support
# for PostgreSQL 9.3 is dropped.
cursor.execute("""
SELECT
indexname, array_agg(attname ORDER BY rnum), indisunique, indisprimary,
array_agg(ordering ORDER BY rnum), amname, exprdef, s2.attoptions
FROM (
SELECT
row_number() OVER () as rnum, c2.relname as indexname,
idx.*, attr.attname, am.amname,
CASE
WHEN idx.indexprs IS NOT NULL THEN
pg_get_indexdef(idx.indexrelid)
END AS exprdef,
CASE am.amname
WHEN 'btree' THEN
CASE (option & 1)
WHEN 1 THEN 'DESC' ELSE 'ASC'
END
END as ordering,
c2.reloptions as attoptions
FROM (
SELECT
*, unnest(i.indkey) as key, unnest(i.indoption) as option
FROM pg_index i
) idx
LEFT JOIN pg_class c ON idx.indrelid = c.oid
LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
LEFT JOIN pg_am am ON c2.relam = am.oid
LEFT JOIN pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
WHERE c.relname = %s
) s2
GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
""", [table_name])
for index, columns, unique, primary, orders, type_, definition, options in cursor.fetchall():
if index not in constraints:
constraints[index] = {
"columns": columns if columns != [None] else [],
"orders": orders if orders != [None] else [],
"primary_key": primary,
"unique": unique,
"foreign_key": None,
"check": False,
"index": True,
"type": Index.suffix if type_ == 'btree' else type_,
"definition": definition,
"options": options,
}
return constraints
|