1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
import logging
import os
import sys
import tempfile
import threading
import time
import testlib
import mitogen.core
import mitogen.parent
class ConnectionTest(testlib.RouterMixin, testlib.TestCase):
def test_broker_shutdown_while_connect_in_progress(self):
# if Broker.shutdown() is called while a connection attempt is in
# progress, the connection should be torn down.
path = tempfile.mktemp(prefix='broker_shutdown_sem_')
open(path, 'wb').close()
os.environ['BROKER_SHUTDOWN_SEMAPHORE'] = path
result = []
def thread():
python_path = testlib.data_path('broker_shutdown_test_python.py')
try:
result.append(self.router.local(python_path=python_path))
except Exception:
result.append(sys.exc_info()[1])
th = threading.Thread(target=thread)
th.start()
while os.path.exists(path):
time.sleep(0.05)
self.broker.shutdown()
th.join()
exc, = result
self.assertIsInstance(exc, mitogen.parent.CancelledError)
self.assertEqual(mitogen.parent.BROKER_SHUTDOWN_MSG, exc.args[0])
@mitogen.core.takes_econtext
def do_detach(econtext):
econtext.detach()
while 1:
time.sleep(1)
logging.getLogger('mitogen').error('hi')
class DetachReapTest(testlib.RouterMixin, testlib.TestCase):
def test_subprocess_preserved_on_shutdown(self):
c1 = self.router.local()
c1_stream = self.router.stream_by_id(c1.context_id)
pid = c1.call(os.getpid)
self.assertEqual(pid, c1_stream.conn.proc.pid)
l = mitogen.core.Latch()
mitogen.core.listen(c1, 'disconnect', l.put)
c1.call_no_reply(do_detach)
l.get()
self.broker.shutdown()
self.broker.join()
self.assertIsNone(os.kill(pid, 0)) # succeeds if process still alive
# now clean up
c1_stream.conn.proc.terminate()
c1_stream.conn.proc.proc.wait()
|