#!/usr/local/bin/python
#ident "@(#) $Id: PgSQLTestCases.py,v 1.25 2002/12/01 04:59:25 ballie01 Exp $"
#-----------------------------------------------------------------------+
# Name:		PgSQLTestCases.py					|
#									|
# Synopsys:								|
#									|
# Description:	PgSQLTestCases contains the functional test cases for	|
#		the PgSQL Python -> PostgreSQL DB-API 2.0 compliant in-	|
#		terface module.						|
#=======================================================================|
# Copyright 2000 by Billy G. Allie.					|
# All rights reserved.							|
#									|
# Permission to use, copy, modify, and distribute this software and its	|
# documentation for any purpose and without fee is hereby granted, pro-	|
# vided that the above copyright notice appear in all copies and that	|
# both that copyright notice and this permission notice appear in sup-	|
# porting documentation, and that the copyright owner's name not be	|
# used in advertising or publicity pertaining to distribution of the	|
# software without specific, written prior permission.			|
#									|
# THE AUTHOR(S) DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,	|
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS.  IN	|
# NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY SPECIAL, INDIRECT OR	|
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS	|
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE	|
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE	|
# USE OR PERFORMANCE OF THIS SOFTWARE.					|
#=======================================================================|
# Revision History:							|
#									|
# Date      Ini Description						|
# --------- --- ------------------------------------------------------- |
# 30NOV2002 bga Updated tests for PostgreSQL 7.3.			|
# 10NOV2002 bga Added an additional PgNumeric class check.		|
# 27OCT2002 gh  Don't check for the obsolete displaysize field in       |
#               cursor.description. Also don't check the backend enco-  |
#               ding.                                                   |
# 08SEP2002 gh  Added tests for PgResultSet.				|
# 11AUG2002 bga	Added additional tests for the PgNumeric class.		|
# 03AUG2002 gh  Added test for bug #589370 (wether it is possible to	|
#		use OID in a query that doesn't return any rows).	|
# 29JUL2002 gh  Added a few tests for PgNumeric (which currently fail).	|
#		Simplified the construction of testcases by a lot and	|
#		fixed a bug where there were two methods named		|
#		CheckConnectionObject (thus hiding the first one).	|
# 16APR2002 gh  Updated the PostgreSQL version specific tests to cope	|
#		with PostgreSQL 7.2.x.					|
# 09SEP2001 bga Modified tests to reflect changes to PgVersion object.	|
#		In particulare, a PgVersion object no longer has a	|
#		__dict__ attribute, but now acts as a mapping object.	|
#		The coerce method of the PgVersion object will always	|
#		succeed, but will return a 'special' PgVersion Object	|
#		that contains any error information if the coersion	|
#		failed.							|
# 08SEP2001 gh  Added more tests for PgVersion object.			|	
# 03SEP2001 gh  Added more checks for PgMoney methods.			|
# 29JUL2001 bga Added test to check for correct handling of non-print-	|
#		able characters in the CHAR types.			|
# 12JUN2001 bga Added more tests relating to closed Connection and	|
#		Cursor objects.  Also added test for corrected circular	|
#		references with weak references (Python 2.1+ only).	|
# 10JUN2001 bga Modified tests to reflect changes to PgSQL.py.		|
# 09JUN2001 bga Fixed test cases to reflect corrected implementation of	|
#		the PgInt2 and PgInt8 pow() function.			|
# 04JUN2001 bga	Fixed a reversed assertion in the CheckPgVer test.	|
# 03JUN2001 bga	Modified test to account for the new C implementations	|
#		of PgInt2 and PgInt8.					|
# 27MAY2001 bga Expanded the number of tests.  Primarily, added tests	|
#		to check out the emulated numerical types, PgInt8,	|
#		PgInt2, and PgMoney.					|
# 21MAY2001 bga Changed assert's to use the unittest assert_() method.	|
#	    ---	Discovered and fixed some really nasty bugs.		|
# 18APR2001 bga	Changed version check so that 7.1<anything> is treated	|
#		as version 7.1.						|
# 09MAR2001 bga Added version check for PostgreSQL 7.1beta.		|
#		Added tests to check the 'pyformat' paramstyle.		|
# 13DEC2000 bga Added version check for PostgreSQL 7.0.3.		|
# 09OCT2000 bga Fixed another section for v6.5.x compatability.		|
# 08OCT2000 bga Added code to check for the PostgreSQL version and mod-	|
#		ify the test(s) accordingly.				|
# 01OCT2000 bga Initial release by Billy G. Allie.			|
#-----------------------------------------------------------------------+
import sys
import unittest
import types
import string
from pyPgSQL import PgSQL

version = sys.version_info
version = ((((version[0] * 100) + version[1]) * 100) + version[2])

