import asyncio
import binascii
import datetime
import ssl
import warnings
from typing import Any, Awaitable, Callable, Dict, List, Optional, Type, Union
from unittest import mock
from urllib.parse import urlparse

import pytest
import pytest_asyncio
from _pytest.fixtures import FixtureRequest
from packaging.version import Version
from redis._parsers import AsyncCommandsParser
from redis.asyncio.cluster import ClusterNode, NodesManager, RedisCluster
from redis.asyncio.connection import Connection, SSLConnection, async_timeout
from redis.asyncio.retry import Retry
from redis.backoff import (
    ExponentialBackoff,
    ExponentialWithJitterBackoff,
    NoBackoff,
)
from redis.cluster import (
    PIPELINE_BLOCKED_COMMANDS,
    PRIMARY,
    REPLICA,
    LoadBalancingStrategy,
    get_node_name,
)
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
    AskError,
    ClusterDownError,
    ConnectionError,
    DataError,
    MaxConnectionsError,
    MovedError,
    NoPermissionError,
    RedisClusterException,
    RedisError,
    ResponseError,
)
from redis.utils import str_if_bytes
from tests.conftest import (
    REDIS_INFO,
    assert_resp_response,
    is_resp2_connection,
    skip_if_redis_enterprise,
    skip_if_server_version_lt,
    skip_unless_arch_bits,
)

from ..ssl_utils import get_tls_certificates
from .compat import aclosing

pytestmark = pytest.mark.onlycluster


default_host = "127.0.0.1"
default_port = 7000
default_cluster_slots = [
    [0, 8191, ["127.0.0.1", 7000, "node_0"], ["127.0.0.1", 7003, "node_3"]],
    [8192, 16383, ["127.0.0.1", 7001, "node_1"], ["127.0.0.1", 7002, "node_2"]],
]


class NodeProxy:
    """A class to proxy a node connection to a different port"""

    def __init__(self, addr, redis_addr):
        self.addr = addr
        self.redis_addr = redis_addr
        self.server = None
        self.task = None
        self.n_connections = 0

    async def start(self):
        # test that we can connect to redis
        async with async_timeout(2):
            _, redis_writer = await asyncio.open_connection(*self.redis_addr)
        redis_writer.close()
        self.server = await asyncio.start_server(
            self.handle, *self.addr, reuse_address=True
        )
        self.task = asyncio.create_task(self.server.serve_forever())

    async def handle(self, reader, writer):
        # establish connection to redis
        redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr)
        try:
            self.n_connections += 1
            pipe1 = asyncio.create_task(self.pipe(reader, redis_writer))
            pipe2 = asyncio.create_task(self.pipe(redis_reader, writer))
            await asyncio.gather(pipe1, pipe2)
        finally:
            redis_writer.close()
            await self.redis_writer.wait_closed()
            writer.close()
            await writer.wait_closed()

    async def aclose(self):
        try:
            self.task.cancel()
            await asyncio.wait_for(self.task, timeout=1)
            self.server.close()
            await self.server.wait_closed()
        except asyncio.TimeoutError:
            pass
        except asyncio.CancelledError:
            pass

    async def pipe(
        self,
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
    ):
        while True:
            data = await reader.read(1000)
            if not data:
                break
            writer.write(data)
            await writer.drain()


@pytest_asyncio.fixture()
async def slowlog(r: RedisCluster) -> None:
    """
    Set the slowlog threshold to 0, and the
    max length to 128. This will force every
    command into the slowlog and allow us
    to test it
    """
    # Save old values
    current_config = await r.config_get(target_nodes=r.get_primaries()[0])
    old_slower_than_value = current_config["slowlog-log-slower-than"]
    old_max_length_value = current_config["slowlog-max-len"]

    # Set the new values
    await r.config_set("slowlog-log-slower-than", 0)
    await r.config_set("slowlog-max-len", 128)

    yield

    await r.config_set("slowlog-log-slower-than", old_slower_than_value)
    await r.config_set("slowlog-max-len", old_max_length_value)


async def get_mocked_redis_client(
    cluster_slots_raise_error=False, *args, **kwargs
) -> RedisCluster:
    """
    Return a stable RedisCluster object that have deterministic
    nodes and slots setup to remove the problem of different IP addresses
    on different installations and machines.
    """
    cluster_slots = kwargs.pop("cluster_slots", default_cluster_slots)
    coverage_res = kwargs.pop("coverage_result", "yes")
    cluster_enabled = kwargs.pop("cluster_enabled", True)
    with mock.patch.object(ClusterNode, "execute_command") as execute_command_mock:

        async def execute_command(*_args, **_kwargs):
            if _args[0] == "CLUSTER SLOTS":
                if cluster_slots_raise_error:
                    raise ResponseError()
                else:
                    mock_cluster_slots = cluster_slots
                    return mock_cluster_slots
            elif _args[0] == "COMMAND":
                return {"get": [], "set": []}
            elif _args[0] == "INFO":
                return {"cluster_enabled": cluster_enabled}
            elif len(_args) > 1 and _args[1] == "cluster-require-full-coverage":
                return {"cluster-require-full-coverage": coverage_res}
            else:
                return await execute_command_mock(*_args, **_kwargs)

        execute_command_mock.side_effect = execute_command

        with mock.patch.object(
            AsyncCommandsParser, "initialize", autospec=True
        ) as cmd_parser_initialize:

            def cmd_init_mock(self, r: ClusterNode) -> None:
                self.commands = {
                    "get": {
                        "name": "get",
                        "arity": 2,
                        "flags": ["readonly", "fast"],
                        "first_key_pos": 1,
                        "last_key_pos": 1,
                        "step_count": 1,
                    }
                }

            cmd_parser_initialize.side_effect = cmd_init_mock

            # Create a subclass of RedisCluster that overrides __del__
            class MockedRedisCluster(RedisCluster):
                def __del__(self):
                    # Override to prevent connection cleanup attempts
                    pass

                @property
                def connection_pool(self):
                    # Required abstract property implementation
                    return self.nodes_manager.get_default_node().redis_connection.connection_pool

            return await MockedRedisCluster(*args, **kwargs)


def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
    connection = mock.AsyncMock(spec=Connection)
    connection.is_connected = True
    connection.read_response.return_value = response
    while node._free:
        node._free.pop()
    node._free.append(connection)
    return node


def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode:
    connection = mock.AsyncMock(spec=Connection)
    connection.is_connected = True
    connection.read_response.side_effect = exc
    while node._free:
        node._free.pop()
    node._free.append(connection)
    return node


def mock_all_nodes_resp(rc: RedisCluster, response: Any) -> RedisCluster:
    for node in rc.get_nodes():
        mock_node_resp(node, response)
    return rc


async def moved_redirection_helper(
    create_redis: Callable[..., RedisCluster], failover: bool = False
) -> None:
    """
    Test that the client handles MOVED response after a failover.
    Redirection after a failover means that the redirection address is of a
    replica that was promoted to a primary.

    At first call it should return a MOVED ResponseError that will point
    the client to the next server it should talk to.

    Verify that:
    1. it tries to talk to the redirected node
    2. it updates the slot's primary to the redirected node

    For a failover, also verify:
    3. the redirected node's server type updated to 'primary'
    4. the server type of the previous slot owner updated to 'replica'
    """
    rc = await create_redis(cls=RedisCluster, flushdb=False)
    slot = 12182
    redirect_node = None
    # Get the current primary that holds this slot
    prev_primary = rc.nodes_manager.get_node_from_slot(slot)
    if failover:
        if len(rc.nodes_manager.slots_cache[slot]) < 2:
            warnings.warn("Skipping this test since it requires to have a replica")
            return
        redirect_node = rc.nodes_manager.slots_cache[slot][1]
    else:
        # Use one of the primaries to be the redirected node
        redirect_node = rc.get_primaries()[0]
    r_host = redirect_node.host
    r_port = redirect_node.port
    with mock.patch.object(
        ClusterNode, "execute_command", autospec=True
    ) as execute_command:

        def moved_redirect_effect(self, *args, **options):
            def ok_response(self, *args, **options):
                assert self.host == r_host
                assert self.port == r_port

                return "MOCK_OK"

            execute_command.side_effect = ok_response
            raise MovedError(f"{slot} {r_host}:{r_port}")

        execute_command.side_effect = moved_redirect_effect
        assert await rc.execute_command("SET", "foo", "bar") == "MOCK_OK"
        slot_primary = rc.nodes_manager.slots_cache[slot][0]
        assert slot_primary == redirect_node
        if failover:
            assert rc.get_node(host=r_host, port=r_port).server_type == PRIMARY
            assert prev_primary.server_type == REPLICA


