1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
#------------------------------------------------------------------------------
# aq.py (Section 10.1)
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
import cx_Oracle
import decimal
import db_config
con = cx_Oracle.connect(db_config.user, db_config.pw, db_config.dsn)
cur = con.cursor()
BOOK_TYPE_NAME = "UDT_BOOK"
QUEUE_NAME = "BOOKS"
QUEUE_TABLE_NAME = "BOOK_QUEUE_TABLE"
# Cleanup
cur.execute(
"""begin
dbms_aqadm.stop_queue('""" + QUEUE_NAME + """');
dbms_aqadm.drop_queue('""" + QUEUE_NAME + """');
dbms_aqadm.drop_queue_table('""" + QUEUE_TABLE_NAME + """');
execute immediate 'drop type """ + BOOK_TYPE_NAME + """';
exception when others then
if sqlcode <> -24010 then
raise;
end if;
end;""")
# Create a type
print("Creating books type UDT_BOOK...")
cur.execute("""
create type %s as object (
title varchar2(100),
authors varchar2(100),
price number(5,2)
);""" % BOOK_TYPE_NAME)
# Create queue table and queue and start the queue
print("Creating queue table...")
cur.callproc("dbms_aqadm.create_queue_table",
(QUEUE_TABLE_NAME, BOOK_TYPE_NAME))
cur.callproc("dbms_aqadm.create_queue", (QUEUE_NAME, QUEUE_TABLE_NAME))
cur.callproc("dbms_aqadm.start_queue", (QUEUE_NAME,))
booksType = con.gettype(BOOK_TYPE_NAME)
queue = con.queue(QUEUE_NAME, booksType)
# Enqueue a few messages
print("Enqueuing messages...")
BOOK_DATA = [
("The Fellowship of the Ring", "Tolkien, J.R.R.", decimal.Decimal("10.99")),
("Harry Potter and the Philosopher's Stone", "Rowling, J.K.",
decimal.Decimal("7.99"))
]
for title, authors, price in BOOK_DATA:
book = booksType.newobject()
book.TITLE = title
book.AUTHORS = authors
book.PRICE = price
print(title)
queue.enqOne(con.msgproperties(payload=book))
con.commit()
# Dequeue the messages
print("\nDequeuing messages...")
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
while True:
props = queue.deqOne()
if not props:
break
print(props.payload.TITLE)
con.commit()
print("\nDone.")
|