File: test_asyncio.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (96 lines) | stat: -rw-r--r-- 3,342 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import sys

import encodings.idna
import asyncio

def test_async_gil_issue():
    async def f():
        reader, writer = await asyncio.open_connection('example.com', 80)
        writer.close()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(f())
    
def test_async_for():
    # tests if async for receives all stores values in the right order
    # and if the correct methods __aiter__ and __anext__ get called
    # and if the end results of run_until_complete are None (in a tuple)
    import asyncio

    class AsyncIter:
        def __init__(self):
            self._data = list(range(5))
            self._index = 0
        
        def __aiter__(self):
            return self
        
        async def __anext__(self):
            while self._index < 5:
                await asyncio.sleep(1)
                self._index += 1
                return self._data[self._index-1]
            raise StopAsyncIteration

    class Corotest(object):
        def __init__(self):
            self.res = "-"
        
        async def do_loop(self):
            async for x in AsyncIter():
                self.res += str(x)
                self.res += "-"

    cor = Corotest()
    loop = asyncio.get_event_loop()
    futures = [asyncio.ensure_future(cor.do_loop()), asyncio.ensure_future(cor.do_loop())]
    taskres = loop.run_until_complete(asyncio.wait(futures))
    assert cor.res.count('0') == 2
    assert cor.res.count('1') == 2
    assert cor.res.count('2') == 2
    assert cor.res.count('3') == 2
    assert cor.res.count('4') == 2
    assert cor.res.find("0") < cor.res.find("1")
    assert cor.res.find("1") < cor.res.find("2")
    assert cor.res.find("2") < cor.res.find("3")
    assert cor.res.find("3") < cor.res.find("4")
    assert isinstance(taskres, tuple)
    assert len(taskres) == 2
    assert "result=None" in repr(taskres[0].pop())
    assert "result=None" in repr(taskres[0].pop())
    
def test_asynchronous_context_managers():
    # it is important that "releasing lock A" happens before "holding lock B"
    # or the other way around, but it is not allowed that both coroutines
    # hold the lock at the same time
    import encodings.idna
    import asyncio

    class Corotest(object):
        def __init__(self):
            self.res = "-"
        
        async def coro(self, name, lock):
            self.res += ' coro {}: waiting for lock -'.format(name)
            async with lock:
                self.res += ' coro {}: holding the lock -'.format(name)
                await asyncio.sleep(1)
                self.res += ' coro {}: releasing the lock -'.format(name)

    cor = Corotest()
    loop = asyncio.get_event_loop()
    lock = asyncio.Lock()
    coros = asyncio.gather(cor.coro(1, lock), cor.coro(2, lock))
    try:
        loop.run_until_complete(coros)
    finally:
        loop.close()

    assert "coro 1: waiting for lock" in cor.res
    assert "coro 1: holding the lock" in cor.res
    assert "coro 1: releasing the lock" in cor.res
    assert "coro 2: waiting for lock" in cor.res
    assert "coro 2: holding the lock" in cor.res
    assert "coro 2: releasing the lock" in cor.res
    assert cor.res.find("coro 1: releasing the lock") < cor.res.find("coro 2: holding the lock") or \
    cor.res.find("coro 2: releasing the lock") < cor.res.find("coro 1: holding the lock")