File: Configurator.java

package info (click to toggle)
libjaba-client-java 2.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 2,052 kB
  • sloc: java: 17,308; makefile: 12
file content (275 lines) | stat: -rw-r--r-- 11,463 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
/* Copyright (c) 2009 Peter Troshin
 *  
 *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     
 * 
 *  This library is free software; you can redistribute it and/or modify it under the terms of the
 *  Apache License version 2 as published by the Apache Software Foundation
 * 
 *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
 *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache 
 *  License for more details.
 * 
 *  A copy of the license is in apache_license.txt. It is also available here:
 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
 * 
 * Any republication or derived work distributed in source code form
 * must include this copyright and license notice.
 */

package compbio.engine;

import java.io.File;
import java.security.InvalidParameterException;
import java.util.List;

import org.apache.log4j.Logger;

import compbio.data.sequence.FastaSequence;
import compbio.engine.client.ConfExecutable;
import compbio.engine.client.ConfiguredExecutable;
import compbio.engine.client.Executable;
import compbio.engine.client.PathValidator;
import compbio.engine.client.EngineUtil;
import compbio.engine.conf.DirectoryManager;
import compbio.engine.conf.PropertyHelperManager;
import compbio.engine.local.AsyncLocalRunner;
import compbio.engine.local.LocalRunner;
import compbio.metadata.JobSubmissionException;
import compbio.util.PropertyHelper;
import compbio.util.SysPrefs;
import compbio.util.Util;

public class Configurator {

	private static Logger log = Logger.getLogger(Configurator.class);
	private static final PropertyHelper ph = PropertyHelperManager.getPropertyHelper();

	public static final boolean IS_LOCAL_ENGINE_ENABLED = initBooleanValue("engine.local.enable");
	public static final boolean IS_CLUSTER_ENGINE_ENABLED = initBooleanValue("engine.cluster.enable");
	public final static String LOCAL_WORK_DIRECTORY = initLocalDirectory();
	public final static String CLUSTER_WORK_DIRECTORY = initClusterWorkDirectory();

	private static boolean initBooleanValue(String key) {
		assert key != null;
		String status = ph.getProperty(key);
		log.debug("Loading property: " + key + " with value: " + status);
		if (Util.isEmpty(status)) {
			return false;
		}
		return new Boolean(status.trim()).booleanValue();
	}

	private static String initClusterWorkDirectory() {
		String tmpDir = null;
		if (IS_CLUSTER_ENGINE_ENABLED) {
			tmpDir = ph.getProperty("cluster.tmp.directory");
			if (!Util.isEmpty(tmpDir)) {
				tmpDir = tmpDir.trim();
			} else {
				throw new RuntimeException("Cluster work directory must be provided! ");
			}
			if (LOCAL_WORK_DIRECTORY != null && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) {
				throw new InvalidParameterException("Cluster engine output directory must be different of that for local engine!");
			}
		}
		return tmpDir;
	}

	private static String initLocalDirectory() {
		String tmp_dir = ph.getProperty("local.tmp.directory");
		// Use system temporary directory if local.tmp.directory is not defined
		if (Util.isEmpty(tmp_dir)) {
			tmp_dir = SysPrefs.getSystemTmpDir();
			log.debug("local.tmp.directory is not defined using system tmp: " + tmp_dir);
		}
		if (!PathValidator.isAbsolutePath(tmp_dir)) {
			log.debug("local.tmp.directory path is relative! " + tmp_dir);
			tmp_dir = EngineUtil.convertToAbsolute(tmp_dir);
			log.debug("local.tmp.directory path changed to absolute: " + tmp_dir);
		}
		return tmp_dir.trim();
	}

	/**
	 * Depending on the values defined in the properties
	 * (engine.cluster.enable=true and engine.local.enable=true) return either
	 * Cluster job submission engine {@link #JobRunner} or local job submission
	 * engine {@link #LocalRunner} If both engines enabled than ask
	 * {@link LoadBalancer} for an engine. This method will fall back and return
	 * local engine if
	 * 
	 * 1) No engines are defined in the properties or they have been defined incorrectly
	 * 
	 * 2) Execution environment is Windows as the system cannot really run
	 * cluster submission from windows
	 * 
	 * @param executable
	 * @return SyncExecutor backed up by either cluster or local engines
	 * @throws JobSubmissionException
	 */
	static Executable.ExecProvider getExecProvider(ConfiguredExecutable<?> executable, List<FastaSequence> dataSet)
			throws JobSubmissionException {
		// Look where executable claims to be executed
		Executable.ExecProvider provider = executable.getSupportedRuntimes();
		if (!IS_CLUSTER_ENGINE_ENABLED && !IS_LOCAL_ENGINE_ENABLED) {
			// Both engines disabled!
			throw new RuntimeException("Both engines are disabled! "
					+ "Check conf/Engine.cluster.properties and conf/Engine.local.properties. At least one engine must be enabled!");
		}
		if (provider == Executable.ExecProvider.Local) {
			if (IS_LOCAL_ENGINE_ENABLED) {
				return Executable.ExecProvider.Local;
			} else {
				throw new JobSubmissionException("Executable can be executed only on locally, but local engine is disabled!");
			}
		}
		if (provider == Executable.ExecProvider.Cluster) {
			if (IS_CLUSTER_ENGINE_ENABLED) {
				return Executable.ExecProvider.Cluster;
			} else {
				throw new JobSubmissionException("Executable can be executed only on the cluster, but cluster engine is disabled!");
			}
		}
		// We are here if executable can be executed on both Cluster and Local
		// engines i.e. provider = Any
		// If we still here executable supports All exec environments
		// Check whether we work on windows
		if (SysPrefs.isWindows) {
			// no matter what the settings are, we cannot send tasks to the
			// cluster from windows env
			return Executable.ExecProvider.Local;
		}
		// Now if both engines are configured that load balance them
		if (IS_CLUSTER_ENGINE_ENABLED && IS_LOCAL_ENGINE_ENABLED) {
			// If the dataset is NULL than base a decision on local engine load
			// only
			if (dataSet == null) {
				return LoadBalancer.getEngine(executable);
			}
			// If the dataset is provided, consider it
			// This should be the main root for any load balancing
			// configurations
			return LoadBalancer.getEngine(executable, dataSet);
		} else if (IS_CLUSTER_ENGINE_ENABLED) {
			return Executable.ExecProvider.Cluster;
		}
		// If we are here, than local engine is enabled or one of the two will
		// happen (1) exception is thrown if both engines are disabled
		// or (2) previous statement will return the cluster engine
		return Executable.ExecProvider.Local;
	}

