File: fifo_connect.py.in

package info (click to toggle)
pydb 1.19-1
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 1,952 kB
  • ctags: 1,412
  • sloc: python: 3,065; perl: 2,479; sh: 586; makefile: 555; lisp: 265; ansic: 16
file content (58 lines) | stat: -rw-r--r-- 1,912 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!@PYTHON@ -t
# $Id: fifo_connect.py.in,v 1.4 2006/09/21 08:56:54 rockyb Exp $ -*- Python -*-

# This unit test doesn't use any of the debugger code. It is meant solely
# to test the connection classes.

import os, sys, thread, time, unittest

top_builddir = "@top_builddir@"
if top_builddir[-1] != os.path.sep:
    top_builddir += os.path.sep
sys.path.insert(0, os.path.join(top_builddir, 'pydb'))
top_srcdir = "@top_srcdir@"
if top_srcdir[-1] != os.path.sep:
    top_srcdir += os.path.sep
sys.path.insert(0, os.path.join(top_srcdir, 'pydb'))

import connection

class TestFIFOConnections(unittest.TestCase):

    def test_client_server(self):
        """Test that the we can read and write between a FIFO client and
        server"""
        fname='fifotest'
        server = connection.ConnectionFIFO(is_server=True)
        client = connection.ConnectionFIFO(is_server=False)
        thread.start_new_thread(server.connect, (fname,))
        for i in range(10):
            if not hasattr(server, 'outp') or server.outp is None:
                time.sleep(0.05)
        client.connect(fname)
        line = 'this is a test\n'
        client.write(line)

        for i in range(10):
            if not os.path.exists('fifotest.out'):
                time.sleep(0.05)
            else:
                break

        self.assertEqual(True, os.path.exists('fifotest.out'))
        for i in range(10):
            if not hasattr(server, 'inp') or server.inp is None:
                time.sleep(0.05)
        l2 = server.readline()
        self.assertEqual(l2, line, "client to server")
        line = 'Another test\n'
        server.write(line)
        l2 = client.readline()
        self.assertEqual(l2, line, "server to client")
        client.disconnect()
        server.disconnect()
        # And again just to see what will happen
        server.disconnect()

if __name__ == '__main__':
    unittest.main()