class TestRedisClusterObj:
    """
    Tests for the RedisCluster class
    """

    async def test_host_port_startup_node(self) -> None:
        """
        Test that it is possible to use host & port arguments as startup node
        args
        """
        cluster = await get_mocked_redis_client(host=default_host, port=default_port)
        assert cluster.get_node(host=default_host, port=default_port) is not None

        await cluster.aclose()

    async def test_aclosing(self) -> None:
        cluster = await get_mocked_redis_client(host=default_host, port=default_port)
        called = 0

        async def mock_aclose():
            nonlocal called
            called += 1

        with mock.patch.object(cluster, "aclose", mock_aclose):
            async with aclosing(cluster):
                pass
            assert called == 1
        await cluster.aclose()

    async def test_close_is_aclose(self) -> None:
        """
        Test that it is possible to use host & port arguments as startup node
        args
        """
        cluster = await get_mocked_redis_client(host=default_host, port=default_port)
        called = 0

        async def mock_aclose():
            nonlocal called
            called += 1

        with mock.patch.object(cluster, "aclose", mock_aclose):
            with pytest.warns(DeprecationWarning, match=r"Use aclose\(\) instead"):
                await cluster.close()
            assert called == 1
        await cluster.aclose()

    async def test_startup_nodes(self) -> None:
        """
        Test that it is possible to use startup_nodes
        argument to init the cluster
        """
        port_1 = 7000
        port_2 = 7001
        startup_nodes = [
            ClusterNode(default_host, port_1),
            ClusterNode(default_host, port_2),
        ]
        cluster = await get_mocked_redis_client(startup_nodes=startup_nodes)
        assert (
            cluster.get_node(host=default_host, port=port_1) is not None
            and cluster.get_node(host=default_host, port=port_2) is not None
        )

        await cluster.aclose()

        startup_node = ClusterNode("127.0.0.1", 16379)
        async with RedisCluster(startup_nodes=[startup_node], client_name="test") as rc:
            assert await rc.set("A", 1)
            assert await rc.get("A") == b"1"
            assert all(
                [
                    name == "test"
                    for name in (
                        await rc.client_getname(target_nodes=rc.ALL_NODES)
                    ).values()
                ]
            )

    async def test_cluster_set_get_retry_object(self, request: FixtureRequest):
        retry = Retry(NoBackoff(), 2)
        url = request.config.getoption("--redis-url")
        async with RedisCluster.from_url(url, retry=retry) as r:
            assert r.retry.get_retries() == retry.get_retries()
            assert isinstance(r.retry._backoff, NoBackoff)
            for node in r.get_nodes():
                # validate nodes lower level connections default
                # retry policy is applied
                n_retry = node.acquire_connection().retry
                assert n_retry is not None
                assert n_retry._retries == 0
                assert isinstance(n_retry._backoff, NoBackoff)
            rand_cluster_node = r.get_random_node()
            existing_conn = rand_cluster_node.acquire_connection()
            # Change retry policy
            new_retry = Retry(ExponentialBackoff(), 3)
            r.set_retry(new_retry)
            assert r.retry.get_retries() == new_retry.get_retries()
            assert isinstance(r.retry._backoff, ExponentialBackoff)
            for node in r.get_nodes():
                # validate nodes lower level connections are not affected
                n_retry = node.acquire_connection().retry
                assert n_retry is not None
                assert n_retry._retries == 0
                assert isinstance(n_retry._backoff, NoBackoff)
            assert existing_conn.retry.get_retries() == 0
            new_conn = rand_cluster_node.acquire_connection()
            assert new_conn.retry._retries == 0

    async def test_cluster_retry_object(self, request: FixtureRequest) -> None:
        url = request.config.getoption("--redis-url")
        async with RedisCluster.from_url(url) as rc_default:
            # Test default retry
            retry = rc_default.retry

            # FIXME: Workaround for https://github.com/redis/redis-py/issues/3030
            host = rc_default.get_default_node().host

            assert isinstance(retry, Retry)
            assert retry._retries == 3
            assert isinstance(retry._backoff, type(ExponentialWithJitterBackoff()))

            # validate nodes connections are using the default retry for
            # lower level connections when client is created through 'from_url' method
            # without specified retry object
            node1_retry = rc_default.get_node(host, 16379).acquire_connection().retry
            node2_retry = rc_default.get_node(host, 16380).acquire_connection().retry
            for node_retry in (node1_retry, node2_retry):
                assert node_retry.get_retries() == 0
                assert isinstance(node_retry._backoff, NoBackoff)
                assert node_retry._supported_errors == (ConnectionError,)

        retry = Retry(ExponentialBackoff(10, 5), 5)
        async with RedisCluster.from_url(url, retry=retry) as rc_custom_retry:
            # Test custom retry
            assert rc_custom_retry.retry == retry
            # validate nodes connections are using the default retry for
            # lower level connections when client is created through 'from_url' method
            # with specified retry object
            node1_retry = rc_default.get_node(host, 16379).acquire_connection().retry
            node2_retry = rc_default.get_node(host, 16380).acquire_connection().retry
            for node_retry in (node1_retry, node2_retry):
                assert node_retry.get_retries() == 0
                assert isinstance(node_retry._backoff, NoBackoff)
                assert node_retry._supported_errors == (ConnectionError,)

        async with RedisCluster.from_url(
            url, cluster_error_retry_attempts=0
        ) as rc_no_retries:
            # Test no cluster retries
            assert rc_no_retries.retry.get_retries() == 0

        async with RedisCluster.from_url(
            url, retry=Retry(NoBackoff(), 0)
        ) as rc_no_retries:
            assert rc_no_retries.retry.get_retries() == 0

    async def test_empty_startup_nodes(self) -> None:
        """
        Test that exception is raised when empty providing empty startup_nodes
        """
        with pytest.raises(RedisClusterException) as ex:
            RedisCluster(startup_nodes=[])

        assert str(ex.value).startswith(
            "RedisCluster requires at least one node to discover the cluster"
        ), str_if_bytes(ex.value)

    async def test_from_url(self, request: FixtureRequest) -> None:
        url = request.config.getoption("--redis-url")

        async with RedisCluster.from_url(url) as rc:
            await rc.set("a", 1)
            await rc.get("a") == 1

        rc = RedisCluster.from_url("rediss://localhost:16379")
        assert rc.connection_kwargs["connection_class"] is SSLConnection

    async def test_max_connections(
        self, create_redis: Callable[..., RedisCluster]
    ) -> None:
        rc = await create_redis(cls=RedisCluster, max_connections=10)
        for node in rc.get_nodes():
            assert node.max_connections == 10

        with mock.patch.object(Connection, "read_response") as read_response:

            async def read_response_mocked(*args: Any, **kwargs: Any) -> None:
                await asyncio.sleep(10)

            read_response.side_effect = read_response_mocked

            with pytest.raises(MaxConnectionsError):
                await asyncio.gather(
                    *(
                        rc.ping(target_nodes=RedisCluster.DEFAULT_NODE)
                        for _ in range(11)
                    )
                )

        await rc.aclose()

    async def test_execute_command_errors(self, r: RedisCluster) -> None:
        """
        Test that if no key is provided then exception should be raised.
        """
        with pytest.raises(RedisClusterException) as ex:
            await r.execute_command("GET")
        assert str(ex.value).startswith(
            "No way to dispatch this command to Redis Cluster. Missing key."
        )

    async def test_execute_command_node_flag_primaries(self, r: RedisCluster) -> None:
        """
        Test command execution with nodes flag PRIMARIES
        """
        primaries = r.get_primaries()
        replicas = r.get_replicas()
        mock_all_nodes_resp(r, "PONG")
        assert await r.ping(target_nodes=RedisCluster.PRIMARIES) is True
        for primary in primaries:
            conn = primary._free.pop()
            assert conn.read_response.called is True
        for replica in replicas:
            conn = replica._free.pop()
            assert conn.read_response.called is not True

    async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None:
        """
        Test command execution with nodes flag REPLICAS
        """
        replicas = r.get_replicas()
        assert len(replicas) != 0, "This test requires Cluster with 1 replica"

        primaries = r.get_primaries()
        mock_all_nodes_resp(r, "PONG")
        assert await r.ping(target_nodes=RedisCluster.REPLICAS) is True
        for replica in replicas:
            conn = replica._free.pop()
            assert conn.read_response.called is True
        for primary in primaries:
            conn = primary._free.pop()
            assert conn.read_response.called is not True

        await r.aclose()

    async def test_execute_command_node_flag_all_nodes(self, r: RedisCluster) -> None:
        """
        Test command execution with nodes flag ALL_NODES
        """
        mock_all_nodes_resp(r, "PONG")
        assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True
        for node in r.get_nodes():
            conn = node._free.pop()
            assert conn.read_response.called is True

    async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
        """
        Test command execution with nodes flag RANDOM
        """
        mock_all_nodes_resp(r, "PONG")
        assert await r.ping(target_nodes=RedisCluster.RANDOM) is True
        called_count = 0
        for node in r.get_nodes():
            conn = node._free.pop()
            if conn.read_response.called is True:
                called_count += 1
        assert called_count == 1

    async def test_execute_command_default_node(self, r: RedisCluster) -> None:
        """
        Test command execution without node flag is being executed on the
        default node
        """
        def_node = r.get_default_node()
        mock_node_resp(def_node, "PONG")
        assert await r.ping() is True
        conn = def_node._free.pop()
        assert conn.read_response.called

    async def test_ask_redirection(self, r: RedisCluster) -> None:
        """
        Test that the server handles ASK response.

        At first call it should return a ASK ResponseError that will point
        the client to the next server it should talk to.

        Important thing to verify is that it tries to talk to the second node.
        """
        redirect_node = r.get_nodes()[0]
        with mock.patch.object(
            ClusterNode, "execute_command", autospec=True
        ) as execute_command:

            def ask_redirect_effect(self, *args, **options):
                def ok_response(self, *args, **options):
                    assert self.host == redirect_node.host
                    assert self.port == redirect_node.port

                    return "MOCK_OK"

                execute_command.side_effect = ok_response
                raise AskError(f"12182 {redirect_node.host}:{redirect_node.port}")

            execute_command.side_effect = ask_redirect_effect

            assert await r.execute_command("SET", "foo", "bar") == "MOCK_OK"

    async def test_moved_redirection(
        self, create_redis: Callable[..., RedisCluster]
    ) -> None:
        """
        Test that the client handles MOVED response.
        """
        await moved_redirection_helper(create_redis, failover=False)

    async def test_moved_redirection_after_failover(
        self, create_redis: Callable[..., RedisCluster]
    ) -> None:
        """
        Test that the client handles MOVED response after a failover.
        """
        await moved_redirection_helper(create_redis, failover=True)

    async def test_refresh_using_specific_nodes(
        self, create_redis: Callable[..., RedisCluster]
    ) -> None:
        """
        Test making calls on specific nodes when the cluster has failed over to
        another node
        """
        node_7006 = ClusterNode(host=default_host, port=7006, server_type=PRIMARY)
        node_7007 = ClusterNode(host=default_host, port=7007, server_type=PRIMARY)
        with mock.patch.object(
            ClusterNode, "execute_command", autospec=True
        ) as execute_command:
            with mock.patch.object(
                NodesManager, "initialize", autospec=True
            ) as initialize:
                with mock.patch.multiple(
                    Connection,
                    send_packed_command=mock.DEFAULT,
                    connect=mock.DEFAULT,
                    can_read_destructive=mock.DEFAULT,
                ) as mocks:
                    # simulate 7006 as a failed node
                    def execute_command_mock(self, *args, **options):
                        if self.port == 7006:
                            execute_command.failed_calls += 1
                            raise ClusterDownError(
                                "CLUSTERDOWN The cluster is "
                                "down. Use CLUSTER INFO for "
                                "more information"
                            )
                        elif self.port == 7007:
                            execute_command.successful_calls += 1

                    def initialize_mock(self):
                        # start with all slots mapped to 7006
                        self.nodes_cache = {node_7006.name: node_7006}
                        self.default_node = node_7006
                        self.slots_cache = {}

                        for i in range(0, 16383):
                            self.slots_cache[i] = [node_7006]

                        # After the first connection fails, a reinitialize
                        # should follow the cluster to 7007
                        def map_7007(self):
                            self.nodes_cache = {node_7007.name: node_7007}
                            self.default_node = node_7007
                            self.slots_cache = {}

                            for i in range(0, 16383):
                                self.slots_cache[i] = [node_7007]

                        # Change initialize side effect for the second call
                        initialize.side_effect = map_7007

                    execute_command.side_effect = execute_command_mock
                    execute_command.successful_calls = 0
                    execute_command.failed_calls = 0
                    initialize.side_effect = initialize_mock
                    mocks["can_read_destructive"].return_value = False
                    mocks["send_packed_command"].return_value = "MOCK_OK"
                    mocks["connect"].return_value = None
                    with mock.patch.object(
                        AsyncCommandsParser, "initialize", autospec=True
                    ) as cmd_parser_initialize:

                        def cmd_init_mock(self, r: ClusterNode) -> None:
                            self.commands = {
                                "get": {
                                    "name": "get",
                                    "arity": 2,
                                    "flags": ["readonly", "fast"],
                                    "first_key_pos": 1,
                                    "last_key_pos": 1,
                                    "step_count": 1,
                                }
                            }

                        cmd_parser_initialize.side_effect = cmd_init_mock

                        rc = await create_redis(cls=RedisCluster, flushdb=False)
                        assert len(rc.get_nodes()) == 1
                        assert rc.get_node(node_name=node_7006.name) is not None

                        await rc.get("foo")

                        # Cluster should now point to 7007, and there should be
                        # one failed and one successful call
                        assert len(rc.get_nodes()) == 1
                        assert rc.get_node(node_name=node_7007.name) is not None
                        assert rc.get_node(node_name=node_7006.name) is None
                        assert execute_command.failed_calls == 1
                        assert execute_command.successful_calls == 1

    @pytest.mark.parametrize(
        "read_from_replicas,load_balancing_strategy,mocks_srv_ports",
        [
            (True, None, [7001, 7002, 7001]),
            (True, LoadBalancingStrategy.ROUND_ROBIN, [7001, 7002, 7001]),
            (True, LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, [7002, 7002, 7002]),
            (True, LoadBalancingStrategy.RANDOM_REPLICA, [7002, 7002, 7002]),
            (False, LoadBalancingStrategy.ROUND_ROBIN, [7001, 7002, 7001]),
            (False, LoadBalancingStrategy.ROUND_ROBIN_REPLICAS, [7002, 7002, 7002]),
            (False, LoadBalancingStrategy.RANDOM_REPLICA, [7002, 7002, 7002]),
        ],
    )
    async def test_reading_with_load_balancing_strategies(
        self,
        read_from_replicas: bool,
        load_balancing_strategy: LoadBalancingStrategy,
        mocks_srv_ports: List[int],
    ) -> None:
        with mock.patch.multiple(
            Connection,
            send_command=mock.DEFAULT,
            read_response=mock.DEFAULT,
            _connect=mock.DEFAULT,
            can_read_destructive=mock.DEFAULT,
            on_connect=mock.DEFAULT,
        ) as mocks:
            with mock.patch.object(
                ClusterNode, "execute_command", autospec=True
            ) as execute_command:

                async def execute_command_mock_first(self, *args, **options):
                    await self.connection_class(**self.connection_kwargs).connect()
                    # Primary
                    assert self.port == mocks_srv_ports[0]
                    execute_command.side_effect = execute_command_mock_second
                    return "MOCK_OK"

                def execute_command_mock_second(self, *args, **options):
                    # Replica
                    assert self.port == mocks_srv_ports[1]
                    execute_command.side_effect = execute_command_mock_third
                    return "MOCK_OK"

                def execute_command_mock_third(self, *args, **options):
                    # Primary
                    assert self.port == mocks_srv_ports[2]
                    return "MOCK_OK"

                # We don't need to create a real cluster connection but we
                # do want RedisCluster.on_connect function to get called,
                # so we'll mock some of the Connection's functions to allow it
                execute_command.side_effect = execute_command_mock_first
                mocks["send_command"].return_value = True
                mocks["read_response"].return_value = "OK"
                mocks["_connect"].return_value = True
                mocks["can_read_destructive"].return_value = False
                mocks["on_connect"].return_value = True

                # Create a cluster with reading from replications
                read_cluster = await get_mocked_redis_client(
                    host=default_host,
                    port=default_port,
                    read_from_replicas=read_from_replicas,
                    load_balancing_strategy=load_balancing_strategy,
                )
                assert read_cluster.read_from_replicas is read_from_replicas
                assert read_cluster.load_balancing_strategy is load_balancing_strategy
                # Check that we read from the slot's nodes in a round robin
                # matter.
                # 'foo' belongs to slot 12182 and the slot's nodes are:
                # [(127.0.0.1,7001,primary), (127.0.0.1,7002,replica)]
                await read_cluster.get("foo")
                await read_cluster.get("foo")
                await read_cluster.get("foo")
                mocks["send_command"].assert_has_calls([mock.call("READONLY")])

                await read_cluster.aclose()

    async def test_keyslot(self, r: RedisCluster) -> None:
        """
        Test that method will compute correct key in all supported cases
        """
        assert r.keyslot("foo") == 12182
        assert r.keyslot("{foo}bar") == 12182
        assert r.keyslot("{foo}") == 12182
        assert r.keyslot(1337) == 4314

        assert r.keyslot(125) == r.keyslot(b"125")
        assert r.keyslot(125) == r.keyslot("\x31\x32\x35")
        assert r.keyslot("大奖") == r.keyslot(b"\xe5\xa4\xa7\xe5\xa5\x96")
        assert r.keyslot("大奖") == r.keyslot(b"\xe5\xa4\xa7\xe5\xa5\x96")
        assert r.keyslot(1337.1234) == r.keyslot("1337.1234")
        assert r.keyslot(1337) == r.keyslot("1337")
        assert r.keyslot(b"abc") == r.keyslot("abc")

    async def test_get_node_name(self) -> None:
        assert (
            get_node_name(default_host, default_port)
            == f"{default_host}:{default_port}"
        )

    async def test_all_nodes(self, r: RedisCluster) -> None:
        """
        Set a list of nodes and it should be possible to iterate over all
        """
        nodes = [node for node in r.nodes_manager.nodes_cache.values()]

        for i, node in enumerate(r.get_nodes()):
            assert node in nodes

    async def test_all_nodes_masters(self, r: RedisCluster) -> None:
        """
        Set a list of nodes with random primaries/replicas config and it shold
        be possible to iterate over all of them.
        """
        nodes = [
            node
            for node in r.nodes_manager.nodes_cache.values()
            if node.server_type == PRIMARY
        ]

        for node in r.get_primaries():
            assert node in nodes

    @pytest.mark.parametrize("error", RedisCluster.ERRORS_ALLOW_RETRY)
    async def test_cluster_down_overreaches_retry_attempts(
        self,
        error: Union[Type[TimeoutError], Type[ClusterDownError], Type[ConnectionError]],
    ) -> None:
        """
        When error that allows retry is thrown, test that we retry executing
        the command as many times as configured in cluster_error_retry_attempts
        and then raise the exception
        """
        with mock.patch.object(RedisCluster, "_execute_command") as execute_command:

            def raise_error(target_node, *args, **kwargs):
                execute_command.failed_calls += 1
                raise error("mocked error")

            execute_command.side_effect = raise_error

            rc = await get_mocked_redis_client(host=default_host, port=default_port)

            with pytest.raises(error):
                await rc.get("bar")
                assert execute_command.failed_calls == rc.cluster_error_retry_attempts

            await rc.aclose()

    async def test_set_default_node_success(self, r: RedisCluster) -> None:
        """
        test successful replacement of the default cluster node
        """
        default_node = r.get_default_node()
        # get a different node
        new_def_node = None
        for node in r.get_nodes():
            if node != default_node:
                new_def_node = node
                break
        r.set_default_node(new_def_node)
        assert r.get_default_node() == new_def_node

    async def test_set_default_node_failure(self, r: RedisCluster) -> None:
        """
        test failed replacement of the default cluster node
        """
        default_node = r.get_default_node()
        new_def_node = ClusterNode("1.1.1.1", 1111)
        with pytest.raises(DataError):
            r.set_default_node(None)
        with pytest.raises(DataError):
            r.set_default_node(new_def_node)
        assert r.get_default_node() == default_node

    async def test_get_node_from_key(self, r: RedisCluster) -> None:
        """
        Test that get_node_from_key function returns the correct node
        """
        key = "bar"
        slot = r.keyslot(key)
        slot_nodes = r.nodes_manager.slots_cache.get(slot)
        primary = slot_nodes[0]
        assert r.get_node_from_key(key, replica=False) == primary
        replica = r.get_node_from_key(key, replica=True)
        if replica is not None:
            assert replica.server_type == REPLICA
            assert replica in slot_nodes

    @skip_if_redis_enterprise()
    async def test_not_require_full_coverage_cluster_down_error(
        self, r: RedisCluster
    ) -> None:
        """
        When require_full_coverage is set to False (default client config) and not
        all slots are covered, if one of the nodes has 'cluster-require_full_coverage'
        config set to 'yes' some key-based commands should throw ClusterDownError
        """
        node = r.get_node_from_key("foo")
        missing_slot = r.keyslot("foo")
        assert await r.set("foo", "bar") is True
        try:
            assert all(await r.cluster_delslots(missing_slot))
            with pytest.raises(ClusterDownError):
                await r.exists("foo")
        except ResponseError as e:
            assert "CLUSTERDOWN" in str(e)
        finally:
            try:
                # Add back the missing slot
                assert await r.cluster_addslots(node, missing_slot) is True
                # Make sure we are not getting ClusterDownError anymore
                assert await r.exists("foo") == 1
            except ResponseError as e:
                if f"Slot {missing_slot} is already busy" in str(e):
                    # It can happen if the test failed to delete this slot
                    pass
                else:
                    raise e

    async def test_can_run_concurrent_commands(self, request: FixtureRequest) -> None:
        url = request.config.getoption("--redis-url")
        rc = RedisCluster.from_url(url)
        assert all(
            await asyncio.gather(
                *(rc.echo("i", target_nodes=RedisCluster.ALL_NODES) for i in range(100))
            )
        )
        await rc.aclose()

    def test_replace_cluster_node(self, r: RedisCluster) -> None:
        prev_default_node = r.get_default_node()
        r.replace_default_node()
        assert r.get_default_node() != prev_default_node
        r.replace_default_node(prev_default_node)
        assert r.get_default_node() == prev_default_node

    async def test_default_node_is_replaced_after_exception(self, r):
        curr_default_node = r.get_default_node()
        # CLUSTER NODES command is being executed on the default node
        nodes = await r.cluster_nodes()
        assert "myself" in nodes.get(curr_default_node.name).get("flags")
        # Mock connection error for the default node
        mock_node_resp_exc(curr_default_node, ConnectionError("error"))
        # Test that the command succeed from a different node
        nodes = await r.cluster_nodes()
        assert "myself" not in nodes.get(curr_default_node.name).get("flags")
        assert r.get_default_node() != curr_default_node
        # Rollback to the old default node
        r.replace_default_node(curr_default_node)

    async def test_address_remap(self, create_redis, master_host):
        """Test that we can create a rediscluster object with
        a host-port remapper and map connections through proxy objects
        """

        # we remap the first n nodes
        offset = 1000
        n = 6
        hostname, master_port = master_host
        ports = [master_port + i for i in range(n)]

        def address_remap(address):
            # remap first three nodes to our local proxy
            # old = host, port
            host, port = address
            if int(port) in ports:
                host, port = "127.0.0.1", int(port) + offset
            # print(f"{old} {host, port}")
            return host, port

        # create the proxies
        proxies = [
            NodeProxy(("127.0.0.1", port + offset), (hostname, port)) for port in ports
        ]
        await asyncio.gather(*[p.start() for p in proxies])
        try:
            # create cluster:
            r = await create_redis(
                cls=RedisCluster, flushdb=False, address_remap=address_remap
            )
            try:
                assert await r.ping() is True
                assert await r.set("byte_string", b"giraffe")
                assert await r.get("byte_string") == b"giraffe"
            finally:
                await r.aclose()
        finally:
            await asyncio.gather(*[p.aclose() for p in proxies])

        # verify that the proxies were indeed used
        n_used = sum((1 if p.n_connections else 0) for p in proxies)
        assert n_used > 1