class DBAPICompliance(unittest.TestCase):
    def CheckAPILevel(self):
        self.assertEqual(PgSQL.apilevel, '2.0',
			 'apilevel is %s, should be 2.0' % PgSQL.apilevel)

    def CheckThreadSafety(self):
        self.assertEqual(PgSQL.threadsafety, 1,
			 'threadsafety is %d, should be 1' % PgSQL.threadsafety)

    def CheckParamStyle(self):
        self.assertEqual(PgSQL.paramstyle, 'pyformat',
			 'paramstyle is "%s", should be "pyformat"' %
			 PgSQL.paramstyle)

    def CheckWarning(self):
        self.assert_(issubclass(PgSQL.Warning, StandardError),
                     'Warning is not a subclass of StandardError')

    def CheckError(self):
        self.failUnless(issubclass(PgSQL.Error, StandardError),
                        'Error is not a subclass of StandardError')

    def CheckInterfaceError(self):
        self.failUnless(issubclass(PgSQL.InterfaceError, PgSQL.Error),
                        'InterfaceError is not a subclass of Error')

    def CheckDatabaseError(self):
        self.failUnless(issubclass(PgSQL.DatabaseError, PgSQL.Error),
                        'DatabaseError is not a subclass of Error')

    def CheckDataError(self):
        self.failUnless(issubclass(PgSQL.DataError, PgSQL.DatabaseError),
                        'DataError is not a subclass of DatabaseError')

    def CheckOperationalError(self):
        self.failUnless(issubclass(PgSQL.OperationalError, PgSQL.DatabaseError),
                        'OperationalError is not a subclass of DatabaseError')

    def CheckIntegrityError(self):
        self.failUnless(issubclass(PgSQL.IntegrityError, PgSQL.DatabaseError),
                        'IntegrityError is not a subclass of DatabaseError')

    def CheckInternalError(self):
        self.failUnless(issubclass(PgSQL.InternalError, PgSQL.DatabaseError),
                        'InternalError is not a subclass of DatabaseError')

    def CheckProgrammingError(self):
        self.failUnless(issubclass(PgSQL.ProgrammingError, PgSQL.DatabaseError),
                        'ProgrammingError is not a subclass of DatabaseError')

    def CheckNotSupportedError(self):
        self.failUnless(issubclass(PgSQL.NotSupportedError,
				   PgSQL.DatabaseError),
                        'NotSupportedError is not a subclass of DatabaseError')

