File: tpool_exception_leak.py

package info (click to toggle)
python-eventlet 0.26.1-7%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,916 kB
  • sloc: python: 24,898; makefile: 98
file content (43 lines) | stat: -rw-r--r-- 1,115 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
__test__ = False

if __name__ == '__main__':
    import eventlet
    import eventlet.tpool
    import gc
    import pprint

    class RequiredException(Exception):
        pass

    class A(object):
        def ok(self):
            return 'ok'

        def err(self):
            raise RequiredException

    a = A()

    # case 1 no exception
    assert eventlet.tpool.Proxy(a).ok() == 'ok'
    # yield to tpool_trampoline(), otherwise e.send(rv) have a reference
    eventlet.sleep(0.1)
    gc.collect()
    refs = gc.get_referrers(a)
    assert len(refs) == 1, 'tpool.Proxy-ied object leaked: {}'.format(pprint.pformat(refs))

    # case 2 with exception
    def test_exception():
        try:
            eventlet.tpool.Proxy(a).err()
            assert False, 'expected exception'
        except RequiredException:
            pass
    test_exception()
    # yield to tpool_trampoline(), otherwise e.send(rv) have a reference
    eventlet.sleep(0.1)
    gc.collect()
    refs = gc.get_referrers(a)
    assert len(refs) == 1, 'tpool.Proxy-ied object leaked: {}'.format(pprint.pformat(refs))

    print('pass')