File: test_custom_payload.py

package info (click to toggle)
python-cassandra-driver 3.29.2-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,144 kB
  • sloc: python: 51,532; ansic: 768; makefile: 136; sh: 13
file content (165 lines) | stat: -rw-r--r-- 6,327 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import unittest

from cassandra.query import (SimpleStatement, BatchStatement, BatchType)

from tests.integration import use_singledc, PROTOCOL_VERSION, local, TestCluster


def setup_module():
    use_singledc()

#These test rely on the custom payload being returned but by default C*
#ignores all the payloads.
@local
class CustomPayloadTests(unittest.TestCase):

    def setUp(self):
        if PROTOCOL_VERSION < 4:
            raise unittest.SkipTest(
                "Native protocol 4,0+ is required for custom payloads, currently using %r"
                % (PROTOCOL_VERSION,))
        self.cluster = TestCluster()
        self.session = self.cluster.connect()

    def tearDown(self):

        self.cluster.shutdown()

    def test_custom_query_basic(self):
        """
        Test to validate that custom payloads work with simple queries

        creates a simple query and ensures that custom payloads are passed to C*. A custom
        query provider is used with C* so we can validate that same custom payloads are sent back
        with the results


        @since 2.6
        @jira_ticket PYTHON-280
        @expected_result valid custom payloads should be sent and received

        @test_category queries:custom_payload
        """

        # Create a simple query statement a
        query = "SELECT * FROM system.local"
        statement = SimpleStatement(query)
        # Validate that various types of custom payloads are sent and received okay
        self.validate_various_custom_payloads(statement=statement)

    def test_custom_query_batching(self):
        """
        Test to validate that custom payloads work with batch queries

        creates a batch query and ensures that custom payloads are passed to C*. A custom
        query provider is used with C* so we can validate that same custom payloads are sent back
        with the results


        @since 2.6
        @jira_ticket PYTHON-280
        @expected_result valid custom payloads should be sent and received

        @test_category queries:custom_payload
        """

        # Construct Batch Statement
        batch = BatchStatement(BatchType.LOGGED)
        for i in range(10):
            batch.add(SimpleStatement("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)"), (i, i))

        # Validate that various types of custom payloads are sent and received okay
        self.validate_various_custom_payloads(statement=batch)

    def test_custom_query_prepared(self):
        """
        Test to validate that custom payloads work with prepared queries

        creates a batch query and ensures that custom payloads are passed to C*. A custom
        query provider is used with C* so we can validate that same custom payloads are sent back
        with the results


        @since 2.6
        @jira_ticket PYTHON-280
        @expected_result valid custom payloads should be sent and received

        @test_category queries:custom_payload
        """

        # Construct prepared statement
        prepared = self.session.prepare(
            """
            INSERT INTO test3rf.test (k, v) VALUES  (?, ?)
            """)

        bound = prepared.bind((1, None))

        # Validate that various custom payloads are validated correctly
        self.validate_various_custom_payloads(statement=bound)

    def validate_various_custom_payloads(self, statement):
        """
        This is a utility method that given a statement will attempt
        to submit the statement with various custom payloads. It will
        validate that the custom payloads are sent and received correctly.

        @param statement The statement to validate the custom queries in conjunction with
        """

        # Simple key value
        custom_payload = {'test': b'test_return'}
        self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)

        # no key value
        custom_payload = {'': b''}
        self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)

        # Space value
        custom_payload = {' ': b' '}
        self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)

        # Long key value pair
        key_value = "x" * 10
        custom_payload = {key_value: key_value.encode()}
        self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)

        # Max supported value key pairs according C* binary protocol v4 should be 65534 (unsigned short max value)
        for i in range(65534):
            custom_payload[str(i)] = b'x'
        self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)

        # Add one custom payload to this is too many key value pairs and should fail
        custom_payload[str(65535)] = b'x'
        with self.assertRaises(ValueError):
            self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)

    def execute_async_validate_custom_payload(self, statement, custom_payload):
        """
        This is just a simple method that submits a statement with a payload, and validates
        that the custom payload we submitted matches the one that we got back
        @param statement The statement to execute
        @param custom_payload The custom payload to submit with
        """

        # Submit the statement with our custom payload. Validate the one
        # we receive from the server matches
        response_future = self.session.execute_async(statement, custom_payload=custom_payload)
        response_future.result()
        returned_custom_payload = response_future.custom_payload
        self.assertEqual(custom_payload, returned_custom_payload)