File: test_syncio.py

package info (click to toggle)
gensio 3.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,364 kB
  • sloc: ansic: 90,328; python: 5,220; sh: 4,935; cpp: 3,258; makefile: 1,418
file content (115 lines) | stat: -rw-r--r-- 3,280 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#
#  gensio - A library for abstracting stream I/O
#  Copyright (C) 2018  Corey Minyard <minyard@acm.org>
#
#  SPDX-License-Identifier: GPL-2.0-only
#

import utils
import gensio
import gensios_enabled

class Logger:
    def gensio_log(self, level, log):
        print("***%s log: %s" % (level, log))

def test_sync_gensio(o):
    print("Testing basic sync I/O")

    gensios_enabled.check_iostr_gensios("echo")
    g = gensio.gensio(o, "echo(readbuf=10)", None)
    g.set_sync()
    g.open_s()

    # Basic write then read
    (count, time) = g.write_s("Hello", 1000)
    if count != 5 or time < 500:
        raise Exception("Invalid write return: %d %d\n" % (count, time))
    (buf, time) = g.read_s(10, 1000)
    buf = buf.decode(encoding='utf8')
    if buf != "Hello" or time < 500:
        raise Exception("Invalid read return: '%s' %d\n" % (buf, time))

    # This should time out, no data
    (buf, time) = g.read_s(10, 250)
    buf = buf.decode(encoding='utf8')
    if buf != "" or time != 0:
        raise Exception("Invalid read timeout return: '%s' %d\n" % (buf, time))

    # This should time out, buffer is only 10 bytes per readbuf above
    (count, time) = g.write_s("HelloHelloHello", 250)
    if count != 10 or time != 0:
        raise Exception("Invalid write timeout return: %d %d\n" % (count, time))

    # Read out what should have been written
    time = 1000
    buf = ""
    while len(buf) < 10 and time >= 500:
        (tbuf, time) = g.read_s(10, time)
        buf = buf + tbuf.decode(encoding='utf8')

    if buf != "HelloHello" or time < 500:
        raise Exception("Invalid read return(2): '%s' %d\n" % (buf, time))

    # This should time out, no data
    (buf, time) = g.read_s(10, 250)
    buf = buf.decode(encoding='utf8')
    if buf != "" or time != 0:
        raise Exception("Invalid read timeout return: '%s' %d\n" % (buf, time))

    g.close_s()
    return

class SyncEvent:
    def __init__(self):
        self.opened = False
        return

    def read_callback(self, io, err, data, auxdata):
        return len(data)

    def write_callback(self, io):
        io.write_callback_enable(false);
        return

    def open_done(self, io, err):
        if err:
            raise Exception("accept_s_timeout open error: " + err);
        self.opened = True
        return

def test_sync_gensio_accepter(o):
    print("Testing sync accept")

    gensios_enabled.check_iostr_gensios("tcp")
    a = gensio.gensio_accepter(o, "tcp,localhost,0", None)
    a.set_sync()
    a.startup()
    port = a.control(gensio.GENSIO_CONTROL_DEPTH_FIRST,
                     gensio.GENSIO_CONTROL_GET,
                     gensio.GENSIO_ACC_CONTROL_LPORT, "0")

    sa = SyncEvent()
    (io, time) = a.accept_s_timeout(o, sa, 1)
    if io != None or time != 0:
        raise Exception("accept_s_timeout didn't time out");

    sg = SyncEvent()
    gensios_enabled.check_iostr_gensios("tcp")
    g = gensio.gensio(o, "tcp,localhost," + port, sg)
    g.open(sg)

    (io, time) = a.accept_s_timeout(o, sa, 1000)
    if io == None or time == 0:
        raise Exception("accept_s_timeout timed out");

    io.close_s()
    g.close_s()
    a.shutdown_s()
    return

import utils

test_sync_gensio(utils.o)
test_sync_gensio_accepter(utils.o)
utils.test_shutdown()