File: test_asyncio.py

package info (click to toggle)
pyserial-asyncio-fast 0.16-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 192 kB
  • sloc: python: 452; makefile: 71
file content (116 lines) | stat: -rw-r--r-- 3,518 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
#
# This file is part of pySerial-asyncio - Cross platform serial port support for Python
# (C) 2016 pySerial-team
#
# SPDX-License-Identifier:    BSD-3-Clause
"""\
Test asyncio related functionality.

To run from the command line with a specific port with a loop-back,
device connected, use:

  $ cd pyserial-asyncio
  $ python -m test.test_asyncio SERIALDEVICE

"""

import os
import unittest
import asyncio
from typing import Optional

import serial_asyncio_fast

HOST = "127.0.0.1"
_PORT = 8888

# on which port should the tests be performed:
PORT = "socket://%s:%s" % (HOST, _PORT)


@unittest.skipIf(os.name != "posix", "asyncio not supported on platform")
class Test_asyncio(unittest.TestCase):
    """Test asyncio related functionality"""

    def setUp(self):
        self.loop = asyncio.get_event_loop()
        # create a closed serial port

    def tearDown(self):
        self.loop.close()

    def test_asyncio(self):
        TEXT = b"Hello, World!"
        COUNT = 1024
        COMPLETE_MESSAGE = TEXT * COUNT + b"\n"
        received = []
        actions = []
        done = asyncio.Event()

        class Input(asyncio.Protocol):
            def __init__(self):
                super().__init__()
                self._transport = None

            def connection_made(self, transport: serial_asyncio_fast.SerialTransport):
                self._transport = transport

            def data_received(self, data):
                self._transport.write(data)

        class Output(asyncio.Protocol):
            def __init__(self):
                super().__init__()
                self._transport: Optional[serial_asyncio_fast.SerialTransport] = None

            def connection_made(self, transport: serial_asyncio_fast.SerialTransport):
                self._transport = transport
                actions.append("open")
                for _ in range(COUNT):
                    transport.write(TEXT)
                transport.write(b"\n")

            def data_received(self, data):
                received.append(data)
                if b"\n" in data:
                    self._transport.close()

            def connection_lost(self, exc):
                actions.append("close")
                done.set()

            def pause_writing(self):
                actions.append("pause")
                print(self._transport.get_write_buffer_size())

            def resume_writing(self):
                actions.append("resume")
                print(self._transport.get_write_buffer_size())

        if PORT.startswith("socket://"):
            coro = self.loop.create_server(Input, HOST, _PORT)
            self.loop.run_until_complete(coro)

        client = serial_asyncio_fast.create_serial_connection(self.loop, Output, PORT)
        self.loop.run_until_complete(client)
        self.loop.run_until_complete(done.wait())
        pending = asyncio.all_tasks(self.loop)
        self.loop.run_until_complete(asyncio.gather(*pending))
        for _ in range(1024):
            self.loop.run_until_complete(asyncio.sleep(0))
        all_data = b"".join(received)
        self.assertEqual(all_data, COMPLETE_MESSAGE)
        self.assertEqual(actions, ["open", "close"])


if __name__ == "__main__":
    import sys

    sys.stdout.write(__doc__)
    if len(sys.argv) > 1:
        PORT = sys.argv[1]
    sys.stdout.write("Testing port: %r\n" % PORT)
    sys.argv[1:] = ["-v"]
    # When this module is executed from the command-line, it runs all its tests
    unittest.main()