File: test_aio.py

package info (click to toggle)
python-dugong 3.8.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 548 kB
  • sloc: python: 2,714; makefile: 21; sh: 7
file content (66 lines) | stat: -rw-r--r-- 1,654 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
'''
test_aio.py - Unit tests for Dugong

Copyright © 2014 Nikolaus Rath <Nikolaus.org>

This module may be distributed under the terms of the Python Software Foundation
License Version 2.  The complete license text may be retrieved from
http://hg.python.org/cpython/file/65f2c92ed079/LICENSE.
'''


if __name__ == '__main__':
    import sys
    import pytest
    sys.exit(pytest.main([__file__] + sys.argv[1:]))

import socket
import logging
from dugong import PollNeeded, POLLIN

try:
    import asyncio
except ImportError:
    import pytest
    pytestmark = pytest.mark.skipif(True,
                                    reason='asyncio module not available')
else:
    from dugong import AioFuture

log = logging.getLogger(__name__)

def read(sock):
    for i in range(3):
        log.debug('yielding')
        yield PollNeeded(sock.fileno(), POLLIN)
        log.debug('trying to read')
        buf = sock.recv(100)
        assert buf.decode() == 'text-%03d' % i
        log.debug('got: %s', buf)

@asyncio.coroutine
def write(sock):
    for i in range(3):
        log.debug('sleeping')
        yield from asyncio.sleep(1)
        buf = ('text-%03d' % i).encode()
        log.debug('writing %s', buf)
        sock.send(buf)

def test_aio_future():
    loop = asyncio.get_event_loop()
    try:
        (sock1, sock2) = socket.socketpair()

        asyncio.Task(write(sock2))
        read_fut = AioFuture(read(sock1))
        read_fut.add_done_callback(lambda fut: loop.stop())
        loop.call_later(6, loop.stop)
        loop.run_forever()

        assert read_fut.done()

        sock1.close()
        sock2.close()
    finally:
        loop.close()