1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
|
"""Tests of MemoryFile and ZippedMemoryFile"""
import os
from io import BytesIO
import pytest
import fiona
from fiona import supported_drivers
from fiona.drvsupport import _driver_supports_mode
from fiona.errors import DriverError
from fiona.io import MemoryFile, ZipMemoryFile
from fiona.meta import supports_vsi
from .conftest import requires_gdal2, requires_gpkg
@pytest.fixture(scope='session')
def profile_first_coutwildrnp_shp(path_coutwildrnp_shp):
with fiona.open(path_coutwildrnp_shp) as col:
return col.profile, next(iter(col))
@pytest.fixture(scope='session')
def data_coutwildrnp_json(path_coutwildrnp_json):
with open(path_coutwildrnp_json, 'rb') as f:
return f.read()
def test_memoryfile_ext():
"""File extensions are handled"""
assert MemoryFile(ext=".geojson").name.endswith(".geojson")
def test_memoryfile_bare_ext():
"""File extensions without a leading . are handled"""
assert MemoryFile(ext="geojson").name.endswith(".geojson")
def test_memoryfile_init(data_coutwildrnp_json):
"""In-memory GeoJSON file can be read"""
with MemoryFile(data_coutwildrnp_json) as memfile:
with memfile.open() as collection:
assert len(collection) == 67
def test_memoryfile_incr_init(data_coutwildrnp_json):
"""In-memory GeoJSON file written in 2 parts can be read"""
with MemoryFile() as memfile:
memfile.write(data_coutwildrnp_json[:1000])
memfile.write(data_coutwildrnp_json[1000:])
with memfile.open() as collection:
assert len(collection) == 67
def test_zip_memoryfile(bytes_coutwildrnp_zip):
"""In-memory zipped Shapefile can be read"""
with ZipMemoryFile(bytes_coutwildrnp_zip) as memfile:
with memfile.open('coutwildrnp.shp') as collection:
assert len(collection) == 67
def test_zip_memoryfile_infer_layer_name(bytes_coutwildrnp_zip):
"""In-memory zipped Shapefile can be read with the default layer"""
with ZipMemoryFile(bytes_coutwildrnp_zip) as memfile:
with memfile.open() as collection:
assert len(collection) == 67
def test_open_closed():
"""Get an exception when opening a dataset on a closed MemoryFile"""
memfile = MemoryFile()
memfile.close()
assert memfile.closed
with pytest.raises(OSError):
memfile.open()
def test_open_closed_zip():
"""Get an exception when opening a dataset on a closed ZipMemoryFile"""
memfile = ZipMemoryFile()
memfile.close()
assert memfile.closed
with pytest.raises(OSError):
memfile.open()
def test_write_memoryfile(profile_first_coutwildrnp_shp):
"""In-memory GeoJSON can be written"""
profile, first = profile_first_coutwildrnp_shp
profile['driver'] = 'GeoJSON'
with MemoryFile() as memfile:
with memfile.open(**profile) as col:
col.write(first)
memfile.seek(0)
data = memfile.read()
with MemoryFile(data) as memfile:
with memfile.open() as col:
assert len(col) == 1
@requires_gdal2
def test_memoryfile_write_extension(profile_first_coutwildrnp_shp):
"""In-memory shapefile gets an .shp extension by default"""
profile, first = profile_first_coutwildrnp_shp
profile['driver'] = 'ESRI Shapefile'
with MemoryFile() as memfile:
with memfile.open(**profile) as col:
col.write(first)
assert memfile.name.endswith(".shp")
def test_memoryfile_open_file_or_bytes_read(path_coutwildrnp_json):
"""Test MemoryFile.open when file_or_bytes has a read attribute """
with open(path_coutwildrnp_json, 'rb') as f:
with MemoryFile(f) as memfile:
with memfile.open() as collection:
assert len(collection) == 67
def test_memoryfile_bytesio(data_coutwildrnp_json):
"""GeoJSON file stored in BytesIO can be read"""
with fiona.open(BytesIO(data_coutwildrnp_json)) as collection:
assert len(collection) == 67
def test_memoryfile_fileobj(path_coutwildrnp_json):
"""GeoJSON file in an open file object can be read"""
with open(path_coutwildrnp_json, 'rb') as f:
with fiona.open(f) as collection:
assert len(collection) == 67
def test_memoryfile_len(data_coutwildrnp_json):
"""Test MemoryFile.__len__ """
with MemoryFile() as memfile:
assert len(memfile) == 0
memfile.write(data_coutwildrnp_json)
assert len(memfile) == len(data_coutwildrnp_json)
def test_memoryfile_tell(data_coutwildrnp_json):
"""Test MemoryFile.tell() """
with MemoryFile() as memfile:
assert memfile.tell() == 0
memfile.write(data_coutwildrnp_json)
assert memfile.tell() == len(data_coutwildrnp_json)
def test_write_bytesio(profile_first_coutwildrnp_shp):
"""GeoJSON can be written to BytesIO"""
profile, first = profile_first_coutwildrnp_shp
profile['driver'] = 'GeoJSON'
with BytesIO() as fout:
with fiona.open(fout, 'w', **profile) as col:
col.write(first)
fout.seek(0)
data = fout.read()
with MemoryFile(data) as memfile:
with memfile.open() as col:
assert len(col) == 1
@requires_gpkg
def test_read_multilayer_memoryfile(path_coutwildrnp_json, tmpdir):
"""Test read access to multilayer dataset in from file-like object"""
with fiona.open(path_coutwildrnp_json, "r") as src:
schema = src.schema
features = list(src)
path = os.path.join(tmpdir, "test.gpkg")
with fiona.open(path, "w", driver="GPKG", schema=schema, layer="layer1") as dst:
dst.writerecords(features[0:5])
with fiona.open(path, "w", driver="GPKG", schema=schema, layer="layer2") as dst:
dst.writerecords(features[5:])
with open(path, "rb") as f:
with fiona.open(f, layer="layer1") as src:
assert src.name == "layer1"
assert len(src) == 5
# Bug reported in #781 where this next section would fail
with open(path, "rb") as f:
with fiona.open(f, layer="layer2") as src:
assert src.name == "layer2"
assert len(src) == 62
def test_append_bytesio_exception(data_coutwildrnp_json):
"""Append is not supported, see #1027."""
with pytest.raises(OSError):
fiona.open(BytesIO(data_coutwildrnp_json), "a")
def test_mapinfo_raises():
"""Reported to be a crasher in #937"""
driver = "MapInfo File"
schema = {"geometry": "Point", "properties": {"position": "str"}}
with BytesIO() as fout:
with pytest.raises(OSError):
with fiona.open(fout, "w", driver=driver, schema=schema) as collection:
collection.write(
{
"type": "Feature",
"geometry": {"type": "Point", "coordinates": (0, 0)},
"properties": {"position": "x"},
}
)
# TODO remove exclusion of MapInfo File once testdata_generator is fixed
@pytest.mark.parametrize(
"driver",
[
driver
for driver in supported_drivers
if _driver_supports_mode(driver, "w")
and supports_vsi(driver)
and driver not in {"MapInfo File", "TileDB"}
],
)
def test_write_memoryfile_drivers(driver, testdata_generator):
""" Test if driver is able to write to memoryfile """
range1 = list(range(0, 5))
schema, crs, records1, _, _ = testdata_generator(driver, range1, [])
with MemoryFile() as memfile:
with memfile.open(driver=driver, crs="OGC:CRS84", schema=schema) as c:
c.writerecords(records1)
with memfile.open(driver=driver) as c:
assert driver == c.driver
items = list(c)
assert len(items) == len(range1)
def test_multiple_layer_memoryfile(testdata_generator):
""" Test ability to create multiple layers in memoryfile"""
driver = "GPKG"
range1 = list(range(0, 5))
range2 = list(range(5, 10))
schema, crs, records1, records2, _ = testdata_generator(driver, range1, range2)
with MemoryFile() as memfile:
with memfile.open(mode='w', driver=driver, schema=schema, layer="layer1") as c:
c.writerecords(records1)
with memfile.open(mode='w', driver=driver, schema=schema, layer="layer2") as c:
c.writerecords(records2)
with memfile.open(driver=driver, layer="layer1") as c:
assert driver == c.driver
items = list(c)
assert len(items) == len(range1)
with memfile.open(driver=driver, layer="layer2") as c:
assert driver == c.driver
items = list(c)
assert len(items) == len(range1)
# TODO remove exclusion of MapInfo File once testdata_generator is fixed
@pytest.mark.parametrize(
"driver",
[
driver
for driver in supported_drivers
if _driver_supports_mode(driver, "a")
and supports_vsi(driver)
and driver not in {"MapInfo File", "TileDB"}
],
)
def test_append_memoryfile_drivers(driver, testdata_generator):
"""Test if driver is able to append to memoryfile"""
range1 = list(range(0, 5))
range2 = list(range(5, 10))
schema, crs, records1, records2, _ = testdata_generator(driver, range1, range2)
with MemoryFile() as memfile:
with memfile.open(driver=driver, crs="OGC:CRS84", schema=schema) as c:
c.writerecords(records1)
# The parquet dataset does not seem to support append mode
if driver == "Parquet":
with memfile.open(driver=driver) as c:
assert driver == c.driver
items = list(c)
assert len(items) == len(range1)
else:
with memfile.open(mode='a', driver=driver, schema=schema) as c:
c.writerecords(records2)
with memfile.open(driver=driver) as c:
assert driver == c.driver
items = list(c)
assert len(items) == len(range1 + range2)
def test_memoryfile_driver_does_not_support_vsi():
"""An exception is raised with a driver that does not support VSI"""
if "FileGDB" not in supported_drivers:
pytest.skip("FileGDB driver not available")
with pytest.raises(DriverError):
with MemoryFile() as memfile:
with memfile.open(driver="FileGDB"):
pass
@pytest.mark.parametrize('mode', ['r', 'a'])
def test_modes_on_non_existing_memoryfile(mode):
"""Non existing memoryfile cannot opened in r or a mode"""
with MemoryFile() as memfile:
with pytest.raises(IOError):
with memfile.open(mode=mode):
pass
def test_write_mode_on_non_existing_memoryfile(profile_first_coutwildrnp_shp):
"""Exception is raised if a memoryfile is opened in write mode on a non empty memoryfile"""
profile, first = profile_first_coutwildrnp_shp
profile['driver'] = 'GeoJSON'
with MemoryFile() as memfile:
with memfile.open(**profile) as col:
col.write(first)
with pytest.raises(IOError):
with memfile.open(mode="w"):
pass
@requires_gpkg
def test_read_multilayer_memoryfile(path_coutwildrnp_json, tmpdir):
"""Test read access to multilayer dataset in from file-like object"""
with fiona.open(path_coutwildrnp_json, "r") as src:
schema = src.schema
features = list(src)
path = os.path.join(tmpdir, "test.gpkg")
with fiona.open(path, "w", driver="GPKG", schema=schema, layer="layer1") as dst:
dst.writerecords(features[0:5])
with fiona.open(path, "w", driver="GPKG", schema=schema, layer="layer2") as dst:
dst.writerecords(features[5:])
with open(path, "rb") as f:
with fiona.open(f, layer="layer1") as src:
assert src.name == "layer1"
assert len(src) == 5
# Bug reported in #781 where this next section would fail
with open(path, "rb") as f:
with fiona.open(f, layer="layer2") as src:
assert src.name == "layer2"
assert len(src) == 62
def test_allow_unsupported_drivers(monkeypatch):
"""Test if allow unsupported drivers works as expected"""
# We delete a known working driver from fiona.drvsupport so that we can use it
monkeypatch.delitem(fiona.drvsupport.supported_drivers, "GPKG")
schema = {"geometry": "Polygon", "properties": {}}
# Test that indeed we can't create a file without allow_unsupported_drivers
with pytest.raises(DriverError):
with MemoryFile() as memfile:
with memfile.open(mode="w", driver="GPKG", schema=schema):
pass
# Test that we can create file with allow_unsupported_drivers=True
try:
with MemoryFile() as memfile:
with memfile.open(
mode="w", driver="GPKG", schema=schema, allow_unsupported_drivers=True
):
pass
except Exception as e:
assert (
False
), f"Using allow_unsupported_drivers=True should not raise an exception: {e}"
def test_listdir_zipmemoryfile(bytes_coutwildrnp_zip):
"""Test list directories of a zipped memory file."""
with ZipMemoryFile(bytes_coutwildrnp_zip) as memfile:
assert sorted(memfile.listdir()) == [
"coutwildrnp.dbf",
"coutwildrnp.prj",
"coutwildrnp.shp",
"coutwildrnp.shx",
]
def test_listlayers_zipmemoryfile(bytes_coutwildrnp_zip):
"""Test layers of a zipped memory file."""
with ZipMemoryFile(bytes_coutwildrnp_zip) as memfile:
assert memfile.listlayers() == ["coutwildrnp"]
def test_listdir_gdbzipmemoryfile(bytes_testopenfilegdb_zip):
"""Test list directories of a zipped GDB memory file."""
with ZipMemoryFile(bytes_testopenfilegdb_zip, ext=".gdb.zip") as memfile:
assert memfile.listdir() == [
"testopenfilegdb.gdb",
]
def test_listdir_gdbzipmemoryfile_bis(bytes_testopenfilegdb_zip):
"""Test list directories of a zipped GDB memory file."""
with ZipMemoryFile(bytes_testopenfilegdb_zip, filename="temp.gdb.zip") as memfile:
assert memfile.listdir() == [
"testopenfilegdb.gdb",
]
|