File: test_aio.py

package info (click to toggle)
python-dugong 3.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 548 kB
  • ctags: 388
  • sloc: python: 2,205; makefile: 20; sh: 1
file content (66 lines) | stat: -rw-r--r-- 1,654 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
'''
test_aio.py - Unit tests for Dugong

Copyright (c) Nikolaus Rath <Nikolaus@rath.org>

This module may be distributed under the terms of the Python Software Foundation
License Version 2.  The complete license text may be retrieved from
http://hg.python.org/cpython/file/65f2c92ed079/LICENSE.
'''


if __name__ == '__main__':
    import sys
    import pytest
    sys.exit(pytest.main([__file__] + sys.argv[1:]))

import socket
from select import POLLIN
import logging
from dugong import PollNeeded

try:
    import asyncio
except ImportError:
    import pytest
    pytestmark = pytest.mark.skipif(True,
                                    reason='asyncio module not available')
else:
    from dugong import AioFuture

log = logging.getLogger(__name__)

def read(sock):
    for i in range(3):
        log.debug('yielding')
        yield PollNeeded(sock.fileno(), POLLIN)
        log.debug('trying to read')
        buf = sock.recv(100)
        assert buf.decode() == 'text-%03d' % i
        log.debug('got: %s', buf)

def write(sock):
    for i in range(3):
        log.debug('sleeping')
        yield from asyncio.sleep(1)
        buf = ('text-%03d' % i).encode()
        log.debug('writing %s', buf)
        sock.send(buf)

def test_aio_future():
    loop = asyncio.get_event_loop()
    try:
        (sock1, sock2) = socket.socketpair()

        asyncio.Task(write(sock2))
        read_fut = AioFuture(read(sock1))
        read_fut.add_done_callback(lambda fut: loop.stop())
        loop.call_later(6, loop.stop)
        loop.run_forever()

        assert read_fut.done()

        sock1.close()
        sock2.close()
    finally:
        loop.close()