1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# -*- coding: utf-8 -*-
"""
Test queries.
"""
from datetime import datetime
import unittest
import pytest
from pymssql import _mssql
from .helpers import mssqlconn, drop_table
@pytest.mark.mssql_server_required
class QueryTests(unittest.TestCase):
@classmethod
def setup_class(cls):
cls.mssql = mssqlconn()
cls.createTestTable()
@classmethod
def teardown_class(cls):
cls.dropTestTable()
cls.mssql.close()
@classmethod
def createTestTable(cls):
try:
cls.mssql.execute_non_query("""
CREATE TABLE pymssql (
pk_id int IDENTITY (1, 1) NOT NULL,
real_no real,
float_no float,
money_no money,
stamp_datetime datetime,
data_bit bit,
comment_vch varchar(50),
comment_nvch nvarchar(50),
comment_text text,
comment_ntext ntext,
data_image image,
data_binary varbinary(40),
decimal_no decimal(38,2),
numeric_no numeric(38,8),
stamp_time timestamp,
bin_data varbinary(16)
)""")
cls.tableCreated = True
cls.testTableColCount = 16
except _mssql.MSSQLDatabaseException as e:
if e.number != 2714:
raise
@classmethod
def dropTestTable(cls):
cls.mssql.execute_non_query('DROP TABLE pymssql')
cls.tableCreated = False
def insertSampleData(self):
for x in range(10):
y = x + 1
query = """
INSERT INTO pymssql (
real_no,
float_no,
money_no,
stamp_datetime,
data_bit,
comment_vch,
comment_ntext,
comment_text,
comment_nvch,
decimal_no,
numeric_no,
bin_data
) VALUES (
%d, %d, %d, getdate(), %d,
'comment %d',
'detail %d',
'hmm',
'bhmme',
234.99,
894123.09,
%#x
);""" % (y, y, y, (y % 2), y, y, y)
self.mssql.execute_non_query(query)
def test01SimpleSelect(self):
query = 'SELECT getdate() as cur_date_info'
self.mssql.execute_query(query)
rows = tuple(self.mssql)
self.assertTrue(isinstance(rows[0]['cur_date_info'], datetime))
def test02EmptySelect(self):
query = 'SELECT * FROM pymssql'
self.mssql.execute_query(query)
rows = tuple(self.mssql)
self.assertEqual(rows, ())
def test03InsertSelect(self):
self.insertSampleData()
self.mssql.execute_query('SELECT * FROM pymssql')
# check row count
rows = tuple(self.mssql)
self.assertEqual(10, len(rows))
# check col count
cols = [k for k in rows[0] if type(k) is int]
self.assertEqual(self.testTableColCount, len(cols))
def test19MultipleResults(self):
self.mssql.execute_query("SELECT 'ret1'; SELECT 'ret2'; SELECT 'ret3'")
rows = tuple(self.mssql)
self.assertEqual(rows[0][0], 'ret1')
self.mssql.nextresult()
rows = tuple(self.mssql)
self.assertEqual(rows[0][0], 'ret2')
self.mssql.nextresult()
rows = tuple(self.mssql)
self.assertEqual(rows[0][0], 'ret3')
def test04BinaryTypeSqlInjection(self):
self.mssql.execute_query('SELECT * FROM pymssql WHERE bin_data=%s', ('0x OR 1=1;',))
rows = tuple(self.mssql)
self.assertEqual(len(rows), 0)
|