1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
"""
Tests for the thread pool.
Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
"""
from __future__ import print_function
import time
import random
import unittest
from Pyro4 import socketutil, core
from Pyro4.socketserver.threadpool import Pool, PoolError, NoFreeWorkersError
from Pyro4.socketserver.threadpoolserver import SocketServer_Threadpool
from Pyro4.configuration import config
JOB_TIME = 0.2
class Job(object):
def __init__(self, name="unnamed"):
self.name = name
def __call__(self):
time.sleep(JOB_TIME - random.random() / 10.0)
class SlowJob(object):
def __init__(self, name="unnamed"):
self.name = name
def __call__(self):
time.sleep(5*JOB_TIME - random.random() / 10.0)
class PoolTests(unittest.TestCase):
def setUp(self):
config.THREADPOOL_SIZE_MIN = 2
config.THREADPOOL_SIZE = 4
def tearDown(self):
config.reset()
def testCreate(self):
with Pool() as jq:
_ = repr(jq)
self.assertTrue(jq.closed)
def testSingle(self):
with Pool() as p:
job = Job()
p.process(job)
time.sleep(0.02) # let it pick up the job
self.assertEqual(1, len(p.busy))
def testAllBusy(self):
try:
config.COMMTIMEOUT = 0.2
with Pool() as p:
for i in range(config.THREADPOOL_SIZE):
p.process(SlowJob(str(i+1)))
# putting one more than the number of workers should raise an error:
with self.assertRaises(NoFreeWorkersError):
p.process(SlowJob("toomuch"))
finally:
config.COMMTIMEOUT = 0.0
def testClose(self):
with Pool() as p:
for i in range(config.THREADPOOL_SIZE):
p.process(Job(str(i + 1)))
with self.assertRaises(PoolError):
p.process(Job(1)) # must not allow new jobs after closing
self.assertEqual(0, len(p.busy))
self.assertEqual(0, len(p.idle))
def testScaling(self):
with Pool() as p:
for i in range(config.THREADPOOL_SIZE_MIN-1):
p.process(Job("x"))
self.assertEqual(1, len(p.idle))
self.assertEqual(config.THREADPOOL_SIZE_MIN-1, len(p.busy))
p.process(Job("x"))
self.assertEqual(0, len(p.idle))
self.assertEqual(config.THREADPOOL_SIZE_MIN, len(p.busy))
# grow until no more free workers
while True:
try:
p.process(Job("x"))
except NoFreeWorkersError:
break
self.assertEqual(0, len(p.idle))
self.assertEqual(config.THREADPOOL_SIZE, len(p.busy))
# wait till jobs are done and check ending situation
time.sleep(JOB_TIME*1.5)
self.assertEqual(0, len(p.busy))
self.assertEqual(config.THREADPOOL_SIZE_MIN, len(p.idle))
class ServerCallback(core.Daemon):
def __init__(self):
self.received_denied_reasons = []
def _handshake(self, connection, denied_reason=None):
self.received_denied_reasons.append(denied_reason) # store the denied reason
return True
def handleRequest(self, connection):
time.sleep(0.05)
def _housekeeping(self):
pass
class ThreadPoolServerTests(unittest.TestCase):
def setUp(self):
config.THREADPOOL_SIZE_MIN = 1
config.THREADPOOL_SIZE = 1
config.POLLTIMEOUT = 0.5
config.COMMTIMEOUT = 0.5
def tearDown(self):
config.reset()
def testServerPoolFull(self):
port = socketutil.findProbablyUnusedPort()
serv = SocketServer_Threadpool()
daemon = ServerCallback()
serv.init(daemon, "localhost", port)
serversock = serv.sock.getsockname()
csock1 = socketutil.createSocket(connect=serversock)
csock2 = socketutil.createSocket(connect=serversock)
try:
serv.events([serv.sock])
time.sleep(0.2)
self.assertEqual([None], daemon.received_denied_reasons)
serv.events([serv.sock])
time.sleep(0.2)
self.assertEqual(2, len(daemon.received_denied_reasons))
self.assertIn("no free workers, increase server threadpool size", daemon.received_denied_reasons)
finally:
csock1.close()
csock2.close()
serv.shutdown()
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']
unittest.main()
|