File: test_logging.py

package info (click to toggle)
zarr 3.1.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,264 kB
  • sloc: python: 33,347; makefile: 11
file content (170 lines) | stat: -rw-r--r-- 6,908 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, TypedDict

import pytest

import zarr
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
from zarr.storage import LocalStore, LoggingStore
from zarr.testing.store import StoreTests

if TYPE_CHECKING:
    from pathlib import Path

    from zarr.abc.store import Store


class StoreKwargs(TypedDict):
    store: LocalStore
    log_level: str


class TestLoggingStore(StoreTests[LoggingStore[LocalStore], cpu.Buffer]):
    # store_cls is needed to do an isinstance check, so can't be a subscripted generic
    store_cls = LoggingStore  # type: ignore[assignment]
    buffer_cls = cpu.Buffer

    async def get(self, store: LoggingStore[LocalStore], key: str) -> Buffer:
        return self.buffer_cls.from_bytes((store._store.root / key).read_bytes())

    async def set(self, store: LoggingStore[LocalStore], key: str, value: Buffer) -> None:
        parent = (store._store.root / key).parent
        if not parent.exists():
            parent.mkdir(parents=True)
        (store._store.root / key).write_bytes(value.to_bytes())

    @pytest.fixture
    def store_kwargs(self, tmp_path: Path) -> StoreKwargs:
        return {"store": LocalStore(str(tmp_path)), "log_level": "DEBUG"}

    @pytest.fixture
    def open_kwargs(self, tmp_path: Path) -> dict[str, type[LocalStore] | str]:
        return {"store_cls": LocalStore, "root": str(tmp_path), "log_level": "DEBUG"}

    @pytest.fixture
    def store(self, store_kwargs: StoreKwargs) -> LoggingStore[LocalStore]:
        return self.store_cls(**store_kwargs)

    def test_store_supports_writes(self, store: LoggingStore[LocalStore]) -> None:
        assert store.supports_writes

    def test_store_supports_listing(self, store: LoggingStore[LocalStore]) -> None:
        assert store.supports_listing

    def test_store_repr(self, store: LoggingStore[LocalStore]) -> None:
        assert f"{store!r}" == f"LoggingStore(LocalStore, 'file://{store._store.root.as_posix()}')"

    def test_store_str(self, store: LoggingStore[LocalStore]) -> None:
        assert str(store) == f"logging-file://{store._store.root.as_posix()}"

    async def test_default_handler(
        self, local_store: LocalStore, capsys: pytest.CaptureFixture[str]
    ) -> None:
        # Store and then remove existing handlers to enter default handler code path
        handlers = logging.getLogger().handlers[:]
        for h in handlers:
            logging.getLogger().removeHandler(h)
        # Test logs are sent to stdout
        wrapped = LoggingStore(store=local_store)
        buffer = default_buffer_prototype().buffer
        res = await wrapped.set("foo/bar/c/0", buffer.from_bytes(b"\x01\x02\x03\x04"))  # type: ignore[func-returns-value]
        assert res is None
        captured = capsys.readouterr()
        assert len(captured) == 2
        assert "Calling LocalStore.set" in captured.out
        assert "Finished LocalStore.set" in captured.out
        # Restore handlers
        for h in handlers:
            logging.getLogger().addHandler(h)

    def test_is_open_setter_raises(self, store: LoggingStore[LocalStore]) -> None:
        "Test that a user cannot change `_is_open` without opening the underlying store."
        with pytest.raises(
            NotImplementedError, match="LoggingStore must be opened via the `_open` method"
        ):
            store._is_open = True

    async def test_with_read_only_round_trip(self, local_store: LocalStore) -> None:
        """
        Ensure that LoggingStore.with_read_only returns another LoggingStore with
        the requested read_only state, preserves logging configuration, and does
        not change the original store.
        """
        # Start from a read-only underlying store
        ro_store = local_store.with_read_only(read_only=True)
        wrapped_ro = LoggingStore(store=ro_store, log_level="INFO")
        assert wrapped_ro.read_only

        buf = default_buffer_prototype().buffer.from_bytes(b"0123")

        # Cannot write through the read-only wrapper
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await wrapped_ro.set("foo", buf)

        # Create a writable wrapper
        writer = wrapped_ro.with_read_only(read_only=False)
        assert isinstance(writer, LoggingStore)
        assert not writer.read_only
        # logging configuration is preserved
        assert writer.log_level == wrapped_ro.log_level
        assert writer.log_handler == wrapped_ro.log_handler

        # Writes via the writable wrapper succeed
        await writer.set("foo", buf)
        out = await writer.get("foo", prototype=default_buffer_prototype())
        assert out is not None
        assert out.to_bytes() == buf.to_bytes()

        # The original wrapper remains read-only
        assert wrapped_ro.read_only
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await wrapped_ro.set("bar", buf)


@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"])
async def test_logging_store(store: Store, caplog: pytest.LogCaptureFixture) -> None:
    wrapped = LoggingStore(store=store, log_level="DEBUG")
    buffer = default_buffer_prototype().buffer

    caplog.clear()
    res = await wrapped.set("foo/bar/c/0", buffer.from_bytes(b"\x01\x02\x03\x04"))  # type: ignore[func-returns-value]
    assert res is None
    assert len(caplog.record_tuples) == 2
    for tup in caplog.record_tuples:
        assert str(store) in tup[0]
    assert f"Calling {type(store).__name__}.set" in caplog.record_tuples[0][2]
    assert f"Finished {type(store).__name__}.set" in caplog.record_tuples[1][2]

    caplog.clear()
    keys = [k async for k in wrapped.list()]
    assert keys == ["foo/bar/c/0"]
    assert len(caplog.record_tuples) == 2
    for tup in caplog.record_tuples:
        assert str(store) in tup[0]
    assert f"Calling {type(store).__name__}.list" in caplog.record_tuples[0][2]
    assert f"Finished {type(store).__name__}.list" in caplog.record_tuples[1][2]


@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"])
async def test_logging_store_counter(store: Store) -> None:
    wrapped = LoggingStore(store=store, log_level="DEBUG")

    arr = zarr.create(shape=(10,), store=wrapped, overwrite=True)
    arr[:] = 1

    assert wrapped.counter["set"] == 2
    assert wrapped.counter["list"] == 0
    assert wrapped.counter["list_dir"] == 0
    assert wrapped.counter["list_prefix"] == 0
    if store.supports_deletes:
        assert wrapped.counter["get"] == 0  # 1 if overwrite=False
        assert wrapped.counter["delete_dir"] == 1
    else:
        assert wrapped.counter["get"] == 1
        assert wrapped.counter["delete_dir"] == 0