
|
#! /usr/bin/env python
"""
usage: %(progname)s [args]
"""
import os, sys, string, time, getopt
from log import *
import odb
import sqlite
import re
# --- these are using for removing nulls from strings
# --- because sqlite can't handle them
def escape_string(str):
def subfn(m):
c = m.group(0)
return "%%%02X" % ord(c)
return re.sub("('|\0|%)",subfn,str)
def unescape_string(str):
def subfn(m):
hexnum = int(m.group(1),16)
return "%c" % hexnum
return re.sub("%(..)",subfn,str)
class Database(odb.Database):
def __init__(self,db, debug=0):
odb.Database.__init__(self, db, debug=debug)
self.SQLError = sqlite.Error
def escape(self,str):
if str is None:
return None
elif type(str) == type(""):
return string.replace(str,"'","''")
elif type(str) == type(1):
return str
else:
raise "unknown column data type: %s" % type(str)
def listTables(self, cursor=None):
if cursor is None: cursor = self.defaultCursor()
cursor.execute("select name from sqlite_master where type='table'")
rows = cursor.fetchall()
tables = []
for row in rows: tables.append(row[0])
return tables
def listIndices(self, cursor=None):
if cursor is None: cursor = self.defaultCursor()
cursor.execute("select name from sqlite_master where type='index'")
rows = cursor.fetchall()
tables = []
for row in rows: tables.append(row[0])
return tables
def listFieldsDict(self, table_name, cursor=None):
if cursor is None: cursor = self.defaultCursor()
sql = "pragma table_info(%s)" % table_name
cursor.execute(sql)
rows = cursor.fetchall()
columns = {}
for row in rows:
colname = row[1]
columns[colname] = row
return columns
def _tableCreateStatement(self, table_name, cursor=None):
if cursor is None: cursor = self.defaultCursor()
sql = "select sql from sqlite_master where type='table' and name='%s'" % table_name
print sql
cursor.execute(sql)
row = cursor.fetchone()
sqlstatement = row[0]
return sqlstatement
def alterTableToMatch(self, table):
tableName = table.getTableName()
tmpTableName = tableName + "_" + str(os.getpid())
invalidAppCols, invalidDBCols = table.checkTable(warnflag=0)
## if invalidAppCols or invalidDBCols:
## return
if not invalidAppCols and not invalidDBCols:
return
oldcols = self.listFieldsDict(tableName)
# tmpcols = oldcols.keys()
tmpcols = []
newcols = table.getAppColumnList()
for colname, coltype, options in newcols:
if oldcols.has_key(colname): tmpcols.append(colname)
tmpcolnames = string.join(tmpcols, ",")
statements = []
sql = "begin transaction"
statements.append(sql)
sql = "create temporary table %s (%s)" % (tmpTableName, tmpcolnames)
statements.append(sql)
sql = "insert into %s select %s from %s" % (tmpTableName, tmpcolnames, tableName)
statements.append(sql)
sql = "drop table %s" % tableName
statements.append(sql)
sql = table._createTableSQL()
statements.append(sql)
sql = "insert into %s(%s) select %s from %s" % (tableName, tmpcolnames, tmpcolnames, tmpTableName)
statements.append(sql)
sql = "drop table %s" % tmpTableName
statements.append(sql)
sql = "commit"
statements.append(sql)
cur = self.defaultCursor()
for statement in statements:
# print statement
cur.execute(statement)
def test():
pass
def usage(progname):
print __doc__ % vars()
def main(argv, stdout, environ):
progname = argv[0]
optlist, args = getopt.getopt(argv[1:], "", ["help", "test", "debug"])
testflag = 0
if len(args) == 0:
usage(progname)
return
for (field, val) in optlist:
if field == "--help":
usage(progname)
return
elif field == "--debug":
debugfull()
elif field == "--test":
testflag = 1
if testflag:
test()
return
if __name__ == "__main__":
main(sys.argv, sys.stdout, os.environ)
|