File: test_wrapper.py

package info (click to toggle)
zarr 3.1.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,068 kB
  • sloc: python: 31,589; makefile: 10
file content (125 lines) | stat: -rw-r--r-- 4,566 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from __future__ import annotations

from typing import TYPE_CHECKING, Any, TypedDict

import pytest

from zarr.abc.store import ByteRequest, Store
from zarr.core.buffer import Buffer
from zarr.core.buffer.cpu import Buffer as CPUBuffer
from zarr.core.buffer.cpu import buffer_prototype
from zarr.storage import LocalStore, WrapperStore
from zarr.testing.store import StoreTests

if TYPE_CHECKING:
    from pathlib import Path

    from zarr.core.buffer.core import BufferPrototype


class StoreKwargs(TypedDict):
    store: LocalStore


class OpenKwargs(TypedDict):
    store_cls: type[LocalStore]
    root: str


# TODO: fix this warning
@pytest.mark.filterwarnings(
    "ignore:coroutine 'ClientCreatorContext.__aexit__' was never awaited:RuntimeWarning"
)
class TestWrapperStore(StoreTests[WrapperStore[Any], Buffer]):
    store_cls = WrapperStore
    buffer_cls = CPUBuffer

    async def get(self, store: WrapperStore[LocalStore], key: str) -> Buffer:
        return self.buffer_cls.from_bytes((store._store.root / key).read_bytes())

    async def set(self, store: WrapperStore[LocalStore], key: str, value: Buffer) -> None:
        parent = (store._store.root / key).parent
        if not parent.exists():
            parent.mkdir(parents=True)
        (store._store.root / key).write_bytes(value.to_bytes())

    @pytest.fixture
    def store_kwargs(self, tmp_path: Path) -> StoreKwargs:
        return {"store": LocalStore(str(tmp_path))}

    @pytest.fixture
    def open_kwargs(self, tmp_path: Path) -> OpenKwargs:
        return {"store_cls": LocalStore, "root": str(tmp_path)}

    def test_store_supports_writes(self, store: WrapperStore[LocalStore]) -> None:
        assert store.supports_writes

    def test_store_supports_listing(self, store: WrapperStore[LocalStore]) -> None:
        assert store.supports_listing

    def test_store_repr(self, store: WrapperStore[LocalStore]) -> None:
        assert f"{store!r}" == f"WrapperStore(LocalStore, 'file://{store._store.root.as_posix()}')"

    def test_store_str(self, store: WrapperStore[LocalStore]) -> None:
        assert str(store) == f"wrapping-file://{store._store.root.as_posix()}"

    def test_check_writeable(self, store: WrapperStore[LocalStore]) -> None:
        """
        Test _check_writeable() runs without errors.
        """
        store._check_writable()

    def test_close(self, store: WrapperStore[LocalStore]) -> None:
        "Test store can be closed"
        store.close()
        assert not store._is_open

    def test_is_open_setter_raises(self, store: WrapperStore[LocalStore]) -> None:
        """
        Test that a user cannot change `_is_open` without opening the underlying store.
        """
        with pytest.raises(
            NotImplementedError, match="WrapperStore must be opened via the `_open` method"
        ):
            store._is_open = True


# TODO: work out where warning is coming from and fix
@pytest.mark.filterwarnings(
    "ignore:coroutine 'ClientCreatorContext.__aexit__' was never awaited:RuntimeWarning"
)
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=True)
async def test_wrapped_set(store: Store, capsys: pytest.CaptureFixture[str]) -> None:
    # define a class that prints when it sets
    class NoisySetter(WrapperStore[Store]):
        async def set(self, key: str, value: Buffer) -> None:
            print(f"setting {key}")
            await super().set(key, value)

    key = "foo"
    value = CPUBuffer.from_bytes(b"bar")
    store_wrapped = NoisySetter(store)
    await store_wrapped.set(key, value)
    captured = capsys.readouterr()
    assert f"setting {key}" in captured.out
    assert await store_wrapped.get(key, buffer_prototype) == value


@pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning")
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=True)
async def test_wrapped_get(store: Store, capsys: pytest.CaptureFixture[str]) -> None:
    # define a class that prints when it sets
    class NoisyGetter(WrapperStore[Any]):
        async def get(
            self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
        ) -> None:
            print(f"getting {key}")
            await super().get(key, prototype=prototype, byte_range=byte_range)

    key = "foo"
    value = CPUBuffer.from_bytes(b"bar")
    store_wrapped = NoisyGetter(store)
    await store_wrapped.set(key, value)
    await store_wrapped.get(key, buffer_prototype)
    captured = capsys.readouterr()
    assert f"getting {key}" in captured.out