1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
Description: disable expensive unittests (i.e. need MySQL)
Author: Martin <debacle@debian.org>
Origin: vendor
Last-Update: 2025-04-04
---
This patch header follows DEP-3: http://dep.debian.net/deps/dep3/
--- a/persistqueue/tests/test_mysqlqueue.py
+++ b/persistqueue/tests/test_mysqlqueue.py
@@ -49,6 +49,7 @@
"drop table if exists %s" % self.mysql_queue._table_name)
tmp_conn.commit()
+ @unittest.skip("disable unittest")
def test_raise_empty(self):
q = self.queue
@@ -64,6 +65,7 @@
self.assertRaises(ValueError, q.get, block=True, timeout=-1.0)
del q
+ @unittest.skip("disable unittest")
def test_empty(self):
q = self.queue
self.assertEqual(q.empty(), True)
@@ -74,6 +76,7 @@
q.get()
self.assertEqual(q.empty(), True)
+ @unittest.skip("disable unittest")
def test_full(self):
# SQL queue `full()` always returns `False` !!
q = self.queue
@@ -85,6 +88,7 @@
q.get()
self.assertEqual(q.full(), False)
+ @unittest.skip("disable unittest")
def test_open_close_single(self):
"""Write 1 item, close, reopen checking if same item is there"""
@@ -96,6 +100,7 @@
self.assertEqual(1, q.qsize())
self.assertEqual(b'var1', q.get())
+ @unittest.skip("disable unittest")
def test_open_close_1000(self):
"""Write 1000 items, close, reopen checking if all items are there"""
@@ -115,6 +120,7 @@
data = q.get()
self.assertEqual('foobar', data)
+ @unittest.skip("disable unittest")
def test_random_read_write(self):
"""Test random read/write"""
@@ -131,6 +137,7 @@
q.put('var%d' % random.getrandbits(16))
n += 1
+ @unittest.skip("disable unittest")
def test_multi_threaded_parallel(self):
"""Create consumer and producer threads, check parallelism"""
m_queue = self.queue
@@ -154,6 +161,7 @@
self.assertEqual(0, len(m_queue))
self.assertRaises(Empty, m_queue.get, block=False)
+ @unittest.skip("disable unittest")
def test_multi_threaded_multi_producer(self):
"""Test mysqlqueue can be used by multiple producers."""
@@ -181,6 +189,7 @@
c.join()
+ @unittest.skip("disable unittest")
def test_multiple_consumers(self):
"""Test mysqlqueue can be used by multiple consumers."""
queue = self.queue
@@ -219,6 +228,7 @@
self.assertEqual(len(set(counter)), len(counter))
+ @unittest.skip("disable unittest")
def test_task_done_with_restart(self):
"""Test that items are not deleted before task_done."""
@@ -260,16 +270,19 @@
self.assertEqual(5, q.get())
self.assertEqual(5, q.qsize())
+ @unittest.skip("disable unittest")
def test_protocol_1(self):
q = self.queue
self.assertEqual(q._serializer.protocol,
2 if sys.version_info[0] == 2 else 4)
+ @unittest.skip("disable unittest")
def test_protocol_2(self):
q = self.queue
self.assertEqual(q._serializer.protocol,
2 if sys.version_info[0] == 2 else 4)
+ @unittest.skip("disable unittest")
def test_json_serializer(self):
q = self.queue
x = dict(
@@ -282,12 +295,14 @@
q.put(x)
self.assertEqual(q.get(), x)
+ @unittest.skip("disable unittest")
def test_put_0(self):
q = self.queue
q.put(0)
d = q.get(block=False)
self.assertIsNotNone(d)
+ @unittest.skip("disable unittest")
def test_get_id(self):
q = self.queue
q.put("val1")
@@ -299,6 +314,7 @@
# item should get val2
self.assertEqual(item, 'val2')
+ @unittest.skip("disable unittest")
def test_get_raw(self):
q = self.queue
q.put("val1")
@@ -307,6 +323,7 @@
self.assertEqual(True, "pqid" in item)
self.assertEqual(item.get("data"), 'val1')
+ @unittest.skip("disable unittest")
def test_queue(self):
q = self.queue
q.put("val1")
@@ -317,6 +334,7 @@
self.assertEqual(len(d), 3)
self.assertEqual(d[1].get("data"), "val2")
+ @unittest.skip("disable unittest")
def test_update(self):
q = self.queue
qid = q.put("val1")
|