File: popenerr.py

package info (click to toggle)
aap 1.072-1.1
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k, lenny
  • size: 4,976 kB
  • ctags: 2,160
  • sloc: python: 15,113; makefile: 62; sh: 13
file content (137 lines) | stat: -rw-r--r-- 4,080 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# This is a copy of the popen2.py module of Python 1.5.2.
# It has been modified to pass stdin as file descriptor 2 instead of 0.
# This means the child process initially uses the existing stdin.  This is
# required for "su" to work on Linux, where it must be a tty.
# After passing "su" the executed command is expected to connect stdin to file
# descriptor 2.
#
# Some things that are not obvious:
# - There are two things:
#   1. file descriptors that Unix uses, an integer number
#   2. Python file objects
# - The returned file objects must be closed, otherwise a stale handle causes
#   trouble for the next call.  It's not clear why.
# - One descriptor is used for two directions. Closing it for one directions
#   also closes it for the other direction.  But there are two file objects for
#   it that also must be closed.  Need to catch and ignore an error when
#   closing the second one.
# - On some systems a bidirectional pipe is not possible.  stderr will then not
#   be connected.

import os
import sys

MAXFD = 256     # Max number of file descriptors (os.getdtablesize()???)

_active = []

def _cleanup():
    for inst in _active[:]:
        inst.poll()

class Popen3:
    def __init__(self, cmd, capturestderr=0, bufsize=-1):
        n = capturestderr       # using stderr is disabled, avoid a warning
        if type(cmd) == type(''):
            cmd = ['/bin/sh', '-c', cmd]
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.close(2)
            n = os.dup(p2cread)
            if n <> 2:
                sys.stderr.write('popen2: bad read dup: %d\n' % n)
            os.close(1)
            n = os.dup(c2pwrite)
            if n <> 1:
                sys.stderr.write('popen2: bad write dup: %d\n' % n)
            for i in range(3, MAXFD):
                try:
                    os.close(i)
                except: pass
            try:
                os.execvp(cmd[0], cmd)
            finally:
                os._exit(1)
            # Shouldn't come here, I guess
            os._exit(1)

        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
        try:
            self.childerr = os.fdopen(p2cwrite, 'r', bufsize)
        except:
            # On some systems sockets are not bidirectional
            self.childerr = None

        self.sts = -1 # Child not completed yet
        _active.append(self)
    def poll(self):
        if self.sts < 0:
            try:
                pid, sts = os.waitpid(self.pid, os.WNOHANG)
                if pid == self.pid:
                    self.sts = sts
                    _active.remove(self)
            except os.error:
                pass
        return self.sts
    def wait(self):
        pid, sts = os.waitpid(self.pid, 0)
        if pid == self.pid:
            self.sts = sts
            _active.remove(self)
        return self.sts

def popen3(cmd, bufsize=-1):
    _cleanup()
    inst = Popen3(cmd, 1, bufsize)
    return inst.fromchild, inst.tochild, inst.childerr

def _test():
    teststr = "abc\n"

    print "testing popen3: reading from stdout..."
    r, w, e = popen3('cat <&2')
    w.write(teststr)
    w.close()
    assert r.read() == teststr
    r.close()
    try:
        e.close()
    except:
        pass

    print "testing popen3: reading from stderr..."
    r, w, e = popen3('cat /etc/group >&2')
    assert e.read() == open("/etc/group").read()
    w.close()
    r.close()
    try:
        e.close()
    except:
        pass

    print "testing popen3: reading an error message..."
    r, w, e = popen3('cat -abcdefghi')
    err = e.read()
    print "Between the ----- lines should be an error message from 'cat':"
    print "-----\n%s-----" % err
    assert err != ''
    w.close()
    r.close()
    try:
        e.close()
    except:
        pass

    _cleanup()
    assert not _active
    print "All OK"

if __name__ == '__main__':
    _test()