File: test_multiprocessing.py

package info (click to toggle)
python-redis 6.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,432 kB
  • sloc: python: 60,318; sh: 179; makefile: 128
file content (211 lines) | stat: -rw-r--r-- 7,518 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import contextlib
import multiprocessing

import pytest
import redis
from redis.connection import Connection, ConnectionPool
from redis.exceptions import ConnectionError

from .conftest import _get_client


@contextlib.contextmanager
def exit_callback(callback, *args):
    try:
        yield
    finally:
        callback(*args)


class TestMultiprocessing:
    # On macOS and newly non-macOS POSIX systems (since Python 3.14),
    # the default method has been changed to forkserver.
    # The code in this module does not work with it,
    # hence the explicit change to 'fork'
    # See https://github.com/python/cpython/issues/125714
    if multiprocessing.get_start_method() in ["forkserver", "spawn"]:
        _mp_context = multiprocessing.get_context(method="fork")
    else:
        _mp_context = multiprocessing.get_context()

    # Test connection sharing between forks.
    # See issue #1085 for details.

    # use a multi-connection client as that's the only type that is
    # actually fork/process-safe
    @pytest.fixture()
    def r(self, request):
        return _get_client(redis.Redis, request=request, single_connection_client=False)

    def test_close_connection_in_child(self, master_host):
        """
        A connection owned by a parent and closed by a child doesn't
        destroy the file descriptors so a parent can still use it.
        """
        conn = Connection(host=master_host[0], port=master_host[1])
        conn.send_command("ping")
        assert conn.read_response() == b"PONG"

        def target(conn):
            conn.send_command("ping")
            assert conn.read_response() == b"PONG"
            conn.disconnect()

        proc = self._mp_context.Process(target=target, args=(conn,))
        proc.start()
        proc.join(3)
        assert proc.exitcode == 0

        # The connection was created in the parent but disconnected in the
        # child. The child called socket.close() but did not call
        # socket.shutdown() because it wasn't the "owning" process.
        # Therefore the connection still works in the parent.
        conn.send_command("ping")
        assert conn.read_response() == b"PONG"

    def test_close_connection_in_parent(self, master_host):
        """
        A connection owned by a parent is unusable by a child if the parent
        (the owning process) closes the connection.
        """
        conn = Connection(host=master_host[0], port=master_host[1])
        conn.send_command("ping")
        assert conn.read_response() == b"PONG"

        def target(conn, ev):
            ev.wait()
            # the parent closed the connection. because it also created the
            # connection, the connection is shutdown and the child
            # cannot use it.
            with pytest.raises(ConnectionError):
                conn.send_command("ping")

        ev = multiprocessing.Event()
        proc = self._mp_context.Process(target=target, args=(conn, ev))
        proc.start()

        conn.disconnect()
        ev.set()

        proc.join(3)
        assert proc.exitcode == 0

    @pytest.mark.parametrize("max_connections", [2, None])
    def test_release_parent_connection_from_pool_in_child_process(
        self, max_connections, master_host
    ):
        """
        A connection owned by a parent should not decrease the _created_connections
        counter in child when released - when the child process starts to use the
        pool it resets all the counters that have been set in the parent process.
        """

        pool = ConnectionPool.from_url(
            f"redis://{master_host[0]}:{master_host[1]}",
            max_connections=max_connections,
        )

        parent_conn = pool.get_connection()

        def target(pool, parent_conn):
            with exit_callback(pool.disconnect):
                child_conn = pool.get_connection()
                assert child_conn.pid != parent_conn.pid
                pool.release(child_conn)
                assert pool._created_connections == 1
                assert child_conn in pool._available_connections
                pool.release(parent_conn)
                assert pool._created_connections == 1
                assert child_conn in pool._available_connections
                assert parent_conn not in pool._available_connections

        proc = self._mp_context.Process(target=target, args=(pool, parent_conn))
        proc.start()
        proc.join(3)
        assert proc.exitcode == 0

    @pytest.mark.parametrize("max_connections", [1, 2, None])
    def test_pool(self, max_connections, master_host):
        """
        A child will create its own connections when using a pool created
        by a parent.
        """
        pool = ConnectionPool.from_url(
            f"redis://{master_host[0]}:{master_host[1]}",
            max_connections=max_connections,
        )

        conn = pool.get_connection()
        main_conn_pid = conn.pid
        with exit_callback(pool.release, conn):
            conn.send_command("ping")
            assert conn.read_response() == b"PONG"

        def target(pool):
            with exit_callback(pool.disconnect):
                conn = pool.get_connection()
                assert conn.pid != main_conn_pid
                with exit_callback(pool.release, conn):
                    assert conn.send_command("ping") is None
                    assert conn.read_response() == b"PONG"

        proc = self._mp_context.Process(target=target, args=(pool,))
        proc.start()
        proc.join(3)
        assert proc.exitcode == 0

        # Check that connection is still alive after fork process has exited
        # and disconnected the connections in its pool
        conn = pool.get_connection()
        with exit_callback(pool.release, conn):
            assert conn.send_command("ping") is None
            assert conn.read_response() == b"PONG"

    @pytest.mark.parametrize("max_connections", [1, 2, None])
    def test_close_pool_in_main(self, max_connections, master_host):
        """
        A child process that uses the same pool as its parent isn't affected
        when the parent disconnects all connections within the pool.
        """
        pool = ConnectionPool.from_url(
            f"redis://{master_host[0]}:{master_host[1]}",
            max_connections=max_connections,
        )

        conn = pool.get_connection()
        assert conn.send_command("ping") is None
        assert conn.read_response() == b"PONG"

        def target(pool, disconnect_event):
            conn = pool.get_connection()
            with exit_callback(pool.release, conn):
                assert conn.send_command("ping") is None
                assert conn.read_response() == b"PONG"
                disconnect_event.wait()
                assert conn.send_command("ping") is None
                assert conn.read_response() == b"PONG"

        ev = multiprocessing.Event()

        proc = self._mp_context.Process(target=target, args=(pool, ev))
        proc.start()

        pool.disconnect()
        ev.set()
        proc.join(3)
        assert proc.exitcode == 0

    def test_redis_client(self, r):
        "A redis client created in a parent can also be used in a child"
        assert r.ping() is True

        def target(client):
            assert client.ping() is True
            del client

        proc = self._mp_context.Process(target=target, args=(r,))
        proc.start()
        proc.join(3)
        assert proc.exitcode == 0

        assert r.ping() is True