File: test_cache_store.py

package info (click to toggle)
zarr 3.1.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,068 kB
  • sloc: python: 31,589; makefile: 10
file content (864 lines) | stat: -rw-r--r-- 34,273 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
"""
Tests for the dual-store cache implementation.
"""

import asyncio
import time

import pytest

from zarr.abc.store import Store
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.buffer.cpu import Buffer as CPUBuffer
from zarr.experimental.cache_store import CacheStore
from zarr.storage import MemoryStore


class TestCacheStore:
    """Test the dual-store cache implementation."""

    @pytest.fixture
    def source_store(self) -> MemoryStore:
        """Create a source store with some test data."""
        return MemoryStore()

    @pytest.fixture
    def cache_store(self) -> MemoryStore:
        """Create an empty cache store."""
        return MemoryStore()

    @pytest.fixture
    def cached_store(self, source_store: Store, cache_store: Store) -> CacheStore:
        """Create a cached store instance."""
        return CacheStore(source_store, cache_store=cache_store, key_insert_times={})

    async def test_basic_caching(self, cached_store: CacheStore, source_store: Store) -> None:
        """Test basic cache functionality."""
        # Store some data
        test_data = CPUBuffer.from_bytes(b"test data")
        await cached_store.set("test_key", test_data)

        # Verify it's in both stores
        assert await source_store.exists("test_key")
        assert await cached_store._cache.exists("test_key")

        # Retrieve and verify caching works
        result = await cached_store.get("test_key", default_buffer_prototype())
        assert result is not None
        assert result.to_bytes() == b"test data"

    async def test_cache_miss_and_population(
        self, cached_store: CacheStore, source_store: Store
    ) -> None:
        """Test cache miss and subsequent population."""
        # Put data directly in source store (bypassing cache)
        test_data = CPUBuffer.from_bytes(b"source data")
        await source_store.set("source_key", test_data)

        # First access should miss cache but populate it
        result = await cached_store.get("source_key", default_buffer_prototype())
        assert result is not None
        assert result.to_bytes() == b"source data"

        # Verify data is now in cache
        assert await cached_store._cache.exists("source_key")

    async def test_cache_expiration(self) -> None:
        """Test cache expiration based on max_age_seconds."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            source_store,
            cache_store=cache_store,
            max_age_seconds=1,  # 1 second expiration
            key_insert_times={},
        )

        # Store data
        test_data = CPUBuffer.from_bytes(b"expiring data")
        await cached_store.set("expire_key", test_data)

        # Should be fresh initially (if _is_key_fresh method exists)
        if hasattr(cached_store, "_is_key_fresh"):
            assert cached_store._is_key_fresh("expire_key")

            # Wait for expiration
            await asyncio.sleep(1.1)

            # Should now be stale
            assert not cached_store._is_key_fresh("expire_key")
        else:
            # Skip freshness check if method doesn't exist
            await asyncio.sleep(1.1)
            # Just verify the data is still accessible
            result = await cached_store.get("expire_key", default_buffer_prototype())
            assert result is not None

    async def test_cache_set_data_false(self, source_store: Store, cache_store: Store) -> None:
        """Test behavior when cache_set_data=False."""
        cached_store = CacheStore(
            source_store, cache_store=cache_store, cache_set_data=False, key_insert_times={}
        )

        test_data = CPUBuffer.from_bytes(b"no cache data")
        await cached_store.set("no_cache_key", test_data)

        # Data should be in source but not cache
        assert await source_store.exists("no_cache_key")
        assert not await cache_store.exists("no_cache_key")

    async def test_delete_removes_from_both_stores(self, cached_store: CacheStore) -> None:
        """Test that delete removes from both source and cache."""
        test_data = CPUBuffer.from_bytes(b"delete me")
        await cached_store.set("delete_key", test_data)

        # Verify in both stores
        assert await cached_store._store.exists("delete_key")
        assert await cached_store._cache.exists("delete_key")

        # Delete
        await cached_store.delete("delete_key")

        # Verify removed from both
        assert not await cached_store._store.exists("delete_key")
        assert not await cached_store._cache.exists("delete_key")

    async def test_exists_checks_source_store(
        self, cached_store: CacheStore, source_store: Store
    ) -> None:
        """Test that exists() checks the source store (source of truth)."""
        # Put data directly in source
        test_data = CPUBuffer.from_bytes(b"exists test")
        await source_store.set("exists_key", test_data)

        # Should exist even though not in cache
        assert await cached_store.exists("exists_key")

    async def test_list_operations(self, cached_store: CacheStore, source_store: Store) -> None:
        """Test listing operations delegate to source store."""
        # Add some test data
        test_data = CPUBuffer.from_bytes(b"list test")
        await cached_store.set("list/item1", test_data)
        await cached_store.set("list/item2", test_data)
        await cached_store.set("other/item3", test_data)

        # Test list_dir
        list_items = [key async for key in cached_store.list_dir("list/")]
        assert len(list_items) >= 2  # Should include our items

        # Test list_prefix
        prefix_items = [key async for key in cached_store.list_prefix("list/")]
        assert len(prefix_items) >= 2

    async def test_stale_cache_refresh(self) -> None:
        """Test that stale cache entries are refreshed from source."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            source_store, cache_store=cache_store, max_age_seconds=1, key_insert_times={}
        )

        # Store initial data
        old_data = CPUBuffer.from_bytes(b"old data")
        await cached_store.set("refresh_key", old_data)

        # Wait for expiration
        await asyncio.sleep(1.1)

        # Update source store directly (simulating external update)
        new_data = CPUBuffer.from_bytes(b"new data")
        await source_store.set("refresh_key", new_data)

        # Access should refresh from source when cache is stale
        result = await cached_store.get("refresh_key", default_buffer_prototype())
        assert result is not None
        assert result.to_bytes() == b"new data"

    async def test_infinity_max_age(self, cached_store: CacheStore) -> None:
        """Test that 'infinity' max_age means cache never expires."""
        # Skip test if _is_key_fresh method doesn't exist
        if not hasattr(cached_store, "_is_key_fresh"):
            pytest.skip("_is_key_fresh method not implemented")

        test_data = CPUBuffer.from_bytes(b"eternal data")
        await cached_store.set("eternal_key", test_data)

        # Should always be fresh
        assert cached_store._is_key_fresh("eternal_key")

        # Even after time passes
        await asyncio.sleep(0.1)
        assert cached_store._is_key_fresh("eternal_key")

    async def test_cache_returns_cached_data_for_performance(
        self, cached_store: CacheStore, source_store: Store
    ) -> None:
        """Test that cache returns cached data for performance, even if not in source."""
        # Skip test if key_insert_times attribute doesn't exist
        if not hasattr(cached_store, "key_insert_times"):
            pytest.skip("key_insert_times attribute not implemented")

        # Put data in cache but not source (simulates orphaned cache entry)
        test_data = CPUBuffer.from_bytes(b"orphaned data")
        await cached_store._cache.set("orphan_key", test_data)
        cached_store.key_insert_times["orphan_key"] = time.monotonic()

        # Cache should return data for performance (no source verification)
        result = await cached_store.get("orphan_key", default_buffer_prototype())
        assert result is not None
        assert result.to_bytes() == b"orphaned data"

        # Cache entry should remain (performance optimization)
        assert await cached_store._cache.exists("orphan_key")
        assert "orphan_key" in cached_store.key_insert_times

    async def test_cache_coherency_through_expiration(self) -> None:
        """Test that cache coherency is managed through cache expiration, not source verification."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            source_store,
            cache_store=cache_store,
            max_age_seconds=1,  # Short expiration for coherency
        )

        # Add data to both stores
        test_data = CPUBuffer.from_bytes(b"original data")
        await cached_store.set("coherency_key", test_data)

        # Remove from source (simulating external deletion)
        await source_store.delete("coherency_key")

        # Cache should still return cached data (performance optimization)
        result = await cached_store.get("coherency_key", default_buffer_prototype())
        assert result is not None
        assert result.to_bytes() == b"original data"

        # Wait for cache expiration
        await asyncio.sleep(1.1)

        # Now stale cache should be refreshed from source
        result = await cached_store.get("coherency_key", default_buffer_prototype())
        assert result is None  # Key no longer exists in source

    async def test_cache_info(self, cached_store: CacheStore) -> None:
        """Test cache_info method returns correct information."""
        # Test initial state
        info = cached_store.cache_info()

        # Check all expected keys are present
        expected_keys = {
            "cache_store_type",
            "max_age_seconds",
            "max_size",
            "current_size",
            "cache_set_data",
            "tracked_keys",
            "cached_keys",
        }
        assert set(info.keys()) == expected_keys

        # Check initial values
        assert info["cache_store_type"] == "MemoryStore"
        assert info["max_age_seconds"] == "infinity"
        assert info["max_size"] is None  # Default unlimited
        assert info["current_size"] == 0
        assert info["cache_set_data"] is True
        assert info["tracked_keys"] == 0
        assert info["cached_keys"] == 0

        # Add some data and verify tracking
        test_data = CPUBuffer.from_bytes(b"test data for cache info")
        await cached_store.set("info_test_key", test_data)

        # Check updated info
        updated_info = cached_store.cache_info()
        assert updated_info["tracked_keys"] == 1
        assert updated_info["cached_keys"] == 1
        assert updated_info["current_size"] > 0  # Should have some size now

    async def test_cache_info_with_max_size(self) -> None:
        """Test cache_info with max_size configuration."""
        source_store = MemoryStore()
        cache_store = MemoryStore()

        # Create cache with specific max_size and max_age
        cached_store = CacheStore(
            source_store,
            cache_store=cache_store,
            max_size=1024,
            max_age_seconds=300,
            key_insert_times={},
        )

        info = cached_store.cache_info()
        assert info["max_size"] == 1024
        assert info["max_age_seconds"] == 300
        assert info["current_size"] == 0

    async def test_clear_cache(self, cached_store: CacheStore) -> None:
        """Test clear_cache method clears all cache data and tracking."""
        # Add some test data
        test_data1 = CPUBuffer.from_bytes(b"test data 1")
        test_data2 = CPUBuffer.from_bytes(b"test data 2")

        await cached_store.set("clear_test_1", test_data1)
        await cached_store.set("clear_test_2", test_data2)

        # Verify data is cached
        info_before = cached_store.cache_info()
        assert info_before["tracked_keys"] == 2
        assert info_before["cached_keys"] == 2
        assert info_before["current_size"] > 0

        # Verify data exists in cache
        assert await cached_store._cache.exists("clear_test_1")
        assert await cached_store._cache.exists("clear_test_2")

        # Clear the cache
        await cached_store.clear_cache()

        # Verify cache is cleared
        info_after = cached_store.cache_info()
        assert info_after["tracked_keys"] == 0
        assert info_after["cached_keys"] == 0
        assert info_after["current_size"] == 0

        # Verify data is removed from cache store (if it supports clear)
        if hasattr(cached_store._cache, "clear"):
            # If cache store supports clear, all data should be gone
            assert not await cached_store._cache.exists("clear_test_1")
            assert not await cached_store._cache.exists("clear_test_2")

        # Verify data still exists in source store
        assert await cached_store._store.exists("clear_test_1")
        assert await cached_store._store.exists("clear_test_2")

    async def test_max_age_infinity(self) -> None:
        """Test cache with infinite max age."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_age_seconds="infinity")

        # Add data and verify it never expires
        test_data = CPUBuffer.from_bytes(b"test data")
        await cached_store.set("test_key", test_data)

        # Even after time passes, key should be fresh
        assert cached_store._is_key_fresh("test_key")

    async def test_max_age_numeric(self) -> None:
        """Test cache with numeric max age."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            source_store,
            cache_store=cache_store,
            max_age_seconds=1,  # 1 second
        )

        # Add data
        test_data = CPUBuffer.from_bytes(b"test data")
        await cached_store.set("test_key", test_data)

        # Key should be fresh initially
        assert cached_store._is_key_fresh("test_key")

        # Manually set old timestamp to test expiration
        cached_store.key_insert_times["test_key"] = time.monotonic() - 2  # 2 seconds ago

        # Key should now be stale
        assert not cached_store._is_key_fresh("test_key")

    async def test_cache_set_data_disabled(self) -> None:
        """Test cache behavior when cache_set_data is False."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, cache_set_data=False)

        # Set data
        test_data = CPUBuffer.from_bytes(b"test data")
        await cached_store.set("test_key", test_data)

        # Data should be in source but not in cache
        assert await source_store.exists("test_key")
        assert not await cache_store.exists("test_key")

        # Cache info should show no cached data
        info = cached_store.cache_info()
        assert info["cache_set_data"] is False
        assert info["cached_keys"] == 0

    async def test_eviction_with_max_size(self) -> None:
        """Test LRU eviction when max_size is exceeded."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            source_store,
            cache_store=cache_store,
            max_size=100,  # Small cache size
        )

        # Add data that exceeds cache size
        small_data = CPUBuffer.from_bytes(b"a" * 40)  # 40 bytes
        medium_data = CPUBuffer.from_bytes(b"b" * 40)  # 40 bytes
        large_data = CPUBuffer.from_bytes(b"c" * 40)  # 40 bytes (would exceed 100 byte limit)

        # Set first two items
        await cached_store.set("key1", small_data)
        await cached_store.set("key2", medium_data)

        # Cache should have 2 items
        info = cached_store.cache_info()
        assert info["cached_keys"] == 2
        assert info["current_size"] == 80

        # Add third item - should trigger eviction of first item
        await cached_store.set("key3", large_data)

        # Cache should still have items but first one may be evicted
        info = cached_store.cache_info()
        assert info["current_size"] <= 100

    async def test_value_exceeds_max_size(self) -> None:
        """Test behavior when a single value exceeds max_size."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            source_store,
            cache_store=cache_store,
            max_size=50,  # Small cache size
        )

        # Try to cache data larger than max_size
        large_data = CPUBuffer.from_bytes(b"x" * 100)  # 100 bytes > 50 byte limit
        await cached_store.set("large_key", large_data)

        # Data should be in source but not cached
        assert await source_store.exists("large_key")
        info = cached_store.cache_info()
        assert info["cached_keys"] == 0
        assert info["current_size"] == 0

    async def test_get_nonexistent_key(self) -> None:
        """Test getting a key that doesn't exist in either store."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store)

        # Try to get nonexistent key
        result = await cached_store.get("nonexistent", default_buffer_prototype())
        assert result is None

        # Should not create any cache entries
        info = cached_store.cache_info()
        assert info["cached_keys"] == 0

    async def test_delete_both_stores(self) -> None:
        """Test that delete removes from both source and cache stores."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store)

        # Add data
        test_data = CPUBuffer.from_bytes(b"test data")
        await cached_store.set("test_key", test_data)

        # Verify it's in both stores
        assert await source_store.exists("test_key")
        assert await cache_store.exists("test_key")

        # Delete
        await cached_store.delete("test_key")

        # Verify it's removed from both
        assert not await source_store.exists("test_key")
        assert not await cache_store.exists("test_key")

        # Verify tracking is updated
        info = cached_store.cache_info()
        assert info["cached_keys"] == 0

    async def test_invalid_max_age_seconds(self) -> None:
        """Test that invalid max_age_seconds values raise ValueError."""
        source_store = MemoryStore()
        cache_store = MemoryStore()

        with pytest.raises(ValueError, match="max_age_seconds string value must be 'infinity'"):
            CacheStore(source_store, cache_store=cache_store, max_age_seconds="invalid")

    async def test_unlimited_cache_size(self) -> None:
        """Test behavior when max_size is None (unlimited)."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            source_store,
            cache_store=cache_store,
            max_size=None,  # Unlimited cache
        )

        # Add large amounts of data
        for i in range(10):
            large_data = CPUBuffer.from_bytes(b"x" * 1000)  # 1KB each
            await cached_store.set(f"large_key_{i}", large_data)

        # All should be cached since there's no size limit
        info = cached_store.cache_info()
        assert info["cached_keys"] == 10
        assert info["current_size"] == 10000  # 10 * 1000 bytes

    async def test_evict_key_exception_handling(self) -> None:
        """Test exception handling in _evict_key method."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_size=100)

        # Add some data
        test_data = CPUBuffer.from_bytes(b"test data")
        await cached_store.set("test_key", test_data)

        # Manually corrupt the tracking to trigger exception
        # Remove from one structure but not others to create inconsistency
        del cached_store._cache_order["test_key"]

        # Try to evict - should handle the KeyError gracefully
        await cached_store._evict_key("test_key")

        # Should still work and not crash
        info = cached_store.cache_info()
        assert isinstance(info, dict)

    async def test_get_no_cache_delete_tracking(self) -> None:
        """Test _get_no_cache when key doesn't exist and needs cleanup."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store)

        # First, add key to cache tracking but not to source
        test_data = CPUBuffer.from_bytes(b"test data")
        await cache_store.set("phantom_key", test_data)
        await cached_store._cache_value("phantom_key", test_data)

        # Verify it's in tracking
        assert "phantom_key" in cached_store._cache_order
        assert "phantom_key" in cached_store.key_insert_times

        # Now try to get it - since it's not in source, should clean up tracking
        result = await cached_store._get_no_cache("phantom_key", default_buffer_prototype())
        assert result is None

        # Should have cleaned up tracking
        assert "phantom_key" not in cached_store._cache_order
        assert "phantom_key" not in cached_store.key_insert_times

    async def test_accommodate_value_no_max_size(self) -> None:
        """Test _accommodate_value early return when max_size is None."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            source_store,
            cache_store=cache_store,
            max_size=None,  # No size limit
        )

        # This should return early without doing anything
        await cached_store._accommodate_value(1000000)  # Large value

        # Should not affect anything since max_size is None
        info = cached_store.cache_info()
        assert info["current_size"] == 0

    async def test_concurrent_set_operations(self) -> None:
        """Test that concurrent set operations don't corrupt cache size tracking."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_size=1000)

        # Create 10 concurrent set operations
        async def set_data(key: str) -> None:
            data = CPUBuffer.from_bytes(b"x" * 50)
            await cached_store.set(key, data)

        # Run concurrently
        await asyncio.gather(*[set_data(f"key_{i}") for i in range(10)])

        info = cached_store.cache_info()
        # Expected: 10 keys * 50 bytes = 500 bytes
        assert info["cached_keys"] == 10
        assert info["current_size"] == 500  # WOULD FAIL due to race condition

    async def test_concurrent_eviction_race(self) -> None:
        """Test concurrent evictions don't corrupt size tracking."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_size=200)

        # Fill cache to near capacity
        data = CPUBuffer.from_bytes(b"x" * 80)
        await cached_store.set("key1", data)
        await cached_store.set("key2", data)

        # Now trigger two concurrent sets that both need to evict
        async def set_large(key: str) -> None:
            large_data = CPUBuffer.from_bytes(b"y" * 100)
            await cached_store.set(key, large_data)

        await asyncio.gather(set_large("key3"), set_large("key4"))

        info = cached_store.cache_info()
        # Size should be consistent with tracked keys
        assert info["current_size"] <= 200  # Might pass
        # But verify actual cache store size matches tracking
        total_size = sum(cached_store._key_sizes.get(k, 0) for k in cached_store._cache_order)
        assert total_size == info["current_size"]  # WOULD FAIL

    async def test_concurrent_get_and_evict(self) -> None:
        """Test get operations during eviction don't cause corruption."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_size=100)

        # Setup
        data = CPUBuffer.from_bytes(b"x" * 40)
        await cached_store.set("key1", data)
        await cached_store.set("key2", data)

        # Concurrent: read key1 while adding key3 (triggers eviction)
        async def read_key() -> None:
            for _ in range(100):
                await cached_store.get("key1", default_buffer_prototype())

        async def write_key() -> None:
            for i in range(10):
                new_data = CPUBuffer.from_bytes(b"y" * 40)
                await cached_store.set(f"new_{i}", new_data)

        await asyncio.gather(read_key(), write_key())

        # Verify consistency
        info = cached_store.cache_info()
        assert info["current_size"] <= 100
        assert len(cached_store._cache_order) == len(cached_store._key_sizes)

    async def test_eviction_actually_deletes_from_cache_store(self) -> None:
        """Test that eviction removes keys from cache_store, not just tracking."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_size=100)

        # Add data that will be evicted
        data1 = CPUBuffer.from_bytes(b"x" * 60)
        data2 = CPUBuffer.from_bytes(b"y" * 60)

        await cached_store.set("key1", data1)

        # Verify key1 is in cache_store
        assert await cache_store.exists("key1")

        # Add key2, which should evict key1
        await cached_store.set("key2", data2)

        # Check tracking - key1 should be removed
        assert "key1" not in cached_store._cache_order
        assert "key1" not in cached_store._key_sizes

        # CRITICAL: key1 should also be removed from cache_store
        assert not await cache_store.exists("key1"), (
            "Evicted key still exists in cache_store! _evict_key doesn't actually delete."
        )

        # But key1 should still exist in source store
        assert await source_store.exists("key1")

    async def test_eviction_no_orphaned_keys(self) -> None:
        """Test that eviction doesn't leave orphaned keys in cache_store."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_size=150)

        # Add multiple keys that will cause evictions
        for i in range(10):
            data = CPUBuffer.from_bytes(b"x" * 60)
            await cached_store.set(f"key_{i}", data)

        # Check tracking
        info = cached_store.cache_info()
        tracked_keys = info["cached_keys"]

        # Count actual keys in cache_store
        actual_keys = 0
        async for _ in cache_store.list():
            actual_keys += 1

        # Cache store should have same number of keys as tracking
        assert actual_keys == tracked_keys, (
            f"Cache store has {actual_keys} keys but tracking shows {tracked_keys}. "
            f"Eviction doesn't delete from cache_store!"
        )

    async def test_size_accounting_with_key_updates(self) -> None:
        """Test that updating the same key replaces size instead of accumulating."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_size=500)

        # Set initial value
        data1 = CPUBuffer.from_bytes(b"x" * 100)
        await cached_store.set("same_key", data1)

        info1 = cached_store.cache_info()
        assert info1["current_size"] == 100

        # Update with different size
        data2 = CPUBuffer.from_bytes(b"y" * 200)
        await cached_store.set("same_key", data2)

        info2 = cached_store.cache_info()

        # Should be 200, not 300 (update replaces, doesn't accumulate)
        assert info2["current_size"] == 200, (
            f"Expected size 200 but got {info2['current_size']}. "
            "Updating same key should replace, not accumulate."
        )

    async def test_all_tracked_keys_exist_in_cache_store(self) -> None:
        """Test invariant: all keys in tracking should exist in cache_store."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(source_store, cache_store=cache_store, max_size=500)

        # Add some data
        for i in range(5):
            data = CPUBuffer.from_bytes(b"x" * 50)
            await cached_store.set(f"key_{i}", data)

        # Every key in tracking should exist in cache_store
        for key in cached_store._cache_order:
            assert await cache_store.exists(key), (
                f"Key '{key}' is tracked but doesn't exist in cache_store"
            )

        # Every key in _key_sizes should exist in cache_store
        for key in cached_store._key_sizes:
            assert await cache_store.exists(key), (
                f"Key '{key}' has size tracked but doesn't exist in cache_store"
            )

    # Additional coverage tests for 100% coverage

    async def test_cache_store_requires_delete_support(self) -> None:
        """Test that CacheStore validates cache_store supports deletes."""
        from unittest.mock import MagicMock

        # Create a mock store that doesn't support deletes
        source_store = MemoryStore()
        cache_store = MagicMock()
        cache_store.supports_deletes = False

        with pytest.raises(ValueError, match="does not support deletes"):
            CacheStore(store=source_store, cache_store=cache_store)

    async def test_evict_key_exception_handling_with_real_error(
        self, monkeypatch: pytest.MonkeyPatch
    ) -> None:
        """Test _evict_key exception handling when deletion fails."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(store=source_store, cache_store=cache_store, max_size=100)

        # Set up a key in tracking
        buffer = CPUBuffer.from_bytes(b"test data")
        await cached_store.set("test_key", buffer)

        # Mock the cache delete to raise an exception
        async def failing_delete(key: str) -> None:
            raise RuntimeError("Simulated cache deletion failure")

        monkeypatch.setattr(cache_store, "delete", failing_delete)

        # Attempt to evict should raise the exception
        with pytest.raises(RuntimeError, match="Simulated cache deletion failure"):
            async with cached_store._lock:
                await cached_store._evict_key("test_key")

    async def test_cache_stats_method(self) -> None:
        """Test cache_stats method returns correct statistics."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(store=source_store, cache_store=cache_store, max_size=1000)

        # Initially, stats should be zero
        stats = cached_store.cache_stats()
        assert stats["hits"] == 0
        assert stats["misses"] == 0
        assert stats["evictions"] == 0
        assert stats["total_requests"] == 0
        assert stats["hit_rate"] == 0.0

        # Perform some operations
        buffer = CPUBuffer.from_bytes(b"x" * 100)

        # Write to source store directly to avoid affecting stats
        await source_store.set("key1", buffer)

        # First get is a miss (not in cache yet)
        result1 = await cached_store.get("key1", default_buffer_prototype())
        assert result1 is not None

        # Second get is a hit (now in cache)
        result2 = await cached_store.get("key1", default_buffer_prototype())
        assert result2 is not None

        stats = cached_store.cache_stats()
        assert stats["hits"] == 1
        assert stats["misses"] == 1
        assert stats["total_requests"] == 2
        assert stats["hit_rate"] == 0.5

    async def test_cache_stats_with_evictions(self) -> None:
        """Test cache_stats tracks evictions correctly."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            store=source_store,
            cache_store=cache_store,
            max_size=150,  # Small size to force eviction
        )

        # Add items that will trigger eviction
        buffer1 = CPUBuffer.from_bytes(b"x" * 100)
        buffer2 = CPUBuffer.from_bytes(b"y" * 100)

        await cached_store.set("key1", buffer1)
        await cached_store.set("key2", buffer2)  # Should evict key1

        stats = cached_store.cache_stats()
        assert stats["evictions"] == 1

    def test_repr_method(self) -> None:
        """Test __repr__ returns useful string representation."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(
            store=source_store, cache_store=cache_store, max_age_seconds=60, max_size=1024
        )

        repr_str = repr(cached_store)

        # Check that repr contains key information
        assert "CacheStore" in repr_str
        assert "max_age_seconds=60" in repr_str
        assert "max_size=1024" in repr_str
        assert "current_size=0" in repr_str
        assert "cached_keys=0" in repr_str

    async def test_cache_stats_zero_division_protection(self) -> None:
        """Test cache_stats handles zero requests correctly."""
        source_store = MemoryStore()
        cache_store = MemoryStore()
        cached_store = CacheStore(store=source_store, cache_store=cache_store)

        # With no requests, hit_rate should be 0.0 (not NaN or error)
        stats = cached_store.cache_stats()
        assert stats["hit_rate"] == 0.0
        assert stats["total_requests"] == 0