File: postgreBrowse.py

package info (click to toggle)
pythoncard 0.8.2-2
  • links: PTS
  • area: main
  • in suites: wheezy
  • size: 8,452 kB
  • sloc: python: 56,787; makefile: 56; sh: 22
file content (115 lines) | stat: -rw-r--r-- 4,644 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/python

"""
Module Name: psyopg
Description: Plug in for PythonCard application dbBrowse to provide Psycopg specific functionality

Psycopg version of the mysqlBrowse class
"""
__version__="$Release $"
__date__="Sat Jun  7 13:39:12 BST 2003"
__author__="Jon Dyte <jon@totient.co.uk>"

import psycopg
from psycopg import NUMBER, STRING, INTEGER, FLOAT, DATETIME
from psycopg import BOOLEAN, ROWID, LONGINTEGER

class browse:

    # Connection should be a dictionary with at least three keys, 'username',
    # 'password', 'database' - may need to be normalised for other RDBMS
    def __init__(self, connection):
        "Setup the database connection"
        self._system_tables = []
        # Not providing a db name is guaranteed to ruin our connection
        if not connection['database']:
            raise ValueError
        self._db = psycopg.connect( "user=%s password=%s dbname=%s" % (connection['username'],
                                                                       connection['password'],
                                                                       connection['database']) )
        self._cursor=self._db.cursor()
        # This one is used in getRow
        self._tableName=''

    def getTables(self):
        "Return a list of all of the non-system tables in <database>.\
        CAVEAT: actually gets all the tables not owned by user 'postgres'"
        stmt ="SELECT t.tablename FROM pg_tables t WHERE tableowner <> \'postgres\'"
        self._cursor.execute(stmt)
        # I'm using a list comprehension here instead of a for loop,
        # either will do but I think this is more concise (unlike this comment)
        return [ x[0] for x in self._cursor.fetchall() if x[0] not in self._system_tables ]

    def getColumns(self, tableName):
        "Get the definition of the columns in tableName"
        stmt = 'select * from %s where 1=0 ' % tableName
        try:
            self._cursor.execute(stmt)
        except psycopg.Error:
            return ()
        desc = self._cursor.description
        r = []
        a = r.append
        ## shamelessly borrowed from the ZPsycopgDA for Zope
        for name, type, width, ds, p, scale, null_ok in desc:
            if type == NUMBER:
                if type == INTEGER:
                    type = INTEGER
                elif type == FLOAT:
                    type = FLOAT
                else: type = NUMBER
            elif type == BOOLEAN:
                type = BOOLEAN
            elif type == ROWID:
                type = ROWID
            elif type == DATETIME:
                type = DATETIME
            else:
                type = STRING
            a((name,type.name,0,0,0))
        return r

    def getQueryString(self, tableName):
        "Return a SQL statement which queries all of the columns in tableName"
        tableStructure=self.getColumns(tableName)
        # Construct and return the string
        return 'SELECT %s FROM %s' % (", ".join([column[0] for column in tableStructure]),
                                      tableName)
    
    def getRow(self, tableName):
        "Get a row from tableName"
        # When we upgrade to 2.2 this will be a great candidate for a
        # generator/iterator. In the meantime we use self._tableName to keep
        # track of what we are doing
        if tableName!=self._tableName:
            self._tableName=tableName
            self._cursor.execute(self.getQueryString(tableName))
        return self._cursor.fetchone()

    def getRows(self, tableName):
        "Get all of the rows from tableName"
        # When we upgrade to 2.2 this will be a great candidate for a
        # generator/iterator. In the meantime we use self._tableName to keep
        # track of what we are doing
        if tableName!=self._tableName:
            self._tableName=tableName
        self._cursor.execute(self.getQueryString(tableName))
        return self._cursor.fetchall()

if __name__ == '__main__':
    # We are in an interactive session so run our test routines
    # Connect to the database
    connection={ 'username':'<username>'
                ,'password':'<password>'
                ,'database':'<db name>'}
    dbHolder = browse(connection)
    # Return all of our table names into user_tables
    user_tables = dbHolder.getTables()

    # Print out the structure of each table and its first row
    print "--------------------------------------------------"
    for table in user_tables:
        print table
        print dbHolder.getQueryString(table)
        print dbHolder.getRow(table)
        print "--------------------------------------------------"