1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
|
from __future__ import unicode_literals
import re
from django.core.exceptions import FieldDoesNotExist
from django.db.models.constants import LOOKUP_SEP
from django.db.models.expressions import Col, Expression
from django.db.models.lookups import Lookup, Transform
from django.db.models.sql.query import Query
from django.utils import six
gis_lookups = {}
class RasterBandTransform(Transform):
def as_sql(self, compiler, connection):
return compiler.compile(self.lhs)
class GISLookup(Lookup):
sql_template = None
transform_func = None
distance = False
band_rhs = None
band_lhs = None
def __init__(self, *args, **kwargs):
super(GISLookup, self).__init__(*args, **kwargs)
self.template_params = {}
@classmethod
def _check_geo_field(cls, opts, lookup):
"""
Utility for checking the given lookup with the given model options.
The lookup is a string either specifying the geographic field, e.g.
'point, 'the_geom', or a related lookup on a geographic field like
'address__point'.
If a BaseSpatialField exists according to the given lookup on the model
options, it will be returned. Otherwise return None.
"""
from django.contrib.gis.db.models.fields import BaseSpatialField
# This takes into account the situation where the lookup is a
# lookup to a related geographic field, e.g., 'address__point'.
field_list = lookup.split(LOOKUP_SEP)
# Reversing so list operates like a queue of related lookups,
# and popping the top lookup.
field_list.reverse()
fld_name = field_list.pop()
try:
geo_fld = opts.get_field(fld_name)
# If the field list is still around, then it means that the
# lookup was for a geometry field across a relationship --
# thus we keep on getting the related model options and the
# model field associated with the next field in the list
# until there's no more left.
while len(field_list):
opts = geo_fld.remote_field.model._meta
geo_fld = opts.get_field(field_list.pop())
except (FieldDoesNotExist, AttributeError):
return False
# Finally, make sure we got a Geographic field and return.
if isinstance(geo_fld, BaseSpatialField):
return geo_fld
else:
return False
def process_band_indices(self, only_lhs=False):
"""
Extract the lhs band index from the band transform class and the rhs
band index from the input tuple.
"""
# PostGIS band indices are 1-based, so the band index needs to be
# increased to be consistent with the GDALRaster band indices.
if only_lhs:
self.band_rhs = 1
self.band_lhs = self.lhs.band_index + 1
return
if isinstance(self.lhs, RasterBandTransform):
self.band_lhs = self.lhs.band_index + 1
else:
self.band_lhs = 1
self.band_rhs = self.rhs[1]
if len(self.rhs) == 1:
self.rhs = self.rhs[0]
else:
self.rhs = (self.rhs[0], ) + self.rhs[2:]
def get_db_prep_lookup(self, value, connection):
# get_db_prep_lookup is called by process_rhs from super class
if isinstance(value, (tuple, list)):
# First param is assumed to be the geometric object
params = [connection.ops.Adapter(value[0])] + list(value)[1:]
else:
params = [connection.ops.Adapter(value)]
return ('%s', params)
def process_rhs(self, compiler, connection):
if isinstance(self.rhs, Query):
# If rhs is some Query, don't touch it.
return super(GISLookup, self).process_rhs(compiler, connection)
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for spatial fields.')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
# Check if a band index was passed in the query argument.
if ((len(self.rhs) == 2 and not self.lookup_name == 'relate') or
(len(self.rhs) == 3 and self.lookup_name == 'relate')):
self.process_band_indices()
elif len(self.rhs) > 2:
raise ValueError('Tuple too long for lookup %s.' % self.lookup_name)
elif isinstance(self.lhs, RasterBandTransform):
self.process_band_indices(only_lhs=True)
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def get_rhs_op(self, connection, rhs):
# Unlike BuiltinLookup, the GIS get_rhs_op() implementation should return
# an object (SpatialOperator) with an as_sql() method to allow for more
# complex computations (where the lhs part can be mixed in).
return connection.ops.gis_operators[self.lookup_name]
def as_sql(self, compiler, connection):
lhs_sql, sql_params = self.process_lhs(compiler, connection)
rhs_sql, rhs_params = self.process_rhs(compiler, connection)
sql_params.extend(rhs_params)
template_params = {'lhs': lhs_sql, 'rhs': rhs_sql, 'value': '%s'}
template_params.update(self.template_params)
rhs_op = self.get_rhs_op(connection, rhs_sql)
return rhs_op.as_sql(connection, self, template_params, sql_params)
# ------------------
# Geometry operators
# ------------------
class OverlapsLeftLookup(GISLookup):
"""
The overlaps_left operator returns true if A's bounding box overlaps or is to the
left of B's bounding box.
"""
lookup_name = 'overlaps_left'
gis_lookups['overlaps_left'] = OverlapsLeftLookup
class OverlapsRightLookup(GISLookup):
"""
The 'overlaps_right' operator returns true if A's bounding box overlaps or is to the
right of B's bounding box.
"""
lookup_name = 'overlaps_right'
gis_lookups['overlaps_right'] = OverlapsRightLookup
class OverlapsBelowLookup(GISLookup):
"""
The 'overlaps_below' operator returns true if A's bounding box overlaps or is below
B's bounding box.
"""
lookup_name = 'overlaps_below'
gis_lookups['overlaps_below'] = OverlapsBelowLookup
class OverlapsAboveLookup(GISLookup):
"""
The 'overlaps_above' operator returns true if A's bounding box overlaps or is above
B's bounding box.
"""
lookup_name = 'overlaps_above'
gis_lookups['overlaps_above'] = OverlapsAboveLookup
class LeftLookup(GISLookup):
"""
The 'left' operator returns true if A's bounding box is strictly to the left
of B's bounding box.
"""
lookup_name = 'left'
gis_lookups['left'] = LeftLookup
class RightLookup(GISLookup):
"""
The 'right' operator returns true if A's bounding box is strictly to the right
of B's bounding box.
"""
lookup_name = 'right'
gis_lookups['right'] = RightLookup
class StrictlyBelowLookup(GISLookup):
"""
The 'strictly_below' operator returns true if A's bounding box is strictly below B's
bounding box.
"""
lookup_name = 'strictly_below'
gis_lookups['strictly_below'] = StrictlyBelowLookup
class StrictlyAboveLookup(GISLookup):
"""
The 'strictly_above' operator returns true if A's bounding box is strictly above B's
bounding box.
"""
lookup_name = 'strictly_above'
gis_lookups['strictly_above'] = StrictlyAboveLookup
class SameAsLookup(GISLookup):
"""
The "~=" operator is the "same as" operator. It tests actual geometric
equality of two features. So if A and B are the same feature,
vertex-by-vertex, the operator returns true.
"""
lookup_name = 'same_as'
gis_lookups['same_as'] = SameAsLookup
class ExactLookup(SameAsLookup):
# Alias of same_as
lookup_name = 'exact'
gis_lookups['exact'] = ExactLookup
class BBContainsLookup(GISLookup):
"""
The 'bbcontains' operator returns true if A's bounding box completely contains
by B's bounding box.
"""
lookup_name = 'bbcontains'
gis_lookups['bbcontains'] = BBContainsLookup
class BBOverlapsLookup(GISLookup):
"""
The 'bboverlaps' operator returns true if A's bounding box overlaps B's bounding box.
"""
lookup_name = 'bboverlaps'
gis_lookups['bboverlaps'] = BBOverlapsLookup
class ContainedLookup(GISLookup):
"""
The 'contained' operator returns true if A's bounding box is completely contained
by B's bounding box.
"""
lookup_name = 'contained'
gis_lookups['contained'] = ContainedLookup
# ------------------
# Geometry functions
# ------------------
class ContainsLookup(GISLookup):
lookup_name = 'contains'
gis_lookups['contains'] = ContainsLookup
class ContainsProperlyLookup(GISLookup):
lookup_name = 'contains_properly'
gis_lookups['contains_properly'] = ContainsProperlyLookup
class CoveredByLookup(GISLookup):
lookup_name = 'coveredby'
gis_lookups['coveredby'] = CoveredByLookup
class CoversLookup(GISLookup):
lookup_name = 'covers'
gis_lookups['covers'] = CoversLookup
class CrossesLookup(GISLookup):
lookup_name = 'crosses'
gis_lookups['crosses'] = CrossesLookup
class DisjointLookup(GISLookup):
lookup_name = 'disjoint'
gis_lookups['disjoint'] = DisjointLookup
class EqualsLookup(GISLookup):
lookup_name = 'equals'
gis_lookups['equals'] = EqualsLookup
class IntersectsLookup(GISLookup):
lookup_name = 'intersects'
gis_lookups['intersects'] = IntersectsLookup
class IsValidLookup(GISLookup):
lookup_name = 'isvalid'
sql_template = '%(func)s(%(lhs)s)'
def as_sql(self, compiler, connection):
if self.lhs.field.geom_type == 'RASTER':
raise ValueError('The isvalid lookup is only available on geometry fields.')
gis_op = connection.ops.gis_operators[self.lookup_name]
sql, params = self.process_lhs(compiler, connection)
sql, params = gis_op.as_sql(connection, self, {'func': gis_op.func, 'lhs': sql}, params)
if not self.rhs:
sql = 'NOT ' + sql
return sql, params
gis_lookups['isvalid'] = IsValidLookup
class OverlapsLookup(GISLookup):
lookup_name = 'overlaps'
gis_lookups['overlaps'] = OverlapsLookup
class RelateLookup(GISLookup):
lookup_name = 'relate'
sql_template = '%(func)s(%(lhs)s, %(rhs)s, %%s)'
pattern_regex = re.compile(r'^[012TF\*]{9}$')
def get_db_prep_lookup(self, value, connection):
if len(value) != 2:
raise ValueError('relate must be passed a two-tuple')
# Check the pattern argument
backend_op = connection.ops.gis_operators[self.lookup_name]
if hasattr(backend_op, 'check_relate_argument'):
backend_op.check_relate_argument(value[1])
else:
pattern = value[1]
if not isinstance(pattern, six.string_types) or not self.pattern_regex.match(pattern):
raise ValueError('Invalid intersection matrix pattern "%s".' % pattern)
return super(RelateLookup, self).get_db_prep_lookup(value, connection)
gis_lookups['relate'] = RelateLookup
class TouchesLookup(GISLookup):
lookup_name = 'touches'
gis_lookups['touches'] = TouchesLookup
class WithinLookup(GISLookup):
lookup_name = 'within'
gis_lookups['within'] = WithinLookup
class DistanceLookupBase(GISLookup):
distance = True
sql_template = '%(func)s(%(lhs)s, %(rhs)s) %(op)s %(value)s'
def process_rhs(self, compiler, connection):
if not isinstance(self.rhs, (tuple, list)) or not 2 <= len(self.rhs) <= 4:
raise ValueError("2, 3, or 4-element tuple required for '%s' lookup." % self.lookup_name)
elif len(self.rhs) == 4 and not self.rhs[3] == 'spheroid':
raise ValueError("For 4-element tuples the last argument must be the 'speroid' directive.")
# Check if the second parameter is a band index.
if len(self.rhs) > 2 and not self.rhs[2] == 'spheroid':
self.process_band_indices()
params = [connection.ops.Adapter(self.rhs[0])]
# Getting the distance parameter in the units of the field.
dist_param = self.rhs[1]
if hasattr(dist_param, 'resolve_expression'):
dist_param = dist_param.resolve_expression(compiler.query)
sql, expr_params = compiler.compile(dist_param)
self.template_params['value'] = sql
params.extend(expr_params)
else:
params += connection.ops.get_distance(
self.lhs.output_field, (dist_param,) + self.rhs[2:],
self.lookup_name, handle_spheroid=False
)
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, params[0], compiler)
return (rhs, params)
class DWithinLookup(DistanceLookupBase):
lookup_name = 'dwithin'
sql_template = '%(func)s(%(lhs)s, %(rhs)s, %%s)'
gis_lookups['dwithin'] = DWithinLookup
class DistanceGTLookup(DistanceLookupBase):
lookup_name = 'distance_gt'
gis_lookups['distance_gt'] = DistanceGTLookup
class DistanceGTELookup(DistanceLookupBase):
lookup_name = 'distance_gte'
gis_lookups['distance_gte'] = DistanceGTELookup
class DistanceLTLookup(DistanceLookupBase):
lookup_name = 'distance_lt'
gis_lookups['distance_lt'] = DistanceLTLookup
class DistanceLTELookup(DistanceLookupBase):
lookup_name = 'distance_lte'
gis_lookups['distance_lte'] = DistanceLTELookup
|