import os
import mapnik
import pytest
from .utilities import execution_path

@pytest.fixture(scope="module")
def setup_and_teardown():
    # All of the paths used are relative, if we run the tests
    # from another directory we need to chdir()
    os.chdir(execution_path('.'))
    yield
    index = '../data/sqlite/world.sqlite.index'
    if os.path.exists(index):
        os.unlink(index)

if 'sqlite' in mapnik.DatasourceCache.plugin_names():

    def test_attachdb_with_relative_file(setup_and_teardown):
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        # The point table and index is in the qgis_spatiallite.sqlite
        # database.  If either is not found, then this fails
        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='point',
                           attachdb='scratch@qgis_spatiallite.sqlite'
                           )
        fs = ds.featureset()
        feature = fs.next()
        assert feature['pkuid'] == 1

    test_attachdb_with_relative_file.requires_data = True

    def test_attachdb_with_multiple_files():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='attachedtest',
                           attachdb='scratch1@:memory:,scratch2@:memory:',
                           initdb='''
                create table scratch1.attachedtest (the_geom);
                create virtual table scratch2.idx_attachedtest_the_geom using rtree(pkid,xmin,xmax,ymin,ymax);
                insert into scratch2.idx_attachedtest_the_geom values (1,-7799225.5,-7778571.0,1393264.125,1417719.375);
                '''
                           )
        fs = ds.featureset()
        feature = None
        try:
            feature = fs.next()
        except StopIteration:
            pass
        # the above should not throw but will result in no features
        assert feature == None

    test_attachdb_with_multiple_files.requires_data = True

    def test_attachdb_with_absolute_file():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        # The point table and index is in the qgis_spatiallite.sqlite
        # database.  If either is not found, then this fails
        ds = mapnik.SQLite(file=os.getcwd() + '/../data/sqlite/world.sqlite',
                           table='point',
                           attachdb='scratch@qgis_spatiallite.sqlite'
                           )
        fs = ds.featureset()
        feature = fs.next()
        assert feature['pkuid'] ==  1

    test_attachdb_with_absolute_file.requires_data = True

    def test_attachdb_with_index():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='attachedtest',
                           attachdb='scratch@:memory:',
                           initdb='''
                create table scratch.attachedtest (the_geom);
                create virtual table scratch.idx_attachedtest_the_geom using rtree(pkid,xmin,xmax,ymin,ymax);
                insert into scratch.idx_attachedtest_the_geom values (1,-7799225.5,-7778571.0,1393264.125,1417719.375);
                '''
                           )

        fs = ds.featureset()
        feature = None
        try:
            feature = fs.next()
        except StopIteration:
            pass
        assert feature ==  None

    test_attachdb_with_index.requires_data = True

    def test_attachdb_with_explicit_index():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='attachedtest',
                           index_table='myindex',
                           attachdb='scratch@:memory:',
                           initdb='''
                create table scratch.attachedtest (the_geom);
                create virtual table scratch.myindex using rtree(pkid,xmin,xmax,ymin,ymax);
                insert into scratch.myindex values (1,-7799225.5,-7778571.0,1393264.125,1417719.375);
                '''
                           )
        fs = ds.featureset()
        feature = None
        try:
            feature = fs.next()
        except StopIteration:
            pass
        assert feature ==  None

    test_attachdb_with_explicit_index.requires_data = True

    def test_attachdb_with_sql_join():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select * from world_merc INNER JOIN business on world_merc.iso3 = business.ISO3 limit 100)',
                           attachdb='busines@business.sqlite'
                           )
        assert len(ds.fields()) ==  29
        assert ds.fields() == ['OGC_FID',
                               'fips',
                               'iso2',
                               'iso3',
                               'un',
                               'name',
                               'area',
                               'pop2005',
                               'region',
                               'subregion',
                               'lon',
                               'lat',
                               'ISO3:1',
                               '1995',
                               '1996',
                               '1997',
                               '1998',
                               '1999',
                               '2000',
                               '2001',
                               '2002',
                               '2003',
                               '2004',
                               '2005',
                               '2006',
                               '2007',
                               '2008',
                               '2009',
                               '2010']
        assert ds.field_types() == ['int',
                                    'str',
                                    'str',
                                    'str',
                                    'int',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'float',
                                    'float',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int']
        fs = ds.featureset()
        feature = fs.next()
        assert feature.id() ==  1
        expected = {
            1995: 0,
            1996: 0,
            1997: 0,
            1998: 0,
            1999: 0,
            2000: 0,
            2001: 0,
            2002: 0,
            2003: 0,
            2004: 0,
            2005: 0,
            2006: 0,
            2007: 0,
            2008: 0,
            2009: 0,
            2010: 0,
            # this appears to be sqlites way of
            # automatically handling clashing column names
            'ISO3:1': 'ATG',
            'OGC_FID': 1,
            'area': 44,
            'fips': u'AC',
            'iso2': u'AG',
            'iso3': u'ATG',
            'lat': 17.078,
            'lon': -61.783,
            'name': u'Antigua and Barbuda',
            'pop2005': 83039,
            'region': 19,
            'subregion': 29,
            'un': 28
        }
        for k, v in expected.items():
            try:
                assert feature[str(k)] ==  v
            except:
                #import pdb;pdb.set_trace()
                print('invalid key/v %s/%s for: %s' % (k, v, feature))

    test_attachdb_with_sql_join.requires_data = True

    def test_attachdb_with_sql_join_count():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select * from world_merc INNER JOIN business on world_merc.iso3 = business.ISO3 limit 100)',
                           attachdb='busines@business.sqlite'
                           )
        assert len(ds.fields()) ==  29
        assert ds.fields() == ['OGC_FID',
                               'fips',
                               'iso2',
                               'iso3',
                               'un',
                               'name',
                               'area',
                               'pop2005',
                               'region',
                               'subregion',
                               'lon',
                               'lat',
                               'ISO3:1',
                               '1995',
                               '1996',
                               '1997',
                               '1998',
                               '1999',
                               '2000',
                               '2001',
                               '2002',
                               '2003',
                               '2004',
                               '2005',
                               '2006',
                               '2007',
                               '2008',
                               '2009',
                               '2010']
        assert ds.field_types() == ['int',
                                    'str',
                                    'str',
                                    'str',
                                    'int',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'float',
                                    'float',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int']
        assert len(list(ds.all_features())) ==  100

    test_attachdb_with_sql_join_count.requires_data = True

    def test_attachdb_with_sql_join_count2():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        '''
        sqlite3 world.sqlite
        attach database 'business.sqlite' as business;
        select count(*) from world_merc INNER JOIN business on world_merc.iso3 = business.ISO3;
        '''
        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select * from world_merc INNER JOIN business on world_merc.iso3 = business.ISO3)',
                           attachdb='busines@business.sqlite'
                           )
        assert len(ds.fields()) ==  29
        assert ds.fields() == ['OGC_FID',
                               'fips',
                               'iso2',
                               'iso3',
                               'un',
                               'name',
                               'area',
                               'pop2005',
                               'region',
                               'subregion',
                               'lon',
                               'lat',
                               'ISO3:1',
                               '1995',
                               '1996',
                               '1997',
                               '1998',
                               '1999',
                               '2000',
                               '2001',
                               '2002',
                               '2003',
                               '2004',
                               '2005',
                               '2006',
                               '2007',
                               '2008',
                               '2009',
                               '2010']
        assert ds.field_types() == ['int',
                                    'str',
                                    'str',
                                    'str',
                                    'int',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'float',
                                    'float',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int']
        assert len(list(ds.all_features())) ==  192

    test_attachdb_with_sql_join_count2.requires_data = True

    def test_attachdb_with_sql_join_count3():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        '''
        select count(*) from (select * from world_merc where 1=1) as world_merc INNER JOIN business on world_merc.iso3 = business.ISO3;
        '''
        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select * from (select * from world_merc where !intersects!) as world_merc INNER JOIN business on world_merc.iso3 = business.ISO3)',
                           attachdb='busines@business.sqlite'
                           )
        assert len(ds.fields()) ==  29
        assert ds.fields() == ['OGC_FID',
                               'fips',
                               'iso2',
                               'iso3',
                               'un',
                               'name',
                               'area',
                               'pop2005',
                               'region',
                               'subregion',
                               'lon',
                               'lat',
                               'ISO3:1',
                               '1995',
                               '1996',
                               '1997',
                               '1998',
                               '1999',
                               '2000',
                               '2001',
                               '2002',
                               '2003',
                               '2004',
                               '2005',
                               '2006',
                               '2007',
                               '2008',
                               '2009',
                               '2010']
        assert ds.field_types() == ['int',
                                    'str',
                                    'str',
                                    'str',
                                    'int',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'float',
                                    'float',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int']
        assert len(list(ds.all_features())) ==  192

    test_attachdb_with_sql_join_count3.requires_data = True

    def test_attachdb_with_sql_join_count4():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        '''
        select count(*) from (select * from world_merc where 1=1) as world_merc INNER JOIN business on world_merc.iso3 = business.ISO3;
        '''
        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select * from (select * from world_merc where !intersects! limit 1) as world_merc INNER JOIN business on world_merc.iso3 = business.ISO3)',
                           attachdb='busines@business.sqlite'
                           )
        assert len(ds.fields()) ==  29
        assert ds.fields() == ['OGC_FID',
                               'fips',
                               'iso2',
                               'iso3',
                               'un',
                               'name',
                               'area',
                               'pop2005',
                               'region',
                               'subregion',
                               'lon',
                               'lat',
                               'ISO3:1',
                               '1995',
                               '1996',
                               '1997',
                               '1998',
                               '1999',
                               '2000',
                               '2001',
                               '2002',
                               '2003',
                               '2004',
                               '2005',
                               '2006',
                               '2007',
                               '2008',
                               '2009',
                               '2010']
        assert ds.field_types() == ['int',
                                    'str',
                                    'str',
                                    'str',
                                    'int',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'float',
                                    'float',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'int']
        assert len(list(ds.all_features())) ==  1

    test_attachdb_with_sql_join_count4.requires_data = True

    def test_attachdb_with_sql_join_count5():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        '''
        select count(*) from (select * from world_merc where 1=1) as world_merc INNER JOIN business on world_merc.iso3 = business.ISO3;
        '''
        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select * from (select * from world_merc where !intersects! and 1=2) as world_merc INNER JOIN business on world_merc.iso3 = business.ISO3)',
                           attachdb='busines@business.sqlite'
                           )
        # nothing is able to join to business so we don't pick up business
        # schema
        assert len(ds.fields()) ==  12
        assert ds.fields() == ['OGC_FID',
                               'fips',
                               'iso2',
                               'iso3',
                               'un',
                               'name',
                               'area',
                               'pop2005',
                               'region',
                               'subregion',
                               'lon',
                               'lat']
        assert ds.field_types() == ['int',
                                    'str',
                                    'str',
                                    'str',
                                    'int',
                                    'str',
                                    'int',
                                    'int',
                                    'int',
                                    'int',
                                    'float',
                                    'float']
        assert len(list(ds.all_features())) ==  0

    test_attachdb_with_sql_join_count5.requires_data = True

    def test_subqueries():
        if not os.path.exists('../data/sqlite/world.sqlite'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='world_merc',
                           )
        fs = ds.featureset()
        feature = fs.next()
        assert feature['OGC_FID'] ==  1
        assert feature['fips'] ==  u'AC'
        assert feature['iso2'] ==  u'AG'
        assert feature['iso3'] ==  u'ATG'
        assert feature['un'] ==  28
        assert feature['name'] ==  u'Antigua and Barbuda'
        assert feature['area'] ==  44
        assert feature['pop2005'] ==  83039
        assert feature['region'] ==  19
        assert feature['subregion'] ==  29
        assert feature['lon'] ==  -61.783
        assert feature['lat'] ==  17.078

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select * from world_merc)',
                           )
        fs = ds.featureset()
        feature = fs.next()
        assert feature['OGC_FID'] ==  1
        assert feature['fips'] ==  u'AC'
        assert feature['iso2'] ==  u'AG'
        assert feature['iso3'] ==  u'ATG'
        assert feature['un'] ==  28
        assert feature['name'] ==  u'Antigua and Barbuda'
        assert feature['area'] ==  44
        assert feature['pop2005'] ==  83039
        assert feature['region'] ==  19
        assert feature['subregion'] ==  29
        assert feature['lon'] ==  -61.783
        assert feature['lat'] ==  17.078

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select OGC_FID,GEOMETRY from world_merc)',
                           )
        fs = ds.featureset()
        feature = fs.next()
        assert feature['OGC_FID'] ==  1
        assert len(feature) ==  1

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select GEOMETRY,OGC_FID,fips from world_merc)',
                           )
        fs = ds.featureset()
        feature = fs.next()
        assert feature['OGC_FID'] ==  1
        assert feature['fips'] ==  u'AC'

        # same as above, except with alias like postgres requires
        # TODO - should we try to make this work?
        # ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
        #    table='(select GEOMETRY,rowid as aliased_id,fips from world_merc) as table',
        #    key_field='aliased_id'
        #    )
        #fs = ds.featureset()
        #feature = fs.next()
        # assert feature['aliased_id'] == 1
        # assert feature['fips'] == u'AC'

        ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
                           table='(select GEOMETRY,OGC_FID,OGC_FID as rowid,fips from world_merc)',
                           )
        fs = ds.featureset()
        feature = fs.next()
        assert feature['rowid'] ==  1
        assert feature['fips'] ==  u'AC'

    test_subqueries.requires_data = True

    def test_empty_db():
        if not os.path.exists('../data/sqlite/empty.db'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/empty.db',
                           table='empty',
                           )
        fs = ds.featureset()
        feature = None
        try:
            feature = fs.next()
        except StopIteration:
            pass
        assert feature ==  None

    test_empty_db.requires_data = True


    def test_that_nonexistant_query_field_throws(**kwargs):
        if not os.path.exists('../data/sqlite/empty.db'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/empty.db',
                           table='empty',
                           )
        assert len(ds.fields()) ==  25
        assert ds.fields() == ['OGC_FID',
                               'scalerank',
                               'labelrank',
                               'featurecla',
                               'sovereignt',
                               'sov_a3',
                               'adm0_dif',
                               'level',
                               'type',
                               'admin',
                               'adm0_a3',
                               'geou_dif',
                               'name',
                               'abbrev',
                               'postal',
                               'name_forma',
                               'terr_',
                               'name_sort',
                               'map_color',
                               'pop_est',
                               'gdp_md_est',
                               'fips_10_',
                               'iso_a2',
                               'iso_a3',
                               'iso_n3']
        assert ds.field_types() ==  ['int',
                                     'int',
                                     'int',
                                     'str',
                                     'str',
                                     'str',
                                     'float',
                                     'float',
                                     'str',
                                     'str',
                                     'str',
                                     'float',
                                     'str',
                                     'str',
                                     'str',
                                     'str',
                                     'str',
                                     'str',
                                     'float',
                                     'float',
                                     'float',
                                     'float',
                                     'str',
                                     'str',
                                     'float']
        query = mapnik.Query(ds.envelope())
        for fld in ds.fields():
            query.add_property_name(fld)
        # also add an invalid one, triggering throw
        query.add_property_name('bogus')
        with pytest.raises(RuntimeError):
            ds.features(query)

    test_that_nonexistant_query_field_throws.requires_data = True

    def test_intersects_token1():
        if not os.path.exists('../data/sqlite/empty.db'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/empty.db',
                           table='(select * from empty where !intersects!)',
                           )
        fs = ds.featureset()
        feature = None
        try:
            feature = fs.next()
        except StopIteration:
            pass
        assert feature ==  None

    test_intersects_token1.requires_data = True

    def test_intersects_token2():
        if not os.path.exists('../data/sqlite/empty.db'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/empty.db',
                           table='(select * from empty where "a"!="b" and !intersects!)',
                           )
        fs = ds.featureset()
        feature = None
        try:
            feature = fs.next()
        except StopIteration:
            pass
        assert feature ==  None

    test_intersects_token2.requires_data = True

    def test_intersects_token3():
        if not os.path.exists('../data/sqlite/empty.db'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/empty.db',
                           table='(select * from empty where "a"!="b" and !intersects!)',
                           )
        fs = ds.featureset()
        feature = None
        try:
            feature = fs.next()
        except StopIteration:
            pass
        assert feature ==  None

    test_intersects_token3.requires_data = True

    # https://github.com/mapnik/mapnik/issues/1537
    # this works because key_field is manually set
    @pytest.mark.xfail(strict=False, reason="Fails on big-endian")
    def test_db_with_one_text_column():
        # form up an in-memory test db
        wkb = '010100000000000000000000000000000000000000'
        ds = mapnik.SQLite(file=':memory:',
                           table='test1',
                           initdb='''
                create table test1 (alias TEXT,geometry BLOB);
                insert into test1 values ("test",x'%s');
                ''' % wkb,
                           extent='-180,-60,180,60',
                           use_spatial_index=False,
                           key_field='alias'
                           )
        assert len(ds.fields()) ==  1
        assert ds.fields() ==  ['alias']
        assert ds.field_types() ==  ['str']
        fs = list(ds.all_features())
        assert len(fs) ==  1
        feat = fs[0]
        assert feat.id() ==  0  # should be 1?
        assert feat['alias'] ==  'test'
        assert feat.geometry.to_wkt() ==  'POINT(0 0)'

    def test_db_with_one_untyped_column():
        # form up an in-memory test db
        wkb = '010100000000000000000000000000000000000000'
        ds = mapnik.SQLite(file=':memory:',
                           table='test1',
                           initdb='''
                create table test1 (geometry BLOB, untyped);
                insert into test1 values (x'%s', 'untyped');
            ''' % wkb,
                           extent='-180,-60,180,60',
                           use_spatial_index=False,
                           key_field='rowid'
                           )

        # ensure the untyped column is found
        assert len(ds.fields()) ==  2
        assert ds.fields(), ['rowid' ==  'untyped']
        assert ds.field_types(), ['int' ==  'str']

    def test_db_with_one_untyped_column_using_subquery():
        # form up an in-memory test db
        wkb = '010100000000000000000000000000000000000000'
        ds = mapnik.SQLite(file=':memory:',
                           table='(SELECT rowid, geometry, untyped FROM test1)',
                           initdb='''
                create table test1 (geometry BLOB, untyped);
                insert into test1 values (x'%s', 'untyped');
            ''' % wkb,
                           extent='-180,-60,180,60',
                           use_spatial_index=False,
                           key_field='rowid'
                           )

        # ensure the untyped column is found
        assert len(ds.fields()) ==  3
        assert ds.fields(), ['rowid', 'untyped' ==  'rowid']
        assert ds.field_types(), ['int', 'str' ==  'int']

    def test_that_64bit_int_fields_work():
        if not os.path.exists('../data/sqlite/64bit_int.sqlite'):
            pytest.skip('Missing data')

        ds = mapnik.SQLite(file='../data/sqlite/64bit_int.sqlite',
                           table='int_table',
                           use_spatial_index=False
                           )
        assert len(ds.fields()) ==  3
        assert ds.fields(), ['OGC_FID', 'id' ==  'bigint']
        assert ds.field_types(), ['int', 'int' ==  'int']
        fs = ds.featureset()
        feat = fs.next()
        assert feat.id() ==  1
        assert feat['OGC_FID'] ==  1
        assert feat['bigint'] ==  2147483648
        feat = fs.next()
        assert feat.id() ==  2
        assert feat['OGC_FID'] ==  2
        assert feat['bigint'] ==  922337203685477580

    test_that_64bit_int_fields_work.requires_data = True

    def test_null_id_field():
        # silence null key warning:
        # https://github.com/mapnik/mapnik/issues/1889
        default_logging_severity = mapnik.logger.get_severity()
        mapnik.logger.set_severity(getattr(mapnik.severity_type, "None"))
        # form up an in-memory test db
        wkb = '010100000000000000000000000000000000000000'
        # note: the osm_id should be declared INTEGER PRIMARY KEY
        # but in this case we intentionally do not make this a valid pkey
        # otherwise sqlite would turn the null into a valid, serial id
        ds = mapnik.SQLite(file=':memory:',
                           table='test1',
                           initdb='''
                create table test1 (osm_id INTEGER,geometry BLOB);
                insert into test1 values (null,x'%s');
                ''' % wkb,
                           extent='-180,-60,180,60',
                           use_spatial_index=False,
                           key_field='osm_id'
                           )
        fs = ds.featureset()
        feature = None
        try:
            feature = fs.next()
        except StopIteration:
            pass
        assert feature ==  None
        mapnik.logger.set_severity(default_logging_severity)