class PgSQLTestModuleInterface(unittest.TestCase):
    # fetchReturnsList is a variable that controls what the fetch*() methods
    # returns.  If zero, fetch*() returns a list (per the DB-API 2.0 specs).
    # If non-zero, ti returns a PgResultSet (PgSQL extension to the specs).
    # The default should be 0.
    def CheckFetchReturnsList(self):
        self.assertEqual(PgSQL.fetchReturnsList, 0,
			 'fetchReturnsList is %d, should be 0' %
			 PgSQL.fetchReturnsList)

    def CheckBooleanConstructors(self):
        try:
            iTrue = PgSQL.PgBooleanFromInteger(1)
            iFalse = PgSQL.PgBooleanFromInteger(0)
            sTrue = PgSQL.PgBooleanFromString('t')
            sFalse = PgSQL.PgBooleanFromString('f')
	    sFalse2 = PgSQL.PgBooleanFromString('  faLSe    ')
        except StandardError, msg:
            self.assert_(0, msg)

        self.failUnless(iTrue,
                        'PgBooleanFromInteger failed to create a TRUE value.')
        self.failIf(iFalse,
                    'PgBooleanFromInteger failed to create a FALSE value (1).')
        self.failIf(iFalse,
                    'PgBooleanFromInteger failed to create a FALSE value (2).')
        self.failUnless(sTrue,
                        'PgBooleanFromString failed to create a TRUE value.')
        self.failIf(sFalse,
                    'PgBooleanFromString failed to create a FALSE value.')

    def CheckPgInt8(self):
        a = PgSQL.PgInt8(12345678)
        self.failUnless(a == 12345678, 'PgInt8 comparison to Int failed.')
        self.failUnless(a == 12345678.0,
                        'PgInt8 comparison to Float failed.')
        self.failUnless(a == 12345678L, 'PgInt8 comparison to Long failed.')
        self.failUnless(a != None, 'PgInt8 comparison to None failed')
        b = float(a)
        self.failUnless(type(b) == types.FloatType,
                        'PgInt8 conversion to Float failed.')
        self.failUnless(b == 12345678.0,
                        'PgInt8 conversion to Float failed.')
        b = int(a)
        self.failUnless(type(b) == types.IntType,
                        'PgInt8 conversion to Int failed.')
        self.failUnless(b == 12345678,
                        'PgInt8 conversion to Int failed.')
        b = long(a)
        self.failUnless(type(b) == types.LongType,
                        'PgInt8 conversion to Long failed.')
        self.failUnless(b == 12345678L,
                        'PgInt8 conversion to Long failed.')
        b = complex(a)
        self.failUnless(type(b) == types.ComplexType,
                        'PgInt8 conversion to Complex failed.')
        self.failUnless(b == (12345678+0j),
                        'PgInt8 conversion to Complex failed.')
        b = (((a ** 2) / a) + 2 - 10)
        self.failUnless(b == 12345670,
                        'PgInt8 match failed to produce correct result.')
        b = PgSQL.PgInt8(3037000649L)
        self.failUnlessRaises(OverflowError, pow, b, 2)

    def CheckPgInt2(self):
        a = PgSQL.PgInt2(181)
        self.failUnless(a == 181, 'PgInt2 comparison to Int failed.')
        self.failUnless(a == 181.0, 'PgInt2 comparison to Float failed.')
        self.failUnless(a == 181L, 'PgInt2 comparison to Long failed.')
        self.failUnless(a != None, 'PgInt2 comparison to None failed')
        b = float(a)
        self.failUnless(type(b) == types.FloatType,
                        'PgInt2 conversion to Float failed.')
        self.failUnless(b == 181.0, 'PgInt2 conversion to Float failed.')
        b = int(a)
        self.failUnless(type(b) == types.IntType,
                        'PgInt2 conversion to Int failed.')
        self.failUnless(b == 181, 'PgInt8 conversion to Int failed.')
        b = long(a)
        self.failUnless(type(b) == types.LongType,
                        'PgInt2 conversion to Long failed.')
        self.failUnless(b == 181L, 'PgInt2 conversion to Long failed.')
        b = complex(a)
        self.failUnless(type(b) == types.ComplexType,
                        'PgInt2 conversion to Complex failed.')
        self.failUnless(b == (181+0j), 'PgInt2 conversion to Complex failed.')
        b = (((a ** 2) / a) + 2 - 10)
        self.failUnless(b == 173,
                        'PgInt2 match failed to produce correct result.')
        b = PgSQL.PgInt2(182)
        self.failUnlessRaises(OverflowError, pow, b, PgSQL.PgInt2(2))

    def CheckPgMoney(self):
        a = PgSQL.PgMoney(4634.00)
        self.failUnless(a == 4634, 'PgMoney comparison to Int failed.')
        self.failUnless(a == 4634.0, 'PgMoney comparison to Float failed.')
        self.failUnless(a == 4634L, 'PgMoney comparison to Long failed.')
        self.failUnless(a != None, 'PgMoney comparison to None failed')
        a = PgSQL.PgMoney(4634.04)
        b = float(a)
        self.failUnless(type(b) == types.FloatType,
                        'PgMoney conversion to Float failed.')
        self.failUnless(b == 4634.04, 'PgMoney conversion to Float failed.')
        b = int(a)
        self.failUnless(type(b) == types.IntType,
                        'PgMoney conversion to Int failed.')
        self.failUnless(b == 4634, 'PgMoney conversion to Int failed.')
        b = long(a)
        self.failUnless(type(b) == types.LongType,
                        'PgMoney conversion to Long failed.')
        self.failUnless(b == 4634, 'PgMoney conversion to Long failed.')
        b = complex(a)
        self.failUnless(type(b) == types.ComplexType,
                        'PgMoney conversion to Complex failed.')
        self.failUnless(b == (4634.04+0j),
                        'PgMoney conversion to Complex failed.')
        b = (((a ** 2) / a) + 2.0 - 10)
        self.failUnless(isinstance(b, PgSQL.PgMoney),
                        'PgMoney math failed to return PgMoney.')
        self.failUnless(b == 4626.04,
                        'PgMoney match failed to produce correct result.')
        b = PgSQL.PgMoney(4634.1)
        self.failUnlessRaises(OverflowError, pow, b, 2)

	b = PgSQL.PgMoney(4634.1)
	self.failUnless(b==+b,
			'PgMoney __pos__ operation failed.')

	b = PgSQL.PgMoney(4634.1)
	self.failUnless(b * (-1) == -b,
			'PgMoney __neg__ operation failed.')

    def CheckPgNumeric(self):
	try:
            a = PgSQL.PgNumeric('1234.00')
            self.failUnless(a.getPrecision() == 6,
                            'PgNumeric from string failed')
            self.failUnless(a.getScale() == 2, 'PgNumeric from string failed')
            self.failUnless(a == 1234, 'PgNumeric comparison to string failed.')
            self.failUnless(a == 1234.0,
			    'PgNumeric comparison to Float failed.')
            self.failUnless(a == 1234L, 'PgNumeric comparison to Long failed.')
            self.failUnless(a != None, 'PgNumeric comparison to None failed')
	except StandardError, msg:
	    self.fail(msg)

	try:
            a = PgSQL.PgNumeric(2345.00)
            self.failUnless(a.getPrecision() == 5,
                            'PgNumeric from float failed')
            self.failUnless(a.getScale() == 1, 'PgNumeric from float failed')
            self.failUnless(a == 2345, 'PgNumeric from float failed.')
	except StandardError, msg:
	    self.fail(msg)

	try:
            a = PgSQL.PgNumeric(345600, 6, 2)
            self.failUnless(a == 3456, 'PgNumeric from int failed.')
            self.failUnless(a.getPrecision() == 6, 'PgNumeric from int failed')
            self.failUnless(a.getScale() == 2, 'PgNumeric from int failed')
	except StandardError, msg:
	    self.fail(msg)

	try:
            a = PgSQL.PgNumeric(456700L, 6, 2)
            self.failUnless(a == 4567, 'PgNumeric from long failed.')
            self.failUnless(a.getPrecision() == 6, 'PgNumeric from long failed')
            self.failUnless(a.getScale() == 2, 'PgNumeric from long failed')
	except StandardError, msg:
	    self.fail(msg)

        try: # Test the casting of PgNumeric -> PgNumeric to change prec/scale
            a = PgSQL.PgNumeric('123456.78')
            b = PgSQL.PgNumeric(a, 10, 4)
            c = PgSQL.PgNumeric(a, 7, 1)
            self.failUnless(b.getPrecision() == 10, 'PgNumeric cast failed')
            self.failUnless(b.getScale() == 4, 'PgNumeric cast failed')
            self.failUnless(c.getPrecision() == 7, 'PgNumeric cast failed')
            self.failUnless(c.getScale() == 1, 'PgNumeric cast failed')
            self.failUnless(a == b, 'PgNumeric cast failed')
            self.failUnless(a != c, 'PgNumeric cast failed')
            self.failUnless(str(c) == '123456.8', 'PgNumeric cast failed.')
	except StandardError, msg:
	    self.fail(msg)

        try: # Test the basic math functions.
            a = PgSQL.PgNumeric('1234.567')
            b = PgSQL.PgNumeric('12.3456789')
            c = a + b
            self.failUnless(c.getPrecision() == 11, 'PgNumeric addition failed')
            self.failUnless(c.getScale() == 7, 'PgNumeric addition failed')
            self.failUnless(str(c) == '1246.9126789',
                            'PgNumeric addition failed.')
            c = a - b
            self.failUnless(c.getPrecision() == 11,
                            'PgNumeric subtraction failed')
            self.failUnless(c.getScale() == 7, 'PgNumeric subtraction failed')
            self.failUnless(str(c) == '1222.2213211',
                            'PgNumeric subtraction failed.')
            c = a * b
            self.failUnless(c.getPrecision() == 16,
                            'PgNumeric multiplication failed')
            self.failUnless(c.getScale() == 10,
                            'PgNumeric mulitplication failed')
            self.failUnless(str(c) == '15241.5677625363',
                            'PgNumeric multiplication failed.')
            c = a / b
            self.failUnless(c.getPrecision() == 7, 'PgNumeric division failed')
            self.failUnless(c.getScale() == 3, 'PgNumeric divisioncast failed')
            self.failUnless(str(c) == '100.000',
                            'PgNumeric divisioncast failed.')
            c = PgSQL.PgNumeric(a)
            c += b
            self.failUnless(c.getPrecision() == 11, 'PgNumeric addition failed')
            self.failUnless(c.getScale() == 7, 'PgNumeric addition failed')
            self.failUnless(str(c) == '1246.9126789',
                            'PgNumeric addition failed.')
            c = PgSQL.PgNumeric(a)
            c -= b
            self.failUnless(c.getPrecision() == 11,
                            'PgNumeric subtraction failed')
            self.failUnless(c.getScale() == 7, 'PgNumeric subtraction failed')
            self.failUnless(str(c) == '1222.2213211',
                            'PgNumeric subtraction failed.')
            c = PgSQL.PgNumeric(a)
            c *= b
            self.failUnless(c.getPrecision() == 16,
                            'PgNumeric multiplication failed')
            self.failUnless(c.getScale() == 10,
                            'PgNumeric mulitplication failed')
            self.failUnless(str(c) == '15241.5677625363',
                            'PgNumeric multiplication failed.')
            c = PgSQL.PgNumeric(a)
            c /= b
            self.failUnless(c.getPrecision() == 7, 'PgNumeric division failed')
            self.failUnless(c.getScale() == 3, 'PgNumeric divisioncast failed')
            self.failUnless(str(c) == '100.000',
                            'PgNumeric divisioncast failed.')

	    # Check for correct precision after a carry in the high-order digit
	    a = PgSQL.PgNumeric('999999.99')
            c = a + b
            self.failUnless(c.getPrecision() == 14, 'PgNumeric addition failed')
            self.failUnless(c.getScale() == 7, 'PgNumeric addition failed')
            self.failUnless(str(c) == '1000012.3356789',
                            'PgNumeric addition failed.')

	except StandardError, msg:
	    self.fail(msg)