	public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable) throws JobSubmissionException {

		ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));
		Executable.ExecProvider provider = getExecProvider(confExec, null);
		confExec.setExecProvider(provider);
		setupWorkDirectory(confExec, provider);
		return confExec;
	}

	public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable, List<FastaSequence> dataSet)
			throws JobSubmissionException {

		ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));
		Executable.ExecProvider provider = getExecProvider(confExec, dataSet);
		confExec.setExecProvider(provider);
		setupWorkDirectory(confExec, provider);
		return confExec;
	}

	static <T> void setupWorkDirectory(ConfExecutable<T> confExec, Executable.ExecProvider provider) {
		assert provider != null && provider != Executable.ExecProvider.Any;
		String workDir = "";
		if (provider == Executable.ExecProvider.Local) {
			workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator + confExec.getTaskId();
		} else {
			workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator + confExec.getTaskId();
		}
		// Create working directory for the task
		File wdir = new File(workDir);
		wdir.mkdir();
		log.info("Creating working directory for the task in: " + wdir.getAbsolutePath());
		// Tell the executable where to get the results
		confExec.setWorkDirectory(workDir);
	}

	public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable, Executable.ExecProvider provider)
			throws JobSubmissionException {
		if (executable == null) {
			throw new InvalidParameterException("Executable must be provided!");
		}
		ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));
		if (provider == Executable.ExecProvider.Cluster && !IS_CLUSTER_ENGINE_ENABLED) {
			throw new JobSubmissionException("Cluster engine is disabled or not configured!");
		}
		if (provider == Executable.ExecProvider.Local && !IS_LOCAL_ENGINE_ENABLED) {
			throw new JobSubmissionException("Local engine is disabled or not configured!");
		}
		confExec.setExecProvider(provider);
		setupWorkDirectory(confExec, provider);
		return confExec;
	}

	public static AsyncExecutor getAsyncEngine(ConfiguredExecutable<?> executable, Executable.ExecProvider provider) {

		assert provider != Executable.ExecProvider.Any && provider != null;
		return new AsyncLocalRunner();
	}

	public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable, Executable.ExecProvider provider)
			throws JobSubmissionException {

		assert provider != Executable.ExecProvider.Any && provider != null;
		return new LocalRunner(executable);
	}

	public static AsyncExecutor getAsyncEngine(ConfiguredExecutable<?> executable) {
		return new AsyncLocalRunner();
	}

	public static AsyncExecutor getAsyncEngine(String taskId) {
		return new AsyncLocalRunner();
	}

	public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable) throws JobSubmissionException {
		return new LocalRunner(executable);
	}

	static boolean isTargetedForLocalExecution(ConfiguredExecutable<?> executable) {
		// In the uncommon case that the cluster and local execution temporary
		// directories are the same, in this case the method return true anyway

		/*
		 * Could have done this String taskDir = executable.getWorkDirectory();
		 * int idx = taskDir.lastIndexOf(File.separator); String workDir =
		 * taskDir.substring(0, idx); assert
		 * !(workDir.equals(CLUSTER_WORK_DIRECTORY) && workDir
		 * .equals(LOCAL_WORK_DIRECTORY)) :
		 * "Could not determine executable target!"; if
		 * (workDir.equals(LOCAL_WORK_DIRECTORY)) { return true; }
		 */
		String taskDir = executable.getTaskId();
		return isLocal(taskDir);
	}

	static boolean isLocal(String taskId) {
		if (Util.isEmpty(taskId)) {
			throw new NullPointerException("TaskId must be provided!");
		}
		if (!EngineUtil.isValidJobId(taskId)) {
			throw new InvalidParameterException("TaskId is not valid!");
		}
		return !taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX);
	}

	public static String getWorkDirectory(String taskId) {
		assert !compbio.util.Util.isEmpty(taskId);
		assert EngineUtil.isValidJobId(taskId);
		log.info("Getting workdirectory for TaskID: " + taskId);
		if (taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX)) {
			return CLUSTER_WORK_DIRECTORY + File.separator + taskId;
		}
		return LOCAL_WORK_DIRECTORY + File.separator + taskId;
	}
}