File: LocalExecutorService.java

package info (click to toggle)
libjaba-client-java 2.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 2,052 kB
  • sloc: java: 17,308; makefile: 12
file content (165 lines) | stat: -rw-r--r-- 5,434 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/* Copyright (c) 2009 Peter Troshin
 *  
 *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     
 * 
 *  This library is free software; you can redistribute it and/or modify it under the terms of the
 *  Apache License version 2 as published by the Apache Software Foundation
 * 
 *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
 *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache 
 *  License for more details.
 * 
 *  A copy of the license is in apache_license.txt. It is also available here:
 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
 * 
 * Any republication or derived work distributed in source code form
 * must include this copyright and license notice.
 */

package compbio.engine.local;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import compbio.engine.conf.PropertyHelperManager;
import compbio.util.PropertyHelper;
import compbio.util.Util;

public final class LocalExecutorService extends ThreadPoolExecutor {

	private final static Logger log = Logger.getLogger(LocalExecutorService.class);
	private final static String threadNumPropName = "engine.local.thread.number";

	private static LocalExecutorService INSTANCE = null;
	private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
	private final AtomicLong numTasks = new AtomicLong();
	private final AtomicLong totalTime = new AtomicLong();

	private LocalExecutorService(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	/**
	 * This method returns the single instance of CachedThreadPoolExecutor which
	 * it cashes internally
	 * 
	 * @return the instance
	 */
	public synchronized static LocalExecutorService getExecutor() {
		if (INSTANCE == null) {
			INSTANCE = init();
		}
		log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());
		return INSTANCE;
	}

	private static LocalExecutorService init() {
		int procNum = Runtime.getRuntime().availableProcessors();
		// Add safety net if this function is unavailable
		if (procNum < 1) {
			procNum = 1;
		}
		if (procNum > 4) {
			procNum = procNum - 1; // leave one processor for overhead
			// management
		}
		PropertyHelper ph = PropertyHelperManager.getPropertyHelper();
		String threadNum = ph.getProperty(threadNumPropName);
		log.debug("Thread number for local execution from conf file is "
				+ threadNum);
		int threads = 0;
		if (!Util.isEmpty(threadNum)) {
			try {
				threads = Integer.parseInt(threadNum);
				if (threads > 1 && threads < procNum * 2) {
					procNum = threads;
				}
			} catch (NumberFormatException e) {
				log.error("Cannot understand " + threadNumPropName
						+ " property. Expecting whole number, but given "
						+ threadNum);
			}
		}

		log.debug("Constructing thread pool for executor with " + procNum
				+ " thread(s)");
		LocalExecutorService executor = new LocalExecutorService(procNum,
				procNum, 0L, TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<Runnable>());
		// Make sure that the executor is going to be properly closed
		Runtime.getRuntime().addShutdownHook(new Thread() {

			@Override
			public void run() {
				shutDown();
			}
		});
		return executor;
	}

	/**
	 * This stops all executing processes via interruption. Thus it is vital
	 * that all processes that use this service respond to interruption
	 * 
	 * Stops internal executor service which captures streams of native
	 * executables. This method is intended for stopping service if deployed in
	 * the web application context. There is NO NEED of using this method
	 * otherwise as the executor service is taken care of internally.
	 */
	public static void shutDown() {
		if (INSTANCE != null) {
			INSTANCE.shutdownNow();
		}
	}

	/**
	 * If the Executor queue is empty
	 * 
	 * @return true is not all threads are busy, false otherwise
	 */
	public boolean canAcceptMoreWork() {
		// alternative to use: INSTANCE.getQueue().isEmpty(); - but this will
		// inevitably put the last task to the queue
		return INSTANCE.getMaximumPoolSize() > INSTANCE.getActiveCount();
	}

	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		// class of r is java.util.concurrent.FutureTask
		log.info(String.format("Thread %s: start %s", t, r));
		startTime.set(System.nanoTime());
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		try {
			long endTime = System.nanoTime();
			long taskTime = endTime - startTime.get();
			numTasks.incrementAndGet();
			totalTime.addAndGet(taskTime);
			log.info(String.format("Throwable %s: end %s, time=%dns", t, r,
					taskTime));
		} finally {
			super.afterExecute(r, t);
		}
	}

	@Override
	protected void terminated() {
		try {
			if (numTasks.get() != 0) {
				log.info(String.format("Terminated : avg time=%dns",
						totalTime.get() / numTasks.get()));
			}
		} finally {
			super.terminated();
		}
	}
}