class TestClusterRedisCommands:
    """
    Tests for RedisCluster unique commands
    """

    async def test_get_and_set(self, r: RedisCluster) -> None:
        # get and set can't be tested independently of each other
        assert await r.get("a") is None
        byte_string = b"value"
        integer = 5
        unicode_string = chr(3456) + "abcd" + chr(3421)
        assert await r.set("byte_string", byte_string)
        assert await r.set("integer", 5)
        assert await r.set("unicode_string", unicode_string)
        assert await r.get("byte_string") == byte_string
        assert await r.get("integer") == str(integer).encode()
        assert (await r.get("unicode_string")).decode("utf-8") == unicode_string

    @pytest.mark.parametrize(
        "load_balancing_strategy",
        [
            LoadBalancingStrategy.ROUND_ROBIN,
            LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
            LoadBalancingStrategy.RANDOM_REPLICA,
        ],
    )
    async def test_get_and_set_with_load_balanced_client(
        self, create_redis, load_balancing_strategy: LoadBalancingStrategy
    ) -> None:
        r = await create_redis(
            cls=RedisCluster,
            load_balancing_strategy=load_balancing_strategy,
        )

        # get and set can't be tested independently of each other
        assert await r.get("a") is None

        byte_string = b"value"
        assert await r.set("byte_string", byte_string)

        # run the get command for the same key several times
        # to iterate over the read nodes
        assert await r.get("byte_string") == byte_string
        assert await r.get("byte_string") == byte_string
        assert await r.get("byte_string") == byte_string

    async def test_mget_nonatomic(self, r: RedisCluster) -> None:
        assert await r.mget_nonatomic([]) == []
        assert await r.mget_nonatomic(["a", "b"]) == [None, None]
        await r.set("a", "1")
        await r.set("b", "2")
        await r.set("c", "3")

        assert await r.mget_nonatomic("a", "other", "b", "c") == [
            b"1",
            None,
            b"2",
            b"3",
        ]

    async def test_mset_nonatomic(self, r: RedisCluster) -> None:
        d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"}
        assert await r.mset_nonatomic(d)
        for k, v in d.items():
            assert await r.get(k) == v

    async def test_config_set(self, r: RedisCluster) -> None:
        assert await r.config_set("slowlog-log-slower-than", 0)

    async def test_cluster_config_resetstat(self, r: RedisCluster) -> None:
        await r.ping(target_nodes="all")
        all_info = await r.info(target_nodes="all")
        prior_commands_processed = -1
        for node_info in all_info.values():
            prior_commands_processed = node_info["total_commands_processed"]
            assert prior_commands_processed >= 1
        await r.config_resetstat(target_nodes="all")
        all_info = await r.info(target_nodes="all")
        for node_info in all_info.values():
            reset_commands_processed = node_info["total_commands_processed"]
            assert reset_commands_processed < prior_commands_processed

    async def test_client_setname(self, r: RedisCluster) -> None:
        node = r.get_random_node()
        await r.client_setname("redis_py_test", target_nodes=node)
        client_name = await r.client_getname(target_nodes=node)
        assert_resp_response(r, client_name, "redis_py_test", b"redis_py_test")

    async def test_exists(self, r: RedisCluster) -> None:
        d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"}
        await r.mset_nonatomic(d)
        assert await r.exists(*d.keys()) == len(d)

    async def test_delete(self, r: RedisCluster) -> None:
        d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"}
        await r.mset_nonatomic(d)
        assert await r.delete(*d.keys()) == len(d)
        assert await r.delete(*d.keys()) == 0

    async def test_touch(self, r: RedisCluster) -> None:
        d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"}
        await r.mset_nonatomic(d)
        assert await r.touch(*d.keys()) == len(d)

    async def test_unlink(self, r: RedisCluster) -> None:
        d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"}
        await r.mset_nonatomic(d)
        assert await r.unlink(*d.keys()) == len(d)
        # Unlink is non-blocking so we sleep before
        # verifying the deletion
        await asyncio.sleep(0.1)
        assert await r.unlink(*d.keys()) == 0

    async def test_initialize_before_execute_multi_key_command(
        self, request: FixtureRequest
    ) -> None:
        # Test for issue https://github.com/redis/redis-py/issues/2437
        url = request.config.getoption("--redis-url")
        r = RedisCluster.from_url(url)
        assert 0 == await r.exists("a", "b", "c")
        await r.aclose()

    @skip_if_redis_enterprise()
    async def test_cluster_myid(self, r: RedisCluster) -> None:
        node = r.get_random_node()
        myid = await r.cluster_myid(node)
        assert len(myid) == 40

    @skip_if_server_version_lt("7.2.0")
    @skip_if_redis_enterprise()
    async def test_cluster_myshardid(self, r: RedisCluster) -> None:
        node = r.get_random_node()
        myshardid = await r.cluster_myshardid(node)
        assert len(myshardid) == 40

    @skip_if_redis_enterprise()
    async def test_cluster_slots(self, r: RedisCluster) -> None:
        mock_all_nodes_resp(r, default_cluster_slots)
        cluster_slots = await r.cluster_slots()
        assert isinstance(cluster_slots, dict)
        assert len(default_cluster_slots) == len(cluster_slots)
        assert cluster_slots.get((0, 8191)) is not None
        assert cluster_slots.get((0, 8191)).get("primary") == ("127.0.0.1", 7000)

    @skip_if_redis_enterprise()
    async def test_cluster_addslots(self, r: RedisCluster) -> None:
        node = r.get_random_node()
        mock_node_resp(node, "OK")
        assert await r.cluster_addslots(node, 1, 2, 3) is True

    @skip_if_server_version_lt("7.0.0")
    @skip_if_redis_enterprise()
    async def test_cluster_addslotsrange(self, r: RedisCluster):
        node = r.get_random_node()
        mock_node_resp(node, "OK")
        assert await r.cluster_addslotsrange(node, 1, 5)

    @skip_if_redis_enterprise()
    async def test_cluster_countkeysinslot(self, r: RedisCluster) -> None:
        node = r.nodes_manager.get_node_from_slot(1)
        mock_node_resp(node, 2)
        assert await r.cluster_countkeysinslot(1) == 2

    async def test_cluster_count_failure_report(self, r: RedisCluster) -> None:
        mock_all_nodes_resp(r, 0)
        assert await r.cluster_count_failure_report("node_0") == 0

    @skip_if_redis_enterprise()
    async def test_cluster_delslots(self) -> None:
        cluster_slots = [
            [0, 8191, ["127.0.0.1", 7000, "node_0"]],
            [8192, 16383, ["127.0.0.1", 7001, "node_1"]],
        ]
        r = await get_mocked_redis_client(
            host=default_host, port=default_port, cluster_slots=cluster_slots
        )
        mock_all_nodes_resp(r, "OK")
        node0 = r.get_node(default_host, 7000)
        node1 = r.get_node(default_host, 7001)
        assert await r.cluster_delslots(0, 8192) == [True, True]
        assert node0._free.pop().read_response.called
        assert node1._free.pop().read_response.called

        await r.aclose()

    @skip_if_server_version_lt("7.0.0")
    @skip_if_redis_enterprise()
    async def test_cluster_delslotsrange(self):
        r = await get_mocked_redis_client(host=default_host, port=default_port)
        mock_all_nodes_resp(r, "OK")
        node = r.get_random_node()
        await r.cluster_addslots(node, 1, 2, 3, 4, 5)
        assert await r.cluster_delslotsrange(1, 5)
        assert node._free.pop().read_response.called
        await r.aclose()

    @skip_if_redis_enterprise()
    async def test_cluster_failover(self, r: RedisCluster) -> None:
        node = r.get_random_node()
        mock_node_resp(node, "OK")
        assert await r.cluster_failover(node) is True
        assert await r.cluster_failover(node, "FORCE") is True
        assert await r.cluster_failover(node, "TAKEOVER") is True
        with pytest.raises(RedisError):
            await r.cluster_failover(node, "FORCT")

    @skip_if_redis_enterprise()
    async def test_cluster_info(self, r: RedisCluster) -> None:
        info = await r.cluster_info()
        assert isinstance(info, dict)
        assert info["cluster_state"] == "ok"

    @skip_if_redis_enterprise()
    async def test_cluster_keyslot(self, r: RedisCluster) -> None:
        mock_all_nodes_resp(r, 12182)
        assert await r.cluster_keyslot("foo") == 12182

    @skip_if_redis_enterprise()
    async def test_cluster_meet(self, r: RedisCluster) -> None:
        node = r.get_default_node()
        mock_node_resp(node, "OK")
        assert await r.cluster_meet("127.0.0.1", 6379) is True

    @skip_if_redis_enterprise()
    async def test_cluster_nodes(self, r: RedisCluster) -> None:
        response = (
            "c8253bae761cb1ecb2b61857d85dfe455a0fec8b 172.17.0.7:7006 "
            "slave aa90da731f673a99617dfe930306549a09f83a6b 0 "
            "1447836263059 5 connected\n"
            "9bd595fe4821a0e8d6b99d70faa660638a7612b3 172.17.0.7:7008 "
            "master - 0 1447836264065 0 connected\n"
            "aa90da731f673a99617dfe930306549a09f83a6b 172.17.0.7:7003 "
            "myself,master - 0 0 2 connected 5461-10922\n"
            "1df047e5a594f945d82fc140be97a1452bcbf93e 172.17.0.7:7007 "
            "slave 19efe5a631f3296fdf21a5441680f893e8cc96ec 0 "
            "1447836262556 3 connected\n"
            "4ad9a12e63e8f0207025eeba2354bcf4c85e5b22 172.17.0.7:7005 "
            "master - 0 1447836262555 7 connected 0-5460\n"
            "19efe5a631f3296fdf21a5441680f893e8cc96ec 172.17.0.7:7004 "
            "master - 0 1447836263562 3 connected 10923-16383\n"
            "fbb23ed8cfa23f17eaf27ff7d0c410492a1093d6 172.17.0.7:7002 "
            "master,fail - 1447829446956 1447829444948 1 disconnected\n"
        )
        mock_all_nodes_resp(r, response)
        nodes = await r.cluster_nodes()
        assert len(nodes) == 7
        assert nodes.get("172.17.0.7:7006") is not None
        assert (
            nodes.get("172.17.0.7:7006").get("node_id")
            == "c8253bae761cb1ecb2b61857d85dfe455a0fec8b"
        )

    @skip_if_redis_enterprise()
    async def test_cluster_nodes_importing_migrating(self, r: RedisCluster) -> None:
        response = (
            "488ead2fcce24d8c0f158f9172cb1f4a9e040fe5 127.0.0.1:16381@26381 "
            "master - 0 1648975557664 3 connected 10923-16383\n"
            "8ae2e70812db80776f739a72374e57fc4ae6f89d 127.0.0.1:16380@26380 "
            "master - 0 1648975555000 2 connected 1 5461-10922 ["
            "2-<-ed8007ccfa2d91a7b76f8e6fba7ba7e257034a16]\n"
            "ed8007ccfa2d91a7b76f8e6fba7ba7e257034a16 127.0.0.1:16379@26379 "
            "myself,master - 0 1648975556000 1 connected 0 2-5460 ["
            "2->-8ae2e70812db80776f739a72374e57fc4ae6f89d]\n"
        )
        mock_all_nodes_resp(r, response)
        nodes = await r.cluster_nodes()
        assert len(nodes) == 3
        node_16379 = nodes.get("127.0.0.1:16379")
        node_16380 = nodes.get("127.0.0.1:16380")
        node_16381 = nodes.get("127.0.0.1:16381")
        assert node_16379.get("migrations") == [
            {
                "slot": "2",
                "node_id": "8ae2e70812db80776f739a72374e57fc4ae6f89d",
                "state": "migrating",
            }
        ]
        assert node_16379.get("slots") == [["0"], ["2", "5460"]]
        assert node_16380.get("migrations") == [
            {
                "slot": "2",
                "node_id": "ed8007ccfa2d91a7b76f8e6fba7ba7e257034a16",
                "state": "importing",
            }
        ]
        assert node_16380.get("slots") == [["1"], ["5461", "10922"]]
        assert node_16381.get("slots") == [["10923", "16383"]]
        assert node_16381.get("migrations") == []

    @skip_if_redis_enterprise()
    async def test_cluster_replicate(self, r: RedisCluster) -> None:
        node = r.get_random_node()
        all_replicas = r.get_replicas()
        mock_all_nodes_resp(r, "OK")
        assert await r.cluster_replicate(node, "c8253bae761cb61857d") is True
        results = await r.cluster_replicate(all_replicas, "c8253bae761cb61857d")
        if isinstance(results, dict):
            for res in results.values():
                assert res is True
        else:
            assert results is True

    @skip_if_redis_enterprise()
    async def test_cluster_reset(self, r: RedisCluster) -> None:
        mock_all_nodes_resp(r, "OK")
        assert await r.cluster_reset() is True
        assert await r.cluster_reset(False) is True
        all_results = await r.cluster_reset(False, target_nodes="all")
        for res in all_results.values():
            assert res is True

    @skip_if_redis_enterprise()
    async def test_cluster_save_config(self, r: RedisCluster) -> None:
        node = r.get_random_node()
        all_nodes = r.get_nodes()
        mock_all_nodes_resp(r, "OK")
        assert await r.cluster_save_config(node) is True
        all_results = await r.cluster_save_config(all_nodes)
        for res in all_results.values():
            assert res is True

    @skip_if_redis_enterprise()
    async def test_cluster_get_keys_in_slot(self, r: RedisCluster) -> None:
        response = ["{foo}1", "{foo}2"]
        node = r.nodes_manager.get_node_from_slot(12182)
        mock_node_resp(node, response)
        keys = await r.cluster_get_keys_in_slot(12182, 4)
        assert keys == response

    @skip_if_redis_enterprise()
    async def test_cluster_set_config_epoch(self, r: RedisCluster) -> None:
        mock_all_nodes_resp(r, "OK")
        assert await r.cluster_set_config_epoch(3) is True
        all_results = await r.cluster_set_config_epoch(3, target_nodes="all")
        for res in all_results.values():
            assert res is True

    @skip_if_redis_enterprise()
    async def test_cluster_setslot(self, r: RedisCluster) -> None:
        node = r.get_random_node()
        mock_node_resp(node, "OK")
        assert await r.cluster_setslot(node, "node_0", 1218, "IMPORTING") is True
        assert await r.cluster_setslot(node, "node_0", 1218, "NODE") is True
        assert await r.cluster_setslot(node, "node_0", 1218, "MIGRATING") is True
        with pytest.raises(RedisError):
            await r.cluster_failover(node, "STABLE")
        with pytest.raises(RedisError):
            await r.cluster_failover(node, "STATE")

    async def test_cluster_setslot_stable(self, r: RedisCluster) -> None:
        node = r.nodes_manager.get_node_from_slot(12182)
        mock_node_resp(node, "OK")
        assert await r.cluster_setslot_stable(12182) is True
        assert node._free.pop().read_response.called

    @skip_if_redis_enterprise()
    async def test_cluster_replicas(self, r: RedisCluster) -> None:
        response = [
            b"01eca22229cf3c652b6fca0d09ff6941e0d2e3 "
            b"127.0.0.1:6377@16377 slave "
            b"52611e796814b78e90ad94be9d769a4f668f9a 0 "
            b"1634550063436 4 connected",
            b"r4xfga22229cf3c652b6fca0d09ff69f3e0d4d "
            b"127.0.0.1:6378@16378 slave "
            b"52611e796814b78e90ad94be9d769a4f668f9a 0 "
            b"1634550063436 4 connected",
        ]
        mock_all_nodes_resp(r, response)
        replicas = await r.cluster_replicas("52611e796814b78e90ad94be9d769a4f668f9a")
        assert replicas.get("127.0.0.1:6377") is not None
        assert replicas.get("127.0.0.1:6378") is not None
        assert (
            replicas.get("127.0.0.1:6378").get("node_id")
            == "r4xfga22229cf3c652b6fca0d09ff69f3e0d4d"
        )

    @skip_if_server_version_lt("7.0.0")
    async def test_cluster_links(self, r: RedisCluster):
        node = r.get_random_node()
        res = await r.cluster_links(node)
        if is_resp2_connection(r):
            links_to = sum(x.count(b"to") for x in res)
            links_for = sum(x.count(b"from") for x in res)
            assert links_to == links_for
            for i in range(0, len(res) - 1, 2):
                assert res[i][3] == res[i + 1][3]
        else:
            links_to = len(list(filter(lambda x: x[b"direction"] == b"to", res)))
            links_for = len(list(filter(lambda x: x[b"direction"] == b"from", res)))
            assert links_to == links_for
            for i in range(0, len(res) - 1, 2):
                assert res[i][b"node"] == res[i + 1][b"node"]

    @skip_if_redis_enterprise()
    async def test_readonly(self) -> None:
        r = await get_mocked_redis_client(host=default_host, port=default_port)
        mock_all_nodes_resp(r, "OK")
        assert await r.readonly() is True
        all_replicas_results = await r.readonly(target_nodes="replicas")
        for res in all_replicas_results.values():
            assert res is True
        for replica in r.get_replicas():
            assert replica._free.pop().read_response.called

        await r.aclose()

    @skip_if_redis_enterprise()
    async def test_readwrite(self) -> None:
        r = await get_mocked_redis_client(host=default_host, port=default_port)
        mock_all_nodes_resp(r, "OK")
        assert await r.readwrite() is True
        all_replicas_results = await r.readwrite(target_nodes="replicas")
        for res in all_replicas_results.values():
            assert res is True
        for replica in r.get_replicas():
            assert replica._free.pop().read_response.called

        await r.aclose()

    @skip_if_redis_enterprise()
    async def test_bgsave(self, r: RedisCluster) -> None:
        try:
            assert await r.bgsave()
            await asyncio.sleep(0.3)
            assert await r.bgsave(True)
        except ResponseError as e:
            if "Background save already in progress" not in e.__str__():
                raise

    async def test_info(self, r: RedisCluster) -> None:
        # Map keys to same slot
        await r.set("x{1}", 1)
        await r.set("y{1}", 2)
        await r.set("z{1}", 3)
        # Get node that handles the slot
        slot = r.keyslot("x{1}")
        node = r.nodes_manager.get_node_from_slot(slot)
        # Run info on that node
        info = await r.info(target_nodes=node)
        assert isinstance(info, dict)
        assert info["db0"]["keys"] == 3

    async def _init_slowlog_test(self, r: RedisCluster, node: ClusterNode) -> str:
        slowlog_lim = await r.config_get("slowlog-log-slower-than", target_nodes=node)
        assert (
            await r.config_set("slowlog-log-slower-than", 0, target_nodes=node) is True
        )
        return slowlog_lim["slowlog-log-slower-than"]

    async def _teardown_slowlog_test(
        self, r: RedisCluster, node: ClusterNode, prev_limit: str
    ) -> None:
        assert (
            await r.config_set("slowlog-log-slower-than", prev_limit, target_nodes=node)
            is True
        )

    async def test_slowlog_get(
        self, r: RedisCluster, slowlog: Optional[List[Dict[str, Union[int, bytes]]]]
    ) -> None:
        unicode_string = chr(3456) + "abcd" + chr(3421)
        node = r.get_node_from_key(unicode_string)
        slowlog_limit = await self._init_slowlog_test(r, node)
        assert await r.slowlog_reset(target_nodes=node)
        await r.get(unicode_string)
        slowlog = await r.slowlog_get(target_nodes=node)
        assert isinstance(slowlog, list)
        commands = [log["command"] for log in slowlog]

        get_command = b" ".join((b"GET", unicode_string.encode("utf-8")))
        assert get_command in commands
        assert b"SLOWLOG RESET" in commands

        # the order should be ['GET <uni string>', 'SLOWLOG RESET'],
        # but if other clients are executing commands at the same time, there
        # could be commands, before, between, or after, so just check that
        # the two we care about are in the appropriate order.
        assert commands.index(get_command) < commands.index(b"SLOWLOG RESET")

        # make sure other attributes are typed correctly
        assert isinstance(slowlog[0]["start_time"], int)
        assert isinstance(slowlog[0]["duration"], int)
        # rollback the slowlog limit to its original value
        await self._teardown_slowlog_test(r, node, slowlog_limit)

    async def test_slowlog_get_limit(
        self, r: RedisCluster, slowlog: Optional[List[Dict[str, Union[int, bytes]]]]
    ) -> None:
        assert await r.slowlog_reset()
        node = r.get_node_from_key("foo")
        slowlog_limit = await self._init_slowlog_test(r, node)
        await r.get("foo")
        slowlog = await r.slowlog_get(1, target_nodes=node)
        assert isinstance(slowlog, list)
        # only one command, based on the number we passed to slowlog_get()
        assert len(slowlog) == 1
        await self._teardown_slowlog_test(r, node, slowlog_limit)

    async def test_slowlog_length(self, r: RedisCluster, slowlog: None) -> None:
        await r.get("foo")
        node = r.nodes_manager.get_node_from_slot(key_slot(b"foo"))
        slowlog_len = await r.slowlog_len(target_nodes=node)
        assert isinstance(slowlog_len, int)

    async def test_time(self, r: RedisCluster) -> None:
        t = await r.time(target_nodes=r.get_primaries()[0])
        assert len(t) == 2
        assert isinstance(t[0], int)
        assert isinstance(t[1], int)

    @skip_if_server_version_lt("4.0.0")
    async def test_memory_usage(self, r: RedisCluster) -> None:
        await r.set("foo", "bar")
        assert isinstance(await r.memory_usage("foo"), int)

    @skip_if_server_version_lt("4.0.0")
    @skip_if_redis_enterprise()
    async def test_memory_malloc_stats(self, r: RedisCluster) -> None:
        assert await r.memory_malloc_stats()

    @skip_if_server_version_lt("4.0.0")
    @skip_if_redis_enterprise()
    async def test_memory_stats(self, r: RedisCluster) -> None:
        # put a key into the current db to make sure that "db.<current-db>"
        # has data
        await r.set("foo", "bar")
        node = r.nodes_manager.get_node_from_slot(key_slot(b"foo"))
        stats = await r.memory_stats(target_nodes=node)
        assert isinstance(stats, dict)
        for key, value in stats.items():
            if key.startswith("db."):
                assert not isinstance(value, list)

    @skip_if_server_version_lt("4.0.0")
    async def test_memory_help(self, r: RedisCluster) -> None:
        with pytest.raises(NotImplementedError):
            await r.memory_help()

    @skip_if_server_version_lt("4.0.0")
    async def test_memory_doctor(self, r: RedisCluster) -> None:
        with pytest.raises(NotImplementedError):
            await r.memory_doctor()

    @skip_if_redis_enterprise()
    async def test_lastsave(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        assert isinstance(await r.lastsave(target_nodes=node), datetime.datetime)

    async def test_cluster_echo(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        assert await r.echo("foo bar", target_nodes=node) == b"foo bar"

    @skip_if_server_version_lt("1.0.0")
    async def test_debug_segfault(self, r: RedisCluster) -> None:
        with pytest.raises(NotImplementedError):
            await r.debug_segfault()

    async def test_config_resetstat(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        await r.ping(target_nodes=node)
        prior_commands_processed = int(
            (await r.info(target_nodes=node))["total_commands_processed"]
        )
        assert prior_commands_processed >= 1
        await r.config_resetstat(target_nodes=node)
        reset_commands_processed = int(
            (await r.info(target_nodes=node))["total_commands_processed"]
        )
        assert reset_commands_processed < prior_commands_processed

    @skip_if_server_version_lt("6.2.0")
    async def test_client_trackinginfo(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        res = await r.client_trackinginfo(target_nodes=node)
        assert len(res) > 2
        assert "prefixes" in res or b"prefixes" in res

    @skip_if_server_version_lt("2.9.50")
    async def test_client_pause(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        assert await r.client_pause(1, target_nodes=node)
        assert await r.client_pause(timeout=1, target_nodes=node)
        with pytest.raises(RedisError):
            await r.client_pause(timeout="not an integer", target_nodes=node)

    @skip_if_server_version_lt("6.2.0")
    @skip_if_redis_enterprise()
    async def test_client_unpause(self, r: RedisCluster) -> None:
        assert await r.client_unpause()

    @skip_if_server_version_lt("5.0.0")
    async def test_client_id(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        assert await r.client_id(target_nodes=node) > 0

    @skip_if_server_version_lt("5.0.0")
    async def test_client_unblock(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        myid = await r.client_id(target_nodes=node)
        assert not await r.client_unblock(myid, target_nodes=node)
        assert not await r.client_unblock(myid, error=True, target_nodes=node)
        assert not await r.client_unblock(myid, error=False, target_nodes=node)

    @skip_if_server_version_lt("6.0.0")
    async def test_client_getredir(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        assert isinstance(await r.client_getredir(target_nodes=node), int)
        assert await r.client_getredir(target_nodes=node) == -1

    @skip_if_server_version_lt("6.2.0")
    async def test_client_info(self, r: RedisCluster) -> None:
        node = r.get_primaries()[0]
        info = await r.client_info(target_nodes=node)
        assert isinstance(info, dict)
        assert "addr" in info

    @skip_if_server_version_lt("2.6.9")
    async def test_client_kill(
        self, r: RedisCluster, create_redis: Callable[..., RedisCluster]
    ) -> None:
        node = r.get_primaries()[0]
        r2 = await create_redis(cls=RedisCluster, flushdb=False)
        await r.client_setname("redis-py-c1", target_nodes="all")
        await r2.client_setname("redis-py-c2", target_nodes="all")
        clients = [
            client
            for client in await r.client_list(target_nodes=node)
            if client.get("name") in ["redis-py-c1", "redis-py-c2"]
        ]
        assert len(clients) == 2
        clients_by_name = {client.get("name"): client for client in clients}

        client_addr = clients_by_name["redis-py-c2"].get("addr")
        assert await r.client_kill(client_addr, target_nodes=node) is True

        clients = [
            client
            for client in await r.client_list(target_nodes=node)
            if client.get("name") in ["redis-py-c1", "redis-py-c2"]
        ]
        assert len(clients) == 1
        assert clients[0].get("name") == "redis-py-c1"
        await r2.aclose()

    @skip_if_server_version_lt("2.6.0")
    async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None:
        await r.set("{foo}a", "")
        await r.bitop("not", "{foo}r", "{foo}a")
        assert await r.get("{foo}r") is None

    @skip_if_server_version_lt("2.6.0")
    async def test_cluster_bitop_not(self, r: RedisCluster) -> None:
        test_str = b"\xaa\x00\xff\x55"
        correct = ~0xAA00FF55 & 0xFFFFFFFF
        await r.set("{foo}a", test_str)
        await r.bitop("not", "{foo}r", "{foo}a")
        assert int(binascii.hexlify(await r.get("{foo}r")), 16) == correct

    @skip_if_server_version_lt("2.6.0")
    async def test_cluster_bitop_not_in_place(self, r: RedisCluster) -> None:
        test_str = b"\xaa\x00\xff\x55"
        correct = ~0xAA00FF55 & 0xFFFFFFFF
        await r.set("{foo}a", test_str)
        await r.bitop("not", "{foo}a", "{foo}a")
        assert int(binascii.hexlify(await r.get("{foo}a")), 16) == correct

    @skip_if_server_version_lt("2.6.0")
    async def test_cluster_bitop_single_string(self, r: RedisCluster) -> None:
        test_str = b"\x01\x02\xff"
        await r.set("{foo}a", test_str)
        await r.bitop("and", "{foo}res1", "{foo}a")
        await r.bitop("or", "{foo}res2", "{foo}a")
        await r.bitop("xor", "{foo}res3", "{foo}a")
        assert await r.get("{foo}res1") == test_str
        assert await r.get("{foo}res2") == test_str
        assert await r.get("{foo}res3") == test_str

    @skip_if_server_version_lt("2.6.0")
    async def test_cluster_bitop_string_operands(self, r: RedisCluster) -> None:
        await r.set("{foo}a", b"\x01\x02\xff\xff")
        await r.set("{foo}b", b"\x01\x02\xff")
        await r.bitop("and", "{foo}res1", "{foo}a", "{foo}b")
        await r.bitop("or", "{foo}res2", "{foo}a", "{foo}b")
        await r.bitop("xor", "{foo}res3", "{foo}a", "{foo}b")
        assert int(binascii.hexlify(await r.get("{foo}res1")), 16) == 0x0102FF00
        assert int(binascii.hexlify(await r.get("{foo}res2")), 16) == 0x0102FFFF
        assert int(binascii.hexlify(await r.get("{foo}res3")), 16) == 0x000000FF

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_copy(self, r: RedisCluster) -> None:
        assert await r.copy("{foo}a", "{foo}b") == 0
        await r.set("{foo}a", "bar")
        assert await r.copy("{foo}a", "{foo}b") == 1
        assert await r.get("{foo}a") == b"bar"
        assert await r.get("{foo}b") == b"bar"

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_copy_and_replace(self, r: RedisCluster) -> None:
        await r.set("{foo}a", "foo1")
        await r.set("{foo}b", "foo2")
        assert await r.copy("{foo}a", "{foo}b") == 0
        assert await r.copy("{foo}a", "{foo}b", replace=True) == 1

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_lmove(self, r: RedisCluster) -> None:
        await r.rpush("{foo}a", "one", "two", "three", "four")
        assert await r.lmove("{foo}a", "{foo}b")
        assert await r.lmove("{foo}a", "{foo}b", "right", "left")

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_blmove(self, r: RedisCluster) -> None:
        await r.rpush("{foo}a", "one", "two", "three", "four")
        assert await r.blmove("{foo}a", "{foo}b", 5)
        assert await r.blmove("{foo}a", "{foo}b", 1, "RIGHT", "LEFT")

    async def test_cluster_msetnx(self, r: RedisCluster) -> None:
        d = {"{foo}a": b"1", "{foo}b": b"2", "{foo}c": b"3"}
        assert await r.msetnx(d)
        d2 = {"{foo}a": b"x", "{foo}d": b"4"}
        assert not await r.msetnx(d2)
        for k, v in d.items():
            assert await r.get(k) == v
        assert await r.get("{foo}d") is None

    async def test_cluster_rename(self, r: RedisCluster) -> None:
        await r.set("{foo}a", "1")
        assert await r.rename("{foo}a", "{foo}b")
        assert await r.get("{foo}a") is None
        assert await r.get("{foo}b") == b"1"

    async def test_cluster_renamenx(self, r: RedisCluster) -> None:
        await r.set("{foo}a", "1")
        await r.set("{foo}b", "2")
        assert not await r.renamenx("{foo}a", "{foo}b")
        assert await r.get("{foo}a") == b"1"
        assert await r.get("{foo}b") == b"2"

    # LIST COMMANDS
    async def test_cluster_blpop(self, r: RedisCluster) -> None:
        await r.rpush("{foo}a", "1", "2")
        await r.rpush("{foo}b", "3", "4")
        assert_resp_response(
            r,
            await r.blpop(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}b", b"3"),
            [b"{foo}b", b"3"],
        )
        assert_resp_response(
            r,
            await r.blpop(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}b", b"4"),
            [b"{foo}b", b"4"],
        )
        assert_resp_response(
            r,
            await r.blpop(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}a", b"1"),
            [b"{foo}a", b"1"],
        )
        assert_resp_response(
            r,
            await r.blpop(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}a", b"2"),
            [b"{foo}a", b"2"],
        )
        assert await r.blpop(["{foo}b", "{foo}a"], timeout=1) is None
        await r.rpush("{foo}c", "1")
        assert_resp_response(
            r, await r.blpop("{foo}c", timeout=1), (b"{foo}c", b"1"), [b"{foo}c", b"1"]
        )

    async def test_cluster_brpop(self, r: RedisCluster) -> None:
        await r.rpush("{foo}a", "1", "2")
        await r.rpush("{foo}b", "3", "4")
        assert_resp_response(
            r,
            await r.brpop(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}b", b"4"),
            [b"{foo}b", b"4"],
        )
        assert_resp_response(
            r,
            await r.brpop(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}b", b"3"),
            [b"{foo}b", b"3"],
        )
        assert_resp_response(
            r,
            await r.brpop(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}a", b"2"),
            [b"{foo}a", b"2"],
        )
        assert_resp_response(
            r,
            await r.brpop(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}a", b"1"),
            [b"{foo}a", b"1"],
        )
        assert await r.brpop(["{foo}b", "{foo}a"], timeout=1) is None
        await r.rpush("{foo}c", "1")
        assert_resp_response(
            r, await r.brpop("{foo}c", timeout=1), (b"{foo}c", b"1"), [b"{foo}c", b"1"]
        )

    async def test_cluster_brpoplpush(self, r: RedisCluster) -> None:
        await r.rpush("{foo}a", "1", "2")
        await r.rpush("{foo}b", "3", "4")
        assert await r.brpoplpush("{foo}a", "{foo}b") == b"2"
        assert await r.brpoplpush("{foo}a", "{foo}b") == b"1"
        assert await r.brpoplpush("{foo}a", "{foo}b", timeout=1) is None
        assert await r.lrange("{foo}a", 0, -1) == []
        assert await r.lrange("{foo}b", 0, -1) == [b"1", b"2", b"3", b"4"]

    async def test_cluster_brpoplpush_empty_string(self, r: RedisCluster) -> None:
        await r.rpush("{foo}a", "")
        assert await r.brpoplpush("{foo}a", "{foo}b") == b""

    async def test_cluster_rpoplpush(self, r: RedisCluster) -> None:
        await r.rpush("{foo}a", "a1", "a2", "a3")
        await r.rpush("{foo}b", "b1", "b2", "b3")
        assert await r.rpoplpush("{foo}a", "{foo}b") == b"a3"
        assert await r.lrange("{foo}a", 0, -1) == [b"a1", b"a2"]
        assert await r.lrange("{foo}b", 0, -1) == [b"a3", b"b1", b"b2", b"b3"]

    async def test_cluster_sdiff(self, r: RedisCluster) -> None:
        await r.sadd("{foo}a", "1", "2", "3")
        assert await r.sdiff("{foo}a", "{foo}b") == {b"1", b"2", b"3"}
        await r.sadd("{foo}b", "2", "3")
        assert await r.sdiff("{foo}a", "{foo}b") == {b"1"}

    async def test_cluster_sdiffstore(self, r: RedisCluster) -> None:
        await r.sadd("{foo}a", "1", "2", "3")
        assert await r.sdiffstore("{foo}c", "{foo}a", "{foo}b") == 3
        assert await r.smembers("{foo}c") == {b"1", b"2", b"3"}
        await r.sadd("{foo}b", "2", "3")
        assert await r.sdiffstore("{foo}c", "{foo}a", "{foo}b") == 1
        assert await r.smembers("{foo}c") == {b"1"}

    async def test_cluster_sinter(self, r: RedisCluster) -> None:
        await r.sadd("{foo}a", "1", "2", "3")
        assert await r.sinter("{foo}a", "{foo}b") == set()
        await r.sadd("{foo}b", "2", "3")
        assert await r.sinter("{foo}a", "{foo}b") == {b"2", b"3"}

    async def test_cluster_sinterstore(self, r: RedisCluster) -> None:
        await r.sadd("{foo}a", "1", "2", "3")
        assert await r.sinterstore("{foo}c", "{foo}a", "{foo}b") == 0
        assert await r.smembers("{foo}c") == set()
        await r.sadd("{foo}b", "2", "3")
        assert await r.sinterstore("{foo}c", "{foo}a", "{foo}b") == 2
        assert await r.smembers("{foo}c") == {b"2", b"3"}

    async def test_cluster_smove(self, r: RedisCluster) -> None:
        await r.sadd("{foo}a", "a1", "a2")
        await r.sadd("{foo}b", "b1", "b2")
        assert await r.smove("{foo}a", "{foo}b", "a1")
        assert await r.smembers("{foo}a") == {b"a2"}
        assert await r.smembers("{foo}b") == {b"b1", b"b2", b"a1"}

    async def test_cluster_sunion(self, r: RedisCluster) -> None:
        await r.sadd("{foo}a", "1", "2")
        await r.sadd("{foo}b", "2", "3")
        assert set(await r.sunion("{foo}a", "{foo}b")) == {b"1", b"2", b"3"}

    async def test_cluster_sunionstore(self, r: RedisCluster) -> None:
        await r.sadd("{foo}a", "1", "2")
        await r.sadd("{foo}b", "2", "3")
        assert await r.sunionstore("{foo}c", "{foo}a", "{foo}b") == 3
        assert set(await r.smembers("{foo}c")) == {b"1", b"2", b"3"}

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_zdiff(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3})
        await r.zadd("{foo}b", {"a1": 1, "a2": 2})
        assert await r.zdiff(["{foo}a", "{foo}b"]) == [b"a3"]
        response = await r.zdiff(["{foo}a", "{foo}b"], withscores=True)
        assert_resp_response(r, response, [b"a3", b"3"], [[b"a3", 3.0]])

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_zdiffstore(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3})
        await r.zadd("{foo}b", {"a1": 1, "a2": 2})
        assert await r.zdiffstore("{foo}out", ["{foo}a", "{foo}b"])
        assert await r.zrange("{foo}out", 0, -1) == [b"a3"]
        response = await r.zrange("{foo}out", 0, -1, withscores=True)
        assert_resp_response(r, response, [(b"a3", 3.0)], [[b"a3", 3.0]])

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_zinter(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 1})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert await r.zinter(["{foo}a", "{foo}b", "{foo}c"]) == [b"a3", b"a1"]
        # invalid aggregation
        with pytest.raises(DataError):
            await r.zinter(
                ["{foo}a", "{foo}b", "{foo}c"], aggregate="foo", withscores=True
            )
        # aggregate with SUM
        response = await r.zinter(["{foo}a", "{foo}b", "{foo}c"], withscores=True)
        assert_resp_response(
            r, response, [(b"a3", 8), (b"a1", 9)], [[b"a3", 8], [b"a1", 9]]
        )
        # aggregate with MAX
        response = await r.zinter(
            ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX", withscores=True
        )
        assert_resp_response(
            r, response, [(b"a3", 5), (b"a1", 6)], [[b"a3", 5], [b"a1", 6]]
        )
        # aggregate with MIN
        response = await r.zinter(
            ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN", withscores=True
        )
        assert_resp_response(
            r, response, [(b"a1", 1), (b"a3", 1)], [[b"a1", 1], [b"a3", 1]]
        )
        # with weights
        res = await r.zinter({"{foo}a": 1, "{foo}b": 2, "{foo}c": 3}, withscores=True)
        assert_resp_response(
            r, res, [(b"a3", 20), (b"a1", 23)], [[b"a3", 20], [b"a1", 23]]
        )

    async def test_cluster_zinterstore_sum(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert await r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"]) == 2
        assert_resp_response(
            r,
            await r.zrange("{foo}d", 0, -1, withscores=True),
            [(b"a3", 8), (b"a1", 9)],
            [[b"a3", 8.0], [b"a1", 9.0]],
        )

    async def test_cluster_zinterstore_max(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert (
            await r.zinterstore(
                "{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX"
            )
            == 2
        )
        assert_resp_response(
            r,
            await r.zrange("{foo}d", 0, -1, withscores=True),
            [(b"a3", 5), (b"a1", 6)],
            [[b"a3", 5.0], [b"a1", 6.0]],
        )

    async def test_cluster_zinterstore_min(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3})
        await r.zadd("{foo}b", {"a1": 2, "a2": 3, "a3": 5})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert (
            await r.zinterstore(
                "{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN"
            )
            == 2
        )
        assert_resp_response(
            r,
            await r.zrange("{foo}d", 0, -1, withscores=True),
            [(b"a1", 1), (b"a3", 3)],
            [[b"a1", 1.0], [b"a3", 3.0]],
        )

    async def test_cluster_zinterstore_with_weight(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert (
            await r.zinterstore("{foo}d", {"{foo}a": 1, "{foo}b": 2, "{foo}c": 3}) == 2
        )
        assert_resp_response(
            r,
            await r.zrange("{foo}d", 0, -1, withscores=True),
            [(b"a3", 20), (b"a1", 23)],
            [[b"a3", 20.0], [b"a1", 23.0]],
        )

    @skip_if_server_version_lt("4.9.0")
    async def test_cluster_bzpopmax(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 2})
        await r.zadd("{foo}b", {"b1": 10, "b2": 20})
        assert_resp_response(
            r,
            await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}b", b"b2", 20),
            [b"{foo}b", b"b2", 20],
        )
        assert_resp_response(
            r,
            await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}b", b"b1", 10),
            [b"{foo}b", b"b1", 10],
        )
        assert_resp_response(
            r,
            await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}a", b"a2", 2),
            [b"{foo}a", b"a2", 2],
        )
        assert_resp_response(
            r,
            await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}a", b"a1", 1),
            [b"{foo}a", b"a1", 1],
        )
        assert await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1) is None
        await r.zadd("{foo}c", {"c1": 100})
        assert_resp_response(
            r,
            await r.bzpopmax("{foo}c", timeout=1),
            (b"{foo}c", b"c1", 100),
            [b"{foo}c", b"c1", 100],
        )

    @skip_if_server_version_lt("4.9.0")
    async def test_cluster_bzpopmin(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 2})
        await r.zadd("{foo}b", {"b1": 10, "b2": 20})
        assert_resp_response(
            r,
            await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}b", b"b1", 10),
            [b"{foo}b", b"b1", 10],
        )
        assert_resp_response(
            r,
            await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}b", b"b2", 20),
            [b"{foo}b", b"b2", 20],
        )
        assert_resp_response(
            r,
            await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}a", b"a1", 1),
            [b"{foo}a", b"a1", 1],
        )
        assert_resp_response(
            r,
            await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
            (b"{foo}a", b"a2", 2),
            [b"{foo}a", b"a2", 2],
        )
        assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) is None
        await r.zadd("{foo}c", {"c1": 100})
        assert_resp_response(
            r,
            await r.bzpopmin("{foo}c", timeout=1),
            (b"{foo}c", b"c1", 100),
            [b"{foo}c", b"c1", 100],
        )

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_zrangestore(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3})
        assert await r.zrangestore("{foo}b", "{foo}a", 0, 1)
        assert await r.zrange("{foo}b", 0, -1) == [b"a1", b"a2"]
        assert await r.zrangestore("{foo}b", "{foo}a", 1, 2)
        assert await r.zrange("{foo}b", 0, -1) == [b"a2", b"a3"]
        assert_resp_response(
            r,
            await r.zrange("{foo}b", 0, -1, withscores=True),
            [(b"a2", 2), (b"a3", 3)],
            [[b"a2", 2.0], [b"a3", 3.0]],
        )
        # reversed order
        assert await r.zrangestore("{foo}b", "{foo}a", 1, 2, desc=True)
        assert await r.zrange("{foo}b", 0, -1) == [b"a1", b"a2"]
        # by score
        assert await r.zrangestore(
            "{foo}b", "{foo}a", 2, 1, byscore=True, offset=0, num=1, desc=True
        )
        assert await r.zrange("{foo}b", 0, -1) == [b"a2"]
        # by lex
        assert await r.zrangestore(
            "{foo}b", "{foo}a", "[a2", "(a3", bylex=True, offset=0, num=1
        )
        assert await r.zrange("{foo}b", 0, -1) == [b"a2"]

    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_zunion(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        # sum
        assert await r.zunion(["{foo}a", "{foo}b", "{foo}c"]) == [
            b"a2",
            b"a4",
            b"a3",
            b"a1",
        ]
        assert_resp_response(
            r,
            await r.zunion(["{foo}a", "{foo}b", "{foo}c"], withscores=True),
            [(b"a2", 3), (b"a4", 4), (b"a3", 8), (b"a1", 9)],
            [[b"a2", 3.0], [b"a4", 4.0], [b"a3", 8.0], [b"a1", 9.0]],
        )
        # max
        assert_resp_response(
            r,
            await r.zunion(
                ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX", withscores=True
            ),
            [(b"a2", 2), (b"a4", 4), (b"a3", 5), (b"a1", 6)],
            [[b"a2", 2.0], [b"a4", 4.0], [b"a3", 5.0], [b"a1", 6.0]],
        )
        # min
        assert_resp_response(
            r,
            await r.zunion(
                ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN", withscores=True
            ),
            [(b"a1", 1), (b"a2", 1), (b"a3", 1), (b"a4", 4)],
            [[b"a1", 1.0], [b"a2", 1.0], [b"a3", 1.0], [b"a4", 4.0]],
        )
        # with weight
        assert_resp_response(
            r,
            await r.zunion({"{foo}a": 1, "{foo}b": 2, "{foo}c": 3}, withscores=True),
            [(b"a2", 5), (b"a4", 12), (b"a3", 20), (b"a1", 23)],
            [[b"a2", 5.0], [b"a4", 12.0], [b"a3", 20.0], [b"a1", 23.0]],
        )

    async def test_cluster_zunionstore_sum(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert await r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"]) == 4
        assert_resp_response(
            r,
            await r.zrange("{foo}d", 0, -1, withscores=True),
            [(b"a2", 3), (b"a4", 4), (b"a3", 8), (b"a1", 9)],
            [[b"a2", 3.0], [b"a4", 4.0], [b"a3", 8.0], [b"a1", 9.0]],
        )

    async def test_cluster_zunionstore_max(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert (
            await r.zunionstore(
                "{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX"
            )
            == 4
        )
        assert_resp_response(
            r,
            await r.zrange("{foo}d", 0, -1, withscores=True),
            [(b"a2", 2), (b"a4", 4), (b"a3", 5), (b"a1", 6)],
            [[b"a2", 2.0], [b"a4", 4.0], [b"a3", 5.0], [b"a1", 6.0]],
        )

    async def test_cluster_zunionstore_min(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 4})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert (
            await r.zunionstore(
                "{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN"
            )
            == 4
        )
        assert_resp_response(
            r,
            await r.zrange("{foo}d", 0, -1, withscores=True),
            [(b"a1", 1), (b"a2", 2), (b"a3", 3), (b"a4", 4)],
            [[b"a1", 1.0], [b"a2", 2.0], [b"a3", 3.0], [b"a4", 4.0]],
        )

    async def test_cluster_zunionstore_with_weight(self, r: RedisCluster) -> None:
        await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1})
        await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2})
        await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4})
        assert (
            await r.zunionstore("{foo}d", {"{foo}a": 1, "{foo}b": 2, "{foo}c": 3}) == 4
        )
        assert_resp_response(
            r,
            await r.zrange("{foo}d", 0, -1, withscores=True),
            [(b"a2", 5), (b"a4", 12), (b"a3", 20), (b"a1", 23)],
            [[b"a2", 5.0], [b"a4", 12.0], [b"a3", 20.0], [b"a1", 23.0]],
        )

    @skip_if_server_version_lt("2.8.9")
    async def test_cluster_pfcount(self, r: RedisCluster) -> None:
        members = {b"1", b"2", b"3"}
        await r.pfadd("{foo}a", *members)
        assert await r.pfcount("{foo}a") == len(members)
        members_b = {b"2", b"3", b"4"}
        await r.pfadd("{foo}b", *members_b)
        assert await r.pfcount("{foo}b") == len(members_b)
        assert await r.pfcount("{foo}a", "{foo}b") == len(members_b.union(members))

    @skip_if_server_version_lt("2.8.9")
    async def test_cluster_pfmerge(self, r: RedisCluster) -> None:
        mema = {b"1", b"2", b"3"}
        memb = {b"2", b"3", b"4"}
        memc = {b"5", b"6", b"7"}
        await r.pfadd("{foo}a", *mema)
        await r.pfadd("{foo}b", *memb)
        await r.pfadd("{foo}c", *memc)
        await r.pfmerge("{foo}d", "{foo}c", "{foo}a")
        assert await r.pfcount("{foo}d") == 6
        await r.pfmerge("{foo}d", "{foo}b")
        assert await r.pfcount("{foo}d") == 7

    async def test_cluster_sort_store(self, r: RedisCluster) -> None:
        await r.rpush("{foo}a", "2", "3", "1")
        assert await r.sort("{foo}a", store="{foo}sorted_values") == 3
        assert await r.lrange("{foo}sorted_values", 0, -1) == [b"1", b"2", b"3"]

    # GEO COMMANDS
    @skip_if_server_version_lt("6.2.0")
    async def test_cluster_geosearchstore(self, r: RedisCluster) -> None:
        values = (2.1909389952632, 41.433791470673, "place1") + (
            2.1873744593677,
            41.406342043777,
            "place2",
        )

        await r.geoadd("{foo}barcelona", values)
        await r.geosearchstore(
            "{foo}places_barcelona",
            "{foo}barcelona",
            longitude=2.191,
            latitude=41.433,
            radius=1000,
        )
        assert await r.zrange("{foo}places_barcelona", 0, -1) == [b"place1"]

    @skip_unless_arch_bits(64)
    @skip_if_server_version_lt("6.2.0")
    async def test_geosearchstore_dist(self, r: RedisCluster) -> None:
        values = (2.1909389952632, 41.433791470673, "place1") + (
            2.1873744593677,
            41.406342043777,
            "place2",
        )

        await r.geoadd("{foo}barcelona", values)
        await r.geosearchstore(
            "{foo}places_barcelona",
            "{foo}barcelona",
            longitude=2.191,
            latitude=41.433,
            radius=1000,
            storedist=True,
        )
        # instead of save the geo score, the distance is saved.
        assert await r.zscore("{foo}places_barcelona", "place1") == 88.05060698409301

    @skip_if_server_version_lt("3.2.0")
    async def test_cluster_georadius_store(self, r: RedisCluster) -> None:
        values = (2.1909389952632, 41.433791470673, "place1") + (
            2.1873744593677,
            41.406342043777,
            "place2",
        )

        await r.geoadd("{foo}barcelona", values)
        await r.georadius(
            "{foo}barcelona", 2.191, 41.433, 1000, store="{foo}places_barcelona"
        )
        assert await r.zrange("{foo}places_barcelona", 0, -1) == [b"place1"]

    @skip_unless_arch_bits(64)
    @skip_if_server_version_lt("3.2.0")
    async def test_cluster_georadius_store_dist(self, r: RedisCluster) -> None:
        values = (2.1909389952632, 41.433791470673, "place1") + (
            2.1873744593677,
            41.406342043777,
            "place2",
        )

        await r.geoadd("{foo}barcelona", values)
        await r.georadius(
            "{foo}barcelona", 2.191, 41.433, 1000, store_dist="{foo}places_barcelona"
        )
        # instead of save the geo score, the distance is saved.
        assert await r.zscore("{foo}places_barcelona", "place1") == 88.05060698409301

    async def test_cluster_dbsize(self, r: RedisCluster) -> None:
        d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"}
        assert await r.mset_nonatomic(d)
        assert await r.dbsize(target_nodes="primaries") == len(d)

    async def test_cluster_keys(self, r: RedisCluster) -> None:
        assert await r.keys() == []
        keys_with_underscores = {b"test_a", b"test_b"}
        keys = keys_with_underscores.union({b"testc"})
        for key in keys:
            await r.set(key, 1)
        assert (
            set(await r.keys(pattern="test_*", target_nodes="primaries"))
            == keys_with_underscores
        )
        assert set(await r.keys(pattern="test*", target_nodes="primaries")) == keys

    # SCAN COMMANDS
    @skip_if_server_version_lt("2.8.0")
    async def test_cluster_scan(self, r: RedisCluster) -> None:
        await r.set("a", 1)
        await r.set("b", 2)
        await r.set("c", 3)

        for target_nodes, nodes in zip(
            ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()]
        ):
            cursors, keys = await r.scan(target_nodes=target_nodes)
            assert sorted(keys) == [b"a", b"b", b"c"]
            assert sorted(cursors.keys()) == sorted(node.name for node in nodes)
            assert all(cursor == 0 for cursor in cursors.values())

            cursors, keys = await r.scan(match="a*", target_nodes=target_nodes)
            assert sorted(keys) == [b"a"]
            assert sorted(cursors.keys()) == sorted(node.name for node in nodes)
            assert all(cursor == 0 for cursor in cursors.values())

    @skip_if_server_version_lt("6.0.0")
    async def test_cluster_scan_type(self, r: RedisCluster) -> None:
        await r.sadd("a-set", 1)
        await r.sadd("b-set", 1)
        await r.sadd("c-set", 1)
        await r.hset("a-hash", "foo", 2)
        await r.lpush("a-list", "aux", 3)

        for target_nodes, nodes in zip(
            ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()]
        ):
            cursors, keys = await r.scan(_type="SET", target_nodes=target_nodes)
            assert sorted(keys) == [b"a-set", b"b-set", b"c-set"]
            assert sorted(cursors.keys()) == sorted(node.name for node in nodes)
            assert all(cursor == 0 for cursor in cursors.values())

            cursors, keys = await r.scan(
                _type="SET", match="a*", target_nodes=target_nodes
            )
            assert sorted(keys) == [b"a-set"]
            assert sorted(cursors.keys()) == sorted(node.name for node in nodes)
            assert all(cursor == 0 for cursor in cursors.values())

    @skip_if_server_version_lt("2.8.0")
    async def test_cluster_scan_iter(self, r: RedisCluster) -> None:
        keys_all = []
        keys_1 = []
        for i in range(100):
            s = str(i)
            await r.set(s, 1)
            keys_all.append(s.encode("utf-8"))
            if s.startswith("1"):
                keys_1.append(s.encode("utf-8"))
        keys_all.sort()
        keys_1.sort()

        for target_nodes in ["primaries", "replicas"]:
            keys = [key async for key in r.scan_iter(target_nodes=target_nodes)]
            assert sorted(keys) == keys_all

            keys = [
                key async for key in r.scan_iter(match="1*", target_nodes=target_nodes)
            ]
            assert sorted(keys) == keys_1

    async def test_cluster_randomkey(self, r: RedisCluster) -> None:
        node = r.get_node_from_key("{foo}")
        assert await r.randomkey(target_nodes=node) is None
        for key in ("{foo}a", "{foo}b", "{foo}c"):
            await r.set(key, 1)
        assert await r.randomkey(target_nodes=node) in (b"{foo}a", b"{foo}b", b"{foo}c")

    @skip_if_server_version_lt("6.0.0")
    @skip_if_redis_enterprise()
    async def test_acl_log(
        self, r: RedisCluster, create_redis: Callable[..., RedisCluster]
    ) -> None:
        key = "{cache}:"
        node = r.get_node_from_key(key)
        username = "redis-py-user"

        await r.acl_setuser(
            username,
            enabled=True,
            reset=True,
            commands=["+get", "+set", "+select", "+cluster", "+command", "+info"],
            keys=["{cache}:*"],
            nopass=True,
            target_nodes="primaries",
        )
        await r.acl_log_reset(target_nodes=node)

        user_client = await create_redis(
            cls=RedisCluster, flushdb=False, username=username
        )

        # Valid operation and key
        assert await user_client.set("{cache}:0", 1)
        assert await user_client.get("{cache}:0") == b"1"

        # Invalid key
        with pytest.raises(NoPermissionError):
            await user_client.get("{cache}violated_cache:0")

        # Invalid operation
        with pytest.raises(NoPermissionError):
            await user_client.hset("{cache}:0", "hkey", "hval")

        assert isinstance(await r.acl_log(target_nodes=node), list)
        if Version(REDIS_INFO.get("version", "0")) >= Version("7.2.0"):
            assert len(await r.acl_log(target_nodes=node)) == 3
        else:
            assert len(await r.acl_log(target_nodes=node)) == 2
        assert len(await r.acl_log(count=1, target_nodes=node)) == 1
        assert isinstance((await r.acl_log(target_nodes=node))[0], dict)
        assert "client-info" in (await r.acl_log(count=1, target_nodes=node))[0]
        assert await r.acl_log_reset(target_nodes=node)

        await r.acl_deluser(username, target_nodes="primaries")

        await user_client.aclose()


