File: copen.py

package info (click to toggle)
python-biopython 1.42-2
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 17,584 kB
  • ctags: 12,272
  • sloc: python: 80,461; xml: 13,834; ansic: 7,902; cpp: 1,855; sql: 1,144; makefile: 203
file content (321 lines) | stat: -rw-r--r-- 9,839 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
"""
This implements a set of classes that wraps a file object interface
around code that executes in another process.  This allows you fork
many different commands and let the run concurrently.

Functions:
copen_sys     Open a file-like pipe to a system command.
copen_fn      Open a file-like pipe to a python function.

"""
import os
import sys
import time
import signal

def copen_sys(syscmd, *args):
    """copen_sys(syscmd, *args) -> file-like object

    Open a file-like object that returns the output from a system
    command.

    """
    # python requires first element to be the path
    if not args or args[0] != syscmd:
        args = [syscmd] + list(args)

    r, w = os.pipe()
    er, ew = os.pipe()

    pid = os.fork()
    if pid == 0: # child process
        os.close(r)
        os.close(er)
        os.dup2(w, sys.stdout.fileno())
        os.dup2(ew, sys.stderr.fileno())
        try:
            os.execvp(syscmd, args)  # execute it!
        except:
            sys.stderr.write("%s could not be executed\n" % syscmd)
            os._exit(-1)
        os._exit(0)

    # parent
    os.close(w)
    os.close(ew)
    return _ProcHandle(pid, os.fdopen(r, 'r'), os.fdopen(er, 'r'))

def copen_fn(func, *args, **keywords):
    """copen_fn(func, *args, **keywords) -> file-like object

    Open a file-like object that returns the output from function
    call.  The object's 'read' method returns the return value from
    the function.  The function is executed as a separate process so
    any variables modified by the function does not affect the ones in
    the parent process.  The return value of the function must be
    pickle-able.

    """
    try:
        import cPickle as pickle
    except ImportError:
        import pickle
    r, w = os.pipe()
    er, ew = os.pipe()

    pid = os.fork()
    if pid == 0: # child process
        cwrite, errwrite = os.fdopen(w, 'w'), os.fdopen(ew, 'w')
        try:
            output = func(*args, **keywords)
            # Pickle may fail here is the object is not pickleable.
            s = pickle.dumps(output, 1)
        except:
            import traceback
            traceback.print_exc(file=errwrite)
            errwrite.flush()
            os._exit(-1)
        try:
            cwrite.write(s)
            cwrite.flush()
        except IOError, x:
            # There can be an IOError if the parent is no longer
            # listening.  Ignore it.
            pass
        os._exit(0)

    # parent
    os.close(w)
    os.close(ew)
    return _PickleHandle(pid, os.fdopen(r, 'r'), os.fdopen(er, 'r'))


