Java/Threads/Thread Pool
Содержание
- 1 Create a new thread for the thread pool. The create thread will be a daemon thread.
- 2 Defining a thread for a thread pool
- 3 JDK1.5 provides a mechanism to create a pool a scheduled task
- 4 Simple object pool. Based on ThreadPool and few other classes
- 5 Simple pool of Threads
- 6 Simple thread pool. A task is executed by obtaining a thread from the pool
- 7 Thread Cache
- 8 Thread pool
- 9 Thread Pool 2
- 10 Thread pool demo
- 11 Thread Pools 1
- 12 Thread Pools 2
- 13 Thread Pool Test
- 14 Very basic implementation of a thread pool
- 15 Worker thread pool
Create a new thread for the thread pool. The create thread will be a daemon thread.
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* Simple implementation of a ThreadFactory that
* marks all of the threads as daemon threads.
*/
public class DaemonThreadFactory implements ThreadFactory {
private final ThreadFactory factory = Executors.defaultThreadFactory();
/**
* Create a new thread for the thread pool. The create
* thread will be a daemon thread.
*
* @param r The runnable used by the thread pool.
*
* @return The newly created thread.
*/
public Thread newThread(Runnable r) {
Thread t = factory.newThread(r);
if (!t.isDaemon()) {
t.setDaemon(true);
}
return t;
}
}
Defining a thread for a thread pool
import java.util.LinkedList;
class ThreadTask extends Thread {
private ThreadPool pool;
public ThreadTask(ThreadPool thePool) {
pool = thePool;
}
public void run() {
while (true) {
// blocks until job
Runnable job = pool.getNext();
try {
job.run();
} catch (Exception e) {
// Ignore exceptions thrown from jobs
System.err.println("Job exception: " + e);
}
}
}
}
public class ThreadPool {
private LinkedList tasks = new LinkedList();
public ThreadPool(int size) {
for (int i = 0; i < size; i++) {
Thread thread = new ThreadTask(this);
thread.start();
}
}
public void run(Runnable task) {
synchronized (tasks) {
tasks.addLast(task);
tasks.notify();
}
}
public Runnable getNext() {
Runnable returnVal = null;
synchronized (tasks) {
while (tasks.isEmpty()) {
try {
tasks.wait();
} catch (InterruptedException ex) {
System.err.println("Interrupted");
}
}
returnVal = (Runnable) tasks.removeFirst();
}
return returnVal;
}
public static void main(String args[]) {
final String message[] = { "Java", "Source", "and", "Support" };
ThreadPool pool = new ThreadPool(message.length / 2);
for (int i = 0, n = message.length; i < n; i++) {
final int innerI = i;
Runnable runner = new Runnable() {
public void run() {
for (int j = 0; j < 25; j++) {
System.out.println("j: " + j + ": " + message[innerI]);
}
}
};
pool.run(runner);
}
}
}
JDK1.5 provides a mechanism to create a pool a scheduled task
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main{
public static void main(String args[]) {
ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(5);
stpe.scheduleAtFixedRate(new Job1(), 0, 5, TimeUnit.SECONDS);
stpe.scheduleAtFixedRate(new Job2(), 1, 2, TimeUnit.SECONDS);
}
}
class Job1 implements Runnable {
public void run() {
System.out.println("Job 1");
}
}
class Job2 implements Runnable {
public void run() {
for(int i=-99;i<99;i++){
System.out.println(i);
}
}
}
Simple object pool. Based on ThreadPool and few other classes
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Simple object pool. Based on ThreadPool and few other classes
*
* The pool will ignore overflow and return null if empty.
*
* @author Gal Shachor
* @author Costin
* @author
* @version $Id: SimplePool.java 463298 2006-10-12 16:10:32Z henning $
*/
public final class SimplePool {
/*
* Where the objects are held.
*/
private Object pool[];
/**
* max amount of objects to be managed set via CTOR
*/
private int max;
/**
* index of previous to next free slot
*/
private int current = -1;
/**
* @param max
*/
public SimplePool(int max) {
this.max = max;
pool = new Object[max];
}
/**
* Add the object to the pool, silent nothing if the pool is full
*
* @param o
*/
public void put(Object o) {
int idx = -1;
synchronized (this) {
/*
* if we aren"t full
*/
if (current < max - 1) {
/*
* then increment the current index.
*/
idx = ++current;
}
if (idx >= 0) {
pool[idx] = o;
}
}
}
/**
* Get an object from the pool, null if the pool is empty.
*
* @return The object from the pool.
*/
public Object get() {
synchronized (this) {
/*
* if we have any in the pool
*/
if (current >= 0) {
/*
* remove the current one
*/
Object o = pool[current];
pool[current] = null;
current--;
return o;
}
}
return null;
}
/**
* Return the size of the pool
*
* @return The pool size.
*/
public int getMax() {
return max;
}
/**
* for testing purposes, so we can examine the pool
*
* @return Array of Objects in the pool.
*/
Object[] getPool() {
return pool;
}
}
Simple pool of Threads
/*
* Copyright (c) 1998 - 2005 Versant Corporation
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Versant Corporation - initial API and implementation
*/
import java.util.LinkedList;
import java.util.Iterator;
import java.util.HashSet;
/**
* Simple pool of Threads.
*/
public class ThreadPool {
private String name;
private HashSet active = new HashSet();
private LinkedList idle = new LinkedList();
private int idleCount;
private int maxActive = 10;
private int maxIdle = 3;
private int lastThreadId;
private boolean closed;
public ThreadPool(String name) {
this.name = name;
}
public int getMaxActive() {
return maxActive;
}
public void setMaxActive(int maxActive) {
this.maxActive = maxActive;
}
public int getMaxIdle() {
return maxIdle;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public synchronized int getActiveCount() {
return active.size();
}
public synchronized int getIdleCount() {
return idleCount;
}
/**
* Close the pool, stopping all threads. This does not wait for the
* threads to actually stop before returning. This is a NOP if the
* pool has already been closed.
*/
public synchronized void close() {
if (closed) {
return;
}
closed = true;
for (Iterator i = idle.iterator(); i.hasNext(); ) {
Worker w = (Worker)i.next();
w.terminate();
}
idle = null;
idleCount = 0;
for (Iterator i = active.iterator(); i.hasNext(); ) {
Worker w = (Worker)i.next();
w.terminate();
}
active = null;
}
/**
* Executed runnable using a Thread from the pool. This will block for
* timeoutMs and forever if this is 0. Returns true if the task is
* being executed (i.e. a Thread was available) or false if not (i.e.
* pool full).
*/
public synchronized boolean execute(Runnable runnable, int timeoutMs) {
if (closed) {
throw new IllegalStateException("Pool has been closed");
}
Worker t;
if (idleCount == 0) {
for (; isFull(); ) {
try {
wait(timeoutMs);
if (isFull()) {
return false;
}
} catch (InterruptedException e) {
// ignore
}
}
t = new Worker();
} else {
t = (Worker)idle.removeFirst();
--idleCount;
}
active.add(t);
t.execute(runnable);
return true;
}
protected boolean isFull() {
return active.size() >= maxActive;
}
private synchronized void finishedWork(Worker t) {
if (!closed) {
active.remove(t);
if (idleCount >= maxIdle) {
t.terminate();
} else {
idle.addLast(t);
++idleCount;
}
}
}
private class Worker extends Thread {
private boolean stopFlag;
private Runnable runnable;
public Worker() {
super(name + " " + ++lastThreadId);
setDaemon(true);
}
/**
* Executed runnable.
*/
public void execute(Runnable runnable) {
this.runnable = runnable;
if (!isAlive()) {
start();
} else {
synchronized (this) {
notify();
}
}
}
/**
* Stop this thread as soon as possible.
*/
public void terminate() {
stopFlag = true;
interrupt();
}
public void run() {
for (; !stopFlag; ) {
try {
runnable.run();
} catch (Throwable e) {
if (e instanceof ThreadDeath) {
throw (ThreadDeath)e;
}
}
runnable = null;
finishedWork(this);
if (stopFlag) break;
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
// ignore
}
}
}
}
}
}
Simple thread pool. A task is executed by obtaining a thread from the pool
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.List;
/** Simple thread pool. A task is executed by obtaining a thread from
* the pool
*/
public class ThreadPool {
/** The thread pool contains instances of {@link ThreadPool.Task}.
*/
public interface Task {
/** Performs the task.
* @throws Throwable The task failed, and the worker thread won"t be used again.
*/
void run() throws Throwable;
}
/** A task, which may be interrupted, if the pool is shutting down.
*/
public interface InterruptableTask extends Task {
/** Interrupts the task.
* @throws Throwable Shutting down the task failed.
*/
void shutdown() throws Throwable;
}
private class Poolable {
private boolean shuttingDown;
private Task task;
private Thread thread;
Poolable(ThreadGroup pGroup, int pNum) {
thread = new Thread(pGroup, pGroup.getName() + "-" + pNum){
public void run() {
while (!isShuttingDown()) {
final Task t = getTask();
if (t == null) {
try {
synchronized (this) {
if (!isShuttingDown() && getTask() == null) {
wait();
}
}
} catch (InterruptedException e) {
// Do nothing
}
} else {
try {
t.run();
resetTask();
repool(Poolable.this);
} catch (Throwable e) {
discard(Poolable.this);
resetTask();
}
}
}
}
};
thread.start();
}
synchronized void shutdown() {
shuttingDown = true;
final Task t = getTask();
if (t != null && t instanceof InterruptableTask) {
try {
((InterruptableTask) t).shutdown();
} catch (Throwable th) {
// Ignore me
}
}
task = null;
synchronized (thread) {
thread.notify();
}
}
private synchronized boolean isShuttingDown() { return shuttingDown; }
String getName() { return thread.getName(); }
private synchronized Task getTask() {
return task;
}
private synchronized void resetTask() {
task = null;
}
synchronized void start(Task pTask) {
task = pTask;
synchronized (thread) {
thread.notify();
}
}
}
private final ThreadGroup threadGroup;
private final int maxSize;
private final List waitingThreads = new ArrayList();
private final List runningThreads = new ArrayList();
private final List waitingTasks = new ArrayList();
private int num;
/** Creates a new instance.
* @param pMaxSize Maximum number of concurrent threads.
* @param pName Thread group name.
*/
public ThreadPool(int pMaxSize, String pName) {
maxSize = pMaxSize;
threadGroup = new ThreadGroup(pName);
}
synchronized void discard(Poolable pPoolable) {
pPoolable.shutdown();
runningThreads.remove(pPoolable);
waitingThreads.remove(pPoolable);
}
synchronized void repool(Poolable pPoolable) {
if (runningThreads.remove(pPoolable)) {
if (maxSize != 0 && runningThreads.size() + waitingThreads.size() >= maxSize) {
discard(pPoolable);
} else {
waitingThreads.add(pPoolable);
if (waitingTasks.size() > 0) {
Task task = (Task) waitingTasks.remove(waitingTasks.size() - 1);
startTask(task);
}
}
} else {
discard(pPoolable);
}
}
/** Starts a task immediately.
* @param pTask The task being started.
* @return True, if the task could be started immediately. False, if
* the maxmimum number of concurrent tasks was exceeded. If so, you
* might consider to use the {@link #addTask(ThreadPool.Task)} method instead.
*/
public synchronized boolean startTask(Task pTask) {
if (maxSize != 0 && runningThreads.size() >= maxSize) {
return false;
}
Poolable poolable;
if (waitingThreads.size() > 0) {
poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);
} else {
poolable = new Poolable(threadGroup, num++);
}
runningThreads.add(poolable);
poolable.start(pTask);
return true;
}
/** Adds a task for immediate or deferred execution.
* @param pTask The task being added.
* @return True, if the task was started immediately. False, if
* the task will be executed later.
*/
public synchronized boolean addTask(Task pTask) {
if (startTask(pTask)) {
return true;
}
waitingTasks.add(pTask);
return false;
}
/** Closes the pool.
*/
public synchronized void shutdown() {
while (!waitingThreads.isEmpty()) {
Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);
poolable.shutdown();
}
while (!runningThreads.isEmpty()) {
Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size()-1);
poolable.shutdown();
}
}
/** Returns the maximum number of concurrent threads.
* @return Maximum number of threads.
*/
public int getMaxThreads() { return maxSize; }
/** Returns the number of threads, which have actually been created,
* as opposed to the number of currently running threads.
*/
public synchronized int getNumThreads() { return num; }
}
Thread Cache
// ThreadCache.java
// $Id: ThreadCache.java,v 1.16 2000/08/16 21:37:58 ylafon Exp $
// (c) COPYRIGHT MIT and INRIA, 1996-1997.
// Please first read the full copyright statement in file COPYRIGHT.html
class CachedThread extends Thread {
Runnable runner = null;
boolean alive = true;
ThreadCache cache = null;
CachedThread next = null;
CachedThread prev = null;
boolean terminated = false;
boolean started = false;
boolean firstime = true;
synchronized boolean isTerminated() {
boolean ret = terminated;
terminated = true;
return ret;
}
synchronized Runnable waitForRunner() {
boolean to = false;
while ( alive ) {
// Is a runner available ?
if ( runner != null ) {
Runnable torun = runner;
firstime = false;
runner = null;
return torun;
} else if ( firstime ) {
// This thread will not be declared free until it runs once:
try {
wait();
} catch (InterruptedException ex) {
}
} else if ( alive = cache.isFree(this, to) ) {
// Notify the cache that we are free, and continue if allowed:
try {
int idleto = cache.getIdleTimeout();
to = false;
if ( idleto > 0 ) {
wait(idleto);
to = (runner == null);
} else {
wait();
}
} catch (InterruptedException ex) {
}
}
}
return null;
}
synchronized void kill() {
alive = false;
notify();
}
synchronized boolean wakeup(Runnable runnable) {
if ( alive ) {
runner = runnable;
if ( ! started )
this.start();
notify();
return true;
} else {
return false;
}
}
public synchronized void start() {
super.start();
this.started = true;
}
public void run() {
try {
while ( true ) {
// Wait for a runner:
Runnable torun = waitForRunner();
// If runner, run:
if ( torun != null )
torun.run();
// If dead, stop
if ( ! alive )
break;
}
} finally {
cache.isDead(this);
}
}
CachedThread(ThreadCache cache, int id) {
super(cache.getThreadGroup(), cache.getThreadGroup().getName()+":"+id);
this.cache = cache;
setPriority(cache.getThreadPriority());
setDaemon(true);
}
}
public class ThreadCache {
private static final boolean debug = false;
/**
* Default number of cached threads.
*/
private static final int DEFAULT_CACHESIZE = 5;
/**
* Has this thread cache been initialized ?
*/
protected boolean inited = false;
/**
* The thread group for this thread cache.
*/
protected ThreadGroup group = null;
/**
* Number of cached threads.
*/
protected int cachesize = DEFAULT_CACHESIZE;
/**
* Number of created threads.
*/
protected int threadcount = 0;
/**
* Uniq thread identifier within this ThreadCache instance.
*/
protected int threadid = 0;
/**
* Number of idle threads to always maintain alive.
*/
protected int idlethreads = 0;
/**
* Should we queue thread requests, rather then creating new threads.
*/
protected boolean growasneeded = false;
/**
* Number of used threads
*/
protected int usedthreads = 0;
/**
* List of free threads.
*/
protected CachedThread freelist = null;
protected CachedThread freetail = null;
/**
* The idle timeout, for a thread to wait before being killed.
* Defaults to <strong>5000</strong> milliseconds.
*/
protected int idletimeout = 5000;
/**
* Cached thread priority.
*/
protected int threadpriority = 5;
/**
* Get the idle timeout value for this cache.
* @return The idletimeout value, or negative if no timeout applies.
*/
synchronized final int getIdleTimeout() {
return (threadcount <= idlethreads) ? -1 : idletimeout;
}
/**
* The given thread is about to be declared free.
* @return A boolean, <strong>true</strong> if the thread is to continue
* running, <strong>false</strong> if the thread should stop.
*/
final synchronized boolean isFree(CachedThread t, boolean timedout) {
if ( timedout && (threadcount > idlethreads) ) {
if ( ! t.isTerminated() ) {
threadcount--;
usedthreads--;
notifyAll();
}
return false;
} else if ( threadcount <= cachesize ) {
t.prev = freetail;
if (freetail != null)
freetail.next = t;
freetail = t;
if (freelist == null)
freelist = t;
usedthreads--;
notifyAll();
return true;
} else {
if ( ! t.isTerminated() ) {
threadcount--;
usedthreads--;
notifyAll();
}
return false;
}
}
/**
* The given thread has terminated, cleanup any associated state.
* @param dead The dead CachedThread instance.
*/
final synchronized void isDead(CachedThread t) {
if ( debug )
System.out.println("** "+t+": is dead tc="+threadcount);
if ( ! t.isTerminated() ) {
threadcount--;
notifyAll();
}
}
/**
* Create a new thread within this thread cache.
* @return A new CachedThread instance.
*/
private synchronized CachedThread createThread() {
threadcount++;
threadid++;
return new CachedThread(this, threadid);
}
/**
* Allocate a new thread, as requested.
* @param waitp Should we wait until a thread is available ?
* @return A launched CachedThread instance, or <strong>null</strong> if
* unable to allocate a new thread, and <code>waitp</code> is <strong>
* false</strong>.
*/
protected synchronized CachedThread allocateThread(boolean waitp) {
CachedThread t = null;
while ( true ) {
if ( freelist != null ) {
if ( debug )
System.out.println("*** allocateThread: free thread");
t = freelist;
freelist = freelist.next;
if (freelist != null) {
freelist.prev = null;
} else {
freetail = null;
}
t.next = null;
break;
} else if ((threadcount < cachesize) || growasneeded) {
if ( debug )
System.out.println("*** create new thread.");
t = createThread();
break;
} else if ( waitp ) {
if ( debug )
System.out.println("*** wait for a thread.");
// Wait for a thread to become available
try {
wait();
} catch (InterruptedException ex) {
}
} else {
return null;
}
}
return t;
}
/**
* Set the thread cache size.
* This will also update the number of idle threads to maintain, if
* requested.
* @param cachesize The new thread cache size.
* @param update If <strong>true</strong> also update the number of
* threads to maintain idle.
*/
public synchronized void setCachesize(int cachesize, boolean update) {
this.cachesize = cachesize;
if ( update )
this.idlethreads = (cachesize>>1);
}
/**
* Set the thread cache size.
* Updaet the number of idle threads to keep alive.
* @param cachesize The new thread cache size.
*/
public void setCachesize(int cachesize) {
setCachesize(cachesize, true);
}
/**
* Enable/disable the thread cache to grow as needed.
* This flag should be turned on only if always getting a thread as fast
* as possible is critical.
* @param onoff The toggle.
*/
public void setGrowAsNeeded(boolean onoff) {
this.growasneeded = onoff;
}
/**
* Set all the cached threads priority.
* Changing the cached thread priority should be done before the thread
* cache is initialized, it will <em>not</em> affect already created
* threads.
* @param priority The new cachewd threads priority.
*/
public void setThreadPriority(int priority) {
threadpriority = priority;
}
/**
* Get the cached thread normal priority.
* @return Currently assigned cached thread priority.
*/
public int getThreadPriority() {
return threadpriority;
}
/**
* Set the idle timeout.
* The idle timeout value is used to eliminate threads that have remain
* idle for too long (although the thread cache will ensure that a
* decent minimal number of threads stay around).
* @param idletimeout The new idle timeout.
*/
public synchronized void setIdleTimeout(int idletimeout) {
this.idletimeout = idletimeout;
}
/**
* Request a thread to run on the given object.
* @param runnable The object to run with the allocated thread.
* @param waitp If <strong>true</strong> wait until a free thread is
* available, otherwise, return <strong>false</strong>.
* @return A boolean, <strong>true</strong> if a thread was successfully
* allocated for the given object, <strong>false</strong> otherwise.
*/
public boolean getThread(Runnable runnable, boolean waitp) {
if ( debug )
System.out.println("*** getting a thread for "+runnable);
if ( ! inited )
throw new RuntimeException("Uninitialized thread cache");
// Allocate and launch the thread:
while ( true ) {
CachedThread t = allocateThread(waitp);
if ( t != null ) {
if ( t.wakeup(runnable) ) {
synchronized (this) {
usedthreads++;
}
return true;
}
} else {
return false;
}
}
}
/**
* Get the ThreadGroup managed by this ThreadCache instance.
* @return A ThreadGroup instance.
*/
public ThreadGroup getThreadGroup() {
return group;
}
/**
* Wait until all the threads have finished their duty
*/
public synchronized void waitForCompletion() {
while (usedthreads > 0) {
if ( debug )
System.out.println("*** Waiting for "+usedthreads+ " threads");
try {
wait();
} catch (InterruptedException ex) {
}
}
}
/**
* Initialize the given thread cache.
* This two stage initialize method is done so that configuration
* of the thread cache can be done before any thread get actually
* created.
*/
public synchronized void initialize() {
CachedThread t = createThread();
freelist = t;
freetail = t;
t.next = null;
t.prev = null;
t.start();
for (int i = 1 ; i < idlethreads ; i++) {
t = createThread();
t.next = freelist;
t.prev = null;
freelist.prev = t;
freelist = t;
t.start();
}
inited = true;
}
/**
* Create a thread cache, whose threads are to be children of the group.
* @param group The thread group to which this thread cache is bound.
* @param nstart Number of thread to create in advance.
*/
public ThreadCache(ThreadGroup group) {
this.group = group;
}
/**
* Create a thread cache, after creating a new thread group.
* @param name The name of the thread group to create.
*/
public ThreadCache(String name) {
this(new ThreadGroup(name));
}
/**
* Create a thread cache, after creating a new thread group.
* @param parent The parent of the thread group to create.
* @param name The name of the thread group.
*/
public ThreadCache(ThreadGroup parent, String name) {
this(new ThreadGroup(parent, name));
}
}
Thread pool
/*
Java Threads, 3rd Edition
By Scott Oaks, Henry Wong
3rd Edition September 2004
ISBN: 0-596-00782-5
*/
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class CreateTest extends Thread {
static AtomicInteger nCalls;
static int target = 0;
static boolean done = false;
static Object lock = new Object();
public static void main(String[] args) {
target = 10000;
doTestCreate();
doTestPool(8);
doTestLoop();
target = Integer.parseInt(args[0]);
cleanGC();
Timestamp createTS = new Timestamp(TimeUnit.MICROSECONDS);
doTestCreate();
createTS.stop();
System.out.println("Create thread test took " + createTS);
cleanGC();
Timestamp pool8TS = new Timestamp(TimeUnit.MICROSECONDS);
doTestPool(8);
pool8TS.stop();
System.out.println("Pool test (8 threads) took " + pool8TS);
cleanGC();
Timestamp poolTS = new Timestamp(TimeUnit.MICROSECONDS);
doTestPool(1);
poolTS.stop();
System.out.println("Pool test (1 thread) took " + poolTS);
cleanGC();
Timestamp loopTS = new Timestamp(TimeUnit.MICROSECONDS);
doTestLoop();
loopTS.stop();
System.out.println("Loop test took " + loopTS);
double d = ((double) (createTS.elapsedTime() - loopTS.elapsedTime()))
/ target;
System.out.println("Creating a thread took " + d + " " + loopTS.units()
+ " per thread");
d = ((double) (createTS.elapsedTime() - poolTS.elapsedTime())) / target;
System.out.println("Using a thread pool (1 thread) saved " + d + " "
+ loopTS.units() + " per task");
d = ((double) (createTS.elapsedTime() - pool8TS.elapsedTime()))
/ target;
System.out.println("Using a thread pool (8 threads) saved " + d + " "
+ loopTS.units() + " per task");
d = ((double) (poolTS.elapsedTime() - loopTS.elapsedTime())) / target;
System.out.println("Thread pool overhead (1 thread) is " + d + " "
+ loopTS.units() + " per task");
d = ((double) (pool8TS.elapsedTime() - loopTS.elapsedTime())) / target;
System.out.println("Thread pool overhead (8 threads) is " + d + " "
+ loopTS.units() + " per task");
}
static void doTestLoop() {
nCalls = new AtomicInteger(0);
done = false;
for (int i = 0; i < target; i++)
work();
synchronized (lock) {
while (!done)
try {
lock.wait();
} catch (Exception e) {
}
}
}
static void doTestCreate() {
done = false;
nCalls = new AtomicInteger(0);
for (int i = 0; i < target; i++) {
Thread t = new CreateTest();
t.start();
}
synchronized (lock) {
while (!done)
try {
lock.wait();
} catch (Exception e) {
}
}
}
static void doTestPool(int nThreads) {
done = false;
nCalls = new AtomicInteger(0);
ThreadPoolExecutor tpe = new ThreadPoolExecutor(nThreads, nThreads,
50000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
Runnable r = new CreateTest();
for (int i = 0; i < target; i++) {
tpe.execute(r);
}
tpe.shutdown();
try {
tpe.awaitTermination(10000000L, TimeUnit.SECONDS);
} catch (Exception e) {}
} public void run() {
work();
}
public static void work() {
int n = nCalls.incrementAndGet();
if (n == target) {
synchronized (lock) {
done = true;
lock.notify();
}
}
}
public static void cleanGC() {
System.gc();
System.runFinalization();
System.gc();
}
}
Thread Pool 2
/*
*
* Copyright (c) 1997-1999 Scott Oaks and Henry Wong. All Rights Reserved.
*
* Permission to use, copy, modify, and distribute this software
* and its documentation for NON-COMMERCIAL purposes and
* without fee is hereby granted.
*
* This sample source code is provided for example only,
* on an unsupported, as-is basis.
*
* AUTHOR MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF
* THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
* TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
* PARTICULAR PURPOSE, OR NON-INFRINGEMENT. AUTHOR SHALL NOT BE LIABLE FOR
* ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR
* DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES.
*
* THIS SOFTWARE IS NOT DESIGNED OR INTENDED FOR USE OR RESALE AS ON-LINE
* CONTROL EQUIPMENT IN HAZARDOUS ENVIRONMENTS REQUIRING FAIL-SAFE
* PERFORMANCE, SUCH AS IN THE OPERATION OF NUCLEAR FACILITIES, AIRCRAFT
* NAVIGATION OR COMMUNICATION SYSTEMS, AIR TRAFFIC CONTROL, DIRECT LIFE
* SUPPORT MACHINES, OR WEAPONS SYSTEMS, IN WHICH THE FAILURE OF THE
* SOFTWARE COULD LEAD DIRECTLY TO DEATH, PERSONAL INJURY, OR SEVERE
* PHYSICAL OR ENVIRONMENTAL DAMAGE ("HIGH RISK ACTIVITIES"). AUTHOR
* SPECIFICALLY DISCLAIMS ANY EXPRESS OR IMPLIED WARRANTY OF FITNESS FOR
* HIGH RISK ACTIVITIES.
*/
import java.util.*;
public class ThreadPool {
class ThreadPoolRequest {
Runnable target;
Object lock;
ThreadPoolRequest(Runnable t, Object l) {
target = t;
lock = l;
}
}
class ThreadPoolThread extends Thread {
ThreadPool parent;
boolean shouldRun = true;
ThreadPoolThread(ThreadPool parent, int i) {
super("ThreadPoolThread " + i);
this.parent = parent;
}
public void run() {
ThreadPoolRequest obj = null;
while (shouldRun) {
try {
parent.cvFlag.getBusyFlag();
while (obj == null && shouldRun) {
try {
obj = (ThreadPoolRequest)
parent.objects.elementAt(0);
parent.objects.removeElementAt(0);
} catch (ArrayIndexOutOfBoundsException aiobe) {
obj = null;
} catch (ClassCastException cce) {
System.err.println("Unexpected data");
obj = null;
}
if (obj == null) {
try {
parent.cvAvailable.cvWait();
} catch (InterruptedException ie) {
return;
}
}
}
} finally {
parent.cvFlag.freeBusyFlag();
}
if (!shouldRun)
return;
obj.target.run();
try {
parent.cvFlag.getBusyFlag();
nObjects--;
if (nObjects == 0)
parent.cvEmpty.cvSignal();
} finally {
parent.cvFlag.freeBusyFlag();
}
if (obj.lock != null) {
synchronized(obj.lock) {
obj.lock.notify();
}
}
obj = null;
}
}
}
Vector objects;
int nObjects = 0;
CondVar cvAvailable, cvEmpty;
BusyFlag cvFlag;
ThreadPoolThread poolThreads[];
boolean terminated = false;
public ThreadPool(int n) {
cvFlag = new BusyFlag();
cvAvailable = new CondVar(cvFlag);
cvEmpty = new CondVar(cvFlag);
objects = new Vector();
poolThreads = new ThreadPoolThread[n];
for (int i = 0; i < n; i++) {
poolThreads[i] = new ThreadPoolThread(this, i);
poolThreads[i].start();
}
}
private void add(Runnable target, Object lock) {
try {
cvFlag.getBusyFlag();
if (terminated)
throw new IllegalStateException("Thread pool has shutdown");
objects.addElement(new ThreadPoolRequest(target, lock));
nObjects++;
cvAvailable.cvSignal();
} finally {
cvFlag.freeBusyFlag();
}
}
public void addRequest(Runnable target) {
add(target, null);
}
public void addRequestAndWait(Runnable target)
throws InterruptedException {
Object lock = new Object();
synchronized(lock) {
add(target, lock);
lock.wait();
}
}
public void waitForAll(boolean terminate) throws InterruptedException {
try {
cvFlag.getBusyFlag();
while (nObjects != 0)
cvEmpty.cvWait();
if (terminate) {
for (int i = 0; i < poolThreads.length; i++)
poolThreads[i].shouldRun = false;
cvAvailable.cvBroadcast();
terminated = true;
}
} finally {
cvFlag.freeBusyFlag();
}
}
public void waitForAll() throws InterruptedException {
waitForAll(false);
}
}
Thread pool demo
public class ThreadPoolMain extends Object {
public static Runnable makeRunnable(final String name, final long firstDelay) {
return new Runnable() {
public void run() {
try {
System.out.println(name + ": starting up");
Thread.sleep(firstDelay);
System.out.println(name + ": doing some stuff");
Thread.sleep(2000);
System.out.println(name + ": leaving");
} catch (InterruptedException ix) {
System.out.println(name + ": got interrupted!");
return;
} catch (Exception x) {
x.printStackTrace();
}
}
public String toString() {
return name;
}
};
}
public static void main(String[] args) {
try {
ThreadPool pool = new ThreadPool(3);
Runnable ra = makeRunnable("RA", 3000);
pool.execute(ra);
Runnable rb = makeRunnable("RB", 1000);
pool.execute(rb);
Runnable rc = makeRunnable("RC", 2000);
pool.execute(rc);
Runnable rd = makeRunnable("RD", 60000);
pool.execute(rd);
Runnable re = makeRunnable("RE", 1000);
pool.execute(re);
pool.stopRequestIdleWorkers();
Thread.sleep(2000);
pool.stopRequestIdleWorkers();
Thread.sleep(5000);
pool.stopRequestAllWorkers();
} catch (InterruptedException ix) {
ix.printStackTrace();
}
}
}
class ThreadPool extends Object {
private ObjectFIFO idleWorkers;
private ThreadPoolWorker[] workerList;
public ThreadPool(int numberOfThreads) {
// make sure that it"s at least one
numberOfThreads = Math.max(1, numberOfThreads);
idleWorkers = new ObjectFIFO(numberOfThreads);
workerList = new ThreadPoolWorker[numberOfThreads];
for (int i = 0; i < workerList.length; i++) {
workerList[i] = new ThreadPoolWorker(idleWorkers);
}
}
public void execute(Runnable target) throws InterruptedException {
// block (forever) until a worker is available
ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
worker.process(target);
}
public void stopRequestIdleWorkers() {
try {
Object[] idle = idleWorkers.removeAll();
for (int i = 0; i < idle.length; i++) {
((ThreadPoolWorker) idle[i]).stopRequest();
}
} catch (InterruptedException x) {
Thread.currentThread().interrupt(); // re-assert
}
}
public void stopRequestAllWorkers() {
stopRequestIdleWorkers();
try {
Thread.sleep(250);
} catch (InterruptedException x) {
}
for (int i = 0; i < workerList.length; i++) {
if (workerList[i].isAlive()) {
workerList[i].stopRequest();
}
}
}
}
class ThreadPoolWorker extends Object {
private static int nextWorkerID = 0;
private ObjectFIFO idleWorkers;
private int workerID;
private ObjectFIFO handoffBox;
private Thread internalThread;
private volatile boolean noStopRequested;
public ThreadPoolWorker(ObjectFIFO idleWorkers) {
this.idleWorkers = idleWorkers;
workerID = getNextWorkerID();
handoffBox = new ObjectFIFO(1); // only one slot
// just before returning, the thread should be created and started.
noStopRequested = true;
Runnable r = new Runnable() {
public void run() {
try {
runWork();
} catch (Exception x) {
// in case ANY exception slips through
x.printStackTrace();
}
}
};
internalThread = new Thread(r);
internalThread.start();
}
public static synchronized int getNextWorkerID() {
// notice: synchronized at the class level to ensure uniqueness
int id = nextWorkerID;
nextWorkerID++;
return id;
}
public void process(Runnable target) throws InterruptedException {
handoffBox.add(target);
}
private void runWork() {
while (noStopRequested) {
try {
System.out.println("workerID=" + workerID + ", ready for work");
idleWorkers.add(this);
Runnable r = (Runnable) handoffBox.remove();
System.out.println("workerID=" + workerID
+ ", starting execution of new Runnable: " + r);
runIt(r);
} catch (InterruptedException x) {
Thread.currentThread().interrupt(); // re-assert
}
}
}
private void runIt(Runnable r) {
try {
r.run();
} catch (Exception runex) {
System.err.println("Uncaught exception fell through from run()");
runex.printStackTrace();
} finally {
Thread.interrupted();
}
}
public void stopRequest() {
System.out
.println("workerID=" + workerID + ", stopRequest() received.");
noStopRequested = false;
internalThread.interrupt();
}
public boolean isAlive() {
return internalThread.isAlive();
}
}
class ObjectFIFO extends Object {
private Object[] queue;
private int capacity;
private int size;
private int head;
private int tail;
public ObjectFIFO(int cap) {
capacity = (cap > 0) ? cap : 1; // at least 1
queue = new Object[capacity];
head = 0;
tail = 0;
size = 0;
}
public int getCapacity() {
return capacity;
}
public synchronized int getSize() {
return size;
}
public synchronized boolean isEmpty() {
return (size == 0);
}
public synchronized boolean isFull() {
return (size == capacity);
}
public synchronized void add(Object obj) throws InterruptedException {
waitWhileFull();
queue[head] = obj;
head = (head + 1) % capacity;
size++;
notifyAll();
}
public synchronized void addEach(Object[] list) throws InterruptedException {
for (int i = 0; i < list.length; i++) {
add(list[i]);
}
}
public synchronized Object remove() throws InterruptedException {
waitWhileEmpty();
Object obj = queue[tail];
queue[tail] = null;
tail = (tail + 1) % capacity;
size--;
notifyAll();
return obj;
}
public synchronized Object[] removeAll() throws InterruptedException {
Object[] list = new Object[size];
for (int i = 0; i < list.length; i++) {
list[i] = remove();
}
return list;
}
public synchronized Object[] removeAtLeastOne() throws InterruptedException {
waitWhileEmpty();
return removeAll();
}
public synchronized boolean waitUntilEmpty(long msTimeout)
throws InterruptedException {
if (msTimeout == 0L) {
waitUntilEmpty();
return true;
}
long endTime = System.currentTimeMillis() + msTimeout;
long msRemaining = msTimeout;
while (!isEmpty() && (msRemaining > 0L)) {
wait(msRemaining);
msRemaining = endTime - System.currentTimeMillis();
}
return isEmpty();
}
public synchronized void waitUntilEmpty() throws InterruptedException {
while (!isEmpty()) {
wait();
}
}
public synchronized void waitWhileEmpty() throws InterruptedException {
while (isEmpty()) {
wait();
}
}
public synchronized void waitUntilFull() throws InterruptedException {
while (!isFull()) {
wait();
}
}
public synchronized void waitWhileFull() throws InterruptedException {
while (isFull()) {
wait();
}
}
}
Thread Pools 1
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MainClass {
public static void main(String[] args) {
int nTasks = 5;
long n = 1000L;
int tpSize = 10;
ThreadPoolExecutor tpe = new ThreadPoolExecutor(tpSize, tpSize, 50000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
Task[] tasks = new Task[nTasks];
for (int i = 0; i < nTasks; i++) {
tasks[i] = new Task(n, "Task " + i);
tpe.execute(tasks[i]);
}
tpe.shutdown();
}
}
class SingleThreadAccess {
private ThreadPoolExecutor tpe;
public SingleThreadAccess() {
tpe = new ThreadPoolExecutor(1, 1, 50000L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
}
public void invokeLater(Runnable r) {
tpe.execute(r);
}
public void invokeAneWait(Runnable r) throws InterruptedException, ExecutionException {
FutureTask task = new FutureTask(r, null);
tpe.execute(task);
task.get();
}
public void shutdown() {
tpe.shutdown();
}
}
class Task implements Runnable {
long n;
String id;
private long fib(long n) {
if (n == 0)
return 0L;
if (n == 1)
return 1L;
return fib(n - 1) + fib(n - 2);
}
public Task(long n, String id) {
this.n = n;
this.id = id;
}
public void run() {
Date d = new Date();
DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
long startTime = System.currentTimeMillis();
d.setTime(startTime);
System.out.println("Starting task " + id + " at " + df.format(d));
fib(n);
long endTime = System.currentTimeMillis();
d.setTime(endTime);
System.out.println("Ending task " + id + " at " + df.format(d) + " after "
+ (endTime - startTime) + " milliseconds");
}
}
Thread Pools 2
/*
Java Threads, 3rd Edition
By Scott Oaks, Henry Wong
3rd Edition September 2004
ISBN: 0-596-00782-5
*/
import java.util.concurrent.*;
import java.util.*;
import java.text.*;
import java.io.*;
public class SingleThreadTest {
public static void main(String[] args) {
int nTasks = 5;
int fib = 4;
SingleThreadAccess sta = new SingleThreadAccess();
for (int i = 0; i < nTasks; i++)
sta.invokeLater(new Task(fib, "Task " + i));
sta.shutdown();
}
}
class SingleThreadAccess {
private ThreadPoolExecutor tpe;
public SingleThreadAccess() {
tpe = new ThreadPoolExecutor(
1, 1, 50000L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
} public void invokeLater(Runnable r) {
tpe.execute(r);
}
public void invokeAneWait(Runnable r) throws InterruptedException,
ExecutionException {
FutureTask task = new FutureTask(r, null);
tpe.execute(task);
task.get();
}
public void shutdown() {
tpe.shutdown();
}
}
class Task implements Runnable {
long n;
String id;
private long fib(long n) {
if (n == 0)
return 0L;
if (n == 1)
return 1L;
return fib(n - 1) + fib(n - 2);
}
public Task(long n, String id) {
this.n = n;
this.id = id;
}
public void run() {
Date d = new Date();
DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
long startTime = System.currentTimeMillis();
d.setTime(startTime);
System.out.println("Starting task " + id + " at " + df.format(d));
fib(n);
long endTime = System.currentTimeMillis();
d.setTime(endTime);
System.out.println("Ending task " + id + " at " + df.format(d)
+ " after " + (endTime - startTime) + " milliseconds");
}
}
Thread Pool Test
/*
DEVELOPING GAME IN JAVA
Caracteristiques
Editeur : NEW RIDERS
Auteur : BRACKEEN
Parution : 09 2003
Pages : 972
Isbn : 1-59273-005-1
Reliure : Paperback
Disponibilite : Disponible a la librairie
*/
import java.util.LinkedList;
public class ThreadPoolTest {
public static void main(String[] args) {
if (args.length != 2) {
System.out.println("Tests the ThreadPool task.");
System.out
.println("Usage: java ThreadPoolTest numTasks numThreads");
System.out.println(" numTasks - integer: number of task to run.");
System.out.println(" numThreads - integer: number of threads "
+ "in the thread pool.");
return;
}
int numTasks = Integer.parseInt(args[0]);
int numThreads = Integer.parseInt(args[1]);
// create the thread pool
ThreadPool threadPool = new ThreadPool(numThreads);
// run example tasks
for (int i = 0; i < numTasks; i++) {
threadPool.runTask(createTask(i));
}
// close the pool and wait for all tasks to finish.
threadPool.join();
}
/**
* Creates a simple Runnable that prints an ID, waits 500 milliseconds, then
* prints the ID again.
*/
private static Runnable createTask(final int taskID) {
return new Runnable() {
public void run() {
System.out.println("Task " + taskID + ": start");
// simulate a long-running task
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
}
System.out.println("Task " + taskID + ": end");
}
};
}
}
/**
* A thread pool is a group of a limited number of threads that are used to
* execute tasks.
*/
class ThreadPool extends ThreadGroup {
private boolean isAlive;
private LinkedList taskQueue;
private int threadID;
private static int threadPoolID;
/**
* Creates a new ThreadPool.
*
* @param numThreads
* The number of threads in the pool.
*/
public ThreadPool(int numThreads) {
super("ThreadPool-" + (threadPoolID++));
setDaemon(true);
isAlive = true;
taskQueue = new LinkedList();
for (int i = 0; i < numThreads; i++) {
new PooledThread().start();
}
}
/**
* Requests a new task to run. This method returns immediately, and the task
* executes on the next available idle thread in this ThreadPool.
* <p>
* Tasks start execution in the order they are received.
*
* @param task
* The task to run. If null, no action is taken.
* @throws IllegalStateException
* if this ThreadPool is already closed.
*/
public synchronized void runTask(Runnable task) {
if (!isAlive) {
throw new IllegalStateException();
}
if (task != null) {
taskQueue.add(task);
notify();
}
}
protected synchronized Runnable getTask() throws InterruptedException {
while (taskQueue.size() == 0) {
if (!isAlive) {
return null;
}
wait();
}
return (Runnable) taskQueue.removeFirst();
}
/**
* Closes this ThreadPool and returns immediately. All threads are stopped,
* and any waiting tasks are not executed. Once a ThreadPool is closed, no
* more tasks can be run on this ThreadPool.
*/
public synchronized void close() {
if (isAlive) {
isAlive = false;
taskQueue.clear();
interrupt();
}
}
/**
* Closes this ThreadPool and waits for all running threads to finish. Any
* waiting tasks are executed.
*/
public void join() {
// notify all waiting threads that this ThreadPool is no
// longer alive
synchronized (this) {
isAlive = false;
notifyAll();
}
// wait for all threads to finish
Thread[] threads = new Thread[activeCount()];
int count = enumerate(threads);
for (int i = 0; i < count; i++) {
try {
threads[i].join();
} catch (InterruptedException ex) {
}
}
}
/**
* A PooledThread is a Thread in a ThreadPool group, designed to run tasks
* (Runnables).
*/
private class PooledThread extends Thread {
public PooledThread() {
super(ThreadPool.this, "PooledThread-" + (threadID++));
}
public void run() {
while (!isInterrupted()) {
// get a task to run
Runnable task = null;
try {
task = getTask();
} catch (InterruptedException ex) {
}
// if getTask() returned null or was interrupted,
// close this thread by returning.
if (task == null) {
return;
}
// run the task, and eat any exceptions it throws
try {
task.run();
} catch (Throwable t) {
uncaughtException(this, t);
}
}
}
}
}
Very basic implementation of a thread pool
/*
* Copyright (c) 2004, Rob Gordon.
*/
// revised from oddjob
import java.util.LinkedList;
/**
* Very basic implementation of a thread pool.
*
* @author Rob Gordon.
*/
public class ThreadPool {
private BlockingQueue queue = new BlockingQueue();
private boolean closed = true;
private int poolSize = 3;
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}
public int getPoolSize() {
return poolSize;
}
synchronized public void start() {
if (!closed) {
throw new IllegalStateException("Pool already started.");
}
closed = false;
for (int i = 0; i < poolSize; ++i) {
new PooledThread().start();
}
}
synchronized public void execute(Runnable job) {
if (closed) {
throw new PoolClosedException();
}
queue.enqueue(job);
}
private class PooledThread extends Thread {
public void run() {
while (true) {
Runnable job = (Runnable) queue.dequeue();
if (job == null) {
break;
}
try {
job.run();
} catch (Throwable t) {
// ignore
}
}
}
}
public void close() {
closed = true;
queue.close();
}
private static class PoolClosedException extends RuntimeException {
PoolClosedException() {
super ("Pool closed.");
}
}
}
/*
* Copyright � 2004, Rob Gordon.
*/
/**
*
* @author Rob Gordon.
*/
class BlockingQueue {
private final LinkedList list = new LinkedList();
private boolean closed = false;
private boolean wait = false;
synchronized public void enqueue(Object o) {
if (closed) {
throw new ClosedException();
}
list.add(o);
notify();
}
synchronized public Object dequeue() {
while (!closed && list.size() == 0) {
try {
wait();
}
catch (InterruptedException e) {
// ignore
}
}
if (list.size() == 0) {
return null;
}
return list.removeFirst();
}
synchronized public int size() {
return list.size();
}
synchronized public void close() {
closed = true;
notifyAll();
}
synchronized public void open() {
closed = false;
}
public static class ClosedException extends RuntimeException {
ClosedException() {
super("Queue closed.");
}
}
}
Worker thread pool
/**
*
* JFreeReport : a free Java reporting library
*
*
* Project Info: http://reporting.pentaho.org/
*
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*
* This library is free software; you can redistribute it and/or modify it under the terms
* of the GNU Lesser General Public License as published by the Free Software Foundation;
* either version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this
* library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*
* [Java is a trademark or registered trademark of Sun Microsystems, Inc.
* in the United States and other countries.]
*
* ------------
* WorkerPool.java
* ------------
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*/
/**
* A simple static workpool. Worker threads are created when necessary.
*
* @author Thomas Morgner
*
*/
public class WorkerPool
{
/**
* The worker array.
*/
private Worker[] workers;
/**
* A flag indicating whether idle workers are available.
*/
private boolean workersAvailable;
/**
* the name prefix for all workers of this pool.
*/
private String namePrefix;
/**
* Creates a new worker pool with the default size of 10 workers and the default name.
*/
public WorkerPool ()
{
this(10);
}
/**
* Creates a new workerpool with the given number of workers and the default name.
*
* @param size the maximum number of workers available.
*/
public WorkerPool (final int size)
{
this(size, "WorkerPool-worker");
}
/**
* Creates a new worker pool for the given number of workers and with the given name
* prefix.
*
* @param size the size of the worker pool.
* @param namePrefix the name prefix for all created workers.
*/
public WorkerPool (final int size, final String namePrefix)
{
if (size <= 0)
{
throw new IllegalArgumentException("Size must be > 0");
}
workers = new Worker[size];
workersAvailable = true;
this.namePrefix = namePrefix;
}
/**
* Checks whether workers are available.
*
* @return true, if at least one worker is idle, false otherwise.
*/
public synchronized boolean isWorkerAvailable ()
{
return workersAvailable;
}
/**
* Updates the workersAvailable flag after a worker was assigned.
*/
private void updateWorkersAvailable ()
{
for (int i = 0; i < workers.length; i++)
{
if (workers[i] == null)
{
workersAvailable = true;
return;
}
if (workers[i].isAvailable() == true)
{
workersAvailable = true;
return;
}
}
workersAvailable = false;
}
/**
* Waits until a worker will be available.
*/
private synchronized void waitForWorkerAvailable ()
{
while (isWorkerAvailable() == false)
{
try
{
// remove lock
this.wait(5000);
}
catch (InterruptedException ie)
{
// ignored
}
}
}
/**
* Returns a workerhandle for the given workload. This method will wait until an idle
* worker is found.
*
* @param r the workload for the worker
* @return a handle to the worker.
*/
public synchronized WorkerHandle getWorkerForWorkload (final Runnable r)
{
waitForWorkerAvailable();
int emptySlot = -1;
for (int i = 0; i < workers.length; i++)
{
if (workers[i] == null)
{
// in the first run, try to avoid to create new threads...
// reuse the already available threads
if (emptySlot == -1)
{
emptySlot = i;
}
continue;
}
if (workers[i].isAvailable() == true)
{
workers[i].setWorkload(r);
updateWorkersAvailable();
return new WorkerHandle(workers[i]);
}
}
if (emptySlot != -1)
{
workers[emptySlot] = new Worker();
workers[emptySlot].setName(namePrefix + "-" + emptySlot);
workers[emptySlot].setWorkerPool(this);
workers[emptySlot].setWorkload(r);
updateWorkersAvailable();
return new WorkerHandle(workers[emptySlot]);
}
throw new IllegalStateException
("At this point, a worker should already have been assigned.");
}
/**
* Marks the given worker as finished. The worker will be removed from the list of the
* available workers.
*
* @param worker the worker which was finished.
*/
public void workerFinished (final Worker worker)
{
if (worker.isFinish() == false)
{
throw new IllegalArgumentException("This worker is not in the finish state.");
}
for (int i = 0; i < workers.length; i++)
{
if (workers[i] == worker)
{
synchronized (this)
{
workers[i] = null;
workersAvailable = true;
this.notifyAll();
}
return;
}
}
}
/**
* Marks the given worker as available.
*
* @param worker the worker which was available.
*/
public synchronized void workerAvailable (final Worker worker)
{
for (int i = 0; i < workers.length; i++)
{
if (workers[i] == worker)
{
synchronized (this)
{
workersAvailable = true;
this.notifyAll();
}
return;
}
}
}
/**
* Finishes all worker of this pool.
*/
public void finishAll ()
{
for (int i = 0; i < workers.length; i++)
{
if (workers[i] != null)
{
workers[i].finish();
}
}
}
}
/**
*
* JFreeReport : a free Java reporting library
*
*
* Project Info: http://reporting.pentaho.org/
*
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*
* This library is free software; you can redistribute it and/or modify it under the terms
* of the GNU Lesser General Public License as published by the Free Software Foundation;
* either version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this
* library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*
* [Java is a trademark or registered trademark of Sun Microsystems, Inc.
* in the United States and other countries.]
*
* ------------
* Worker.java
* ------------
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*/
/**
* A simple worker implementation. The worker executes a assigned workload and then sleeps
* until another workload is set or the worker is killed.
*
* @author Thomas Morgner
*/
final class Worker extends Thread
{
/**
* the worker"s task.
*/
private Runnable workload;
/**
* a flag whether the worker should exit after the processing.
*/
private volatile boolean finish;
/**
* the time in milliseconds beween 2 checks for exit or work requests.
*/
private final int sleeptime;
/**
* The worker pool, to which this worker is assigned. May be null.
*/
private WorkerPool workerPool;
/**
* Creates a new worker.
*
* @param sleeptime the time this worker sleeps until he checks for new work.
*/
public Worker (final int sleeptime)
{
this.sleeptime = sleeptime;
this.setDaemon(true);
start();
}
/**
* Creates a new worker with an default idle timeout of 2 minutes.
*/
public Worker ()
{
this(120000);
}
/**
* Set the next workload for this worker.
*
* @param r the next workload for the worker.
* @throws IllegalStateException if the worker is not idle.
*/
public void setWorkload (final Runnable r)
{
if (workload != null)
{
throw new IllegalStateException("This worker is not idle.");
}
//Log.debug("Workload set...");
synchronized (this)
{
workload = r;
//Log.debug("Workload assigned: Notified " + getName());
this.notifyAll();
}
}
/**
* Returns the workload object.
*
* @return the runnable executed by this worker thread.
*/
public synchronized Runnable getWorkload()
{
return workload;
}
/**
* Kills the worker after he completed his work. Awakens the worker if he"s sleeping, so
* that the worker dies without delay.
*/
public void finish ()
{
finish = true;
// we are evil ..
try
{
this.interrupt();
}
catch (SecurityException se)
{
// ignored
}
if (workerPool != null)
{
workerPool.workerFinished(this);
}
}
/**
* Checks whether this worker has some work to do.
*
* @return true, if this worker has no more work and is currently sleeping.
*/
public boolean isAvailable ()
{
return (workload == null);
}
/**
* If a workload is set, process it. After the workload is processed, this worker starts
* to sleep until a new workload is set for the worker or the worker got the finish()
* request.
*/
public void run ()
{
while (!finish)
{
if (workload != null)
{
try
{
workload.run();
}
catch (Exception e)
{
System.out.println("Worker caught exception on run: "+ e);
}
workload = null;
if (workerPool != null)
{
workerPool.workerAvailable(this);
}
}
if (!finish)
{
synchronized (this)
{
try
{
// remove lock
this.wait(sleeptime);
}
catch (InterruptedException ie)
{
// ignored
}
}
}
}
synchronized (this)
{
this.notifyAll();
}
}
/**
* Checks whether this worker has received the signal to finish and die.
*
* @return true, if the worker should finish the work and end the thread.
*/
public boolean isFinish ()
{
return finish;
}
/**
* Returns the worker"s assigned pool.
*
* @return the worker pool (or null, if the worker is not assigned to a pool).
*/
public WorkerPool getWorkerPool ()
{
return workerPool;
}
/**
* Defines the worker"s assigned pool.
*
* @param workerPool the worker pool (or null, if the worker is not assigned to a
* pool).
*/
public void setWorkerPool (final WorkerPool workerPool)
{
this.workerPool = workerPool;
}
}
/**
*
* JFreeReport : a free Java reporting library
*
*
* Project Info: http://reporting.pentaho.org/
*
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*
* This library is free software; you can redistribute it and/or modify it under the terms
* of the GNU Lesser General Public License as published by the Free Software Foundation;
* either version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this
* library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*
* [Java is a trademark or registered trademark of Sun Microsystems, Inc.
* in the United States and other countries.]
*
* ------------
* WorkerHandle.java
* ------------
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*/
/**
* The worker handle is a control structure which allows control over the worker without
* exposing the thread object.
*
* @author Thomas Morgner
* @deprecated This class is used by the WorkerPool, which is not used anywhere anymore.
*/
class WorkerHandle
{
/**
* The worker for this handle.
*/
private final Worker worker;
/**
* Creates a new handle for the given worker.
*
* @param worker the worker.
*/
public WorkerHandle (final Worker worker)
{
this.worker = worker;
}
/**
* Finishes the worker.
*/
public void finish ()
{
worker.finish();
}
}