File: test_3000_subscription.py

package info (click to toggle)
python-cx-oracle 8.3.0-3
  • links: PTS, VCS
  • area: contrib
  • in suites: bookworm, sid
  • size: 3,276 kB
  • sloc: ansic: 10,406; python: 9,358; sql: 1,724; makefile: 31
file content (138 lines) | stat: -rw-r--r-- 5,316 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#------------------------------------------------------------------------------
# Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------

"""
3000 - Module for testing subscriptions
"""

import threading

import cx_Oracle as oracledb
import test_env

class SubscriptionData(object):

    def __init__(self, num_messages_expected):
        self.condition = threading.Condition()
        self.num_messages_expected = num_messages_expected
        self.num_messages_received = 0
        self.table_operations = []
        self.row_operations = []
        self.rowids = []

    def CallbackHandler(self, message):
        if message.type != oracledb.EVENT_DEREG:
            table, = message.tables
            self.table_operations.append(table.operation)
            for row in table.rows:
                self.row_operations.append(row.operation)
                self.rowids.append(row.rowid)
            self.num_messages_received += 1
        if message.type == oracledb.EVENT_DEREG or \
                self.num_messages_received == self.num_messages_expected:
            self.condition.acquire()
            self.condition.notify()
            self.condition.release()


class TestCase(test_env.BaseTestCase):

    def test_3000_subscription(self):
        "3000 - test Subscription for insert, update, delete and truncate"

        # skip if running on the Oracle Cloud, which does not support
        # subscriptions currently
        if self.is_on_oracle_cloud():
            message = "Oracle Cloud does not support subscriptions currently"
            self.skipTest(message)

        # truncate table in order to run test in known state
        self.cursor.execute("truncate table TestTempTable")

        # expected values
        table_operations = [
            oracledb.OPCODE_INSERT,
            oracledb.OPCODE_UPDATE,
            oracledb.OPCODE_INSERT,
            oracledb.OPCODE_DELETE,
            oracledb.OPCODE_ALTER | oracledb.OPCODE_ALLROWS
        ]
        row_operations = [
            oracledb.OPCODE_INSERT,
            oracledb.OPCODE_UPDATE,
            oracledb.OPCODE_INSERT,
            oracledb.OPCODE_DELETE
        ]
        rowids = []

        # set up subscription
        data = SubscriptionData(5)
        connection = test_env.get_connection(threaded=True, events=True)
        sub = connection.subscribe(callback=data.CallbackHandler,
                                   timeout=10, qos=oracledb.SUBSCR_QOS_ROWIDS)
        sub.registerquery("select * from TestTempTable")
        connection.autocommit = True
        cursor = connection.cursor()

        # insert statement
        cursor.execute("""
                insert into TestTempTable (IntCol, StringCol)
                values (1, 'test')""")
        cursor.execute("select rowid from TestTempTable where IntCol = 1")
        rowids.extend(r for r, in cursor)

        # update statement
        cursor.execute("""
                update TestTempTable set
                    StringCol = 'update'
                where IntCol = 1""")
        cursor.execute("select rowid from TestTempTable where IntCol = 1")
        rowids.extend(r for r, in cursor)

        # second insert statement
        cursor.execute("""
                insert into TestTempTable (IntCol, StringCol)
                values (2, 'test2')""")
        cursor.execute("select rowid from TestTempTable where IntCol = 2")
        rowids.extend(r for r, in cursor)

        # delete statement
        cursor.execute("delete TestTempTable where IntCol = 2")
        rowids.append(rowids[-1])

        # truncate table
        cursor.execute("truncate table TestTempTable")

        # wait for all messages to be sent
        data.condition.acquire()
        data.condition.wait(10)

        # verify the correct messages were sent
        self.assertEqual(data.table_operations, table_operations)
        self.assertEqual(data.row_operations, row_operations)
        self.assertEqual(data.rowids, rowids)

        # test string format of subscription object is as expected
        fmt = "<cx_Oracle.Subscription on <cx_Oracle.Connection to %s@%s>>"
        expected = fmt % \
                (test_env.get_main_user(), test_env.get_connect_string())
        self.assertEqual(str(sub), expected)

    def test_3001_deprecations(self):
        "3001 - test to verify deprecations"
        connection = test_env.get_connection(threaded=True, events=True)
        self.assertRaises(oracledb.ProgrammingError, connection.subscribe,
                          ip_address='www.oracle.in',
                          ipAddress='www.oracle.in')
        self.assertRaises(oracledb.ProgrammingError, connection.subscribe,
                          grouping_class=1, groupingClass=1)
        self.assertRaises(oracledb.ProgrammingError, connection.subscribe,
                          grouping_value=3, groupingValue=3)
        self.assertRaises(oracledb.ProgrammingError, connection.subscribe,
                          grouping_type=2, groupingType=2)
        self.assertRaises(oracledb.ProgrammingError, connection.subscribe,
                          client_initiated=True, clientInitiated=True)

if __name__ == "__main__":
    test_env.run_test_cases()