class TestNodesManager:
    """
    Tests for the NodesManager class
    """

    async def test_load_balancer(self, r: RedisCluster) -> None:
        n_manager = r.nodes_manager
        lb = n_manager.read_load_balancer
        slot_1 = 1257
        slot_2 = 8975
        node_1 = ClusterNode(default_host, 6379, PRIMARY)
        node_2 = ClusterNode(default_host, 6378, REPLICA)
        node_3 = ClusterNode(default_host, 6377, REPLICA)
        node_4 = ClusterNode(default_host, 6376, PRIMARY)
        node_5 = ClusterNode(default_host, 6375, REPLICA)
        n_manager.slots_cache = {
            slot_1: [node_1, node_2, node_3],
            slot_2: [node_4, node_5],
        }
        primary1_name = n_manager.slots_cache[slot_1][0].name
        primary2_name = n_manager.slots_cache[slot_2][0].name
        list1_size = len(n_manager.slots_cache[slot_1])
        list2_size = len(n_manager.slots_cache[slot_2])

        # default load balancer strategy: LoadBalancerStrategy.ROUND_ROBIN
        # slot 1
        assert lb.get_server_index(primary1_name, list1_size) == 0
        assert lb.get_server_index(primary1_name, list1_size) == 1
        assert lb.get_server_index(primary1_name, list1_size) == 2
        assert lb.get_server_index(primary1_name, list1_size) == 0

        # slot 2
        assert lb.get_server_index(primary2_name, list2_size) == 0
        assert lb.get_server_index(primary2_name, list2_size) == 1
        assert lb.get_server_index(primary2_name, list2_size) == 0

        lb.reset()
        assert lb.get_server_index(primary1_name, list1_size) == 0
        assert lb.get_server_index(primary2_name, list2_size) == 0

        # reset the indexes before load balancing strategy test
        lb.reset()
        # load balancer strategy: LoadBalancerStrategy.ROUND_ROBIN_REPLICAS
        for i in [1, 2, 1]:
            srv_index = lb.get_server_index(
                primary1_name,
                list1_size,
                load_balancing_strategy=LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
            )
            assert srv_index == i

        # reset the indexes before load balancing strategy test
        lb.reset()
        # load balancer strategy: LoadBalancerStrategy.RANDOM_REPLICA
        for i in range(5):
            srv_index = lb.get_server_index(
                primary1_name,
                list1_size,
                load_balancing_strategy=LoadBalancingStrategy.RANDOM_REPLICA,
            )

            assert srv_index > 0 and srv_index <= 2

    async def test_init_slots_cache_not_all_slots_covered(self) -> None:
        """
        Test that if not all slots are covered it should raise an exception
        """
        # Missing slot 5460
        cluster_slots = [
            [0, 5459, ["127.0.0.1", 7000], ["127.0.0.1", 7003]],
            [5461, 10922, ["127.0.0.1", 7001], ["127.0.0.1", 7004]],
            [10923, 16383, ["127.0.0.1", 7002], ["127.0.0.1", 7005]],
        ]
        with pytest.raises(RedisClusterException) as ex:
            rc = await get_mocked_redis_client(
                host=default_host,
                port=default_port,
                cluster_slots=cluster_slots,
                require_full_coverage=True,
            )
            await rc.aclose()
        assert str(ex.value).startswith(
            "All slots are not covered after query all startup_nodes."
        )

    async def test_init_slots_cache_not_require_full_coverage_success(self) -> None:
        """
        When require_full_coverage is set to False and not all slots are
        covered the cluster client initialization should succeed
        """
        # Missing slot 5460
        cluster_slots = [
            [0, 5459, ["127.0.0.1", 7000], ["127.0.0.1", 7003]],
            [5461, 10922, ["127.0.0.1", 7001], ["127.0.0.1", 7004]],
            [10923, 16383, ["127.0.0.1", 7002], ["127.0.0.1", 7005]],
        ]

        rc = await get_mocked_redis_client(
            host=default_host,
            port=default_port,
            cluster_slots=cluster_slots,
            require_full_coverage=False,
        )

        assert 5460 not in rc.nodes_manager.slots_cache

        await rc.aclose()

    async def test_init_slots_cache(self) -> None:
        """
        Test that slots cache can in initialized and all slots are covered
        """
        good_slots_resp = [
            [0, 5460, ["127.0.0.1", 7000], ["127.0.0.2", 7003]],
            [5461, 10922, ["127.0.0.1", 7001], ["127.0.0.2", 7004]],
            [10923, 16383, ["127.0.0.1", 7002], ["127.0.0.2", 7005]],
        ]

        rc = await get_mocked_redis_client(
            host=default_host, port=default_port, cluster_slots=good_slots_resp
        )
        n_manager = rc.nodes_manager
        assert len(n_manager.slots_cache) == REDIS_CLUSTER_HASH_SLOTS
        for slot_info in good_slots_resp:
            all_hosts = ["127.0.0.1", "127.0.0.2"]
            all_ports = [7000, 7001, 7002, 7003, 7004, 7005]
            slot_start = slot_info[0]
            slot_end = slot_info[1]
            for i in range(slot_start, slot_end + 1):
                assert len(n_manager.slots_cache[i]) == len(slot_info[2:])
                assert n_manager.slots_cache[i][0].host in all_hosts
                assert n_manager.slots_cache[i][1].host in all_hosts
                assert n_manager.slots_cache[i][0].port in all_ports
                assert n_manager.slots_cache[i][1].port in all_ports

        assert len(n_manager.nodes_cache) == 6

        await rc.aclose()

    async def test_init_slots_cache_cluster_mode_disabled(self) -> None:
        """
        Test that creating a RedisCluster failes if one of the startup nodes
        has cluster mode disabled
        """
        with pytest.raises(RedisClusterException) as e:
            rc = await get_mocked_redis_client(
                cluster_slots_raise_error=True,
                host=default_host,
                port=default_port,
                cluster_enabled=False,
            )
            await rc.aclose()
        assert "Cluster mode is not enabled on this node" in str(e.value)

    async def test_empty_startup_nodes(self) -> None:
        """
        It should not be possible to create a node manager with no nodes
        specified
        """
        with pytest.raises(RedisClusterException):
            await NodesManager([], False, {}).initialize()

    async def test_wrong_startup_nodes_type(self) -> None:
        """
        If something other then a list type itteratable is provided it should
        fail
        """
        with pytest.raises(RedisClusterException):
            await NodesManager({}, False, {}).initialize()

    async def test_init_slots_cache_slots_collision(self) -> None:
        """
        Test that if 2 nodes do not agree on the same slots setup it should
        raise an error. In this test both nodes will say that the first
        slots block should be bound to different servers.
        """
        with mock.patch.object(
            ClusterNode, "execute_command", autospec=True
        ) as execute_command:

            async def mocked_execute_command(self, *args, **kwargs):
                """
                Helper function to return custom slots cache data from
                different redis nodes
                """
                if self.port == 7000:
                    result = [
                        [0, 5460, ["127.0.0.1", 7000], ["127.0.0.1", 7003]],
                        [5461, 10922, ["127.0.0.1", 7001], ["127.0.0.1", 7004]],
                    ]

                elif self.port == 7001:
                    result = [
                        [0, 5460, ["127.0.0.1", 7001], ["127.0.0.1", 7003]],
                        [5461, 10922, ["127.0.0.1", 7000], ["127.0.0.1", 7004]],
                    ]
                else:
                    result = []

                if args[0] == "CLUSTER SLOTS":
                    return result
                elif args[0] == "INFO":
                    return {"cluster_enabled": True}
                elif args[1] == "cluster-require-full-coverage":
                    return {"cluster-require-full-coverage": "yes"}

            execute_command.side_effect = mocked_execute_command

            with pytest.raises(RedisClusterException) as ex:
                node_1 = ClusterNode("127.0.0.1", 7000)
                node_2 = ClusterNode("127.0.0.1", 7001)
                async with RedisCluster(startup_nodes=[node_1, node_2]):
                    ...
            assert str(ex.value).startswith(
                "startup_nodes could not agree on a valid slots cache"
            ), str(ex.value)

    async def test_cluster_one_instance(self) -> None:
        """
        If the cluster exists of only 1 node then there is some hacks that must
        be validated they work.
        """
        node = ClusterNode(default_host, default_port)
        cluster_slots = [[0, 16383, ["", default_port]]]
        rc = await get_mocked_redis_client(
            startup_nodes=[node], cluster_slots=cluster_slots
        )

        n = rc.nodes_manager
        assert len(n.nodes_cache) == 1
        n_node = rc.get_node(node_name=node.name)
        assert n_node is not None
        assert n_node == node
        assert n_node.server_type == PRIMARY
        assert len(n.slots_cache) == REDIS_CLUSTER_HASH_SLOTS
        for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
            assert n.slots_cache[i] == [n_node]

        await rc.aclose()

    async def test_init_with_down_node(self) -> None:
        """
        If I can't connect to one of the nodes, everything should still work.
        But if I can't connect to any of the nodes, exception should be thrown.
        """
        with mock.patch.object(
            ClusterNode, "execute_command", autospec=True
        ) as execute_command:

            async def mocked_execute_command(self, *args, **kwargs):
                if self.port == 7000:
                    raise ConnectionError("mock connection error for 7000")

                if args[0] == "CLUSTER SLOTS":
                    return [
                        [0, 8191, ["127.0.0.1", 7001, "node_1"]],
                        [8192, 16383, ["127.0.0.1", 7002, "node_2"]],
                    ]
                elif args[0] == "INFO":
                    return {"cluster_enabled": True}
                elif args[1] == "cluster-require-full-coverage":
                    return {"cluster-require-full-coverage": "yes"}

            execute_command.side_effect = mocked_execute_command

            node_1 = ClusterNode("127.0.0.1", 7000)
            node_2 = ClusterNode("127.0.0.1", 7001)

            # If all startup nodes fail to connect, connection error should be
            # thrown
            with pytest.raises(RedisClusterException) as e:
                async with RedisCluster(startup_nodes=[node_1]):
                    ...
            assert "Redis Cluster cannot be connected" in str(e.value)

            with mock.patch.object(
                AsyncCommandsParser, "initialize", autospec=True
            ) as cmd_parser_initialize:

                def cmd_init_mock(self, r: ClusterNode) -> None:
                    self.commands = {
                        "GET": {
                            "name": "get",
                            "arity": 2,
                            "flags": ["readonly", "fast"],
                            "first_key_pos": 1,
                            "last_key_pos": 1,
                            "step_count": 1,
                        }
                    }

                cmd_parser_initialize.side_effect = cmd_init_mock
                # When at least one startup node is reachable, the cluster
                # initialization should succeeds
                async with RedisCluster(startup_nodes=[node_1, node_2]) as rc:
                    assert rc.get_node(host=default_host, port=7001) is not None
                    assert rc.get_node(host=default_host, port=7002) is not None

    @pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
    async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
        rc = await get_mocked_redis_client(
            host="my@DNS.com",
            port=7000,
            cluster_slots=default_cluster_slots,
            dynamic_startup_nodes=dynamic_startup_nodes,
        )
        # Nodes are taken from default_cluster_slots
        discovered_nodes = [
            "127.0.0.1:7000",
            "127.0.0.1:7001",
            "127.0.0.1:7002",
            "127.0.0.1:7003",
        ]
        startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
        if dynamic_startup_nodes is True:
            assert sorted(startup_nodes) == sorted(discovered_nodes)
        else:
            assert startup_nodes == ["my@DNS.com:7000"]


