File: test_redis_sentinel_backend.py

package info (click to toggle)
python-dogpile.cache 1.3.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 772 kB
  • sloc: python: 5,462; makefile: 155; sh: 104
file content (109 lines) | stat: -rw-r--r-- 3,528 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from concurrent.futures import ThreadPoolExecutor
import os
from threading import Event
import time

from dogpile.testing import eq_
from dogpile.testing.fixtures import _GenericBackendFixture
from dogpile.testing.fixtures import _GenericBackendTestSuite
from dogpile.testing.fixtures import _GenericMutexTestSuite
from dogpile.testing.fixtures import _GenericSerializerTestSuite
from .test_redis_backend import _TestRedisConn as _TestRedisSentinelConn

REDIS_HOST = "127.0.0.1"
REDIS_PORT = int(os.getenv("DOGPILE_REDIS_SENTINEL_PORT", "26379"))
expect_redis_running = os.getenv("DOGPILE_REDIS_SENTINEL_PORT") is not None


class RedisSentinelTest(_TestRedisSentinelConn, _GenericBackendTestSuite):
    backend = "dogpile.cache.redis_sentinel"
    config_args = {
        "arguments": {
            "sentinels": [[REDIS_HOST, REDIS_PORT]],
            "service_name": "pifpaf",
            "db": 0,
            "distributed_lock": False,
        }
    }


class RedisSerializerTest(_GenericSerializerTestSuite, RedisSentinelTest):
    pass


class RedisSentinelDistributedMutexTest(
    _TestRedisSentinelConn, _GenericMutexTestSuite
):
    backend = "dogpile.cache.redis_sentinel"
    config_args = {
        "arguments": {
            "sentinels": [[REDIS_HOST, REDIS_PORT]],
            "service_name": "pifpaf",
            "db": 0,
            "distributed_lock": True,
        }
    }


class RedisSentinelAsyncCreationTest(
    _TestRedisSentinelConn, _GenericBackendFixture
):
    backend = "dogpile.cache.redis_sentinel"
    config_args = {
        "arguments": {
            "sentinels": [[REDIS_HOST, REDIS_PORT]],
            "service_name": "pifpaf",
            "db": 0,
            "distributed_lock": True,
            # This is the important bit:
            "thread_local_lock": False,
        }
    }

    def test_distributed_async_locks(self):
        pool = ThreadPoolExecutor(max_workers=1)
        ev = Event()

        # A simple example of how people may implement an async runner -
        # plugged into a thread pool executor.
        def asyncer(cache, key, creator, mutex):
            def _call():
                try:
                    value = creator()
                    cache.set(key, value)
                finally:
                    # If a thread-local lock is used here, this will fail
                    # because generally the async calls run in a different
                    # thread (that's the point of async creators).
                    try:
                        mutex.release()
                    except Exception:
                        pass
                    else:
                        ev.set()

            return pool.submit(_call)

        reg = self._region(
            region_args={"async_creation_runner": asyncer},
            config_args={"expiration_time": 0.1},
        )

        @reg.cache_on_arguments()
        def blah(k):
            return k * 2

        # First call adds to the cache without calling the async creator.
        eq_(blah("asd"), "asdasd")

        # Wait long enough to cause the cached value to get stale.
        time.sleep(0.3)

        # This will trigger the async runner and return the stale value.
        eq_(blah("asd"), "asdasd")

        # Wait for the the async runner to finish or timeout. If the mutex
        # release errored, then the event won't be set and we'll timeout.
        # On <= Python 3.1, wait returned nothing. So check is_set after.
        ev.wait(timeout=1.0)
        eq_(ev.is_set(), True)