File: sqlite_rtree_test.py

package info (click to toggle)
mapnik 2.2.0%2Bds1-7
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 30,288 kB
  • ctags: 18,382
  • sloc: cpp: 115,128; python: 9,298; xml: 5,692; ansic: 3,726; makefile: 160; sh: 159; lisp: 13
file content (159 lines) | stat: -rw-r--r-- 5,277 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python

from nose.tools import *
from utilities import execution_path, run_all
from Queue import Queue
import threading

import os, mapnik
import sqlite3

def setup():
    # All of the paths used are relative, if we run the tests
    # from another directory we need to chdir()
    os.chdir(execution_path('.'))

NUM_THREADS = 10
TOTAL = 245
DB = '../data/sqlite/world.sqlite'
TABLE= 'world_merc'

def create_ds():
    ds = mapnik.SQLite(file=DB,table=TABLE)
    fs = ds.all_features()

if 'sqlite' in mapnik.DatasourceCache.plugin_names():

    def test_rtree_creation():

        index = DB +'.index'
        if os.path.exists(index):
            os.unlink(index)

        threads = []
        for i in range(NUM_THREADS):
            t = threading.Thread(target=create_ds)
            t.start()
            threads.append(t)

        for i in threads:
            i.join()

        eq_(os.path.exists(index),True)
        conn = sqlite3.connect(index)
        cur = conn.cursor()
        try:
            cur.execute("Select count(*) from idx_%s_GEOMETRY" % TABLE.replace("'",""))
            conn.commit()
            eq_(cur.fetchone()[0],TOTAL)
        except sqlite3.OperationalError:
            # don't worry about testing # of index records if
            # python's sqlite module does not support rtree
            pass
        cur.close()

        ds = mapnik.SQLite(file=DB,table=TABLE)
        fs = ds.all_features()
        eq_(len(fs),TOTAL)
        os.unlink(index)
        ds = mapnik.SQLite(file=DB,table=TABLE,use_spatial_index=False)
        fs = ds.all_features()
        eq_(len(fs),TOTAL)
        eq_(os.path.exists(index),False)

        ds = mapnik.SQLite(file=DB,table=TABLE,use_spatial_index=True)
        fs = ds.all_features()
        for feat in fs:
            query = mapnik.Query(feat.envelope())
            selected = ds.features(query)
            eq_(len(selected.features)>=1,True)

        eq_(os.path.exists(index),True)
        os.unlink(index)

    def test_geometry_round_trip():
        test_db = '/tmp/mapnik-sqlite-point.db'
        ogr_metadata = True

        # create test db
        conn = sqlite3.connect(test_db)
        cur = conn.cursor()
        cur.execute('''
             CREATE TABLE IF NOT EXISTS point_table
             (id INTEGER PRIMARY KEY AUTOINCREMENT, geometry BLOB, name varchar)
             ''')
        # optional: but nice if we want to read with ogr
        if ogr_metadata:
            cur.execute('''CREATE TABLE IF NOT EXISTS geometry_columns (
                        f_table_name VARCHAR,
                        f_geometry_column VARCHAR,
                        geometry_type INTEGER,
                        coord_dimension INTEGER,
                        srid INTEGER,
                        geometry_format VARCHAR )''')
            cur.execute('''INSERT INTO geometry_columns
                        (f_table_name, f_geometry_column, geometry_format,
                        geometry_type, coord_dimension, srid) VALUES
                        ('point_table','geometry','WKB', 1, 1, 4326)''')
        conn.commit()
        cur.close()

        # add a point as wkb (using mapnik) to match how an ogr created db looks
        x = -122 # longitude
        y = 48 # latitude
        wkt = 'POINT(%s %s)' % (x,y)
        # little endian wkb (mapnik will auto-detect and ready either little or big endian (XDR))
        wkb = mapnik.Path.from_wkt(wkt).to_wkb(mapnik.wkbByteOrder.NDR)
        values = (None,sqlite3.Binary(wkb),"test point")
        cur = conn.cursor()
        cur.execute('''INSERT into "point_table" (id,geometry,name) values (?,?,?)''',values)
        conn.commit()
        cur.close()

        def make_wkb_point(x,y):
            import struct
            byteorder = 1; # little endian
            endianess = ''
            if byteorder == 1:
               endianess = '<'
            else:
               endianess = '>'
            geom_type = 1; # for a point
            return struct.pack('%sbldd' % endianess, byteorder, geom_type, x, y)

        # confirm the wkb matches a manually formed wkb
        wkb2 = make_wkb_point(x,y)
        eq_(wkb,wkb2)

        # ensure we can read this data back out properly with mapnik
        ds = mapnik.Datasource(**{'type':'sqlite','file':test_db, 'table':'point_table'})
        fs = ds.featureset()
        feat = fs.next()
        eq_(feat.id(),1)
        eq_(feat['name'],'test point')
        geoms = feat.geometries()
        eq_(len(geoms),1)
        eq_(geoms.to_wkt(),'Point(-122 48)')

        # ensure it matches data read with just sqlite
        cur = conn.cursor()
        cur.execute('''SELECT * from point_table''')
        conn.commit()
        result = cur.fetchone()
        cur.close()
        feat_id = result[0]
        eq_(feat_id,1)
        name = result[2]
        eq_(name,'test point')
        geom_wkb_blob = result[1]
        eq_(str(geom_wkb_blob),geoms.to_wkb(mapnik.wkbByteOrder.NDR))
        new_geom = mapnik.Path.from_wkb(str(geom_wkb_blob))
        eq_(new_geom.to_wkt(),geoms.to_wkt())

        # cleanup
        os.unlink(test_db)
        os.unlink(test_db + '.index')

if __name__ == "__main__":
    setup()
    run_all(eval(x) for x in dir() if x.startswith("test_"))