class TestClusterPipeline:
    """Tests for the ClusterPipeline class."""

    async def test_blocked_arguments(self, r: RedisCluster) -> None:
        """Test handling for blocked pipeline arguments."""

        with pytest.raises(RedisClusterException) as ex:
            r.pipeline(shard_hint=True)

        assert str(ex.value) == "shard_hint is deprecated in cluster mode"

    async def test_blocked_methods(self, r: RedisCluster) -> None:
        """Test handling for blocked pipeline commands."""
        pipeline = r.pipeline()
        for command in PIPELINE_BLOCKED_COMMANDS:
            command = command.replace(" ", "_").lower()
            if command == "mset_nonatomic":
                continue

            with pytest.raises(RedisClusterException) as exc:
                getattr(pipeline, command)()

            assert str(exc.value) == (
                f"ERROR: Calling pipelined function {command} is blocked "
                "when running redis in cluster mode..."
            )

    async def test_empty_stack(self, r: RedisCluster) -> None:
        """If a pipeline is executed with no commands it should return a empty list."""
        p = r.pipeline()
        result = await p.execute()
        assert result == []

    async def test_redis_cluster_pipeline(self, r: RedisCluster) -> None:
        """Test that we can use a pipeline with the RedisCluster class"""
        result = await (
            r.pipeline()
            .set("A", 1)
            .get("A")
            .hset("K", "F", "V")
            .hgetall("K")
            .mset_nonatomic({"A": 2, "B": 3})
            .get("A")
            .get("B")
            .delete("A", "B", "K")
            .execute()
        )
        assert result == [True, b"1", 1, {b"F": b"V"}, True, True, b"2", b"3", 1, 1, 1]

    async def test_cluster_pipeline_execution_zero_cluster_err_retries(
        self, r: RedisCluster
    ) -> None:
        """
        Test that we can run successfully cluster pipeline execute at least once when
        cluster_error_retry_attempts is set to 0
        """
        r.cluster_error_retry_attempts = 0
        result = await r.pipeline().set("A", 1).get("A").delete("A").execute()
        assert result == [True, b"1", 1]

    async def test_multi_key_operation_with_a_single_slot(
        self, r: RedisCluster
    ) -> None:
        """Test multi key operation with a single slot."""
        pipe = r.pipeline()
        pipe.set("a{foo}", 1)
        pipe.set("b{foo}", 2)
        pipe.set("c{foo}", 3)
        pipe.get("a{foo}")
        pipe.get("b{foo}")
        pipe.get("c{foo}")

        res = await pipe.execute()
        assert res == [True, True, True, b"1", b"2", b"3"]

    async def test_multi_key_operation_with_multi_slots(self, r: RedisCluster) -> None:
        """Test multi key operation with more than one slot."""
        pipe = r.pipeline()
        pipe.set("a{foo}", 1)
        pipe.set("b{foo}", 2)
        pipe.set("c{foo}", 3)
        pipe.set("bar", 4)
        pipe.set("bazz", 5)
        pipe.get("a{foo}")
        pipe.get("b{foo}")
        pipe.get("c{foo}")
        pipe.get("bar")
        pipe.get("bazz")
        res = await pipe.execute()
        assert res == [True, True, True, True, True, b"1", b"2", b"3", b"4", b"5"]

    async def test_cluster_down_error(self, r: RedisCluster) -> None:
        """
        Test that the pipeline retries the specified in retry object times before raising
        an error.
        """
        key = "foo"
        node = r.get_node_from_key(key, False)

        parse_response_orig = node.parse_response
        with mock.patch.object(
            ClusterNode, "parse_response", autospec=True
        ) as parse_response_mock:

            async def parse_response(
                self, connection: Connection, command: str, **kwargs: Any
            ) -> Any:
                if command == "GET":
                    raise ClusterDownError("error")
                return await parse_response_orig(connection, command, **kwargs)

            parse_response_mock.side_effect = parse_response

            # For each ClusterDownError, we launch 4 commands: INFO, CLUSTER SLOTS,
            # COMMAND, GET. Before any errors, the first 3 commands are already run
            async with r.pipeline() as pipe:
                with pytest.raises(ClusterDownError):
                    await pipe.get(key).execute()
                assert node.parse_response.await_count == 3 * r.retry.get_retries() + 1

    async def test_connection_error_not_raised(self, r: RedisCluster) -> None:
        """Test ConnectionError handling with raise_on_error=False."""
        key = "foo"
        node = r.get_node_from_key(key, False)

        parse_response_orig = node.parse_response
        with mock.patch.object(
            ClusterNode, "parse_response", autospec=True
        ) as parse_response_mock:

            async def parse_response(
                self, connection: Connection, command: str, **kwargs: Any
            ) -> Any:
                if command == "GET":
                    raise ConnectionError("error")
                return await parse_response_orig(connection, command, **kwargs)

            parse_response_mock.side_effect = parse_response

            async with r.pipeline() as pipe:
                res = await pipe.get(key).get(key).execute(raise_on_error=False)
                assert node.parse_response.await_count
                assert isinstance(res[0], ConnectionError)

    async def test_connection_error_raised(self, r: RedisCluster) -> None:
        """Test ConnectionError handling with raise_on_error=True."""
        key = "foo"
        node = r.get_node_from_key(key, False)

        parse_response_orig = node.parse_response
        with mock.patch.object(
            ClusterNode, "parse_response", autospec=True
        ) as parse_response_mock:

            async def parse_response(
                self, connection: Connection, command: str, **kwargs: Any
            ) -> Any:
                if command == "GET":
                    raise ConnectionError("error")
                return await parse_response_orig(connection, command, **kwargs)

            parse_response_mock.side_effect = parse_response

            async with r.pipeline() as pipe:
                with pytest.raises(ConnectionError):
                    await pipe.get(key).get(key).execute(raise_on_error=True)

    async def test_asking_error(self, r: RedisCluster) -> None:
        """Test AskError handling."""
        key = "foo"
        first_node = r.get_node_from_key(key, False)
        ask_node = None
        for node in r.get_nodes():
            if node != first_node:
                ask_node = node
                break
        ask_msg = f"{r.keyslot(key)} {ask_node.host}:{ask_node.port}"

        async with r.pipeline() as pipe:
            mock_node_resp_exc(first_node, AskError(ask_msg))
            mock_node_resp(ask_node, "MOCK_OK")
            res = await pipe.get(key).execute()
            assert first_node._free.pop().read_response.await_count
            assert ask_node._free.pop().read_response.await_count
            assert res == ["MOCK_OK"]

    async def test_error_is_truncated(self, r) -> None:
        """
        Test that an error from the pipeline is truncated correctly.
        """
        key = "a" * 50
        a_value = "a" * 20
        b_value = "b" * 20

        async with r.pipeline() as pipe:
            pipe.set(key, 1)
            pipe.hset(key, mapping={"field_a": a_value, "field_b": b_value})
            pipe.expire(key, 100)

            with pytest.raises(Exception) as ex:
                await pipe.execute()

            expected = f"Command # 2 (HSET {key} field_a {a_value} field_b...) of pipeline caused error: "
            assert str(ex.value).startswith(expected)

    async def test_moved_redirection_on_slave_with_default(
        self, r: RedisCluster
    ) -> None:
        """Test MovedError handling."""
        key = "foo"
        await r.set("foo", "bar")
        # set read_from_replicas to True
        r.read_from_replicas = True
        primary = r.get_node_from_key(key, False)
        moved_error = f"{r.keyslot(key)} {primary.host}:{primary.port}"

        parse_response_orig = primary.parse_response
        with mock.patch.object(
            ClusterNode, "parse_response", autospec=True
        ) as parse_response_mock:

            async def parse_response(
                self, connection: Connection, command: str, **kwargs: Any
            ) -> Any:
                if command == "GET" and self.port != primary.port:
                    raise MovedError(moved_error)

                return await parse_response_orig(connection, command, **kwargs)

            parse_response_mock.side_effect = parse_response

            async with r.pipeline() as readwrite_pipe:
                assert r.reinitialize_counter == 0
                readwrite_pipe.get(key).get(key)
                assert r.reinitialize_counter == 0
                assert await readwrite_pipe.execute() == [b"bar", b"bar"]

    async def test_readonly_pipeline_from_readonly_client(
        self, r: RedisCluster
    ) -> None:
        """Test that the pipeline uses replicas for read_from_replicas clients."""
        # Create a cluster with reading from replications
        r.read_from_replicas = True
        key = "bar"
        await r.set(key, "foo")

        async with r.pipeline() as pipe:
            mock_all_nodes_resp(r, "MOCK_OK")
            assert await pipe.get(key).get(key).execute() == ["MOCK_OK", "MOCK_OK"]
            slot_nodes = r.nodes_manager.slots_cache[r.keyslot(key)]
            executed_on_replica = False
            for node in slot_nodes:
                if node.server_type == REPLICA:
                    if node._free.pop().read_response.await_count:
                        executed_on_replica = True
                        break
            assert executed_on_replica

    @pytest.mark.parametrize(
        "load_balancing_strategy",
        [
            LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
            LoadBalancingStrategy.RANDOM_REPLICA,
        ],
    )
    async def test_readonly_pipeline_with_reading_from_replicas_strategies(
        self, r: RedisCluster, load_balancing_strategy: LoadBalancingStrategy
    ) -> None:
        """
        Test that the pipeline uses replicas for different replica-based
        load balancing strategies.
        """
        # Set the load balancing strategy
        r.load_balancing_strategy = load_balancing_strategy
        key = "bar"
        await r.set(key, "foo")

        async with r.pipeline() as pipe:
            mock_all_nodes_resp(r, "MOCK_OK")
            assert await pipe.get(key).get(key).execute() == ["MOCK_OK", "MOCK_OK"]
            slot_nodes = r.nodes_manager.slots_cache[r.keyslot(key)]
            executed_on_replicas_only = True
            for node in slot_nodes:
                if node.server_type == PRIMARY:
                    if node._free.pop().read_response.await_count > 0:
                        executed_on_replicas_only = False
                        break
            assert executed_on_replicas_only

    async def test_can_run_concurrent_pipelines(self, r: RedisCluster) -> None:
        """Test that the pipeline can be used concurrently."""
        await asyncio.gather(
            *(self.test_redis_cluster_pipeline(r) for i in range(100)),
            *(self.test_multi_key_operation_with_a_single_slot(r) for i in range(100)),
            *(self.test_multi_key_operation_with_multi_slots(r) for i in range(100)),
        )

    @pytest.mark.onlycluster
    async def test_pipeline_with_default_node_error_command(self, create_redis):
        """
        Test that the default node is being replaced when it raises a relevant exception
        """
        r = await create_redis(cls=RedisCluster, flushdb=False)
        curr_default_node = r.get_default_node()
        err = ConnectionError("error")
        cmd_count = await r.command_count()
        mock_node_resp_exc(curr_default_node, err)
        async with r.pipeline(transaction=False) as pipe:
            pipe.command_count()
            result = await pipe.execute(raise_on_error=False)
            assert result[0] == err
            assert r.get_default_node() != curr_default_node
            pipe.command_count()
            result = await pipe.execute(raise_on_error=False)
            assert result[0] == cmd_count


