File: NamingThreadPoolExecutor.java

package info (click to toggle)
libsis-base-java 18.09~pre1%2Bgit20180928.45fbd31%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 1,292 kB
  • sloc: java: 9,037; ansic: 813; xml: 160; sh: 139; makefile: 37
file content (329 lines) | stat: -rw-r--r-- 13,295 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
/*
 * Copyright 2007 - 2018 ETH Zuerich, CISD and SIS.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package ch.systemsx.cisd.base.namedthread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * A {@link ThreadPoolExecutor} that allows to attach names to the threads it manages. These names
 * can come either from {@link IRunnableNameProvider}s or {@link ICallableNameProvider}s, or, if
 * their standard counterparts are submitted, a default name is used.
 * 
 * @author Bernd Rinn
 */
public class NamingThreadPoolExecutor extends ThreadPoolExecutor
{

    /**
     * The default time (in milli-seconds) to keep threads alive that are above the core pool size.
     */
    public final static long DEFAULT_KEEP_ALIVE_TIME_MILLIS = 10000L;

    /**
     * Creates a new (caching) <tt>NamingThreadPoolExecutor</tt> with the given initial parameters.
     * This executor will create new threads as needed.
     * 
     * @param poolName the default name for new threads
     */
    public NamingThreadPoolExecutor(String poolName)
    {
        this(poolName, 0);
    }

    /**
     * Creates a new (caching) <tt>NamingThreadPoolExecutor</tt> with the given initial parameters.
     * This executor will create new threads as needed.
     * 
     * @param poolName The default name for new threads.
     * @param workQueueSize The size of the work queue (0 for no queue).
     * 
     */
    public NamingThreadPoolExecutor(String poolName, int workQueueSize)
    {
        super(1, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS,
                workQueueSize == 0 ? new SynchronousQueue<Runnable>()
                        : new LinkedBlockingQueue<Runnable>(workQueueSize),
                new NamingThreadFactory(poolName));
    }

