1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
"""Misc threading module tests
Made for Jython.
"""
from __future__ import with_statement
import random
import subprocess
import sys
import threading
import time
import unittest
from subprocess import PIPE, Popen
from test import test_support
from threading import Condition, Lock, Thread
from java.lang import Thread as JThread, InterruptedException
class ThreadingTestCase(unittest.TestCase):
def test_str_name(self):
t = Thread(name=1)
self.assertEqual(t.getName(), '1')
t.setName(2)
self.assertEqual(t.getName(), '2')
# make sure activeCount() gets decremented (see issue 1348)
def test_activeCount(self):
activeBefore = threading.activeCount()
activeCount = 10
for i in range(activeCount):
t = Thread(target=self._sleep, args=(i,))
t.setDaemon(0)
t.start()
polls = activeCount
while activeCount > activeBefore and polls > 0:
time.sleep(1)
activeCount = threading.activeCount()
polls -= 1
self.assertTrue(activeCount <= activeBefore, 'activeCount should to be <= %s, instead of %s' % (activeBefore, activeCount))
def _sleep(self, n):
time.sleep(random.random())
def test_issue1988(self):
cond = threading.Condition(threading.Lock())
locked = False
try:
locked = cond.acquire(False)
finally:
if locked:
cond.release()
class TwistedTestCase(unittest.TestCase):
def test_needs_underscored_versions(self):
self.assertEqual(threading.Lock, threading._Lock)
self.assertEqual(threading.RLock, threading._RLock)
class JavaIntegrationTestCase(unittest.TestCase):
"""Verifies that Thread.__tojava__ correctly gets the underlying Java thread"""
def test_interruptible(self):
def wait_until_interrupted(cv):
name = threading.currentThread().getName()
with cv:
while not JThread.currentThread().isInterrupted():
try:
cv.wait()
except InterruptedException, e:
break
num_threads = 5
unfair_condition = Condition()
threads = [
Thread(
name="thread #%d" % i,
target=wait_until_interrupted,
args=(unfair_condition,))
for i in xrange(num_threads)]
for thread in threads:
thread.start()
time.sleep(0.1)
for thread in threads:
JThread.interrupt(thread)
joined_threads = 0
for thread in threads:
thread.join(1.) # timeout just in case so we don't stall regrtest
joined_threads += 1
self.assertEqual(joined_threads, num_threads)
class ReprTestCase(unittest.TestCase):
def test_condition(self):
l = Lock()
c = Condition(l)
self.assertEqual(repr(c), "<_threading.Condition(<_threading.Lock owner=None locked=False>), 0")
l.acquire()
self.assertEqual(repr(c), "<_threading.Condition(<_threading.Lock owner='MainThread' locked=True>), 0")
def test_lock(self):
l = Lock()
self.assertEqual(repr(l), "<_threading.Lock owner=None locked=False>")
r.acquire()
self.assertEqual(repr(r), "<_threading.RLock owner='MainThread' locked=True>")
r.release()
self.assertEqual(repr(r), "<_threading.RLock owner=None locked=False>")
def test_rlock(self):
r = RLock()
self.assertEqual(repr(r), "<_threading.RLock owner=None count=0>")
r.acquire()
self.assertEqual(repr(r), "<_threading.RLock owner='MainThread' count=1>")
r.acquire()
self.assertEqual(repr(r), "<_threading.RLock owner='MainThread' count=2>")
r.release(); r.release()
self.assertEqual(repr(r), "<_threading.RLock owner=None count=0>")
def test_main():
test_support.run_unittest(
JavaIntegrationTestCase,
ThreadingTestCase,
TwistedTestCase)
if __name__ == "__main__":
test_main()
|