1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, TypedDict
import pytest
import zarr
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
from zarr.storage import LocalStore, LoggingStore
from zarr.testing.store import StoreTests
if TYPE_CHECKING:
from pathlib import Path
from zarr.abc.store import Store
class StoreKwargs(TypedDict):
store: LocalStore
log_level: str
class TestLoggingStore(StoreTests[LoggingStore[LocalStore], cpu.Buffer]):
# store_cls is needed to do an isintsance check, so can't be a subscripted generic
store_cls = LoggingStore # type: ignore[assignment]
buffer_cls = cpu.Buffer
async def get(self, store: LoggingStore[LocalStore], key: str) -> Buffer:
return self.buffer_cls.from_bytes((store._store.root / key).read_bytes())
async def set(self, store: LoggingStore[LocalStore], key: str, value: Buffer) -> None:
parent = (store._store.root / key).parent
if not parent.exists():
parent.mkdir(parents=True)
(store._store.root / key).write_bytes(value.to_bytes())
@pytest.fixture
def store_kwargs(self, tmp_path: Path) -> StoreKwargs:
return {"store": LocalStore(str(tmp_path)), "log_level": "DEBUG"}
@pytest.fixture
def open_kwargs(self, tmp_path: Path) -> dict[str, type[LocalStore] | str]:
return {"store_cls": LocalStore, "root": str(tmp_path), "log_level": "DEBUG"}
@pytest.fixture
def store(self, store_kwargs: StoreKwargs) -> LoggingStore[LocalStore]:
return self.store_cls(**store_kwargs)
def test_store_supports_writes(self, store: LoggingStore[LocalStore]) -> None:
assert store.supports_writes
def test_store_supports_listing(self, store: LoggingStore[LocalStore]) -> None:
assert store.supports_listing
def test_store_repr(self, store: LoggingStore[LocalStore]) -> None:
assert f"{store!r}" == f"LoggingStore(LocalStore, 'file://{store._store.root.as_posix()}')"
def test_store_str(self, store: LoggingStore[LocalStore]) -> None:
assert str(store) == f"logging-file://{store._store.root.as_posix()}"
async def test_default_handler(
self, local_store: LocalStore, capsys: pytest.CaptureFixture[str]
) -> None:
# Store and then remove existing handlers to enter default handler code path
handlers = logging.getLogger().handlers[:]
for h in handlers:
logging.getLogger().removeHandler(h)
# Test logs are sent to stdout
wrapped = LoggingStore(store=local_store)
buffer = default_buffer_prototype().buffer
res = await wrapped.set("foo/bar/c/0", buffer.from_bytes(b"\x01\x02\x03\x04")) # type: ignore[func-returns-value]
assert res is None
captured = capsys.readouterr()
assert len(captured) == 2
assert "Calling LocalStore.set" in captured.out
assert "Finished LocalStore.set" in captured.out
# Restore handlers
for h in handlers:
logging.getLogger().addHandler(h)
def test_is_open_setter_raises(self, store: LoggingStore[LocalStore]) -> None:
"Test that a user cannot change `_is_open` without opening the underlying store."
with pytest.raises(
NotImplementedError, match="LoggingStore must be opened via the `_open` method"
):
store._is_open = True
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"])
async def test_logging_store(store: Store, caplog: pytest.LogCaptureFixture) -> None:
wrapped = LoggingStore(store=store, log_level="DEBUG")
buffer = default_buffer_prototype().buffer
caplog.clear()
res = await wrapped.set("foo/bar/c/0", buffer.from_bytes(b"\x01\x02\x03\x04")) # type: ignore[func-returns-value]
assert res is None
assert len(caplog.record_tuples) == 2
for tup in caplog.record_tuples:
assert str(store) in tup[0]
assert f"Calling {type(store).__name__}.set" in caplog.record_tuples[0][2]
assert f"Finished {type(store).__name__}.set" in caplog.record_tuples[1][2]
caplog.clear()
keys = [k async for k in wrapped.list()]
assert keys == ["foo/bar/c/0"]
assert len(caplog.record_tuples) == 2
for tup in caplog.record_tuples:
assert str(store) in tup[0]
assert f"Calling {type(store).__name__}.list" in caplog.record_tuples[0][2]
assert f"Finished {type(store).__name__}.list" in caplog.record_tuples[1][2]
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"])
async def test_logging_store_counter(store: Store) -> None:
wrapped = LoggingStore(store=store, log_level="DEBUG")
arr = zarr.create(shape=(10,), store=wrapped, overwrite=True)
arr[:] = 1
assert wrapped.counter["set"] == 2
assert wrapped.counter["list"] == 0
assert wrapped.counter["list_dir"] == 0
assert wrapped.counter["list_prefix"] == 0
if store.supports_deletes:
assert wrapped.counter["get"] == 0 # 1 if overwrite=False
assert wrapped.counter["delete_dir"] == 1
else:
assert wrapped.counter["get"] == 1
assert wrapped.counter["delete_dir"] == 0
|