File: test_asyncore.py

package info (click to toggle)
python-eventlet 0.26.1-7%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,916 kB
  • sloc: python: 24,898; makefile: 98
file content (64 lines) | stat: -rw-r--r-- 1,962 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from eventlet import patcher
from eventlet.green import asyncore
from eventlet.green import select
from eventlet.green import socket
from eventlet.green import threading
from eventlet.green import time

patcher.inject("test.test_asyncore", globals())


def new_closeall_check(self, usedefault):
    # Check that close_all() closes everything in a given map

    l = []
    testmap = {}
    for i in range(10):
        c = dummychannel()
        l.append(c)
        self.assertEqual(c.socket.closed, False)
        testmap[i] = c

    if usedefault:
        # the only change we make is to not assign to asyncore.socket_map
        # because doing so fails to assign to the real asyncore's socket_map
        # and thus the test fails
        socketmap = asyncore.socket_map.copy()
        try:
            asyncore.socket_map.clear()
            asyncore.socket_map.update(testmap)
            asyncore.close_all()
        finally:
            testmap = asyncore.socket_map.copy()
            asyncore.socket_map.clear()
            asyncore.socket_map.update(socketmap)
    else:
        asyncore.close_all(testmap)

    self.assertEqual(len(testmap), 0)

    for c in l:
        self.assertEqual(c.socket.closed, True)

HelperFunctionTests.closeall_check = new_closeall_check

try:
    # Eventlet's select() emulation doesn't support the POLLPRI flag,
    # which this test relies on.  Therefore, nuke it!
    BaseTestAPI.test_handle_expt = lambda *a, **kw: None
except NameError:
    pass

try:
    # temporarily disabling these tests in the python2.7/pyevent configuration
    from tests import using_pyevent
    import sys
    if using_pyevent(None) and sys.version_info >= (2, 7):
        TestAPI_UseSelect.test_handle_accept = lambda *a, **kw: None
        TestAPI_UseSelect.test_handle_close = lambda *a, **kw: None
        TestAPI_UseSelect.test_handle_read = lambda *a, **kw: None
except NameError:
    pass

if __name__ == "__main__":
    test_main()