1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
/* Copyright (c) 2009 Peter Troshin
*
* JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
*
* This library is free software; you can redistribute it and/or modify it under the terms of the
* Apache License version 2 as published by the Apache Software Foundation
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
* License for more details.
*
* A copy of the license is in apache_license.txt. It is also available here:
* @see: http://www.apache.org/licenses/LICENSE-2.0.txt
*
* Any republication or derived work distributed in source code form
* must include this copyright and license notice.
*/
package compbio.engine.local;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.log4j.Logger;
import compbio.engine.AsyncExecutor;
import compbio.engine.Configurator;
import compbio.engine.SubmissionManager;
import compbio.engine.client.ConfiguredExecutable;
import compbio.engine.client.EngineUtil;
import compbio.metadata.JobStatus;
import compbio.metadata.JobSubmissionException;
import compbio.metadata.ResultNotAvailableException;
public final class AsyncLocalRunner implements AsyncExecutor {
private static final Logger log = Logger.getLogger(AsyncLocalRunner.class);
@Override
public String getWorkDirectory(String jobId) {
return Configurator.getWorkDirectory(jobId);
}
@Override
public boolean cancelJob(String jobId) {
Future<ConfiguredExecutable<?>> future = SubmissionManager.getTask(jobId);
// The job has already finished or cancelled.
if (future == null) {
log.debug("Did not find future for local job "
+ jobId
+ " will not cancel it. Perhaps it has finished or cancelled already.");
return false;
}
LocalEngineUtil.cancelJob(future, getWorkDirectory(jobId));
return future.cancel(true);
}
@Override
public JobStatus getJobStatus(String jobId) {
Future<ConfiguredExecutable<?>> future = SubmissionManager.getTask(jobId);
if (future == null) {
return LocalEngineUtil.getRecordedJobStatus(jobId);
}
return LocalEngineUtil.getJobStatus(future);
}
@Override
public String submitJob(ConfiguredExecutable<?> executable)
throws JobSubmissionException {
if (executable == null) {
throw new NullPointerException("Executable expected!");
}
LocalRunner lrunner = new LocalRunner(executable);
lrunner.executeJob();
Future<ConfiguredExecutable<?>> future = lrunner.getFuture();
if (future == null) {
throw new RuntimeException("Future is NULL for executable " + executable);
}
SubmissionManager.addTask(executable, future);
return executable.getTaskId();
}
/**
*
* @param jobId
* @return true if all files were removed, false otherwise
*/
@Override
public boolean cleanup(String jobId) {
Future<ConfiguredExecutable<?>> future = SubmissionManager
.getTask(jobId);
ConfiguredExecutable<?> cexec = null;
try {
cexec = future.get();
} catch (InterruptedException e) {
log.error("Cannot clean up as calculation was not completed!" + e.getLocalizedMessage());
} catch (ExecutionException e) {
log.error("Cannot clean up due to ExecutionException " + e.getLocalizedMessage());
}
if (cexec == null) {
return false;
}
return LocalEngineUtil.cleanup(cexec);
}
@Override
public ConfiguredExecutable<?> getResults(String taskId)
throws ResultNotAvailableException {
if (!EngineUtil.isValidJobId(taskId)) {
// TODO should I be throwing something else?
throw new IllegalArgumentException(taskId);
}
Future<ConfiguredExecutable<?>> futureExec = SubmissionManager
.getTask(taskId);
if (futureExec == null) {
// If task was not find in the list of jobs, than it must have been
// collected already
// Resurrect the job to find out there the output is
ConfiguredExecutable<?> exec = EngineUtil.loadExecutable(taskId);
return exec;
}
return LocalEngineUtil.getResults(futureExec, taskId);
}
}
|