File: test_green.py

package info (click to toggle)
pymssql 2.3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 972 kB
  • sloc: python: 3,801; sh: 152; makefile: 151; ansic: 1
file content (150 lines) | stat: -rw-r--r-- 4,710 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# -*- coding: utf-8 -*-
"""
Some async tests with gevent.
"""

import datetime
import unittest

import pytest

import pymssql
from .helpers import mssqlconn, pymssqlconn

gevent = pytest.importorskip("gevent")
try:
    import gevent.socket
except ImportError:
    pytest.skip('gevent is not available', allow_module_level=True)


@pytest.mark.mssql_server_required
class GreenletTests(unittest.TestCase):

    def greenlet_run_pymssql_execute(self, num):
        with pymssqlconn() as conn:
            cur = conn.cursor()
            cur.execute("""
            WAITFOR DELAY '00:00:05'  -- sleep for 5 seconds
            SELECT CURRENT_TIMESTAMP
            """)
            row = cur.fetchone()
            print("greenlet_run_pymssql_execute: num = %r; row = %r" % (num, row))

    def greenlet_run_pymssql_callproc(self, num):
        with pymssqlconn() as conn:
            cur = conn.cursor()
            proc_name = 'my_proc'
            # print("~~ Checking for stored proc (num=%r)..." % num)
            # cur.execute("IF OBJECT_ID('%s', 'P') IS NOT NULL DROP PROCEDURE %s" % (proc_name, proc_name))
            # print("~~ Creating stored proc (num=%r)..." % num)
            # cur.execute("""
            # CREATE PROCEDURE %s AS
            # BEGIN
            #     SET NOCOUNT ON
            #     WAITFOR DELAY '00:00:05'  -- sleep for 5 seconds
            #     SELECT CURRENT_TIMESTAMP
            # END
            # """ % (proc_name,))
            # print("~~ Calling stored proc (num=%r)..." % num)
            cur.callproc(proc_name, ())
            # print("~~ callproc returned (num=%r)..." % num)
            cur.nextset()
            row = cur.fetchone()
            print("greenlet_run_pymssql_callproc: num = %r; row = %r" % (num, row))
            cur.close()

    def greenlet_run_mssql_execute(self, num):
        conn = mssqlconn()
        conn.execute_query("""
        WAITFOR DELAY '00:00:05'  -- sleep for 5 seconds
        SELECT CURRENT_TIMESTAMP
        """)
        for row in conn:
            print("greenlet_run_mssql_execute: num = %r; row = %r" % (num, row))
            pass
        conn.close()

    def _run_all_greenlets(self, greenlet_task):
        greenlets = []

        dt1 = datetime.datetime.now()

        for i in range(5):
            gevent.sleep(1)
            greenlets.append(gevent.spawn(greenlet_task, i))

        gevent.joinall(greenlets)

        dt2 = datetime.datetime.now()

        return dt2 - dt1

    @pytest.mark.slow
    def test_gevent_socket_pymssql_execute_wait_read_concurrency(self):
        def wait_callback(read_fileno):
            gevent.socket.wait_read(read_fileno)

        pymssql.set_wait_callback(wait_callback)

        elapsed_time = self._run_all_greenlets(
            self.greenlet_run_pymssql_execute)

        self.assertTrue(
            elapsed_time < datetime.timedelta(seconds=20),
            'elapsed_time < 20 seconds')

    @pytest.mark.slow
    def test_gevent_socket_pymssql_callproc_wait_read_concurrency(self):
        def wait_callback(read_fileno):
            gevent.socket.wait_read(read_fileno)

        pymssql.set_wait_callback(wait_callback)

        with pymssqlconn() as conn:
            cur = conn.cursor()
            proc_name = 'my_proc'
            cur.execute("IF OBJECT_ID('%s', 'P') IS NOT NULL DROP PROCEDURE %s" % (proc_name, proc_name))
            cur.execute("""
            CREATE PROCEDURE %s AS
            BEGIN
                SET NOCOUNT ON
                WAITFOR DELAY '00:00:05'  -- sleep for 5 seconds
                SELECT CURRENT_TIMESTAMP
            END
            """ % (proc_name,))
            conn.commit()

        elapsed_time = self._run_all_greenlets(
            self.greenlet_run_pymssql_callproc)

        self.assertTrue(
            elapsed_time < datetime.timedelta(seconds=20),
            'elapsed_time < 20 seconds')

    @pytest.mark.slow
    def test_gevent_socket_mssql_execute_wait_read_concurrency(self):
        def wait_callback(read_fileno):
            gevent.socket.wait_read(read_fileno)

        pymssql.set_wait_callback(wait_callback)

        elapsed_time = self._run_all_greenlets(
            self.greenlet_run_mssql_execute)

        self.assertTrue(
            elapsed_time < datetime.timedelta(seconds=20),
            'elapsed_time < 20 seconds')

    def test_timeout(self):

        def wait_callback(read_fileno):
            gevent.socket.wait_read(read_fileno, timeout=3)

        pymssql.set_wait_callback(wait_callback)

        with self.assertRaises(Exception) as cm:
            self.greenlet_run_mssql_execute(1)

        exc = cm.exception
        self.assertTrue(isinstance(exc, gevent.socket.timeout))