class _ProcHandle:
    """This object provides a file-like interface to a running
    process.

    Members:
    pid         what is the PID of the subprocess?
    killsig     what signal killed the child process?
    status      what was the status of the command?

    Methods:
    close       Close this process, killing it if necessary.
    fileno      Return the fileno used to read from the process.
    wait        Wait for the process to finish.
    poll        Is the process finished?
    elapsed     How much time has this process taken?
    read
    readline
    readlines
    
    """
    def __init__(self, pid, cread, errread=None):
        """Create a wrapper around a running process.  pid is the
        process ID.  cread is the file object used to read from the
        child.  errread is an optional file object used to read errors
        from the child.

        """
        _active.append(self)
        
        self.pid = pid
        self.status = None
        self.killsig = None
        
        self._start, self._end = time.time(), None
        self._cread, self._errread = cread, errread
        self._output = []
        self._done = 0
        self._closed = 0

    def __del__(self):
        self.close()  # kill the process

    def _kill(self):
        """Kill the process and return killsig"""
        try:
            pid, ind = os.waitpid(self.pid, os.WNOHANG)
            if pid == self.pid:   # died
                return 0
            # First, try to kill it with a SIGTERM.
            os.kill(self.pid, signal.SIGTERM)
            # Wait .5 seconds for it to die.
            end = time.time() + 0.5
            while time.time() < end:
                pid, ind = os.waitpid(self.pid, os.WNOHANG)
                if pid == self.pid:
                    return ind & 0xff
                time.sleep(0.1)
            # It didn't die, so kill with a SIGKILL
            os.kill(self.pid, signal.SIGKILL)
            return signal.SIGKILL
        except OSError:
            pass
        
    def close(self):
        """Close the process, killing it if it is still running."""
        # If this gets called in the middle of object initialization,
        # the _closed attribute will not exist.
        if not hasattr(self, '_closed') or self._closed:
            return
        # on cleanup, _active may not be defined!
        if _active and self in _active:
            _active.remove(self)
        if not self._done:
            self.killsig = self._kill()
            self._end = time.time()
            self.status = None
            self.killsig = signal.SIGTERM
            self._done = 1
        self._output = []
        self._closed = 1

    def fileno(self):
        """Return the file descriptor used to read from the process."""
        return self._cread.fileno()
        
    def readline(self):
        """Return the next line or '' if finished."""
        self.wait()
        if not self._output:
            return ''
        line = self._output[0]
        del self._output[0]
        return line

    def readlines(self):
        """Return the output of the process as a list of strings."""
        self.wait()
        output = self._output
        self._output = []
        return output

    def read(self):
        """Return the output as a string."""
        self.wait()
        output = self._output
        self._output = []
        return "".join(output)

    def wait(self):
        """Wait for the process to finish."""
        import select
        if self._done:
            return
        # wait until stuff's ready to be read
        select.select([self], [], [])
        self._cleanup_child()

    def poll(self):
        """Return a boolean.  Is the process finished running?"""
        import select
        if self._done:
            return 1
        # If I'm done, then read the results.
        if select.select([self], [], [], 0)[0]:
            self._cleanup_child()
        return self._done

    def elapsed(self):
        """Return the number of seconds elapsed since the process began."""
        if self._end:  # if I've finished, return the total time
            return self._end - self._start
        return time.time() - self._start

    def _cleanup_child(self):
        """Do necessary cleanup functions after child is finished running."""
        if self._done:
            return

        # read the output
        self._output = self._cread.readlines()
        self._cread.close()
        if self._errread:
            error = self._errread.read()
            self._errread.close()
            if error:
                raise AssertionError, "Error in child process:\n\n%s" % error
                # It would be nice to be able to save the exception
                # and traceback somehow, and raise it in the parent.
                #raise etype, value
        # Remove myself from the active list.
        if _active and self in _active:
            _active.remove(self)

        pid, ind = os.waitpid(self.pid, 0)
        self.status, self.killsig = ind >> 8, ind & 0xff
        self._end = time.time()
        self._done = 1

class _PickleHandle:
    """

    Members:
    pid         what is the PID of the subprocess?
    killsig     what signal killed the child process?
    status      what was the status of the command?

    Methods:
    close       Close this process, killing it if necessary.
    fileno      Return the fileno used to read from the process.
    wait        Wait for the process to finish.
    poll        Is the process finished?
    elapsed     How much time has this process taken?
    read        Return a Python object.
    
    """
    def __init__(self, pid, cread, errread=None):
        """Create a wrapper around a running process.  pid is the
        process ID.  cread is the file object used to read from the
        child.  errread is an optional file object used to read errors
        from the child.

        """
        self._phandle = _ProcHandle(pid, cread, errread)

    def __getattr__(self, attr):
        # This object does not support 'readline' or 'readlines'
        if attr.startswith('readline'):
            raise AttributeError, attr
        return getattr(self._phandle, attr)

    def read(self):
        """Return a Python object or ''."""
        try:
            import cPickle as pickle
        except ImportError:
            import pickle
        r = self._phandle.read()
        if not r:
            return r
        return pickle.loads(r)


# Handle SIGTERM below

# Keep a list of all the active child processes.  If the process is
# forcibly killed, e.g. by a SIGTERM, make sure the child processes
# die too.
_active = []   # list of _ProcHandle objects

_HANDLING = 0
def _handle_sigterm(signum, stackframe):
    """Handles a SIGTERM.  Cleans up."""
    global _HANDLING
    if _HANDLING:
        return
    _HANDLING = 1
    _cleanup()
    # call the previous handler
    if _PREV_SIGTERM is not None:
        signal.signal(signal.SIGTERM, _PREV_SIGTERM)
    os.kill(os.getpid(), signum)

def _cleanup():
    """Close all active commands."""
    for obj in _active[:]:
        obj.close()

_PREV_SIGTERM = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, _handle_sigterm)