File: SpawnedProcess.java

package info (click to toggle)
derby 10.14.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 78,740 kB
  • sloc: java: 691,931; sql: 42,686; xml: 20,511; sh: 3,373; sed: 96; makefile: 46
file content (623 lines) | stat: -rw-r--r-- 22,958 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
/*
 *
 * Derby - Class org.apache.derbyTesting.junit.SpawnedProcess
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, 
 * software distributed under the License is distributed on an 
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 
 * either express or implied. See the License for the specific 
 * language governing permissions and limitations under the License.
 */
package org.apache.derbyTesting.junit;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;

import java.util.Timer;
import java.util.TimerTask;
import static junit.framework.Assert.assertTrue;
import static org.apache.derbyTesting.junit.BaseTestCase.execJavaCmd;
import static org.apache.derbyTesting.junit.BaseTestCase.getJavaExecutableName;
import static org.apache.derbyTesting.junit.BaseTestCase.isIBMJVM;
import static org.apache.derbyTesting.junit.BaseTestCase.isWindowsPlatform;

/**
 * Utility code that wraps a spawned process (Java Process object).
 * <p>
 * There are three main aspects handled by this class:
 * <ul> <li>Draining the output streams of the process.<br/>
 *          Happens automatically, the output gathered can be accessed with
 *          {@linkplain #getFailMessage}, {@linkplain #getFullServerError},
 *          {@linkplain #getFullServerOutput}, and
 *          {@linkplain #getNextServerOutput}</li>
 *      <li>Waiting for process completion, followed by cleanup (see
 *          {@linkplain #complete()} and {@linkplain #complete(long)})</li>
 *      <li>Forcibly destroying a process that live too long, for instance
 *          if inter-process communication hangs. This happens automatically
 *          if a threshold value is exceeded.</li>
 * </ul>
 * <p>
 * <em>Implementation notes</em>: Active waiting is employed when waiting for
 * the process to complete. This is considered acceptable since the expected
 * usage pattern is to spawn the process, execute a set of tests, and then
 * finally asking the process to shut down. Waiting for the process to
 * complete is the last step, and a process typically lives only for a short
 * period of time anyway (often only for seconds, seldom more than a few
 * minutes).
 * <br/>
 * Forcibly destroying processes that live too long makes the test run
 * continue even when facing inter-process communication hangs. The prime
 * example is when both the client and the server are waiting for the other
 * party to send data. Since the timeout is very high this feature is intended
 * to avoid automated test runs from hanging indefinitely, for instance due to
 * environmental issues affecting the process.
 */
//@NotThreadSafe
public final class SpawnedProcess {

    private static final String TAG = "DEBUG: {SpawnedProcess} ";
    private static Timer KILL_TIMER;

    /**
     * Property allowing the kill threshold to be overridden.
     * <p>
     * Interprets the numeric value as milliseconds, ignored if non-numeric.
     * Overriding this value may be required if the test machine is extremely
     * slow, or you want to kill hung processes earlier for some reason.
     */
    private static final String KILL_THRESHOLD_PROPERTY =
            "derby.tests.process.killThreshold";
    private static final long KILL_THRESHOLD_DEFAULT = 45*60*1000; // 45 minutes
    /** The maximum allowed time for a process to live. */
    private static final long KILL_THRESHOLD;
    static {
        long tmpThreshold = KILL_THRESHOLD_DEFAULT;
        String tmp = BaseTestCase.getSystemProperty(KILL_THRESHOLD_PROPERTY);
        if (tmp != null) {
            try {
                tmpThreshold = Long.parseLong(tmp);
            } catch (NumberFormatException nfe) {
                // Ignore, use the default set previously.
                System.err.println(TAG + "Invalid kill threshold: " + tmp);
            }
        }
        KILL_THRESHOLD = tmpThreshold;
    }