    /**
     * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters.
     * 
     * @param poolName the default name for new threads
     * @param corePoolSize the number of threads to keep in the pool, even if they are idle.
     * @param maximumPoolSize the maximum number of threads to allow in the pool.
     * @param keepAliveTime when the number of threads is greater than the core, this is the maximum
     *            time that excess idle threads will wait for new tasks before terminating.
     * @param unit the time unit for the keepAliveTime argument.
     * @param workQueue the queue to use for holding tasks before they are executed. This queue will
     *            hold only the <tt>Runnable</tt> tasks submitted by the <tt>execute</tt> method.
     * @param handler the handler to use when execution is blocked because the thread bounds and
     *            queue capacities are reached.
     * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if
     *             maximumPoolSize less than or equal to zero, or if corePoolSize greater than
     *             maximumPoolSize.
     * @throws NullPointerException if <tt>workQueue</tt> or <tt>threadFactory</tt> or
     *             <tt>handler</tt> are null.
     */
    public NamingThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
            RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                new NamingThreadFactory(poolName), handler);
    }

    /**
     * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters.
     * 
     * @param poolName the default name for new threads
     * @param corePoolSize the number of threads to keep in the pool, even if they are idle.
     * @param maximumPoolSize the maximum number of threads to allow in the pool.
     * @param keepAliveTime when the number of threads is greater than the core, this is the maximum
     *            time that excess idle threads will wait for new tasks before terminating.
     * @param unit the time unit for the keepAliveTime argument.
     * @param workQueue the queue to use for holding tasks before they are executed. This queue will
     *            hold only the <tt>Runnable</tt> tasks submitted by the <tt>execute</tt> method.
     * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if
     *             maximumPoolSize less than or equal to zero, or if corePoolSize greater than
     *             maximumPoolSize.
     * @throws NullPointerException if <tt>workQueue</tt> or <tt>threadFactory</tt> are null.
     */
    public NamingThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                new NamingThreadFactory(poolName));
    }

    /**
     * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters.
     * 
     * @param corePoolSize the number of threads to keep in the pool, even if they are idle.
     * @param maximumPoolSize the maximum number of threads to allow in the pool.
     * @param keepAliveTime when the number of threads is greater than the core, this is the maximum
     *            time that excess idle threads will wait for new tasks before terminating.
     * @param unit the time unit for the keepAliveTime argument.
     * @param workQueue the queue to use for holding tasks before they are executed. This queue will
     *            hold only the <tt>Runnable</tt> tasks submitted by the <tt>execute</tt> method.
     * @param threadFactory the factory to use when the executor creates a new thread.
     * @param handler the handler to use when execution is blocked because the thread bounds and
     *            queue capacities are reached.
     * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if
     *             maximumPoolSize less than or equal to zero, or if corePoolSize greater than
     *             maximumPoolSize.
     * @throws NullPointerException if <tt>workQueue</tt> or <tt>threadFactory</tt> or
     *             <tt>handler</tt> are null.
     */
    public NamingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, NamingThreadFactory threadFactory,
            RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters.
     * 
     * @param corePoolSize the number of threads to keep in the pool, even if they are idle.
     * @param maximumPoolSize the maximum number of threads to allow in the pool.
     * @param keepAliveTime when the number of threads is greater than the core, this is the maximum
     *            time that excess idle threads will wait for new tasks before terminating.
     * @param unit the time unit for the keepAliveTime argument.
     * @param workQueue the queue to use for holding tasks before they are executed. This queue will
     *            hold only the <tt>Runnable</tt> tasks submitted by the <tt>execute</tt> method.
     * @param threadFactory the factory to use when the executor creates a new thread.
     * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if
     *             maximumPoolSize less than or equal to zero, or if corePoolSize greater than
     *             maximumPoolSize.
     * @throws NullPointerException if <tt>workQueue</tt> or <tt>threadFactory</tt> are null.
     */
    public NamingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, NamingThreadFactory threadFactory)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    /**
     * Sets the thread factory of this pool executor to daemon creation mode.
     * <p>
     * This method is supposed to be used in chaining mode, i.e.
     * 
     * <pre>
     * final ExecutorService executor = new NamingThreadPoolExecutor(&quot;name&quot;).daemonize();
     * </pre>
     * 
     * @return This class itself.
     */
    public NamingThreadPoolExecutor daemonize()
    {
        getThreadFactory().setCreateDaemonThreads(true);
        return this;
    }

    /**
     * Same as {@link #setCorePoolSize(int)}, but returns the object itself for chaining.
     */
    public NamingThreadPoolExecutor corePoolSize(int corePoolSize)
    {
        setCorePoolSize(corePoolSize);
        return this;
    }

    /**
     * Same as {@link #setMaximumPoolSize(int)}, but returns the object itself for chaining.
     */
    public NamingThreadPoolExecutor maximumPoolSize(int maximumPoolSize)
    {
        setMaximumPoolSize(maximumPoolSize);
        return this;
    }

    /**
     * Same as {@link #setKeepAliveTime(long, TimeUnit)}, but uses always
     * {@link TimeUnit#MILLISECONDS} and returns the object itself for chaining.
     */
    public NamingThreadPoolExecutor keepAliveTime(long keepAliveTimeMillis)
    {
        setKeepAliveTime(keepAliveTimeMillis, TimeUnit.MILLISECONDS);
        return this;
    }

    /**
     * If <var>addPoolName</var> is <code>true</code>, the threads will contain the pool name as the
     * first part of the thread names.
     */
    public NamingThreadPoolExecutor addPoolName(boolean addPoolName)
    {
        getThreadFactory().setAddPoolName(addPoolName);
        return this;
    }

    @Override
    public NamingThreadFactory getThreadFactory()
    {
        return (NamingThreadFactory) super.getThreadFactory();
    }

    /**
     * Sets the thread factory of this pool executor.
     */
    public void setThreadFactory(NamingThreadFactory threadFactory)
    {
        super.setThreadFactory(threadFactory);
    }

    /**
     * @deprecated Use {@link #setThreadFactory(NamingThreadFactory)} instead!
     */
    @Override
    @Deprecated
    public void setThreadFactory(ThreadFactory threadFactory)
    {
        if (threadFactory instanceof NamingThreadFactory == false)
        {
            throw new IllegalArgumentException("thread factory is of type '"
                    + threadFactory.getClass().getCanonicalName() + ", but needs to be of type "
                    + NamingThreadFactory.class.getCanonicalName());
        }
        super.setThreadFactory(threadFactory);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r)
    {
        if (r instanceof IRunnableNameProvider == false)
        {
            return;
        }
        final String runnableName = ((IRunnableNameProvider) r).getRunnableName();
        if (runnableName == null)
        {
            return;
        }
        if (r instanceof NamedFutureTask<?>)
        {
            ((NamedFutureTask<?>) r).setThread(t);
        }
        if (t instanceof PoolNameThread)
        {
            ((PoolNameThread) t).setRunnableName(runnableName);
        } else
        {
            t.setName(runnableName);
        }
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t)
    {
        if (r instanceof NamedFutureTask<?>)
        {
            ((NamedFutureTask<?>) r).restoreThreadName();
        }
        super.afterExecute(r, t);
    }

    @Override
    public Future<?> submit(Runnable task)
    {
        if (task == null)
        {
            throw new NullPointerException();
        }

        final FutureTask<Object> ftask = new NamedFutureTask<Object>(task, null);
        execute(ftask);
        return ftask;
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result)
    {
        if (task == null)
        {
            throw new NullPointerException();
        }

        final FutureTask<T> ftask = new NamedFutureTask<T>(task, result);
        execute(ftask);
        return ftask;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task)
    {
        if (task == null)
        {
            throw new NullPointerException();
        }
        final FutureTask<T> ftask = new NamedFutureTask<T>(task);
        execute(ftask);
        return ftask;
    }

}