File: test.py

package info (click to toggle)
xmodem 0.4.6%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 192 kB
  • sloc: python: 1,066; makefile: 9
file content (75 lines) | stat: -rw-r--r-- 1,989 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import threading
import time
import StringIO
import Queue
import sys
from xmodem import XMODEM

class FakeIO(object):
    streams = [Queue.Queue(), Queue.Queue()]
    stdin = []
    stdot = []
    delay = 0.01 # simulate modem delays

    def putc(self, data, q=0):
        for char in data:
            self.streams[1-q].put(char)
            print 'p%d(0x%x)' % (q, ord(char)),
            sys.stdout.flush()
        return len(data)

    def getc(self, size, q=0):
        data = []
        while size:
            try:
                char = self.streams[q].get(timeout=0.5)  # Wait at most 1/2 second for data in the queue
                print 'r%d(0x%x)' % (q, ord(char)),
                sys.stdout.flush()
                data.append(char)
                size -= 1
            except Queue.Empty:
                return None
        return ''.join(data)

class Client(threading.Thread):
    def __init__(self, io, server, filename):
        threading.Thread.__init__(self)
        self.io     = io
        self.server = server
        self.stream = open(filename, 'rb')

    def getc(self, data, timeout=0):
        return self.io.getc(data, 0)

    def putc(self, data, timeout=0):
        return self.io.putc(data, 0)

    def run(self):
        self.xmodem = XMODEM(self.getc, self.putc)
        print 'c.send', self.xmodem.send(self.stream)

class Server(FakeIO, threading.Thread):
    def __init__(self, io):
        threading.Thread.__init__(self)
        self.io     = io
        self.stream = StringIO.StringIO()

    def getc(self, data, timeout=0):
        return self.io.getc(data, 1)

    def putc(self, data, timeout=0):
        return self.io.putc(data, 1)

    def run(self):
        self.xmodem = XMODEM(self.getc, self.putc)
        print 's.recv', self.xmodem.recv(self.stream)
        print 'got'
        print self.stream.getvalue()

if __name__ == '__main__':
    i = FakeIO()
    s = Server(i)
    c = Client(i, s, sys.argv[1])
    s.start()
    c.start()