    private static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException ie) {
            // Ignore the interrupt. We want to make sure the process
            // terminates before returning, and we don't want to preserve
            // the interrupt flag because it causes Derby to shut down. These
            // are test requirements and don't apply for production code.
            // Print a notice to stdout.
            System.out.println(TAG + "Interrupted while sleeping (ignored)");
        }
    }

    private final String name;

    private final Process javaProcess;

    private final StreamSaver errSaver;

    private final StreamSaver outSaver;

    private boolean suppressOutput;

    private final TimerTask killTask;

    /**
     * Creates a new wrapper to handle the given process.
     *
     * @param javaProcess a (running) process
     * @param name name to associate with the process
     */
    public SpawnedProcess(Process javaProcess, String name) {
        this.javaProcess = javaProcess;
        this.name = name;

        errSaver = startStreamSaver(javaProcess.getErrorStream(), name
                .concat(":System.err"));
        outSaver = startStreamSaver(javaProcess.getInputStream(), name
                .concat(":System.out"));
        killTask = scheduleKill(javaProcess, name);
    }

    /**
     * Schedules a task to kill/terminate the task after a predefined timeout.
     *
     * @param name name of the process
     * @param process the process
     * @return The task object.
     */
    private TimerTask scheduleKill(Process process, String name) {
        synchronized (KILL_THRESHOLD_PROPERTY) {
            if (KILL_TIMER == null) {
                // Can't use 1.5 methods yet due to J2ME. Add name later.
                KILL_TIMER = new Timer(true);
            }        
        }
        TimerTask killer = new ProcessKillerTask(process, name);
        KILL_TIMER.schedule(killer, KILL_THRESHOLD);
        return killer;
    }

    /**
     * Causes output obtained from the process to be suppressed when
     * executing the {@code complete}-methods.
     *
     * @see #getFullServerOutput() to obtain suppressed output from stdout
     * @see #getFullServerError() to obtain suppressed output from stderr
     */
    public void suppressOutputOnComplete() {
        suppressOutput = true;
    }

    /**
     * Get the Java Process object
     */
    public Process getProcess() {
        return javaProcess;
    }
    
    /**
     * <p>
     * Get the full server output (stdout) as a string using the default
     * encoding which is assumed is how it was originally written.
     * </p>
     *
     * <p>
     * This method should only be called after the process has completed.
     * That is, {@link #complete()} or {@link #complete(long)}
     * should be called first.
     * </p>
     */
    public String getFullServerOutput() throws InterruptedException {
        // First wait until we've read all the output.
        outSaver.thread.join();

        synchronized (this) {
            return outSaver.stream.toString();
        }
    }
    
    /**
     * Get the full server error output (stderr) as a string using the default
     * encoding which is assumed is how it was originally written.
     * <p>
     * This method should only be called after the process has completed.
     * That is, {@link #complete()} or {@link #complete(long)}
     * should be called first.
     */
    public String getFullServerError() throws InterruptedException {
        // First wait until we've read all the output on stderr.
        errSaver.thread.join();

        synchronized (this) {
            return errSaver.stream.toString();
        }
    }

    /**
     * Position offset for getNextServerOutput().
     */
    int stdOutReadOffset;
    /**
     * Get the next set of server output (stdout) as a string using the default
     * encoding which is assumed is how it was originally
     * written. Assumes a single caller is executing the calls
     * to this method.
     */
    public String getNextServerOutput() {
        byte[] fullData;
        synchronized (this) {
            fullData = outSaver.stream.toByteArray();
        }
        
        String output = new String(fullData, stdOutReadOffset,
                fullData.length - stdOutReadOffset);
        stdOutReadOffset = fullData.length;
        return output;
    }
    /**
     * Get a fail message that is the passed in reason plus
     * the stderr and stdout for any output written. Allows
     * easier debugging if the reason the process failed is there!
     */
    public String getFailMessage(String reason) {
        sleep(500);
        StringBuffer sb = new StringBuffer();
        sb.append(reason);
        sb.append(":Spawned ");
        sb.append(name);
        sb.append(" exitCode=");
        try {
            sb.append(javaProcess.exitValue());
        } catch (IllegalThreadStateException e) {
            sb.append("running");
        }

        ByteArrayOutputStream err = errSaver.stream;
        ByteArrayOutputStream out = outSaver.stream;

        synchronized (this) {
            if (err.size() != 0)
            {
                sb.append("\nSTDERR:\n");
                sb.append(err.toString());          
            }
            if (out.size() != 0)
            {
                sb.append("\nSTDOUT:\n");
                sb.append(out.toString());          
            }
       }
       return sb.toString();
    }

    /**
     * Waits for the process to terminate.
     * <p>
     * This call will block until one of the following conditions are met:
     * <ul> <li>the process terminates on its own</li>
     *      <li>the hung-process watchdog mechanism forcibly terminates the
     *          process (see {@linkplain #scheduleKill})</li>
     * @return The process exit code.
     * @throws IOException if printing diagnostics fails
     */
    public int complete()
            throws IOException {
        return complete(Long.MAX_VALUE);         
    }

    /**
     * Waits for the process to terminate, forcibly terminating it if it
     * takes longer than the specified timeout.
     * <p>
     * This call will block until one of the following conditions are met:
     * <ul> <li>the process terminates on its own</li>
     *      <li>the timeout is exceeded, at which point the process is
     *          forcibly destroyed</li>
     *      <li>the hung-process watchdog mechanism forcibly terminates the
     *          process (see {@linkplain #scheduleKill})</li>
     * @param timeout the number of milliseconds to wait for the process
     *                to terminate normally before destroying it
     * @return The process exit code.
     * @throws IOException if printing diagnostics fails
     */
    public int complete(long timeout)
            throws IOException {
        long start = System.currentTimeMillis();
        Integer exitCode = null;
        while (exitCode == null) {
            try {
                exitCode = javaProcess.exitValue();
            } catch (IllegalThreadStateException itse) {
                // This exception means the process is running.
                if (System.currentTimeMillis() - start > timeout) {
                    javaProcess.destroy();
                }
                sleep(500);
            }
        }

        // Clean up
        killTask.cancel();
        joinWith(errSaver.thread);
        joinWith(outSaver.thread);
        cleanupProcess();
        printDiagnostics(exitCode.intValue());
        return exitCode.intValue();
    }
    
    /**
     * Cleans up the process, explicitly closing the streams associated with it.
     */
    private void cleanupProcess() {
        // Doing this is considered best practice.
        closeStream(javaProcess.getOutputStream());
        closeStream(javaProcess.getErrorStream());
        closeStream(javaProcess.getInputStream());
        javaProcess.destroy();
    }

    /**
     * Prints diagnostics to stdout/stderr if the process failed.
     *
     * @param exitCode the exit code of the spawned process
     * @throws IOException if writing to an output stream fails
     * @see #suppressOutput
     */
    private synchronized void printDiagnostics(int exitCode)
            throws IOException {
        // Always write the error, except when suppressed.
        ByteArrayOutputStream err = errSaver.stream;
        if (!suppressOutput && err.size() != 0) {
            System.err.println("START-SPAWNED:" + name + " ERROR OUTPUT:");
            err.writeTo(System.err);
            System.err.println("END-SPAWNED  :" + name + " ERROR OUTPUT:");
        }

        // Only write contents of stdout if it appears the server
        // failed in some way, or output is suppressed.
        ByteArrayOutputStream out = outSaver.stream;
        if (!suppressOutput && exitCode != 0 && out.size() != 0) {
            System.out.println("START-SPAWNED:" + name
                    + " STANDARD OUTPUT: exit code=" + exitCode);
            out.writeTo(System.out);
            System.out.println("END-SPAWNED  :" + name
                    + " STANDARD OUTPUT:");
        }
    }

    /** Joins up with the specified thread. */
    private void joinWith(Thread t) {
        try {
            t.join();
        } catch (InterruptedException ie) {
            // Ignore the interrupt. We want to make sure the process
            // terminates before returning, and we don't want to preserve
            // the interrupt flag because it causes Derby to shut down. These
            // are test requirements and don't apply for production code.
            // Print a notice to stdout.
            System.out.println(TAG + "Interrupted while joining " +
                    "with thread '" + t.toString() + "'");
        }
    }

    /**
     * Closes the specified stream, ignoring any exceptions.
     *
     * @param stream stream to close (may be {@code null})
     */
    private void closeStream(Object stream) {
        if (stream instanceof InputStream) {
            try {
                ((InputStream)stream).close();
            } catch (IOException ioe) {
                // Ignore exception on close
            }
        } else if (stream instanceof OutputStream) {
            try {
                ((OutputStream)stream).close();
            } catch (IOException ioe) {
                // Ignore exception on close
            }
        }
    }

    /**
     * Class holding references to a stream that receives the output from a
     * process and a thread that reads the process output and passes it on
     * to the stream.
     */
    private static class StreamSaver {
        final ByteArrayOutputStream stream;
        final Thread thread;
        StreamSaver(ByteArrayOutputStream stream, Thread thread) {
            this.stream = stream;
            this.thread = thread;
        }
    }

    /**
     * Creates and starts a stream saver that reads the specified input stream
     * in a separate stream.
     *
     * @param in input stream to read from
     * @param name name of the thread
     * @return A {@code StreamSaver} object.
     */
    private StreamSaver startStreamSaver(final InputStream in,
            final String name) {

        final ByteArrayOutputStream out = new ByteArrayOutputStream() {
            public void reset() {
                super.reset();
                new Throwable("WWW").printStackTrace(System.out);
            }

        };

        Thread streamReader = new Thread(new Runnable() {

            public void run() {
                try {
                    byte[] buffer = new byte[1024];
                    int read;
                    while ((read = in.read(buffer)) != -1) {
                        synchronized (SpawnedProcess.this) {
                            out.write(buffer, 0, read);
                        }
                    }

                } catch (IOException ioe) {
                    ioe.printStackTrace(new PrintStream(out, true));
                }
            }

        }, name);
        streamReader.setDaemon(true);
        streamReader.start();

        return new StreamSaver(out, streamReader);
    }

    /**
     * A task that will kill the specified process.
     *
     * @see #scheduleKill(java.lang.Process, java.lang.String) 
     */
    private static class ProcessKillerTask
        extends TimerTask {

        private final String name;
        private Process process;

        public ProcessKillerTask(Process process, String name) {
            this.process = process;
            this.name = name;
        }

        public synchronized boolean cancel() {
            // Since this task will usually be in the timer queue for a long
            // time, nullify the process reference on cancel to free resources.
            process = null;
            return super.cancel();
        }

        public synchronized void run() {
            // We may have just been cancelled 
            if (process == null) {
                return;
            }

            System.err.println("DEBUG: Destroying process '" + name + "'");
            process.destroy();
            int retriesAllowed = 10;
            while (retriesAllowed > 0) {
                try {
                    int exitCode = process.exitValue();
                    System.err.println("DEBUG: Destroyed process '" + name +
                            "', exit code is " + exitCode);
                    break;
                } catch (IllegalThreadStateException itse) {
                    // Sleep for a second and retry.
                    sleep(1000);
                    retriesAllowed--;
                }
            }
            if (retriesAllowed == 0) {
                System.err.println(
                        "DEBUG: Failed to destroy process '" + name + "'");
            } 
            process = null;
        }
    }

    /**
     * Return {@code true} if the subprocess {@code p} has exited within {@code
     * patience} milliseconds. Sleep {@code sleepInterval} between each check}.
     * Note: you still need to call one of the {@link #complete} overloads even
     * if using this method (which is optional). It can be used before trying
     * a {@link #jstack} call.
     *
     * @param patience the maximum milliseconds we want to wait for
     * @param sleepInterval sleep for this amount of milliseconds before trying
     *                      testing again if not already exited the first time
     *                      we check. If patience &lt;= sleepInterval we only
     *                      check once.
     * @return true if the process exited before our patience is up.
     * @throws java.lang.InterruptedException
     */
    @SuppressWarnings("SleepWhileInLoop")
    public boolean waitForExit(long patience, long sleepInterval)
            throws InterruptedException {
        boolean completed = false;
        while (!completed && patience > 0) {
            try {
                try {
                    javaProcess.exitValue();
                    completed = true;
                } catch (IllegalThreadStateException e) {
                    // try again after sleeping
                    Thread.sleep(sleepInterval);
                    patience = patience - sleepInterval;
                }
            } catch (InterruptedException e) {
                throw e;
            }
        }
        return completed;
    }


    /**
     * Return the jstack(1) dump of the process if possible.
     * It will only work if we are running with a full JDK, not a simple JRE.
     * It will not work on Windows, and just return an empty string.
     * @return jstack dump if possible
     * @throws PrivilegedActionException
     * @throws InterruptedException
     */
    public String jstack()
            throws PrivilegedActionException, InterruptedException{

        String output = "";

        if (!isWindowsPlatform() && !isIBMJVM()) {
            // Get the pid of the subprocess using reflection. Dirty,
            // for Unix there is a private field pid in the implementing
            // class.
            final int pid = getPid();
            final String execName = getJavaExecutableName().replace(
                    "jre" + File.separator + "bin" + File.separator + "java",
                    "bin" + File.separator + "jstack");
            final String[] arguments =
                    new String[]{Integer.toString(pid)};
            try {
                final Process p2 =
                        execJavaCmd(execName, null, arguments, null, false);
                final SpawnedProcess spawn2 = new SpawnedProcess(p2, "jstack");
                spawn2.suppressOutputOnComplete();
                // Close stdin of the process so that it stops
                // any waiting for it and exits (shouldn't matter for this test)
                p2.getOutputStream().close();
                final int exitCode2 = spawn2.complete(30000); // 30 seconds
                assertTrue(spawn2.getFailMessage("jstack failed: "),
                        exitCode2 == 0);
                output = spawn2.getFullServerOutput();
            } catch (IOException e) {
                output = "Tried to catch jstack of hanging subprocess but it "
                        + "failed (using JDK or JRE?): " + e;
            }
        }

        return output;
    }

    /**
     * Return the pid if on Unixen, or -1 on Windows (can't be obtained).
     * @return pid
     * @throws PrivilegedActionException
     */
    public int getPid() throws PrivilegedActionException {
        if (!isWindowsPlatform() && !isIBMJVM()) {
            return AccessController.doPrivileged(
                new PrivilegedExceptionAction<Integer>() {
                    @Override
                    public Integer run() throws IllegalAccessException,
                            NoSuchFieldException {
                        final Field f = javaProcess.getClass().
                                getDeclaredField("pid");
                        f.setAccessible(true);

                        return f.getInt(javaProcess);
                    }
                });
        } else {
            return -1;
        }
    }

}