class PgSQLTestCases(unittest.TestCase):
    def setUp(self):
        self.cnx = PgSQL.connect(database='template1')
        self.cur = self.cnx.cursor()
	self.vstr = "%(major)d.%(minor)d" % self.cnx.version
        if self.cnx.version < '7.3':
            self.expd = [['datname',PgSQL.PG_NAME,32,32,None,None,None,0],
                         ['datdba',PgSQL.PG_INT4,4,4,None,None,None,0],
                         ['encoding',PgSQL.PG_INT4,4,4,None,None,None,0],
                         ['datpath',PgSQL.PG_TEXT,-5,-1,None,None,None,0]]
        else:
            self.expd = [['datname',PgSQL.PG_NAME,64,64,None,None,None,0],
                         ['datdba',PgSQL.PG_INT4,4,4,None,None,None,0],
                         ['encoding',PgSQL.PG_INT4,4,4,None,None,None,0],
                         ['datacl',PgSQL.PG_ACLITEM,-5,-1,None,None,None,1]]
        
    def tearDown(self):
        try:
            self.cnx.close()
        except AttributeError:
            pass
        except PgSQL.InterfaceError:
            pass
        
    def CheckConnectionObject(self):
        self.assert_(isinstance(self.cnx, PgSQL.Connection),
                     'PgSQL.connect did not return a Connection object')
        self.assert_(self.cnx.autocommit == 0,
                     'autocommit default is not zero (0)')

    def CheckConnectionClose(self):
        self.assert_(hasattr(self.cnx, 'close') and 
                     type(self.cnx.close) == types.MethodType,
                     'close is not a method of Connection')
        self.cnx.close()
        self.failUnlessRaises(PgSQL.InterfaceError, self.cnx.close)

    def CheckConnectionCommit(self):
        self.assert_(hasattr(self.cnx, "commit") and
                     type(self.cnx.commit) == types.MethodType,
                     'commit is not a method of Connection')
        self.cnx.close()
        self.failUnlessRaises(PgSQL.InterfaceError, self.cnx.commit)

    def CheckConnectionRollback(self):
        self.assert_(hasattr(self.cnx, "rollback") and
                     type(self.cnx.rollback) == types.MethodType,
                     'rollback is not a method of Connection')
        self.cnx.close()
        self.failUnlessRaises(PgSQL.InterfaceError, self.cnx.rollback)

    def CheckConnectionCursor(self):
        self.assert_(hasattr(self.cnx, "cursor") and
                     type(self.cnx.cursor) == types.MethodType,
                     'cursor is not a method of Connection')
        self.cnx.close()
        self.failUnlessRaises(PgSQL.InterfaceError, self.cnx.cursor)

    def CheckConnectionBinary(self):
	# Binary is a method of Connection and not a function of PgSQL
	# because of the requirements of the PostgreSQL Large Objects.
	self.assert_(hasattr(self.cnx, "binary") and
                     type(self.cnx.binary) == types.MethodType,
                     'binary is not a method of Connection')
        self.cnx.close()
        self.failUnlessRaises(PgSQL.InterfaceError, self.cnx.binary, '')

    def CheckCloseConnection(self):
        self.cnx.close()
        self.failUnlessRaises(PgSQL.InterfaceError, self.cur.close)
        
    if version >= 20100:
        # The follwoing tests are only for Python 2.1 or greater.
        def CheckWeakReference1(self):
            del self.cur
            self.assertEquals(len(self.cnx.cursors.data.keys()), 0,
                         'deleting cursor did not remove for connection cursor list')

        def CheckWeakReference2(self):
            del self.cnx
            self.failUnlessRaises(PgSQL.InterfaceError, self.cur.close)
    
    def CheckCursorObject(self):
        self.assert_(isinstance(self.cur, PgSQL.Cursor),
                     'cnx.cursor() did not return a Cursor object')

    def CheckCursorArraysize(self):
        self.assert_(self.cur.arraysize == 1,
                     'cur.arraysize is %d, it should be 1' %
                     self.cur.arraysize)

    def CheckCursorDescription(self):
        self.assert_(self.cur.description == None,
                     "cur.description should be None at this point, it isn't.")

    def CheckCursorRowcount(self):
        self.assert_(self.cur.rowcount == -1,
                     'cur.rowcount is %d, should be -1' % self.cur.rowcount)

    def CheckCursorCallproc(self):
        self.assert_(hasattr(self.cur, "callproc") and
                     type(self.cur.callproc) == types.MethodType,
                     'callproc is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError,
                              self.cur.callproc, 'SELECT version()')

    def CheckCursorClose(self):
        self.assert_(hasattr(self.cur, "close") and
                     type(self.cur.close) == types.MethodType,
                     'close is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError, self.cur.close)

    def CheckCursorExecute(self):
        self.assert_(hasattr(self.cur, "execute") and
                     type(self.cur.execute) == types.MethodType,
                     'execute is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError,
                              self.cur.execute, 'SELECT version()')

    def CheckCursorExecutemany(self):
        self.assert_(hasattr(self.cur, "executemany") and
                     type(self.cur.executemany) == types.MethodType,
                     'executemany is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError,
                              self.cur.executemany, 'SELECT version()', [1,2])

    def CheckCursorFetchone(self):
        self.assert_(hasattr(self.cur, "fetchone") and
                     type(self.cur.fetchone) == types.MethodType,
                     'fetchone is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError, self.cur.fetchone)

    def CheckCursorFetchMany(self):
        self.failUnless(hasattr(self.cur, "fetchmany") and
                        type(self.cur.fetchmany) == types.MethodType,
                        'fetchmany is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError,
                              self.cur.fetchmany, 10)

    def CheckCursorFetchall(self):
        self.failUnless(hasattr(self.cur, "fetchall") and
                        type(self.cur.fetchall) == types.MethodType,
                        'fetchall is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError,
                              self.cur.fetchall)

    def CheckCursorSetoutputsize(self):
        self.failUnless(hasattr(self.cur, "setoutputsize") and
                        type(self.cur.setoutputsize) == types.MethodType,
                        'setoutputsize is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError,
                              self.cur.setoutputsize, 1024)

    def CheckCursorSetinputsizes(self):
        self.failUnless(hasattr(self.cur, "setinputsizes") and
                        type(self.cur.setinputsizes) == types.MethodType,
                        'setinputsizes is not a method of the Cursor object')
        self.cur.close()
        self.failUnlessRaises(PgSQL.InterfaceError,
                              self.cur.setinputsizes, [1, 2, 3])

    def CheckCursorNextset(self):
        # The nextset method is operational.
        if hasattr(self.cur, "nextset"):
            self.assertEqual(type(self.cur.nextset), types.MethodType,
			     'nextset is not a method of the Cursor object')
            self.cur.close()
            self.failUnlessRaises(PgSQL.InterfaceError,
                                  self.cur.callproc, 'SELECT version()')

    def CheckPgVer(self):
        try:
            self.cur.callproc("version")
            v = string.split(self.cur.fetchone()[0])[1]
            self.cur.close()
	    vstr = "%(major)d.%(minor)d" % self.cnx.version
	    if self.cnx.version.level != 0:
		vstr = vstr + ".%(level)d" % self.cnx.version
	    self.failUnlessEqual(v, vstr,
			 'SELECT version() says %s, cnx.version says %s' %
				 (v, vstr))
        except StandardError, msg:
            self.fail(msg)
	
	try:
	    version = self.cnx.version
	    a, b = coerce(version, '7.3.2')
	    self.failUnless(type(a) == type(b) == type(version),
			'Coercion from string to PgVersion failed.')
	    a, b = coerce(version, 80205)
	    self.failUnless(type(a) == type(b) == type(version),
			'Coercion from int to PgVersion failed.')
	    a, b = coerce(version, 80206L)
	    self.failUnless(type(a) == type(b) == type(version),
			'Coercion from long to PgVersion failed.')
	    a, b = coerce(version, 80207.0)
	    self.failUnless(type(a) == type(b) == type(version),
			'Coercion from float to PgVersion failed.')
	    try:
		a, b = coerce(version, '7. 1.3')
		self.fail(
	    'Coercion from garbage string to PgVersion *should* have failed.'
		)
	    except ValueError, msg:
		pass
	except StandardError, msg:
	    self.fail(msg)
    
    def CheckExecuteWithSingleton(self):
        """Test execute() with a singleton string as the parameter."""

	if self.vstr.startswith("7.4"):
	    flen = 11
	elif self.vstr.startswith("7.3"):
	    flen = 11
	elif self.vstr.startswith("7.2"):
	    flen = 9
	elif self.vstr.startswith("7.1"):
	    flen = 7
	else:
	    flen = 4

        try:
            self.cur.execute("""
            select * from pg_database
            where datname = %s""", 'template1')
        except StandardError, msg:
            self.fail(msg)

        self.assertEqual(type(self.cur.description), types.ListType,
			 "cur.description should be a list, but isn't.")

        clen = len(self.cur.description)
        self.assertEqual(clen, flen,
			 "Length of cur.description is %d, it should be %d." %
			 (clen, flen))

        # The following test checks the length of one of the cur.description's
        # sequences.  It should be 7 per the DB-API 2.0 specification.  In the
        # PgSQL extension of the cur.description, it is 8.

        self.assertEqual(len(self.cur.description[0]), 8,
			 "Length of cur.description[0] is %d, it should be 8." %
			 len(self.cur.description[0]))

        self.failUnless(self.cur.description[0][0] == self.expd[0][0] and
                        self.cur.description[0][1] == self.expd[0][1] and
                        self.cur.description[0][3] == self.expd[0][3] and
                        self.cur.description[0][4] == self.expd[0][4] and
                        self.cur.description[0][5] == self.expd[0][5] and
                        self.cur.description[0][6] == self.expd[0][6] and
                        self.cur.description[0][7] == self.expd[0][7],
                        "cur.description[0] does not match the query.")
	self.cur.close()

    def CheckExecuteWithTuple(self):
        """Test execute() with a tuple as the parameter."""
        try:
            self.cur.execute("""
            select * from pg_database
            where datname = %s""", ('template1',))
        except StandardError, msg:
            self.fail(msg)

        self.failUnless(self.cur.description[0][0] == self.expd[0][0] and
                        self.cur.description[0][1] == self.expd[0][1] and
                        self.cur.description[0][3] == self.expd[0][3] and
                        self.cur.description[0][4] == self.expd[0][4] and
                        self.cur.description[0][5] == self.expd[0][5] and
                        self.cur.description[0][6] == self.expd[0][6] and
                        self.cur.description[0][7] == self.expd[0][7],
                        "cur.description[0] does not match the query.")
	self.cur.close()

    def CheckExecuteWithDictionary(self):
        """Test execute() with a dictionary as the parameter."""
        try:
            self.cur.execute("""
            select * from pg_database
            where datname = %(dbname)s""", {'dbname': 'template1'})
        except StandardError, msg:
            self.fail(msg)

        self.failUnless(self.cur.description[0][0] == self.expd[0][0] and
                        self.cur.description[0][1] == self.expd[0][1] and
                        self.cur.description[0][3] == self.expd[0][3] and
                        self.cur.description[0][4] == self.expd[0][4] and
                        self.cur.description[0][5] == self.expd[0][5] and
                        self.cur.description[0][6] == self.expd[0][6] and
                        self.cur.description[0][7] == self.expd[0][7],
                        "cur.description[0] does not match the query.")

        self.failUnless(self.cur.description[1][0] == self.expd[1][0] and
                        self.cur.description[1][1] == self.expd[1][1] and
                        self.cur.description[1][3] == self.expd[1][3] and
                        self.cur.description[1][4] == self.expd[1][4] and
                        self.cur.description[1][5] == self.expd[1][5] and
                        self.cur.description[1][6] == self.expd[1][6] and
                        self.cur.description[1][7] == self.expd[1][7],
                        "cur.description[1] does not match the query.")

        self.failUnless(self.cur.description[2][0] == self.expd[2][0] and
                        self.cur.description[2][1] == self.expd[2][1] and
                        self.cur.description[2][3] == self.expd[2][3] and
                        self.cur.description[2][4] == self.expd[2][4] and
                        self.cur.description[2][5] == self.expd[2][5] and
                        self.cur.description[2][6] == self.expd[2][6] and
                        self.cur.description[2][7] == self.expd[2][7],
                        "cur.description[2] does not match the query.")

        clen = len(self.cur.description) - 1
        self.failUnless(self.cur.description[clen][0] == self.expd[3][0] and
                        self.cur.description[clen][1] == self.expd[3][1] and
                        self.cur.description[clen][3] == self.expd[3][3] and
                        self.cur.description[clen][4] == self.expd[3][4] and
                        self.cur.description[clen][5] == self.expd[3][5] and
                        self.cur.description[clen][6] == self.expd[3][6] and
                        self.cur.description[clen][7] == self.expd[3][7],
                        "cur.description[3] does not match the query.")

    def CheckResultObject(self):
        try:
            self.cur.execute("""
				select * from pg_database
				where datname = 'template1'""")
            self.assertEqual(self.cur.rowcount, -1,
			     "cur.rowcount is %d, it should be -1." %
			     self.cur.rowcount)
            self.res = self.cur.fetchall()
        except StandardError, msg:
            self.fail(msg)

        self.assertEqual(self.cur.rowcount, 1,
			 'cur.rowcount is %d, it should be 1.' %
			 self.cur.rowcount)
        
        self.assertEqual(type(self.res), types.ListType,
			 'cur.fetchall() did not return a sequence.')

        self.assertEqual(len(self.res), 1,
			 'Length of the list of results is %d, it should be 1' %
			 len(self.res))

        self.failUnless(isinstance(self.res[0], PgSQL.PgResultSet),
			'cur.fetchall() did not return a list of PgResultSets.')
        
        self.failUnless(self.res[0].datname == 'template1'	and
			self.res[0][0] == 'template1',
			'The query did not return the correct result.')

    def CheckResultFetchone(self):
	# The following tests do not work when the version of PostgreSQL < 7.x
	if self.cnx.version < 70000:
	    try:
		self.cur.execute("""
				    select * from pg_database
				    where datname = 'template1'""")
		self.res = self.cur.fetchone()
		self.assertEqual(self.cur.rowcount, 1,
				 'cur.rowcount is %d, it should be 1.' %
				 self.cur.rowcount)
		self.cur.rewind()
		self.assertEqual(self.cur.rowcount, -1,
				 "cur.rowcount is %d, it should be -1" %
				 self.cur.rowcount)
		self.res = self.cur.fetchone()
	    except StandardError, msg:
		self.fail(msg)

	    self.assertEqual(self.cur.rowcount, 1,
			     "cur.rowcount is %d, it should be 1." %
			     self.cur.rowcount)

	    self.failUnless(isinstance(self.res, PgSQL.PgResultSet),
			    "cur.fetchone() does not return a PgResultSet.")
	    
	    self.failUnless(self.res[0] == 'template1'	and
			    self.res['encoding'] == 0	and
			    self.res.datname == 'template1',
			    "The query did not return the correct result.")

	    try:
		self.res = self.cur.fetchone()
		self.assertEqual(self.cur.rowcount, 0,
				 "cur.rowcount is %d, it should be 0" %
				 self.cur.rowcount)
	    except StandardError, msg:
		self.fail(msg)

	    self.assertEqual(self.res, None,
			     "res should be None at this point, but it isn't.")

    def CheckDoMoreResultObjectChecks(self):
        # Define the row counts for various version of PosrtgreSQL
	# Note: We only have to check for the minor version number in order
	#	to determine the needed row counts.
        rc = { '7.4':137, '7.3':124, '7.2':101, '7.1':80, '7.0':65, '6.5':47 }

	v = self.vstr

        try:
            self.cur.execute("""
            select * from pg_class
            where relname like 'pg_%%'""")
            self.assertEqual(self.cur.rowcount, -1,
			     "cur.rowcount is %d, it should be -1." %
			     self.cur.rowcount)
            self.res = self.cur.fetchall()
	    self.assertEqual(self.cur.rowcount, rc[v],
			     "cur.rowcount is %d, it should be %d." %
			     (self.cur.rowcount, rc[v]))
        except StandardError, msg:
            self.fail(msg)

    def CheckSelectOfNonPrintableString(self):
	try:
	    a = '\x01\x02\x03\x04'
	    self.cur.execute('select %s as a', a)
	    r = self.cur.fetchone()
	    self.assertEqual(len(r.a), len(a),
			     "Length of result is %d, it should be %d."  %
			     (len(r.a), len(a)))
	    self.failUnless(r.a == a,
			     "Result is '%s', it should be '%s'" % (r.a, a))
        except StandardError, msg:
            self.fail(msg)

    def CheckSelectOfLong(self):
	try:
	    self.cur.execute('select %s + %s as a', (5L, 6L))
	    r = self.cur.fetchone()
	    self.assertEqual(r.a, 11,
			     "Sum of 5 and 6 should have been 11, was %i"
                             % r.a)
        except StandardError, msg:
            self.fail(msg)

    def CheckSelectOfOidWithZeroRows(self):
	# Check for control flow of bug #589370
	try:
	    self.cur.execute('select oid from pg_type where false')
	except StandardError, msg:
	    self.fail(msg)

class PgResultSetTests(unittest.TestCase):
    def setUp(self):
	self.cnx = PgSQL.connect(database='template1')
	self.cur = self.cnx.cursor()

    def tearDown(self):
	try:
	    self.cnx.close()
	except AttributeError:
	    pass
	except PgSQL.InterfaceError:
	    pass

    def getResult(self):
	self.cur.execute("select 5 as id, 'Alice' as name, 29 as age")
	return self.cur.fetchone()
    
    def CheckAttributeAccess(self):
	res = self.getResult()
	if not hasattr(res, "id"):
	    self.fail("Resultset doesn't have attribute 'id'")
	if not hasattr(res, "ID"):
	    self.fail("Resultset doesn't have attribute 'ID'")

    def CheckAttributeValue(self):
	res = self.getResult()
	if res.id != 5:
	    self.fail("id should be 5, is %i" % res.id)
	if res.ID != 5:
	    self.fail("ID should be 5, is %i" % res.ID)

    def CheckKeyAccess(self):
	res = self.getResult()
	if not "id" in res:
	    self.fail("Resultset doesn't have item 'id'")
	if not "ID" in res:
	    self.fail("Resultset doesn't have item 'ID'")

    def CheckKeyValue(self):
	res = self.getResult()
	if res["id"] != 5:
	    self.fail("id should be 5, is %i" % res.id)
	if res["ID"] != 5:
	    self.fail("ID should be 5, is %i" % res.ID)

    def CheckIndexValue(self):
	res = self.getResult()
	if res[0] != 5:
	    self.fail("item 0 should be 5, is %i" % res.id)

    def CheckChangeIndex(self):
	res = self.getResult()
	res[0] = 6
	if res[0] != 6:
	    self.fail("item 0 should be 6, is %i" % res.id)

    def CheckChangeKey(self):
	res = self.getResult()
	res["id"] = 6
	if res["id"] != 6:
	    self.fail("id should be 6, is %i" % res.id)

	res = self.getResult()
	res["ID"] = 6
	if res["ID"] != 6:
	    self.fail("ID should be 6, is %i" % res.id)

    def CheckChangeAttr(self):
	res = self.getResult()
	res.id = 6
	if res.id != 6:
	    self.fail("id should be 6, is %i" % res.id)

	res = self.getResult()
	res.ID = 6
	if res.ID != 6:
	    self.fail("ID should be 6, is %i" % res.id)

    def Check_haskey(self):
	res = self.getResult()
	if not res.has_key("id"):
	    self.fail("resultset should have key 'id'")
	if not res.has_key("ID"):
	    self.fail("resultset should have key 'ID'")
	if not res.has_key("Id"):
	    self.fail("resultset should have key 'Id'")

    def Check_len(self):
	l = len(self.getResult())
	if l != 3:
	    self.fail("length of resultset should be 3, is %i", l)

    def Check_keys(self):
	res = self.getResult()
	if res.keys() != ["id", "name", "age"]:
	    self.fail("keys() should return %s, returns %s" %
			(["id", "name", "age"], res.keys()))

    def Check_values(self):
	val = self.getResult().values()
	if val != [5, 'Alice', 29]:
	    self.fail("Wrong values(): %s" % val)

    def Check_items(self):
	it = self.getResult().items()
	if it != [("id", 5), ("name", 'Alice'), ("age", 29)]:
	    self.fail("Wrong items(): %s" % it)

    def Check_get(self):
	res = self.getResult()
	v = res.get("id")
	if v != 5:
	    self.fail("Wrong result for get [1]")

	v = res.get("ID")
	if v != 5:
	    self.fail("Wrong result for get [2]")

	v = res.get("asdf")
	if v is not None:
	    self.fail("Wrong result for get [3]")

	v = res.get("asdf", 6)
	if v != 6:
	    self.fail("Wrong result for get [4]")

def suite():
    dbapi_tests = unittest.makeSuite(DBAPICompliance, "Check")
    moduleinterface_tests = unittest.makeSuite(PgSQLTestModuleInterface, "Check")
    pgsql_tests = unittest.makeSuite(PgSQLTestCases, "Check")
    pgresultset_tests = unittest.makeSuite(PgResultSetTests, "Check")

    test_suite = unittest.TestSuite((dbapi_tests, moduleinterface_tests,
					pgsql_tests, pgresultset_tests))
    return test_suite

def main():
    runner = unittest.TextTestRunner()
    runner.run(suite())
    
if __name__ == "__main__":
    main()
