1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
|
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cassandra import cluster
from cassandra.cluster import ContinuousPagingOptions
from cassandra.datastax.graph.fluent import DseGraph
from cassandra.graph import VertexProperty
from tests.integration import greaterthanorequaldse68
from tests.integration.advanced.graph import (
GraphUnitTestCase, ClassicGraphSchema, CoreGraphSchema,
VertexLabel, GraphTestConfiguration
)
from tests.integration import greaterthanorequaldse60
from tests.integration.advanced.graph.fluent import (
BaseExplicitExecutionTest, create_traversal_profiles, check_equality_base)
import unittest
@greaterthanorequaldse60
@GraphTestConfiguration.generate_tests(traversal=True)
class BatchStatementTests(BaseExplicitExecutionTest):
def setUp(self):
super(BatchStatementTests, self).setUp()
self.ep_graphson2, self.ep_graphson3 = create_traversal_profiles(self.cluster, self.graph_name)
def _test_batch_with_schema(self, schema, graphson):
"""
Sends a Batch statement and verifies it has succeeded with a schema created
@since 1.1.0
@jira_ticket PYTHON-789
@expected_result ValueError is arisen
@test_category dse graph
"""
self._send_batch_and_read_results(schema, graphson)
def _test_batch_without_schema(self, schema, graphson):
"""
Sends a Batch statement and verifies it has succeeded without a schema created
@since 1.1.0
@jira_ticket PYTHON-789
@expected_result ValueError is arisen
@test_category dse graph
"""
if schema is not ClassicGraphSchema:
raise unittest.SkipTest('schema-less is only for classic graphs')
self._send_batch_and_read_results(schema, graphson, use_schema=False)
def _test_batch_with_schema_add_all(self, schema, graphson):
"""
Sends a Batch statement and verifies it has succeeded with a schema created.
Uses :method:`dse_graph.query._BatchGraphStatement.add_all` to add the statements
instead of :method:`dse_graph.query._BatchGraphStatement.add`
@since 1.1.0
@jira_ticket PYTHON-789
@expected_result ValueError is arisen
@test_category dse graph
"""
self._send_batch_and_read_results(schema, graphson, add_all=True)
def _test_batch_without_schema_add_all(self, schema, graphson):
"""
Sends a Batch statement and verifies it has succeeded without a schema created
Uses :method:`dse_graph.query._BatchGraphStatement.add_all` to add the statements
instead of :method:`dse_graph.query._BatchGraphStatement.add`
@since 1.1.0
@jira_ticket PYTHON-789
@expected_result ValueError is arisen
@test_category dse graph
"""
if schema is not ClassicGraphSchema:
raise unittest.SkipTest('schema-less is only for classic graphs')
self._send_batch_and_read_results(schema, graphson, add_all=True, use_schema=False)
def test_only_graph_traversals_are_accepted(self):
"""
Verifies that ValueError is risen if the parameter add is not a traversal
@since 1.1.0
@jira_ticket PYTHON-789
@expected_result ValueError is arisen
@test_category dse graph
"""
batch = DseGraph.batch()
self.assertRaises(ValueError, batch.add, '{"@value":{"step":[["addV","poc_int"],'
'["property","bigint1value",{"@value":12,"@type":"g:Int32"}]]},'
'"@type":"g:Bytecode"}')
another_batch = DseGraph.batch()
self.assertRaises(ValueError, batch.add, another_batch)
def _send_batch_and_read_results(self, schema, graphson, add_all=False, use_schema=True):
traversals = []
datatypes = schema.fixtures.datatypes()
values = {}
g = self.fetch_traversal_source(graphson)
ep = self.get_execution_profile(graphson)
batch = DseGraph.batch(session=self.session,
execution_profile=self.get_execution_profile(graphson, traversal=True))
for data in datatypes.values():
typ, value, deserializer = data
vertex_label = VertexLabel([typ])
property_name = next(iter(vertex_label.non_pk_properties.keys()))
values[property_name] = value
if use_schema or schema is CoreGraphSchema:
schema.create_vertex_label(self.session, vertex_label, execution_profile=ep)
traversal = g.addV(str(vertex_label.label)).property('pkid', vertex_label.id).property(property_name, value)
if not add_all:
batch.add(traversal)
traversals.append(traversal)
if add_all:
batch.add_all(traversals)
self.assertEqual(len(datatypes), len(batch))
batch.execute()
vertices = self.execute_traversal(g.V(), graphson)
self.assertEqual(len(vertices), len(datatypes), "g.V() returned {}".format(vertices))
# Iterate over all the vertices and check that they match the original input
for vertex in vertices:
schema.ensure_properties(self.session, vertex, execution_profile=ep)
key = [k for k in list(vertex.properties.keys()) if k != 'pkid'][0].replace("value", "")
original = values[key]
self._check_equality(original, vertex)
def _check_equality(self, original, vertex):
for key in vertex.properties:
if key == 'pkid':
continue
value = vertex.properties[key].value \
if isinstance(vertex.properties[key], VertexProperty) else vertex.properties[key][0].value
check_equality_base(self, original, value)
class ContinuousPagingOptionsForTests(ContinuousPagingOptions):
def __init__(self,
page_unit=ContinuousPagingOptions.PagingUnit.ROWS, max_pages=1, # max_pages=1
max_pages_per_second=0, max_queue_size=4):
super(ContinuousPagingOptionsForTests, self).__init__(page_unit, max_pages, max_pages_per_second,
max_queue_size)
def reset_paging_options():
cluster.ContinuousPagingOptions = ContinuousPagingOptions
@greaterthanorequaldse68
@GraphTestConfiguration.generate_tests(schema=CoreGraphSchema)
class GraphPagingTest(GraphUnitTestCase):
def setUp(self):
super(GraphPagingTest, self).setUp()
self.addCleanup(reset_paging_options)
self.ep_graphson2, self.ep_graphson3 = create_traversal_profiles(self.cluster, self.graph_name)
def _setup_data(self, schema, graphson):
self.execute_graph(
"schema.vertexLabel('person').ifNotExists().partitionBy('name', Text).property('age', Int).create();",
graphson)
for i in range(100):
self.execute_graph("g.addV('person').property('name', 'batman-{}')".format(i), graphson)
def _test_cont_paging_is_enabled_by_default(self, schema, graphson):
"""
Test that graph paging is automatically enabled with a >=6.8 cluster.
@jira_ticket PYTHON-1045
@expected_result the default continuous paging options are used
@test_category dse graph
"""
# with traversals... I don't have access to the response future... so this is a hack to ensure paging is on
cluster.ContinuousPagingOptions = ContinuousPagingOptionsForTests
ep = self.get_execution_profile(graphson, traversal=True)
self._setup_data(schema, graphson)
self.session.default_fetch_size = 10
g = DseGraph.traversal_source(self.session, execution_profile=ep)
results = g.V().toList()
self.assertEqual(len(results), 10) # only 10 results due to our hack
def _test_cont_paging_can_be_disabled(self, schema, graphson):
"""
Test that graph paging can be disabled.
@jira_ticket PYTHON-1045
@expected_result the default continuous paging options are not used
@test_category dse graph
"""
# with traversals... I don't have access to the response future... so this is a hack to ensure paging is on
cluster.ContinuousPagingOptions = ContinuousPagingOptionsForTests
ep = self.get_execution_profile(graphson, traversal=True)
ep = self.session.execution_profile_clone_update(ep, continuous_paging_options=None)
self._setup_data(schema, graphson)
self.session.default_fetch_size = 10
g = DseGraph.traversal_source(self.session, execution_profile=ep)
results = g.V().toList()
self.assertEqual(len(results), 100) # 100 results since paging is disabled
def _test_cont_paging_with_custom_options(self, schema, graphson):
"""
Test that we can specify custom paging options.
@jira_ticket PYTHON-1045
@expected_result we get only the desired number of results
@test_category dse graph
"""
ep = self.get_execution_profile(graphson, traversal=True)
ep = self.session.execution_profile_clone_update(ep,
continuous_paging_options=ContinuousPagingOptions(max_pages=1))
self._setup_data(schema, graphson)
self.session.default_fetch_size = 10
g = DseGraph.traversal_source(self.session, execution_profile=ep)
results = g.V().toList()
self.assertEqual(len(results), 10) # only 10 results since paging is disabled
|