1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import Future
from cassandra.datastax.graph.fluent import DseGraph
from tests.integration import requiredse, DSE_VERSION
from tests.integration.advanced import use_single_node_with_graph
from tests.integration.advanced.graph import GraphTestConfiguration
from tests.integration.advanced.graph.fluent import (
BaseImplicitExecutionTest, create_traversal_profiles, _AbstractTraversalTest)
def setup_module():
if DSE_VERSION:
dse_options = {'graph': {'realtime_evaluation_timeout_in_seconds': 60}}
use_single_node_with_graph(dse_options=dse_options)
@requiredse
@GraphTestConfiguration.generate_tests(traversal=True)
class ImplicitExecutionTest(BaseImplicitExecutionTest, _AbstractTraversalTest):
def _test_iterate_step(self, schema, graphson):
"""
Test to validate that the iterate() step work on all dse versions.
@jira_ticket PYTHON-1155
@expected_result iterate step works
@test_category dse graph
"""
g = self.fetch_traversal_source(graphson)
self.execute_graph(schema.fixtures.classic(), graphson)
g.addV('person').property('name', 'Person1').iterate()
@requiredse
@GraphTestConfiguration.generate_tests(traversal=True)
class ImplicitAsyncExecutionTest(BaseImplicitExecutionTest):
"""
Test to validate that the traversal async execution works properly.
@since 3.21.0
@jira_ticket PYTHON-1129
@test_category dse graph
"""
def setUp(self):
super(ImplicitAsyncExecutionTest, self).setUp()
self.ep_graphson2, self.ep_graphson3 = create_traversal_profiles(self.cluster, self.graph_name)
def _validate_results(self, results):
results = list(results)
self.assertEqual(len(results), 2)
self.assertIn('vadas', results)
self.assertIn('josh', results)
def _test_promise(self, schema, graphson):
self.execute_graph(schema.fixtures.classic(), graphson)
g = self.fetch_traversal_source(graphson)
traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
self._validate_results(traversal_future.result())
def _test_promise_error_is_propagated(self, schema, graphson):
self.execute_graph(schema.fixtures.classic(), graphson)
g = DseGraph().traversal_source(self.session, 'wrong_graph', execution_profile=self.ep)
traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
with self.assertRaises(Exception):
traversal_future.result()
def _test_promise_callback(self, schema, graphson):
self.execute_graph(schema.fixtures.classic(), graphson)
g = self.fetch_traversal_source(graphson)
future = Future()
def cb(f):
future.set_result(f.result())
traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
traversal_future.add_done_callback(cb)
self._validate_results(future.result())
def _test_promise_callback_on_error(self, schema, graphson):
self.execute_graph(schema.fixtures.classic(), graphson)
g = DseGraph().traversal_source(self.session, 'wrong_graph', execution_profile=self.ep)
future = Future()
def cb(f):
try:
f.result()
except Exception as e:
future.set_exception(e)
traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
traversal_future.add_done_callback(cb)
with self.assertRaises(Exception):
future.result()
|