1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
|
import os
import re
from io import StringIO
from django.contrib.gis.gdal import GDAL_VERSION, Driver, GDALException
from django.contrib.gis.utils.ogrinspect import ogrinspect
from django.core.management import call_command
from django.db import connection, connections
from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature
from django.test.utils import modify_settings
from ..test_data import TEST_DATA
from .models import AllOGRFields
class InspectDbTests(TestCase):
def test_geom_columns(self):
"""
Test the geo-enabled inspectdb command.
"""
out = StringIO()
call_command(
'inspectdb',
table_name_filter=lambda tn: tn == 'inspectapp_allogrfields',
stdout=out
)
output = out.getvalue()
if connection.features.supports_geometry_field_introspection:
self.assertIn('geom = models.PolygonField()', output)
self.assertIn('point = models.PointField()', output)
else:
self.assertIn('geom = models.GeometryField(', output)
self.assertIn('point = models.GeometryField(', output)
@skipUnlessDBFeature("supports_3d_storage")
def test_3d_columns(self):
out = StringIO()
call_command(
'inspectdb',
table_name_filter=lambda tn: tn == 'inspectapp_fields3d',
stdout=out
)
output = out.getvalue()
if connection.features.supports_geometry_field_introspection:
self.assertIn('point = models.PointField(dim=3)', output)
if connection.features.supports_geography:
self.assertIn('pointg = models.PointField(geography=True, dim=3)', output)
else:
self.assertIn('pointg = models.PointField(dim=3)', output)
self.assertIn('line = models.LineStringField(dim=3)', output)
self.assertIn('poly = models.PolygonField(dim=3)', output)
else:
self.assertIn('point = models.GeometryField(', output)
self.assertIn('pointg = models.GeometryField(', output)
self.assertIn('line = models.GeometryField(', output)
self.assertIn('poly = models.GeometryField(', output)
@modify_settings(
INSTALLED_APPS={'append': 'django.contrib.gis'},
)
class OGRInspectTest(SimpleTestCase):
expected_srid = 'srid=-1' if GDAL_VERSION < (2, 2) else ''
maxDiff = 1024
def test_poly(self):
shp_file = os.path.join(TEST_DATA, 'test_poly', 'test_poly.shp')
model_def = ogrinspect(shp_file, 'MyModel')
expected = [
'# This is an auto-generated Django model module created by ogrinspect.',
'from django.contrib.gis.db import models',
'',
'',
'class MyModel(models.Model):',
' float = models.FloatField()',
' int = models.BigIntegerField()',
' str = models.CharField(max_length=80)',
' geom = models.PolygonField(%s)' % self.expected_srid,
]
self.assertEqual(model_def, '\n'.join(expected))
def test_poly_multi(self):
shp_file = os.path.join(TEST_DATA, 'test_poly', 'test_poly.shp')
model_def = ogrinspect(shp_file, 'MyModel', multi_geom=True)
self.assertIn('geom = models.MultiPolygonField(%s)' % self.expected_srid, model_def)
# Same test with a 25D-type geometry field
shp_file = os.path.join(TEST_DATA, 'gas_lines', 'gas_leitung.shp')
model_def = ogrinspect(shp_file, 'MyModel', multi_geom=True)
srid = '-1' if GDAL_VERSION < (2, 3) else '31253'
self.assertIn('geom = models.MultiLineStringField(srid=%s)' % srid, model_def)
def test_date_field(self):
shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
model_def = ogrinspect(shp_file, 'City')
expected = [
'# This is an auto-generated Django model module created by ogrinspect.',
'from django.contrib.gis.db import models',
'',
'',
'class City(models.Model):',
' name = models.CharField(max_length=80)',
' population = models.BigIntegerField()',
' density = models.FloatField()',
' created = models.DateField()',
' geom = models.PointField(%s)' % self.expected_srid,
]
self.assertEqual(model_def, '\n'.join(expected))
def test_time_field(self):
# Getting the database identifier used by OGR, if None returned
# GDAL does not have the support compiled in.
ogr_db = get_ogr_db_string()
if not ogr_db:
self.skipTest("Unable to setup an OGR connection to your database")
try:
# Writing shapefiles via GDAL currently does not support writing OGRTime
# fields, so we need to actually use a database
model_def = ogrinspect(ogr_db, 'Measurement',
layer_key=AllOGRFields._meta.db_table,
decimal=['f_decimal'])
except GDALException:
self.skipTest("Unable to setup an OGR connection to your database")
self.assertTrue(model_def.startswith(
'# This is an auto-generated Django model module created by ogrinspect.\n'
'from django.contrib.gis.db import models\n'
'\n'
'\n'
'class Measurement(models.Model):\n'
))
# The ordering of model fields might vary depending on several factors (version of GDAL, etc.)
if connection.vendor == 'sqlite':
# SpatiaLite introspection is somewhat lacking (#29461).
self.assertIn(' f_decimal = models.CharField(max_length=0)', model_def)
else:
self.assertIn(' f_decimal = models.DecimalField(max_digits=0, decimal_places=0)', model_def)
self.assertIn(' f_int = models.IntegerField()', model_def)
if not connection.ops.mariadb:
# Probably a bug between GDAL and MariaDB on time fields.
self.assertIn(' f_datetime = models.DateTimeField()', model_def)
self.assertIn(' f_time = models.TimeField()', model_def)
if connection.vendor == 'sqlite':
self.assertIn(' f_float = models.CharField(max_length=0)', model_def)
else:
self.assertIn(' f_float = models.FloatField()', model_def)
max_length = 0 if connection.vendor == 'sqlite' else 10
self.assertIn(' f_char = models.CharField(max_length=%s)' % max_length, model_def)
self.assertIn(' f_date = models.DateField()', model_def)
# Some backends may have srid=-1
self.assertIsNotNone(re.search(r' geom = models.PolygonField\(([^\)])*\)', model_def))
def test_management_command(self):
shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
out = StringIO()
call_command('ogrinspect', shp_file, 'City', stdout=out)
output = out.getvalue()
self.assertIn('class City(models.Model):', output)
def test_mapping_option(self):
expected = (
" geom = models.PointField(%s)\n"
"\n"
"\n"
"# Auto-generated `LayerMapping` dictionary for City model\n"
"city_mapping = {\n"
" 'name': 'Name',\n"
" 'population': 'Population',\n"
" 'density': 'Density',\n"
" 'created': 'Created',\n"
" 'geom': 'POINT',\n"
"}\n" % self.expected_srid)
shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
out = StringIO()
call_command('ogrinspect', shp_file, '--mapping', 'City', stdout=out)
self.assertIn(expected, out.getvalue())
def get_ogr_db_string():
"""
Construct the DB string that GDAL will use to inspect the database.
GDAL will create its own connection to the database, so we re-use the
connection settings from the Django test.
"""
db = connections.databases['default']
# Map from the django backend into the OGR driver name and database identifier
# https://gdal.org/drivers/vector/
#
# TODO: Support Oracle (OCI).
drivers = {
'django.contrib.gis.db.backends.postgis': ('PostgreSQL', "PG:dbname='%(db_name)s'", ' '),
'django.contrib.gis.db.backends.mysql': ('MySQL', 'MYSQL:"%(db_name)s"', ','),
'django.contrib.gis.db.backends.spatialite': ('SQLite', '%(db_name)s', '')
}
db_engine = db['ENGINE']
if db_engine not in drivers:
return None
drv_name, db_str, param_sep = drivers[db_engine]
# Ensure that GDAL library has driver support for the database.
try:
Driver(drv_name)
except GDALException:
return None
# SQLite/SpatiaLite in-memory databases
if db['NAME'] == ":memory:":
return None
# Build the params of the OGR database connection string
params = [db_str % {'db_name': db['NAME']}]
def add(key, template):
value = db.get(key, None)
# Don't add the parameter if it is not in django's settings
if value:
params.append(template % value)
add('HOST', "host='%s'")
add('PORT', "port='%s'")
add('USER', "user='%s'")
add('PASSWORD', "password='%s'")
return param_sep.join(params)
|