File: test_graph_implicit_execution.py

package info (click to toggle)
python-cassandra-driver 3.29.2-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,144 kB
  • sloc: python: 51,532; ansic: 768; makefile: 138; sh: 13
file content (108 lines) | stat: -rw-r--r-- 4,192 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import Future
from cassandra.datastax.graph.fluent import DseGraph

from tests.integration import requiredse, DSE_VERSION
from tests.integration.advanced import use_single_node_with_graph
from tests.integration.advanced.graph import GraphTestConfiguration
from tests.integration.advanced.graph.fluent import (
    BaseImplicitExecutionTest, create_traversal_profiles, _AbstractTraversalTest)


def setup_module():
    if DSE_VERSION:
        dse_options = {'graph': {'realtime_evaluation_timeout_in_seconds': 60}}
        use_single_node_with_graph(dse_options=dse_options)


@requiredse
@GraphTestConfiguration.generate_tests(traversal=True)
class ImplicitExecutionTest(BaseImplicitExecutionTest, _AbstractTraversalTest):
    def _test_iterate_step(self, schema, graphson):
        """
        Test to validate that the iterate() step work on all dse versions.
        @jira_ticket PYTHON-1155
        @expected_result iterate step works
        @test_category dse graph
        """

        g = self.fetch_traversal_source(graphson)
        self.execute_graph(schema.fixtures.classic(), graphson)
        g.addV('person').property('name', 'Person1').iterate()


@requiredse
@GraphTestConfiguration.generate_tests(traversal=True)
class ImplicitAsyncExecutionTest(BaseImplicitExecutionTest):
    """
    Test to validate that the traversal async execution works properly.

    @since 3.21.0
    @jira_ticket PYTHON-1129

    @test_category dse graph
    """

    def setUp(self):
        super(ImplicitAsyncExecutionTest, self).setUp()
        self.ep_graphson2, self.ep_graphson3 = create_traversal_profiles(self.cluster, self.graph_name)

    def _validate_results(self, results):
        results = list(results)
        self.assertEqual(len(results), 2)
        self.assertIn('vadas', results)
        self.assertIn('josh', results)

    def _test_promise(self, schema, graphson):
        self.execute_graph(schema.fixtures.classic(), graphson)
        g = self.fetch_traversal_source(graphson)
        traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
        self._validate_results(traversal_future.result())

    def _test_promise_error_is_propagated(self, schema, graphson):
        self.execute_graph(schema.fixtures.classic(), graphson)
        g = DseGraph().traversal_source(self.session, 'wrong_graph', execution_profile=self.ep)
        traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
        with self.assertRaises(Exception):
            traversal_future.result()

    def _test_promise_callback(self, schema, graphson):
        self.execute_graph(schema.fixtures.classic(), graphson)
        g = self.fetch_traversal_source(graphson)
        future = Future()

        def cb(f):
            future.set_result(f.result())

        traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
        traversal_future.add_done_callback(cb)
        self._validate_results(future.result())

    def _test_promise_callback_on_error(self, schema, graphson):
        self.execute_graph(schema.fixtures.classic(), graphson)
        g = DseGraph().traversal_source(self.session, 'wrong_graph', execution_profile=self.ep)
        future = Future()

        def cb(f):
            try:
                f.result()
            except Exception as e:
                future.set_exception(e)

        traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
        traversal_future.add_done_callback(cb)
        with self.assertRaises(Exception):
            future.result()