1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
from __future__ import annotations
from typing import TYPE_CHECKING, Any, TypedDict
import pytest
from zarr.abc.store import ByteRequest, Store
from zarr.core.buffer import Buffer
from zarr.core.buffer.cpu import Buffer as CPUBuffer
from zarr.core.buffer.cpu import buffer_prototype
from zarr.storage import LocalStore, WrapperStore
from zarr.testing.store import StoreTests
if TYPE_CHECKING:
from pathlib import Path
from zarr.core.buffer.core import BufferPrototype
class StoreKwargs(TypedDict):
store: LocalStore
class OpenKwargs(TypedDict):
store_cls: type[LocalStore]
root: str
# TODO: fix this warning
@pytest.mark.filterwarnings(
"ignore:coroutine 'ClientCreatorContext.__aexit__' was never awaited:RuntimeWarning"
)
class TestWrapperStore(StoreTests[WrapperStore[Any], Buffer]):
store_cls = WrapperStore
buffer_cls = CPUBuffer
async def get(self, store: WrapperStore[LocalStore], key: str) -> Buffer:
return self.buffer_cls.from_bytes((store._store.root / key).read_bytes())
async def set(self, store: WrapperStore[LocalStore], key: str, value: Buffer) -> None:
parent = (store._store.root / key).parent
if not parent.exists():
parent.mkdir(parents=True)
(store._store.root / key).write_bytes(value.to_bytes())
@pytest.fixture
def store_kwargs(self, tmp_path: Path) -> StoreKwargs:
return {"store": LocalStore(str(tmp_path))}
@pytest.fixture
def open_kwargs(self, tmp_path: Path) -> OpenKwargs:
return {"store_cls": LocalStore, "root": str(tmp_path)}
def test_store_supports_writes(self, store: WrapperStore[LocalStore]) -> None:
assert store.supports_writes
def test_store_supports_listing(self, store: WrapperStore[LocalStore]) -> None:
assert store.supports_listing
def test_store_repr(self, store: WrapperStore[LocalStore]) -> None:
assert f"{store!r}" == f"WrapperStore(LocalStore, 'file://{store._store.root.as_posix()}')"
def test_store_str(self, store: WrapperStore[LocalStore]) -> None:
assert str(store) == f"wrapping-file://{store._store.root.as_posix()}"
def test_check_writeable(self, store: WrapperStore[LocalStore]) -> None:
"""
Test _check_writeable() runs without errors.
"""
store._check_writable()
def test_close(self, store: WrapperStore[LocalStore]) -> None:
"Test store can be closed"
store.close()
assert not store._is_open
def test_is_open_setter_raises(self, store: WrapperStore[LocalStore]) -> None:
"""
Test that a user cannot change `_is_open` without opening the underlying store.
"""
with pytest.raises(
NotImplementedError, match="WrapperStore must be opened via the `_open` method"
):
store._is_open = True
# TODO: work out where warning is coming from and fix
@pytest.mark.filterwarnings(
"ignore:coroutine 'ClientCreatorContext.__aexit__' was never awaited:RuntimeWarning"
)
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=True)
async def test_wrapped_set(store: Store, capsys: pytest.CaptureFixture[str]) -> None:
# define a class that prints when it sets
class NoisySetter(WrapperStore[Store]):
async def set(self, key: str, value: Buffer) -> None:
print(f"setting {key}")
await super().set(key, value)
key = "foo"
value = CPUBuffer.from_bytes(b"bar")
store_wrapped = NoisySetter(store)
await store_wrapped.set(key, value)
captured = capsys.readouterr()
assert f"setting {key}" in captured.out
assert await store_wrapped.get(key, buffer_prototype) == value
@pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning")
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=True)
async def test_wrapped_get(store: Store, capsys: pytest.CaptureFixture[str]) -> None:
# define a class that prints when it sets
class NoisyGetter(WrapperStore[Any]):
async def get(
self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> None:
print(f"getting {key}")
await super().get(key, prototype=prototype, byte_range=byte_range)
key = "foo"
value = CPUBuffer.from_bytes(b"bar")
store_wrapped = NoisyGetter(store)
await store_wrapped.set(key, value)
await store_wrapped.get(key, buffer_prototype)
captured = capsys.readouterr()
assert f"getting {key}" in captured.out
|