1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
#!/usr/bin/env python
import os
import sys
from itertools import groupby
from nose.tools import eq_, raises
from nose.plugins.skip import SkipTest
import mapnik
from .utilities import execution_path, run_all
PYTHON3 = sys.version_info[0] == 3
if PYTHON3:
xrange = range
def setup():
# All of the paths used are relative, if we run the tests
# from another directory we need to chdir()
os.chdir(execution_path('.'))
def test_that_datasources_exist():
if len(mapnik.DatasourceCache.plugin_names()) == 0:
print('***NOTICE*** - no datasource plugins have been loaded')
# adapted from raster_symboliser_test#test_dataraster_query_point
@raises(RuntimeError)
def test_vrt_referring_to_missing_files():
srs = '+init=epsg:32630'
if 'gdal' in mapnik.DatasourceCache.plugin_names():
lyr = mapnik.Layer('dataraster')
lyr.datasource = mapnik.Gdal(
file='../data/raster/missing_raster.vrt',
band=1,
)
lyr.srs = srs
_map = mapnik.Map(256, 256, srs)
_map.layers.append(lyr)
# center of extent of raster
x, y = 556113.0, 4381428.0 # center of extent of raster
_map.zoom_all()
# Fancy stuff to supress output of error
# open 2 fds
null_fds = [os.open(os.devnull, os.O_RDWR) for x in xrange(2)]
# save the current file descriptors to a tuple
save = os.dup(1), os.dup(2)
# put /dev/null fds on 1 and 2
os.dup2(null_fds[0], 1)
os.dup2(null_fds[1], 2)
# *** run the function ***
try:
# Should RuntimeError here
list(_map.query_point(0, x, y))
finally:
# restore file descriptors so I can print the results
os.dup2(save[0], 1)
os.dup2(save[1], 2)
# close the temporary fds
os.close(null_fds[0])
os.close(null_fds[1])
def test_field_listing():
if 'shape' in mapnik.DatasourceCache.plugin_names():
if not os.path.exists('../data/shp/poly.shp'):
raise SkipTest
ds = mapnik.Shapefile(file='../data/shp/poly.shp')
fields = ds.fields()
eq_(fields, ['AREA', 'EAS_ID', 'PRFEDEA'])
desc = ds.describe()
eq_(desc['geometry_type'], mapnik.DataGeometryType.Polygon)
eq_(desc['name'], 'shape')
eq_(desc['type'], mapnik.DataType.Vector)
eq_(desc['encoding'], 'utf-8')
def test_total_feature_count_shp():
if 'shape' in mapnik.DatasourceCache.plugin_names():
if not os.path.exists('../data/shp/poly.shp'):
raise SkipTest
ds = mapnik.Shapefile(file='../data/shp/poly.shp')
features = ds.all_features()
num_feats = len(list(features))
eq_(num_feats, 10)
def test_total_feature_count_json():
if 'ogr' in mapnik.DatasourceCache.plugin_names():
if not os.path.exists('../data/json/points.geojson'):
raise SkipTest
ds = mapnik.Ogr(file='../data/json/points.geojson', layer_by_index=0)
desc = ds.describe()
eq_(desc['geometry_type'], mapnik.DataGeometryType.Point)
eq_(desc['name'], 'ogr')
eq_(desc['type'], mapnik.DataType.Vector)
eq_(desc['encoding'], 'utf-8')
features = ds.all_features()
num_feats = len(list(features))
eq_(num_feats, 5)
def test_sqlite_reading():
if 'sqlite' in mapnik.DatasourceCache.plugin_names():
if not os.path.exists('../data/sqlite/world.sqlite'):
raise SkipTest
ds = mapnik.SQLite(
file='../data/sqlite/world.sqlite',
table_by_index=0)
desc = ds.describe()
eq_(desc['geometry_type'], mapnik.DataGeometryType.Polygon)
eq_(desc['name'], 'sqlite')
eq_(desc['type'], mapnik.DataType.Vector)
eq_(desc['encoding'], 'utf-8')
features = ds.all_features()
num_feats = len(list(features))
eq_(num_feats, 245)
def test_reading_json_from_string():
if not os.path.exists('../data/json/points.geojson'):
raise SkipTest
with open('../data/json/points.geojson', 'r') as f:
json = f.read()
if 'ogr' in mapnik.DatasourceCache.plugin_names():
ds = mapnik.Ogr(file=json, layer_by_index=0)
features = ds.all_features()
num_feats = len(list(features))
eq_(num_feats, 5)
def test_feature_envelope():
if 'shape' in mapnik.DatasourceCache.plugin_names():
if not os.path.exists('../data/shp/poly.shp'):
raise SkipTest
ds = mapnik.Shapefile(file='../data/shp/poly.shp')
features = ds.all_features()
for feat in features:
env = feat.envelope()
contains = ds.envelope().contains(env)
eq_(contains, True)
intersects = ds.envelope().contains(env)
eq_(intersects, True)
def test_feature_attributes():
if 'shape' in mapnik.DatasourceCache.plugin_names():
if not os.path.exists('../data/shp/poly.shp'):
raise SkipTest
ds = mapnik.Shapefile(file='../data/shp/poly.shp')
features = list(ds.all_features())
feat = features[0]
attrs = {'PRFEDEA': u'35043411', 'EAS_ID': 168, 'AREA': 215229.266}
eq_(feat.attributes, attrs)
eq_(ds.fields(), ['AREA', 'EAS_ID', 'PRFEDEA'])
eq_(ds.field_types(), ['float', 'int', 'str'])
def test_ogr_layer_by_sql():
if 'ogr' in mapnik.DatasourceCache.plugin_names():
if not os.path.exists('../data/shp/poly.shp'):
raise SkipTest
ds = mapnik.Ogr(file='../data/shp/poly.shp',
layer_by_sql='SELECT * FROM poly WHERE EAS_ID = 168')
features = ds.all_features()
num_feats = len(list(features))
eq_(num_feats, 1)
def test_hit_grid():
def rle_encode(l):
""" encode a list of strings with run-length compression """
return ["%d:%s" % (len(list(group)), name)
for name, group in groupby(l)]
if not os.path.exists('../data/good_maps/agg_poly_gamma_map.xml'):
raise SkipTest
m = mapnik.Map(256, 256)
try:
mapnik.load_map(m, '../data/good_maps/agg_poly_gamma_map.xml')
m.zoom_all()
join_field = 'NAME'
fg = [] # feature grid
for y in xrange(0, 256, 4):
for x in xrange(0, 256, 4):
featureset = m.query_map_point(0, x, y)
added = False
for feature in featureset:
fg.append(feature[join_field])
added = True
if not added:
fg.append('')
hit_list = '|'.join(rle_encode(fg))
eq_(hit_list[:16], '730:|2:Greenland')
eq_(hit_list[-12:], '1:Chile|812:')
except RuntimeError as e:
# only test datasources that we have installed
if not 'Could not create datasource' in str(e):
raise RuntimeError(str(e))
if __name__ == '__main__':
setup()
exit(run_all(eval(x) for x in dir() if x.startswith("test_")))
|