1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
#!/usr/bin/python
"""
Module Name: psyopg
Description: Plug in for PythonCard application dbBrowse to provide Psycopg specific functionality
Psycopg version of the mysqlBrowse class
"""
__version__="$Release $"
__date__="Sat Jun 7 13:39:12 BST 2003"
__author__="Jon Dyte <jon@totient.co.uk>"
import psycopg
from psycopg import NUMBER, STRING, INTEGER, FLOAT, DATETIME
from psycopg import BOOLEAN, ROWID, LONGINTEGER
class browse:
# Connection should be a dictionary with at least three keys, 'username',
# 'password', 'database' - may need to be normalised for other RDBMS
def __init__(self, connection):
"Setup the database connection"
self._system_tables = []
# Not providing a db name is guaranteed to ruin our connection
if not connection['database']:
raise ValueError
self._db = psycopg.connect( "user=%s password=%s dbname=%s" % (connection['username'],
connection['password'],
connection['database']) )
self._cursor=self._db.cursor()
# This one is used in getRow
self._tableName=''
def getTables(self):
"Return a list of all of the non-system tables in <database>.\
CAVEAT: actually gets all the tables not owned by user 'postgres'"
stmt ="SELECT t.tablename FROM pg_tables t WHERE tableowner <> \'postgres\'"
self._cursor.execute(stmt)
# I'm using a list comprehension here instead of a for loop,
# either will do but I think this is more concise (unlike this comment)
return [ x[0] for x in self._cursor.fetchall() if x[0] not in self._system_tables ]
def getColumns(self, tableName):
"Get the definition of the columns in tableName"
stmt = 'select * from %s where 1=0 ' % tableName
try:
self._cursor.execute(stmt)
except psycopg.Error:
return ()
desc = self._cursor.description
r = []
a = r.append
## shamelessly borrowed from the ZPsycopgDA for Zope
for name, type, width, ds, p, scale, null_ok in desc:
if type == NUMBER:
if type == INTEGER:
type = INTEGER
elif type == FLOAT:
type = FLOAT
else: type = NUMBER
elif type == BOOLEAN:
type = BOOLEAN
elif type == ROWID:
type = ROWID
elif type == DATETIME:
type = DATETIME
else:
type = STRING
a((name,type.name,0,0,0))
return r
def getQueryString(self, tableName):
"Return a SQL statement which queries all of the columns in tableName"
tableStructure=self.getColumns(tableName)
# Construct and return the string
return 'SELECT %s FROM %s' % (", ".join([column[0] for column in tableStructure]),
tableName)
def getRow(self, tableName):
"Get a row from tableName"
# When we upgrade to 2.2 this will be a great candidate for a
# generator/iterator. In the meantime we use self._tableName to keep
# track of what we are doing
if tableName!=self._tableName:
self._tableName=tableName
self._cursor.execute(self.getQueryString(tableName))
return self._cursor.fetchone()
def getRows(self, tableName):
"Get all of the rows from tableName"
# When we upgrade to 2.2 this will be a great candidate for a
# generator/iterator. In the meantime we use self._tableName to keep
# track of what we are doing
if tableName!=self._tableName:
self._tableName=tableName
self._cursor.execute(self.getQueryString(tableName))
return self._cursor.fetchall()
if __name__ == '__main__':
# We are in an interactive session so run our test routines
# Connect to the database
connection={ 'username':'<username>'
,'password':'<password>'
,'database':'<db name>'}
dbHolder = browse(connection)
# Return all of our table names into user_tables
user_tables = dbHolder.getTables()
# Print out the structure of each table and its first row
print "--------------------------------------------------"
for table in user_tables:
print table
print dbHolder.getQueryString(table)
print dbHolder.getRow(table)
print "--------------------------------------------------"
|