1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
# This is a copy of the popen2.py module of Python 1.5.2.
# It has been modified to pass stdin as file descriptor 2 instead of 0.
# This means the child process initially uses the existing stdin. This is
# required for "su" to work on Linux, where it must be a tty.
# After passing "su" the executed command is expected to connect stdin to file
# descriptor 2.
#
# Some things that are not obvious:
# - There are two things:
# 1. file descriptors that Unix uses, an integer number
# 2. Python file objects
# - The returned file objects must be closed, otherwise a stale handle causes
# trouble for the next call. It's not clear why.
# - One descriptor is used for two directions. Closing it for one directions
# also closes it for the other direction. But there are two file objects for
# it that also must be closed. Need to catch and ignore an error when
# closing the second one.
# - On some systems a bidirectional pipe is not possible. stderr will then not
# be connected.
import os
import sys
MAXFD = 256 # Max number of file descriptors (os.getdtablesize()???)
_active = []
def _cleanup():
for inst in _active[:]:
inst.poll()
class Popen3:
def __init__(self, cmd, capturestderr=0, bufsize=-1):
n = capturestderr # using stderr is disabled, avoid a warning
if type(cmd) == type(''):
cmd = ['/bin/sh', '-c', cmd]
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.close(2)
n = os.dup(p2cread)
if n <> 2:
sys.stderr.write('popen2: bad read dup: %d\n' % n)
os.close(1)
n = os.dup(c2pwrite)
if n <> 1:
sys.stderr.write('popen2: bad write dup: %d\n' % n)
for i in range(3, MAXFD):
try:
os.close(i)
except: pass
try:
os.execvp(cmd[0], cmd)
finally:
os._exit(1)
# Shouldn't come here, I guess
os._exit(1)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
try:
self.childerr = os.fdopen(p2cwrite, 'r', bufsize)
except:
# On some systems sockets are not bidirectional
self.childerr = None
self.sts = -1 # Child not completed yet
_active.append(self)
def poll(self):
if self.sts < 0:
try:
pid, sts = os.waitpid(self.pid, os.WNOHANG)
if pid == self.pid:
self.sts = sts
_active.remove(self)
except os.error:
pass
return self.sts
def wait(self):
pid, sts = os.waitpid(self.pid, 0)
if pid == self.pid:
self.sts = sts
_active.remove(self)
return self.sts
def popen3(cmd, bufsize=-1):
_cleanup()
inst = Popen3(cmd, 1, bufsize)
return inst.fromchild, inst.tochild, inst.childerr
def _test():
teststr = "abc\n"
print "testing popen3: reading from stdout..."
r, w, e = popen3('cat <&2')
w.write(teststr)
w.close()
assert r.read() == teststr
r.close()
try:
e.close()
except:
pass
print "testing popen3: reading from stderr..."
r, w, e = popen3('cat /etc/group >&2')
assert e.read() == open("/etc/group").read()
w.close()
r.close()
try:
e.close()
except:
pass
print "testing popen3: reading an error message..."
r, w, e = popen3('cat -abcdefghi')
err = e.read()
print "Between the ----- lines should be an error message from 'cat':"
print "-----\n%s-----" % err
assert err != ''
w.close()
r.close()
try:
e.close()
except:
pass
_cleanup()
assert not _active
print "All OK"
if __name__ == '__main__':
_test()
|