@pytest.mark.ssl
class TestSSL:
    """
    Tests for SSL connections.

    This relies on the --redis-ssl-url for building the client and connecting to the
    appropriate port.
    """

    @pytest_asyncio.fixture()
    def create_client(self, request: FixtureRequest) -> Callable[..., RedisCluster]:
        ssl_url = request.config.option.redis_ssl_url
        ssl_host, ssl_port = urlparse(ssl_url)[1].split(":")
        self.client_cert, self.client_key, self.ca_cert = get_tls_certificates(
            "cluster"
        )

        async def _create_client(mocked: bool = True, **kwargs: Any) -> RedisCluster:
            if mocked:
                with mock.patch.object(
                    ClusterNode, "execute_command", autospec=True
                ) as execute_command_mock:

                    async def execute_command(self, *args, **kwargs):
                        if args[0] == "INFO":
                            return {"cluster_enabled": True}
                        if args[0] == "CLUSTER SLOTS":
                            return [[0, 16383, [ssl_host, ssl_port, "ssl_node"]]]
                        if args[0] == "COMMAND":
                            return {
                                "ping": {
                                    "name": "ping",
                                    "arity": -1,
                                    "flags": ["stale", "fast"],
                                    "first_key_pos": 0,
                                    "last_key_pos": 0,
                                    "step_count": 0,
                                }
                            }
                        raise NotImplementedError()

                    execute_command_mock.side_effect = execute_command

                    rc = await RedisCluster(host=ssl_host, port=ssl_port, **kwargs)

                assert len(rc.get_nodes()) == 1
                node = rc.get_default_node()
                assert node.port == int(ssl_port)
                return rc

            return await RedisCluster(host=ssl_host, port=ssl_port, **kwargs)

        return _create_client

    async def test_ssl_connection_without_ssl(
        self, create_client: Callable[..., Awaitable[RedisCluster]]
    ) -> None:
        with pytest.raises(RedisClusterException) as e:
            await create_client(mocked=False, ssl=False)
        e = e.value.__cause__
        assert "Connection closed by server" in str(e)

    async def test_ssl_with_invalid_cert(
        self, create_client: Callable[..., Awaitable[RedisCluster]]
    ) -> None:
        with pytest.raises(RedisClusterException) as e:
            await create_client(mocked=False, ssl=True)
        e = e.value.__cause__.__context__
        assert "SSL: CERTIFICATE_VERIFY_FAILED" in str(e)

    async def test_ssl_connection(
        self, create_client: Callable[..., Awaitable[RedisCluster]]
    ) -> None:
        async with await create_client(ssl=True, ssl_cert_reqs="none") as rc:
            assert await rc.ping()

    @pytest.mark.parametrize(
        "ssl_ciphers",
        [
            "AES256-SHA:DHE-RSA-AES256-SHA:AES128-SHA:DHE-RSA-AES128-SHA",
            "ECDHE-ECDSA-AES256-GCM-SHA384",
            "ECDHE-RSA-AES128-GCM-SHA256",
        ],
    )
    async def test_ssl_connection_tls12_custom_ciphers(
        self, ssl_ciphers, create_client: Callable[..., Awaitable[RedisCluster]]
    ) -> None:
        async with await create_client(
            ssl=True,
            ssl_cert_reqs="none",
            ssl_min_version=ssl.TLSVersion.TLSv1_2,
            ssl_ciphers=ssl_ciphers,
        ) as rc:
            assert await rc.ping()

    async def test_ssl_connection_tls12_custom_ciphers_invalid(
        self, create_client: Callable[..., Awaitable[RedisCluster]]
    ) -> None:
        async with await create_client(
            ssl=True,
            ssl_cert_reqs="none",
            ssl_min_version=ssl.TLSVersion.TLSv1_2,
            ssl_ciphers="foo:bar",
        ) as rc:
            with pytest.raises(RedisClusterException) as e:
                assert await rc.ping()
            assert "Redis Cluster cannot be connected" in str(e.value)

    @pytest.mark.parametrize(
        "ssl_ciphers",
        [
            "TLS_CHACHA20_POLY1305_SHA256",
            "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256",
        ],
    )
    async def test_ssl_connection_tls13_custom_ciphers(
        self, ssl_ciphers, create_client: Callable[..., Awaitable[RedisCluster]]
    ) -> None:
        # TLSv1.3 does not support changing the ciphers
        async with await create_client(
            ssl=True,
            ssl_cert_reqs="none",
            ssl_min_version=ssl.TLSVersion.TLSv1_2,
            ssl_ciphers=ssl_ciphers,
        ) as rc:
            with pytest.raises(RedisClusterException) as e:
                assert await rc.ping()
            assert "Redis Cluster cannot be connected" in str(e.value)

    async def test_validating_self_signed_certificate(
        self, create_client: Callable[..., Awaitable[RedisCluster]]
    ) -> None:
        # ssl_check_hostname=False is used to avoid hostname verification
        # in the test environment, where the server certificate is self-signed
        # and does not match the hostname that is extracted for the cluster.
        # Cert hostname is 'localhost' in the cluster initialization when using
        # 'localhost' it gets transformed into 127.0.0.1
        # In production code, ssl_check_hostname should be set to True
        # to ensure proper hostname verification.
        async with await create_client(
            ssl=True,
            ssl_ca_certs=self.ca_cert,
            ssl_cert_reqs="required",
            ssl_certfile=self.client_cert,
            ssl_keyfile=self.client_key,
            ssl_check_hostname=False,
        ) as rc:
            assert await rc.ping()

    async def test_validating_self_signed_string_certificate(
        self, create_client: Callable[..., Awaitable[RedisCluster]]
    ) -> None:
        with open(self.ca_cert) as f:
            cert_data = f.read()

        # ssl_check_hostname=False is used to avoid hostname verification
        # in the test environment, where the server certificate is self-signed
        # and does not match the hostname that is extracted for the cluster.
        # Cert hostname is 'localhost' in the cluster initialization when using
        # 'localhost' it gets transformed into 127.0.0.1
        # In production code, ssl_check_hostname should be set to True
        # to ensure proper hostname verification.
        async with await create_client(
            ssl=True,
            ssl_ca_data=cert_data,
            ssl_cert_reqs="required",
            ssl_check_hostname=False,
            ssl_certfile=self.client_cert,
            ssl_keyfile=self.client_key,
        ) as rc:
            assert await rc.ping()
