File: test_shapely.py

package info (click to toggle)
psycopg3 3.3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,836 kB
  • sloc: python: 46,657; sh: 403; ansic: 149; makefile: 73
file content (142 lines) | stat: -rw-r--r-- 4,556 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import pytest

import psycopg
from psycopg.pq import Format
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo

pytest.importorskip("shapely")

from shapely.geometry import MultiPolygon, Point, Polygon

from psycopg.types.shapely import register_shapely, shapely_version

if shapely_version >= (2, 0):
    from shapely import get_srid, set_srid
else:

    def set_srid(obj, srid):  # type: ignore[no-redef]
        return obj

    def get_srid(obj):  # type: ignore[no-redef]
        raise NotImplementedError


pytestmark = [
    pytest.mark.postgis,
    pytest.mark.crdb("skip"),
]

SAMPLE_POINT = Point(1.2, 3.4)
SAMPLE_POLYGON = Polygon([(0, 0), (1, 1), (1, 0)])
SAMPLE_POLYGON_4326 = set_srid(SAMPLE_POLYGON, 4326)

# real example, with CRS and "holes"
MULTIPOLYGON_GEOJSON = """
{
   "type":"MultiPolygon",
   "crs":{
      "type":"name",
      "properties":{
         "name":"EPSG:3857"
      }
   },
   "coordinates":[
      [
         [
            [89574.61111389, 6894228.638802719],
            [89576.815239808, 6894208.60747024],
            [89576.904295401, 6894207.820852726],
            [89577.99522641, 6894208.022080451],
            [89577.961830563, 6894209.229446936],
            [89589.227363031, 6894210.601454523],
            [89594.615226386, 6894161.849595264],
            [89600.314784314, 6894111.37846976],
            [89651.187791607, 6894116.774968589],
            [89648.49385993, 6894140.226914071],
            [89642.92788539, 6894193.423936413],
            [89639.721884055, 6894224.08372821],
            [89589.283022777, 6894218.431048969],
            [89588.192091767, 6894230.248628867],
            [89574.61111389, 6894228.638802719]
         ],
         [
            [89610.344670435, 6894182.466199101],
            [89625.985058891, 6894184.258949757],
            [89629.547282597, 6894153.270030369],
            [89613.918026089, 6894151.458993318],
            [89610.344670435, 6894182.466199101]
         ]
      ]
   ]
}"""

SAMPLE_POINT_GEOJSON = '{"type":"Point","coordinates":[1.2, 3.4]}'


@pytest.fixture
def shapely_conn(conn, svcconn):
    try:
        with svcconn.transaction():
            svcconn.execute("create extension if not exists postgis")
    except psycopg.Error as e:
        pytest.skip(f"can't create extension postgis: {e}")

    info = TypeInfo.fetch(conn, "geometry")
    assert info
    register_shapely(info, conn)
    return conn


def test_no_adapter(conn):
    point = Point(1.2, 3.4)
    with pytest.raises(psycopg.ProgrammingError, match="cannot adapt type 'Point'"):
        conn.execute("SELECT pg_typeof(%s)", [point]).fetchone()[0]


def test_no_info_error(conn):
    from psycopg.types.shapely import register_shapely

    with pytest.raises(TypeError, match="postgis.*extension"):
        register_shapely(None, conn)  # type: ignore[arg-type]


@pytest.mark.parametrize("fmt_in", PyFormat)
@pytest.mark.parametrize("obj", ["SAMPLE_POINT", "SAMPLE_POLYGON"])
def test_with_adapter(shapely_conn, obj, fmt_in):
    obj = globals()[obj]
    with shapely_conn.cursor() as cur:
        cur.execute(f"SELECT pg_typeof(%{fmt_in.value})", [obj])
        assert cur.fetchone()[0] == "geometry"


@pytest.mark.parametrize("fmt_in", PyFormat)
@pytest.mark.parametrize("fmt_out", Format)
@pytest.mark.parametrize(
    "obj, srid",
    [("SAMPLE_POINT", 0), ("SAMPLE_POLYGON", 0), ("SAMPLE_POLYGON_4326", 4326)],
)
def test_write_read_shape(shapely_conn, fmt_in, fmt_out, obj, srid):
    obj = globals()[obj]
    with shapely_conn.cursor(binary=fmt_out) as cur:
        cur.execute("drop table if exists sample_geoms")
        cur.execute("create table sample_geoms(id SERIAL PRIMARY KEY, geom geometry)")
        cur.execute(f"insert into sample_geoms(geom) VALUES(%{fmt_in.value})", (obj,))
        cur.execute("select geom from sample_geoms")
        result = cur.fetchone()[0]
        assert result == obj
        if shapely_version >= (2, 0):
            assert get_srid(result) == srid


@pytest.mark.parametrize("fmt_out", Format)
def test_match_geojson(shapely_conn, fmt_out):
    with shapely_conn.cursor(binary=fmt_out) as cur:
        cur.execute("select ST_GeomFromGeoJSON(%s)", (SAMPLE_POINT_GEOJSON,))
        result = cur.fetchone()[0]
        # clone the coordinates to have a list instead of a shapely wrapper
        assert result.coords[:] == SAMPLE_POINT.coords[:]

        cur.execute("select ST_GeomFromGeoJSON(%s)", (MULTIPOLYGON_GEOJSON,))
        result = cur.fetchone()[0]
        assert isinstance(result, MultiPolygon)