1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""dbtable is a module that simplifies accessing database tables"""
# Copyright 2002, 2003 St James Software
#
# This file is part of jToolkit.
#
# jToolkit is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# jToolkit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with jToolkit; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from jToolkit import cidict
class DBTable(object):
"""DBTable is an abstraction of a table and its related data"""
def __init__(self, db, tablename, columnlist, rowidcols, orderbycols):
"""initialises the DBTable
columnlist is a list of (columnname, columntype) tuples
rowidcols and orderbycols can either be a single column name string or a list of them"""
self.db = db
self.tablename = tablename
self.columnlist = [column[0] for column in columnlist]
self.columntypes = dict([(column[0], column[1]) for column in columnlist])
self.rowidcols = rowidcols
if isinstance(self.rowidcols, basestring): self.rowidcols = [self.rowidcols]
self.orderbycols = orderbycols
if isinstance(self.orderbycols, basestring): self.orderbycols = [self.orderbycols]
def getrowid(self, argdict):
"""retrieves the rowid from the arguments"""
rowid = [argdict.get(rowidcol, None) for rowidcol in self.rowidcols]
if len(self.rowidcols) == 1:
return rowid[0]
else:
if reduce(int.__and__, [value is None for value in rowid]):
return None
else:
return tuple(rowid)
def rowidclause(self, rowid):
"""returns a where clause that matches the rowid"""
if len(self.rowidcols) == 1:
return self.db.equalsphrase(self.rowidcols[0], rowid)
else:
return " and ".join([self.db.equalsphrase(self.rowidcols[n], rowid[n]) for n in range(len(self.rowidcols))])
def rowidparamstring(self, rowid):
"""returns a param string (for a url) that matches the rowid"""
if len(self.rowidcols) == 1:
if isinstance(rowid, unicode): rowid = rowid.encode(self.db.encoding)
return "%s=%s" % (self.rowidcols[0], rowid)
else:
newrowid = []
for n in range(len(rowid)):
if isinstance(rowid[n], unicode): newrowid.append(rowid[n].encode(self.db.encoding))
else: newrowid.append(rowid[n])
return "&".join(["%s=%s" % (self.rowidcols[n], rowid[n]) for n in range(len(self.rowidcols))])
def orderbyclause(self):
"""returns the order by clause"""
if len(self.orderbycols) > 0:
return " order by " + ",".join(self.orderbycols)
else:
return ""
def getsqlcolumns(self):
"""returns the list of columns joined with commas (for sql statements)"""
return ", ".join(self.columnlist)
def getrecord(self, rowid):
"""retrieves the record corresponding to the given rowid"""
sql = "select %s from %s where %s" % (self.getsqlcolumns(), self.tablename, self.rowidclause(rowid))
record = self.db.singlerowdict(sql, self.db.lower, self.db.nonetoblank)
for key, value in record.iteritems():
if value is None: record[key] = ''
return record
def rowidexists(self, rowid):
"""returns where a record exists with the given rowid"""
sql = "select count(*) from %s where %s" % (self.tablename, self.rowidclause(rowid))
count = int(self.db.singlevalue(sql))
return count > 0
def filtermatchessome(self, filter):
"""returns whether any rows exist for the given filter"""
sql = "select count(*) from %s %s" % (self.tablename, self.getfilterclause(filter))
count = int(self.db.singlevalue(sql))
return count > 0
def getdefaultrecord(self):
"""retrieves a default record for new categories"""
# if sensible values are needed here, this method can be overridden
return dict([(column, '') for column in self.columnlist])
def getfilterclause(self, filter):
"""returns a where clause for the given filter"""
if filter is None:
return " "
elif isinstance(filter, basestring):
return filter
else:
return self.db.catclauses([filter.getfilterclause(self.db)])
def getsomerecord(self, filter = None):
"""retrieves a single rowid & record when you don't know which one you want..."""
sql = "select %s from %s %s %s" % (self.getsqlcolumns(), self.tablename, self.getfilterclause(filter), self.orderbyclause())
record = self.db.singlerowdict(sql, self.db.lower, self.db.nonetoblank)
rowid = self.getrowid(record)
return rowid, record
def gettablerows(self, filter = None):
"""retrieves all the rows in the table"""
sql = "select %s from %s %s %s" % (self.getsqlcolumns(), self.tablename, self.getfilterclause(filter), self.orderbyclause())
return self.db.allrowdicts(sql, self.db.lower, self.db.nonetoblank)
def countrows(self, filter = None):
"""counts the number of rows in the table matching the filter"""
sql = "select count(*) from %s %s" % (self.tablename, self.getfilterclause(filter))
return int(self.db.singlevalue(sql))
def getsometablerows(self, minRow = None, maxRow = None, filter = None):
"""retrieves all the rows in the table"""
sql = "select %s from %s %s %s" % (self.getsqlcolumns(), self.tablename, self.getfilterclause(filter), self.orderbyclause())
return self.db.somerowdicts(sql, self.db.lower, self.db.nonetoblank, minrow=minRow, maxrow=maxRow)
def addrow(self, argdict):
"""add the row to the table"""
valuesdict = cidict.filterdict(cidict.cidict(argdict), self.columnlist)
self.db.insert(self.tablename, valuesdict)
def modifyrow(self, argdict):
"""modify the row in the table"""
rowid = self.getrowid(argdict)
if rowid is None:
raise TypeError, "rowid is None in modifylog"
valuesdict = self.getrecord(rowid)
newvaluesdict = cidict.filterdict(argdict, valuesdict)
updatedict = cidict.subtractdicts(newvaluesdict, valuesdict)
self.db.update(self.tablename, self.rowidcols, rowid, updatedict)
def deleterow(self, argdict):
"""delete the row in the table"""
rowid = self.getrowid(argdict)
if rowid is None:
raise TypeError, "rowid is None in deletelog"
self.db.delete(self.tablename, self.rowidcols, rowid)
def createtable(self):
"""creates the table represented by self, using tablename and columnlist and returning success"""
sql = "create table %s (" % self.tablename
sql += ", ".join([column+" "+self.db.dbtypename(self.columntypes[column]) for column in self.columnlist])
sql += ")"
try:
self.db.execute(sql)
return 1
except Exception:
return 0
def droptable(self):
"""drops the table represented by self, using tablename and returning success"""
sql = "drop table %s" % self.tablename
try:
self.db.execute(sql)
return 1
except Exception:
return 0
|