1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
|
from eventlet import patcher
from eventlet.green import asyncore
from eventlet.green import select
from eventlet.green import socket
from eventlet.green import threading
from eventlet.green import time
patcher.inject("test.test_asyncore", globals())
def new_closeall_check(self, usedefault):
# Check that close_all() closes everything in a given map
l = []
testmap = {}
for i in range(10):
c = dummychannel()
l.append(c)
self.assertEqual(c.socket.closed, False)
testmap[i] = c
if usedefault:
# the only change we make is to not assign to asyncore.socket_map
# because doing so fails to assign to the real asyncore's socket_map
# and thus the test fails
socketmap = asyncore.socket_map.copy()
try:
asyncore.socket_map.clear()
asyncore.socket_map.update(testmap)
asyncore.close_all()
finally:
testmap = asyncore.socket_map.copy()
asyncore.socket_map.clear()
asyncore.socket_map.update(socketmap)
else:
asyncore.close_all(testmap)
self.assertEqual(len(testmap), 0)
for c in l:
self.assertEqual(c.socket.closed, True)
HelperFunctionTests.closeall_check = new_closeall_check
try:
# Eventlet's select() emulation doesn't support the POLLPRI flag,
# which this test relies on. Therefore, nuke it!
BaseTestAPI.test_handle_expt = lambda *a, **kw: None
except NameError:
pass
try:
# temporarily disabling these tests in the python2.7/pyevent configuration
from tests import using_pyevent
import sys
if using_pyevent(None) and sys.version_info >= (2, 7):
TestAPI_UseSelect.test_handle_accept = lambda *a, **kw: None
TestAPI_UseSelect.test_handle_close = lambda *a, **kw: None
TestAPI_UseSelect.test_handle_read = lambda *a, **kw: None
except NameError:
pass
if __name__ == "__main__":
test_main()
|