File: thread_channel.py

package info (click to toggle)
python-medusa 0.5.4%2Bclean-1
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 540 kB
  • ctags: 1,009
  • sloc: python: 5,489; makefile: 11
file content (125 lines) | stat: -rw-r--r-- 3,683 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# -*- Mode: Python -*-

VERSION_STRING = "$Id: thread_channel.py,v 1.3 2002/03/19 22:49:40 amk Exp $"

# This will probably only work on Unix.

# The disadvantage to this technique is that it wastes file
# descriptors (especially when compared to select_trigger.py)

# May be possible to do it on Win32, using TCP localhost sockets.
# [does winsock support 'socketpair'?]

import asyncore
import asynchat

import fcntl
import FCNTL
import os
import socket
import string
import thread

# this channel slaves off of another one.  it starts a thread which
# pumps its output through the 'write' side of the pipe.  The 'read'
# side of the pipe will then notify us when data is ready.  We push
# this data on the owning data channel's output queue.

class thread_channel (asyncore.file_dispatcher):

    buffer_size = 8192

    def __init__ (self, channel, function, *args):
        self.parent = channel
        self.function = function
        self.args = args
        self.pipe = rfd, wfd = os.pipe()
        asyncore.file_dispatcher.__init__ (self, rfd)

    def start (self):
        rfd, wfd = self.pipe

        # The read side of the pipe is set to non-blocking I/O; it is
        # 'owned' by medusa.

        flags = fcntl.fcntl (rfd, FCNTL.F_GETFL, 0)
        fcntl.fcntl (rfd, FCNTL.F_SETFL, flags | FCNTL.O_NDELAY)

        # The write side of the pipe is left in blocking mode; it is
        # 'owned' by the thread.  However, we wrap it up as a file object.
        # [who wants to 'write()' to a number?]

        of = os.fdopen (wfd, 'w')

        thread.start_new_thread (
                self.function,
                # put the output file in front of the other arguments
                (of,) + self.args
                )

    def writable (self):
        return 0

    def readable (self):
        return 1

    def handle_read (self):
        data = self.recv (self.buffer_size)
        self.parent.push (data)

    def handle_close (self):
        # Depending on your intentions, you may want to close
        # the parent channel here.
        self.close()

# Yeah, it's bad when the test code is bigger than the library code.

if __name__ == '__main__':

    import time

    def thread_function (output_file, i, n):
        print 'entering thread_function'
        while n:
            time.sleep (5)
            output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
            output_file.flush()
            n = n - 1
        output_file.close()
        print 'exiting thread_function'

    class thread_parent (asynchat.async_chat):

        def __init__ (self, conn, addr):
            self.addr = addr
            asynchat.async_chat.__init__ (self, conn)
            self.set_terminator ('\r\n')
            self.buffer = ''
            self.count = 0

        def collect_incoming_data (self, data):
            self.buffer = self.buffer + data

        def found_terminator (self):
            data, self.buffer = self.buffer, ''
            n = string.atoi (string.split (data)[0])
            tc = thread_channel (self, thread_function, self.count, n)
            self.count = self.count + 1
            tc.start()

    class thread_server (asyncore.dispatcher):

        def __init__ (self, family=socket.AF_INET, address=('127.0.0.1', 9003)):
            asyncore.dispatcher.__init__ (self)
            self.create_socket (family, socket.SOCK_STREAM)
            self.set_reuse_addr()
            self.bind (address)
            self.listen (5)

        def handle_accept (self):
            conn, addr = self.accept()
            tp = thread_parent (conn, addr)

    thread_server()
    #asyncore.loop(1.0, use_poll=1)
    asyncore.loop ()