1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
from datetime import datetime
import unittest
import _mssql
from .helpers import mssqlconn, drop_table
class QueryTests(unittest.TestCase):
@classmethod
def setup_class(cls):
cls.mssql = mssqlconn()
cls.createTestTable()
@classmethod
def teardown_class(cls):
cls.dropTestTable()
cls.mssql.close()
@classmethod
def createTestTable(cls):
try:
cls.mssql.execute_non_query("""
CREATE TABLE pymssql (
pk_id int IDENTITY (1, 1) NOT NULL,
real_no real,
float_no float,
money_no money,
stamp_datetime datetime,
data_bit bit,
comment_vch varchar(50),
comment_nvch nvarchar(50),
comment_text text,
comment_ntext ntext,
data_image image,
data_binary varbinary(40),
decimal_no decimal(38,2),
numeric_no numeric(38,8),
stamp_time timestamp,
bin_data varbinary(16)
)""")
cls.tableCreated = True
cls.testTableColCount = 16
except _mssql.MSSQLDatabaseException as e:
if e.number != 2714:
raise
@classmethod
def dropTestTable(cls):
cls.mssql.execute_non_query('DROP TABLE pymssql')
cls.tableCreated = False
def insertSampleData(self):
for x in range(10):
y = x + 1
query = """
INSERT INTO pymssql (
real_no,
float_no,
money_no,
stamp_datetime,
data_bit,
comment_vch,
comment_ntext,
comment_text,
comment_nvch,
decimal_no,
numeric_no,
bin_data
) VALUES (
%d, %d, %d, getdate(), %d,
'comment %d',
'detail %d',
'hmm',
'bhmme',
234.99,
894123.09,
%#x
);""" % (y, y, y, (y % 2), y, y, y)
self.mssql.execute_non_query(query)
def test01SimpleSelect(self):
query = 'SELECT getdate() as cur_date_info'
self.mssql.execute_query(query)
rows = tuple(self.mssql)
self.assertTrue(isinstance(rows[0]['cur_date_info'], datetime))
def test02EmptySelect(self):
query = 'SELECT * FROM pymssql'
self.mssql.execute_query(query)
rows = tuple(self.mssql)
self.assertEquals(rows, ())
def test03InsertSelect(self):
self.insertSampleData()
self.mssql.execute_query('SELECT * FROM pymssql')
# check row count
rows = tuple(self.mssql)
self.assertEquals(10, len(rows))
# check col count
cols = [k for k in rows[0] if type(k) is int]
self.assertEquals(self.testTableColCount, len(cols))
def test19MultipleResults(self):
self.mssql.execute_query("SELECT 'ret1'; SELECT 'ret2'; SELECT 'ret3'")
rows = tuple(self.mssql)
self.assertEquals(rows[0][0], 'ret1')
self.mssql.nextresult()
rows = tuple(self.mssql)
self.assertEquals(rows[0][0], 'ret2')
self.mssql.nextresult()
rows = tuple(self.mssql)
self.assertEquals(rows[0][0], 'ret3')
def test04BinaryTypeSqlInjection(self):
self.mssql.execute_query('SELECT * FROM pymssql WHERE bin_data=%s', ('0x OR 1=1;',))
rows = tuple(self.mssql)
self.assertEqual(len(rows), 0)
|