File: test_control_connection.py

package info (click to toggle)
python-cassandra-driver 3.29.2-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,144 kB
  • sloc: python: 51,532; ansic: 768; makefile: 136; sh: 13
file content (128 lines) | stat: -rw-r--r-- 4,785 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
#
#
#
from cassandra import InvalidRequest

import unittest


from cassandra.protocol import ConfigurationException
from tests.integration import use_singledc, PROTOCOL_VERSION, TestCluster, greaterthanorequalcass40, notdse
from tests.integration.datatype_utils import update_datatypes


def setup_module():
    use_singledc()
    update_datatypes()


class ControlConnectionTests(unittest.TestCase):
    def setUp(self):
        if PROTOCOL_VERSION < 3:
            raise unittest.SkipTest(
                "Native protocol 3,0+ is required for UDTs using %r"
                % (PROTOCOL_VERSION,))
        self.cluster = TestCluster()

    def tearDown(self):
        try:
            self.session.execute("DROP KEYSPACE keyspacetodrop ")
        except (ConfigurationException, InvalidRequest):
            # we already removed the keyspace.
            pass
        self.cluster.shutdown()

    def test_drop_keyspace(self):
        """
        Test to validate that dropping a keyspace with user defined types doesn't kill the control connection.


        Creates a keyspace, and populates with a user defined type. It then records the control_connection's id. It
        will then drop the keyspace and get the id of the control_connection again. They should be the same. If they are
        not dropping the keyspace likely caused the control connection to be rebuilt.

        @since 2.7.0
        @jira_ticket PYTHON-358
        @expected_result the control connection is not killed

        @test_category connection
        """

        self.session = self.cluster.connect()
        self.session.execute("""
            CREATE KEYSPACE keyspacetodrop
            WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1' }
            """)
        self.session.set_keyspace("keyspacetodrop")
        self.session.execute("CREATE TYPE user (age int, name text)")
        self.session.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)")
        cc_id_pre_drop = id(self.cluster.control_connection._connection)
        self.session.execute("DROP KEYSPACE keyspacetodrop")
        cc_id_post_drop = id(self.cluster.control_connection._connection)
        self.assertEqual(cc_id_post_drop, cc_id_pre_drop)

    def test_get_control_connection_host(self):
        """
        Test to validate Cluster.get_control_connection_host() metadata

        @since 3.5.0
        @jira_ticket PYTHON-583
        @expected_result the control connection metadata should accurately reflect cluster state.

        @test_category metadata
        """

        host = self.cluster.get_control_connection_host()
        self.assertEqual(host, None)

        self.session = self.cluster.connect()
        cc_host = self.cluster.control_connection._connection.host

        host = self.cluster.get_control_connection_host()
        self.assertEqual(host.address, cc_host)
        self.assertEqual(host.is_up, True)

        # reconnect and make sure that the new host is reflected correctly
        self.cluster.control_connection._reconnect()
        new_host = self.cluster.get_control_connection_host()
        self.assertNotEqual(host, new_host)

    @notdse
    @greaterthanorequalcass40
    def test_control_connection_port_discovery(self):
        """
        Test to validate that the correct port is discovered when peersV2 is used (C* 4.0+).

        Unit tests already validate that the port can be picked up (or not) from the query. This validates
        it picks up the correct port from a real server and is able to connect.
        """
        self.cluster = TestCluster()

        host = self.cluster.get_control_connection_host()
        self.assertEqual(host, None)

        self.session = self.cluster.connect()
        cc_endpoint = self.cluster.control_connection._connection.endpoint

        host = self.cluster.get_control_connection_host()
        self.assertEqual(host.endpoint, cc_endpoint)
        self.assertEqual(host.is_up, True)
        hosts = self.cluster.metadata.all_hosts()
        self.assertEqual(3, len(hosts))

        for host in hosts:
            self.assertEqual(9042, host.broadcast_rpc_port)
            self.assertEqual(7000, host.broadcast_port)