1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
from __future__ import unicode_literals
import os
from unittest import skipUnless
from django.core.management import call_command
from django.db import connections
from django.test import TestCase
from django.contrib.gis.gdal import HAS_GDAL
from django.contrib.gis.geometry.test_data import TEST_DATA
from django.contrib.gis.tests.utils import HAS_SPATIAL_DB
from django.utils.six import StringIO
if HAS_GDAL:
from django.contrib.gis.gdal import Driver
from django.contrib.gis.utils.ogrinspect import ogrinspect
from .models import AllOGRFields
@skipUnless(HAS_GDAL and HAS_SPATIAL_DB, "GDAL and spatial db are required.")
class InspectDbTests(TestCase):
def test_geom_columns(self):
"""
Test the geo-enabled inspectdb command.
"""
out = StringIO()
call_command('inspectdb',
table_name_filter=lambda tn: tn.startswith('inspectapp_'),
stdout=out)
output = out.getvalue()
if not connections['default'].ops.oracle:
self.assertIn('geom = models.PolygonField()', output)
self.assertIn('point = models.PointField()', output)
else:
self.assertIn('geom = models.GeometryField(', output)
self.assertIn('point = models.GeometryField(', output)
self.assertIn('objects = models.GeoManager()', output)
@skipUnless(HAS_GDAL and HAS_SPATIAL_DB, "GDAL and spatial db are required.")
class OGRInspectTest(TestCase):
maxDiff = 1024
def test_poly(self):
shp_file = os.path.join(TEST_DATA, 'test_poly', 'test_poly.shp')
model_def = ogrinspect(shp_file, 'MyModel')
expected = [
'# This is an auto-generated Django model module created by ogrinspect.',
'from django.contrib.gis.db import models',
'',
'class MyModel(models.Model):',
' float = models.FloatField()',
' int = models.FloatField()',
' str = models.CharField(max_length=80)',
' geom = models.PolygonField(srid=-1)',
' objects = models.GeoManager()',
]
self.assertEqual(model_def, '\n'.join(expected))
def test_date_field(self):
shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
model_def = ogrinspect(shp_file, 'City')
expected = [
'# This is an auto-generated Django model module created by ogrinspect.',
'from django.contrib.gis.db import models',
'',
'class City(models.Model):',
' name = models.CharField(max_length=80)',
' population = models.FloatField()',
' density = models.FloatField()',
' created = models.DateField()',
' geom = models.PointField(srid=-1)',
' objects = models.GeoManager()',
]
self.assertEqual(model_def, '\n'.join(expected))
def test_time_field(self):
# Only possible to test this on PostGIS at the moment. MySQL
# complains about permissions, and SpatiaLite/Oracle are
# insanely difficult to get support compiled in for in GDAL.
if not connections['default'].ops.postgis:
self.skipTest("This database does not support 'ogrinspect'ion")
# Getting the database identifier used by OGR, if None returned
# GDAL does not have the support compiled in.
ogr_db = get_ogr_db_string()
if not ogr_db:
self.skipTest("Your GDAL installation does not support PostGIS databases")
# Writing shapefiles via GDAL currently does not support writing OGRTime
# fields, so we need to actually use a database
model_def = ogrinspect(ogr_db, 'Measurement',
layer_key=AllOGRFields._meta.db_table,
decimal=['f_decimal'])
self.assertTrue(model_def.startswith(
'# This is an auto-generated Django model module created by ogrinspect.\n'
'from django.contrib.gis.db import models\n'
'\n'
'class Measurement(models.Model):\n'
))
# The ordering of model fields might vary depending on several factors (version of GDAL, etc.)
self.assertIn(' f_decimal = models.DecimalField(max_digits=0, decimal_places=0)', model_def)
self.assertIn(' f_int = models.IntegerField()', model_def)
self.assertIn(' f_datetime = models.DateTimeField()', model_def)
self.assertIn(' f_time = models.TimeField()', model_def)
self.assertIn(' f_float = models.FloatField()', model_def)
self.assertIn(' f_char = models.CharField(max_length=10)', model_def)
self.assertIn(' f_date = models.DateField()', model_def)
self.assertTrue(model_def.endswith(
' geom = models.PolygonField()\n'
' objects = models.GeoManager()'
))
def test_management_command(self):
shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
out = StringIO()
call_command('ogrinspect', shp_file, 'City', stdout=out)
output = out.getvalue()
self.assertIn('class City(models.Model):', output)
def get_ogr_db_string():
"""
Construct the DB string that GDAL will use to inspect the database.
GDAL will create its own connection to the database, so we re-use the
connection settings from the Django test.
"""
db = connections.databases['default']
# Map from the django backend into the OGR driver name and database identifier
# http://www.gdal.org/ogr/ogr_formats.html
#
# TODO: Support Oracle (OCI).
drivers = {
'django.contrib.gis.db.backends.postgis': ('PostgreSQL', "PG:dbname='%(db_name)s'", ' '),
'django.contrib.gis.db.backends.mysql': ('MySQL', 'MYSQL:"%(db_name)s"', ','),
'django.contrib.gis.db.backends.spatialite': ('SQLite', '%(db_name)s', '')
}
drv_name, db_str, param_sep = drivers[db['ENGINE']]
# Ensure that GDAL library has driver support for the database.
try:
Driver(drv_name)
except:
return None
# SQLite/Spatialite in-memory databases
if db['NAME'] == ":memory:":
return None
# Build the params of the OGR database connection string
params = [db_str % {'db_name': db['NAME']}]
def add(key, template):
value = db.get(key, None)
# Don't add the parameter if it is not in django's settings
if value:
params.append(template % value)
add('HOST', "host='%s'")
add('PORT', "port='%s'")
add('USER', "user='%s'")
add('PASSWORD', "password='%s'")
return param_sep.join(params)
|