Java/Threads/Thread Pool
Содержание
- 1 Create a new thread for the thread pool. The create thread will be a daemon thread.
- 2 Defining a thread for a thread pool
- 3 JDK1.5 provides a mechanism to create a pool a scheduled task
- 4 Simple object pool. Based on ThreadPool and few other classes
- 5 Simple pool of Threads
- 6 Simple thread pool. A task is executed by obtaining a thread from the pool
- 7 Thread Cache
- 8 Thread pool
- 9 Thread Pool 2
- 10 Thread pool demo
- 11 Thread Pools 1
- 12 Thread Pools 2
- 13 Thread Pool Test
- 14 Very basic implementation of a thread pool
- 15 Worker thread pool
Create a new thread for the thread pool. The create thread will be a daemon thread.
<source lang="java">
import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /**
* Simple implementation of a ThreadFactory that * marks all of the threads as daemon threads. */
public class DaemonThreadFactory implements ThreadFactory {
private final ThreadFactory factory = Executors.defaultThreadFactory(); /** * Create a new thread for the thread pool. The create * thread will be a daemon thread. * * @param r The runnable used by the thread pool. * * @return The newly created thread. */ public Thread newThread(Runnable r) { Thread t = factory.newThread(r); if (!t.isDaemon()) { t.setDaemon(true); } return t; }
}
</source>
Defining a thread for a thread pool
<source lang="java">
import java.util.LinkedList; class ThreadTask extends Thread {
private ThreadPool pool; public ThreadTask(ThreadPool thePool) { pool = thePool; } public void run() { while (true) { // blocks until job Runnable job = pool.getNext(); try { job.run(); } catch (Exception e) { // Ignore exceptions thrown from jobs System.err.println("Job exception: " + e); } } }
} public class ThreadPool {
private LinkedList tasks = new LinkedList(); public ThreadPool(int size) { for (int i = 0; i < size; i++) { Thread thread = new ThreadTask(this); thread.start(); } } public void run(Runnable task) { synchronized (tasks) { tasks.addLast(task); tasks.notify(); } } public Runnable getNext() { Runnable returnVal = null; synchronized (tasks) { while (tasks.isEmpty()) { try { tasks.wait(); } catch (InterruptedException ex) { System.err.println("Interrupted"); } } returnVal = (Runnable) tasks.removeFirst(); } return returnVal; } public static void main(String args[]) { final String message[] = { "Java", "Source", "and", "Support" }; ThreadPool pool = new ThreadPool(message.length / 2); for (int i = 0, n = message.length; i < n; i++) { final int innerI = i; Runnable runner = new Runnable() { public void run() { for (int j = 0; j < 25; j++) { System.out.println("j: " + j + ": " + message[innerI]); } } }; pool.run(runner); } }
}
</source>
JDK1.5 provides a mechanism to create a pool a scheduled task
<source lang="java">
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main{
public static void main(String args[]) { ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(5); stpe.scheduleAtFixedRate(new Job1(), 0, 5, TimeUnit.SECONDS); stpe.scheduleAtFixedRate(new Job2(), 1, 2, TimeUnit.SECONDS); }
} class Job1 implements Runnable {
public void run() { System.out.println("Job 1"); }
} class Job2 implements Runnable {
public void run() { for(int i=-99;i<99;i++){ System.out.println(i); } }
}
</source>
Simple object pool. Based on ThreadPool and few other classes
<source lang="java">
/*
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */
/**
* Simple object pool. Based on ThreadPool and few other classes * * The pool will ignore overflow and return null if empty. * * @author Gal Shachor * @author Costin * @author * @version $Id: SimplePool.java 463298 2006-10-12 16:10:32Z henning $ */
public final class SimplePool {
/* * Where the objects are held. */ private Object pool[]; /** * max amount of objects to be managed set via CTOR */ private int max; /** * index of previous to next free slot */ private int current = -1; /** * @param max */ public SimplePool(int max) { this.max = max; pool = new Object[max]; } /** * Add the object to the pool, silent nothing if the pool is full * * @param o */ public void put(Object o) { int idx = -1; synchronized (this) { /* * if we aren"t full */ if (current < max - 1) { /* * then increment the current index. */ idx = ++current; } if (idx >= 0) { pool[idx] = o; } } } /** * Get an object from the pool, null if the pool is empty. * * @return The object from the pool. */ public Object get() { synchronized (this) { /* * if we have any in the pool */ if (current >= 0) { /* * remove the current one */ Object o = pool[current]; pool[current] = null; current--; return o; } } return null; } /** * Return the size of the pool * * @return The pool size. */ public int getMax() { return max; } /** * for testing purposes, so we can examine the pool * * @return Array of Objects in the pool. */ Object[] getPool() { return pool; }
}
</source>
Simple pool of Threads
<source lang="java">
/*
* Copyright (c) 1998 - 2005 Versant Corporation * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Versant Corporation - initial API and implementation */
import java.util.LinkedList; import java.util.Iterator; import java.util.HashSet; /**
* Simple pool of Threads. */
public class ThreadPool {
private String name; private HashSet active = new HashSet(); private LinkedList idle = new LinkedList(); private int idleCount; private int maxActive = 10; private int maxIdle = 3; private int lastThreadId; private boolean closed; public ThreadPool(String name) { this.name = name; } public int getMaxActive() { return maxActive; } public void setMaxActive(int maxActive) { this.maxActive = maxActive; } public int getMaxIdle() { return maxIdle; } public void setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; } public synchronized int getActiveCount() { return active.size(); } public synchronized int getIdleCount() { return idleCount; } /** * Close the pool, stopping all threads. This does not wait for the * threads to actually stop before returning. This is a NOP if the * pool has already been closed. */ public synchronized void close() { if (closed) { return; } closed = true; for (Iterator i = idle.iterator(); i.hasNext(); ) { Worker w = (Worker)i.next(); w.terminate(); } idle = null; idleCount = 0; for (Iterator i = active.iterator(); i.hasNext(); ) { Worker w = (Worker)i.next(); w.terminate(); } active = null; } /** * Executed runnable using a Thread from the pool. This will block for * timeoutMs and forever if this is 0. Returns true if the task is * being executed (i.e. a Thread was available) or false if not (i.e. * pool full). */ public synchronized boolean execute(Runnable runnable, int timeoutMs) { if (closed) { throw new IllegalStateException("Pool has been closed"); } Worker t; if (idleCount == 0) { for (; isFull(); ) { try { wait(timeoutMs); if (isFull()) { return false; } } catch (InterruptedException e) { // ignore } } t = new Worker(); } else { t = (Worker)idle.removeFirst(); --idleCount; } active.add(t); t.execute(runnable); return true; } protected boolean isFull() { return active.size() >= maxActive; } private synchronized void finishedWork(Worker t) { if (!closed) { active.remove(t); if (idleCount >= maxIdle) { t.terminate(); } else { idle.addLast(t); ++idleCount; } } } private class Worker extends Thread { private boolean stopFlag; private Runnable runnable; public Worker() { super(name + " " + ++lastThreadId); setDaemon(true); } /** * Executed runnable. */ public void execute(Runnable runnable) { this.runnable = runnable; if (!isAlive()) { start(); } else { synchronized (this) { notify(); } } } /** * Stop this thread as soon as possible. */ public void terminate() { stopFlag = true; interrupt(); } public void run() { for (; !stopFlag; ) { try { runnable.run(); } catch (Throwable e) { if (e instanceof ThreadDeath) { throw (ThreadDeath)e; } } runnable = null; finishedWork(this); if (stopFlag) break; synchronized (this) { try { wait(); } catch (InterruptedException e) { // ignore } } } } }
}
</source>
Simple thread pool. A task is executed by obtaining a thread from the pool
<source lang="java">
/*
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */
import java.util.ArrayList; import java.util.List;
/** Simple thread pool. A task is executed by obtaining a thread from
* the pool */
public class ThreadPool {
/** The thread pool contains instances of {@link ThreadPool.Task}. */ public interface Task { /** Performs the task. * @throws Throwable The task failed, and the worker thread won"t be used again. */ void run() throws Throwable; } /** A task, which may be interrupted, if the pool is shutting down. */ public interface InterruptableTask extends Task { /** Interrupts the task. * @throws Throwable Shutting down the task failed. */ void shutdown() throws Throwable; } private class Poolable { private boolean shuttingDown; private Task task; private Thread thread; Poolable(ThreadGroup pGroup, int pNum) { thread = new Thread(pGroup, pGroup.getName() + "-" + pNum){ public void run() { while (!isShuttingDown()) { final Task t = getTask(); if (t == null) { try { synchronized (this) { if (!isShuttingDown() && getTask() == null) { wait(); } } } catch (InterruptedException e) { // Do nothing } } else { try { t.run(); resetTask(); repool(Poolable.this); } catch (Throwable e) { discard(Poolable.this); resetTask(); } } } } }; thread.start(); } synchronized void shutdown() { shuttingDown = true; final Task t = getTask(); if (t != null && t instanceof InterruptableTask) { try { ((InterruptableTask) t).shutdown(); } catch (Throwable th) { // Ignore me } } task = null; synchronized (thread) { thread.notify(); } } private synchronized boolean isShuttingDown() { return shuttingDown; } String getName() { return thread.getName(); } private synchronized Task getTask() { return task; } private synchronized void resetTask() { task = null; } synchronized void start(Task pTask) { task = pTask; synchronized (thread) { thread.notify(); } } } private final ThreadGroup threadGroup; private final int maxSize; private final List waitingThreads = new ArrayList(); private final List runningThreads = new ArrayList(); private final List waitingTasks = new ArrayList(); private int num;
/** Creates a new instance. * @param pMaxSize Maximum number of concurrent threads. * @param pName Thread group name. */ public ThreadPool(int pMaxSize, String pName) { maxSize = pMaxSize; threadGroup = new ThreadGroup(pName); } synchronized void discard(Poolable pPoolable) { pPoolable.shutdown(); runningThreads.remove(pPoolable); waitingThreads.remove(pPoolable); } synchronized void repool(Poolable pPoolable) { if (runningThreads.remove(pPoolable)) { if (maxSize != 0 && runningThreads.size() + waitingThreads.size() >= maxSize) { discard(pPoolable); } else { waitingThreads.add(pPoolable); if (waitingTasks.size() > 0) { Task task = (Task) waitingTasks.remove(waitingTasks.size() - 1); startTask(task); } } } else { discard(pPoolable); } } /** Starts a task immediately. * @param pTask The task being started. * @return True, if the task could be started immediately. False, if * the maxmimum number of concurrent tasks was exceeded. If so, you * might consider to use the {@link #addTask(ThreadPool.Task)} method instead. */ public synchronized boolean startTask(Task pTask) { if (maxSize != 0 && runningThreads.size() >= maxSize) { return false; } Poolable poolable; if (waitingThreads.size() > 0) { poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1); } else { poolable = new Poolable(threadGroup, num++); } runningThreads.add(poolable); poolable.start(pTask); return true; } /** Adds a task for immediate or deferred execution. * @param pTask The task being added. * @return True, if the task was started immediately. False, if * the task will be executed later. */ public synchronized boolean addTask(Task pTask) { if (startTask(pTask)) { return true; } waitingTasks.add(pTask); return false; } /** Closes the pool. */ public synchronized void shutdown() { while (!waitingThreads.isEmpty()) { Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1); poolable.shutdown(); } while (!runningThreads.isEmpty()) { Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size()-1); poolable.shutdown(); } } /** Returns the maximum number of concurrent threads. * @return Maximum number of threads. */ public int getMaxThreads() { return maxSize; } /** Returns the number of threads, which have actually been created, * as opposed to the number of currently running threads. */ public synchronized int getNumThreads() { return num; }
}
</source>
Thread Cache
<source lang="java">
// ThreadCache.java // $Id: ThreadCache.java,v 1.16 2000/08/16 21:37:58 ylafon Exp $ // (c) COPYRIGHT MIT and INRIA, 1996-1997. // Please first read the full copyright statement in file COPYRIGHT.html
class CachedThread extends Thread {
Runnable runner = null; boolean alive = true; ThreadCache cache = null; CachedThread next = null; CachedThread prev = null; boolean terminated = false; boolean started = false; boolean firstime = true; synchronized boolean isTerminated() { boolean ret = terminated; terminated = true; return ret; } synchronized Runnable waitForRunner() { boolean to = false; while ( alive ) { // Is a runner available ? if ( runner != null ) { Runnable torun = runner; firstime = false; runner = null; return torun; } else if ( firstime ) { // This thread will not be declared free until it runs once: try { wait(); } catch (InterruptedException ex) { } } else if ( alive = cache.isFree(this, to) ) { // Notify the cache that we are free, and continue if allowed: try { int idleto = cache.getIdleTimeout(); to = false; if ( idleto > 0 ) { wait(idleto); to = (runner == null); } else { wait(); } } catch (InterruptedException ex) { } } } return null; } synchronized void kill() { alive = false; notify(); } synchronized boolean wakeup(Runnable runnable) { if ( alive ) { runner = runnable; if ( ! started ) this.start(); notify(); return true; } else { return false; } } public synchronized void start() { super.start(); this.started = true; } public void run() { try { while ( true ) { // Wait for a runner: Runnable torun = waitForRunner(); // If runner, run: if ( torun != null ) torun.run(); // If dead, stop if ( ! alive ) break; } } finally { cache.isDead(this); } } CachedThread(ThreadCache cache, int id) { super(cache.getThreadGroup(), cache.getThreadGroup().getName()+":"+id); this.cache = cache; setPriority(cache.getThreadPriority()); setDaemon(true); }
} public class ThreadCache {
private static final boolean debug = false;
/**
* Default number of cached threads.
*/
private static final int DEFAULT_CACHESIZE = 5;
/**
* Has this thread cache been initialized ?
*/
protected boolean inited = false;
/**
* The thread group for this thread cache.
*/
protected ThreadGroup group = null;
/**
* Number of cached threads.
*/
protected int cachesize = DEFAULT_CACHESIZE;
/**
* Number of created threads.
*/
protected int threadcount = 0;
/**
* Uniq thread identifier within this ThreadCache instance.
*/
protected int threadid = 0;
/**
* Number of idle threads to always maintain alive.
*/
protected int idlethreads = 0;
/**
* Should we queue thread requests, rather then creating new threads.
*/
protected boolean growasneeded = false;
/**
* Number of used threads
*/
protected int usedthreads = 0;
/**
* List of free threads.
*/
protected CachedThread freelist = null;
protected CachedThread freetail = null;
/**
* The idle timeout, for a thread to wait before being killed.
* Defaults to 5000 milliseconds.
*/
protected int idletimeout = 5000;
/**
* Cached thread priority.
*/
protected int threadpriority = 5;
/**
* Get the idle timeout value for this cache.
* @return The idletimeout value, or negative if no timeout applies.
*/
synchronized final int getIdleTimeout() {
return (threadcount <= idlethreads) ? -1 : idletimeout;
}
/**
* The given thread is about to be declared free.
* @return A boolean, true if the thread is to continue
* running, false if the thread should stop.
*/
final synchronized boolean isFree(CachedThread t, boolean timedout) {
if ( timedout && (threadcount > idlethreads) ) {
if ( ! t.isTerminated() ) {
threadcount--;
usedthreads--;
notifyAll();
}
return false;
} else if ( threadcount <= cachesize ) {
t.prev = freetail;
if (freetail != null)
freetail.next = t;
freetail = t;
if (freelist == null)
freelist = t;
usedthreads--;
notifyAll();
return true;
} else {
if ( ! t.isTerminated() ) {
threadcount--;
usedthreads--;
notifyAll();
}
return false;
}
}
/**
* The given thread has terminated, cleanup any associated state.
* @param dead The dead CachedThread instance.
*/
final synchronized void isDead(CachedThread t) {
if ( debug )
System.out.println("** "+t+": is dead tc="+threadcount);
if ( ! t.isTerminated() ) {
threadcount--;
notifyAll();
}
}
/**
* Create a new thread within this thread cache.
* @return A new CachedThread instance.
*/
private synchronized CachedThread createThread() {
threadcount++;
threadid++;
return new CachedThread(this, threadid);
}
/**
* Allocate a new thread, as requested.
* @param waitp Should we wait until a thread is available ?
* @return A launched CachedThread instance, or null if
* unable to allocate a new thread, and waitp
is
* false.
*/
protected synchronized CachedThread allocateThread(boolean waitp) {
CachedThread t = null;
while ( true ) {
if ( freelist != null ) {
if ( debug )
System.out.println("*** allocateThread: free thread");
t = freelist;
freelist = freelist.next;
if (freelist != null) {
freelist.prev = null;
} else {
freetail = null;
}
t.next = null;
break;
} else if ((threadcount < cachesize) || growasneeded) {
if ( debug )
System.out.println("*** create new thread.");
t = createThread();
break;
} else if ( waitp ) {
if ( debug )
System.out.println("*** wait for a thread.");
// Wait for a thread to become available
try {
wait();
} catch (InterruptedException ex) {
}
} else {
return null;
}
}
return t;
}
/**
* Set the thread cache size.
* This will also update the number of idle threads to maintain, if
* requested.
* @param cachesize The new thread cache size.
* @param update If true also update the number of
* threads to maintain idle.
*/
public synchronized void setCachesize(int cachesize, boolean update) {
this.cachesize = cachesize;
if ( update )
this.idlethreads = (cachesize>>1);
}
/**
* Set the thread cache size.
* Updaet the number of idle threads to keep alive.
* @param cachesize The new thread cache size.
*/
public void setCachesize(int cachesize) {
setCachesize(cachesize, true);
}
/**
* Enable/disable the thread cache to grow as needed.
* This flag should be turned on only if always getting a thread as fast
* as possible is critical.
* @param onoff The toggle.
*/
public void setGrowAsNeeded(boolean onoff) {
this.growasneeded = onoff;
}
/**
* Set all the cached threads priority.
* Changing the cached thread priority should be done before the thread
* cache is initialized, it will not affect already created
* threads.
* @param priority The new cachewd threads priority.
*/
public void setThreadPriority(int priority) {
threadpriority = priority;
}
/**
* Get the cached thread normal priority.
* @return Currently assigned cached thread priority.
*/
public int getThreadPriority() {
return threadpriority;
}
/**
* Set the idle timeout.
* The idle timeout value is used to eliminate threads that have remain
* idle for too long (although the thread cache will ensure that a
* decent minimal number of threads stay around).
* @param idletimeout The new idle timeout.
*/
public synchronized void setIdleTimeout(int idletimeout) {
this.idletimeout = idletimeout;
}
/**
* Request a thread to run on the given object.
* @param runnable The object to run with the allocated thread.
* @param waitp If true wait until a free thread is
* available, otherwise, return false.
* @return A boolean, true if a thread was successfully
* allocated for the given object, false otherwise.
*/
public boolean getThread(Runnable runnable, boolean waitp) {
if ( debug )
System.out.println("*** getting a thread for "+runnable);
if ( ! inited )
throw new RuntimeException("Uninitialized thread cache");
// Allocate and launch the thread:
while ( true ) {
CachedThread t = allocateThread(waitp);
if ( t != null ) {
if ( t.wakeup(runnable) ) {
synchronized (this) {
usedthreads++;
}
return true;
}
} else {
return false;
}
}
}
/**
* Get the ThreadGroup managed by this ThreadCache instance.
* @return A ThreadGroup instance.
*/
public ThreadGroup getThreadGroup() {
return group;
}
/**
* Wait until all the threads have finished their duty
*/
public synchronized void waitForCompletion() {
while (usedthreads > 0) {
if ( debug )
System.out.println("*** Waiting for "+usedthreads+ " threads");
try {
wait();
} catch (InterruptedException ex) {
}
}
}
/**
* Initialize the given thread cache.
* This two stage initialize method is done so that configuration
* of the thread cache can be done before any thread get actually
* created.
*/
public synchronized void initialize() {
CachedThread t = createThread();
freelist = t;
freetail = t;
t.next = null;
t.prev = null;
t.start();
for (int i = 1 ; i < idlethreads ; i++) {
t = createThread();
t.next = freelist;
t.prev = null;
freelist.prev = t;
freelist = t;
t.start();
}
inited = true;
}
/**
* Create a thread cache, whose threads are to be children of the group.
* @param group The thread group to which this thread cache is bound.
* @param nstart Number of thread to create in advance.
*/
public ThreadCache(ThreadGroup group) {
this.group = group;
}
/**
* Create a thread cache, after creating a new thread group.
* @param name The name of the thread group to create.
*/
public ThreadCache(String name) {
this(new ThreadGroup(name));
}
/**
* Create a thread cache, after creating a new thread group.
* @param parent The parent of the thread group to create.
* @param name The name of the thread group.
*/
public ThreadCache(ThreadGroup parent, String name) {
this(new ThreadGroup(parent, name));
}
}
</source>
Thread pool
<source lang="java">
/* Java Threads, 3rd Edition By Scott Oaks, Henry Wong 3rd Edition September 2004 ISBN: 0-596-00782-5
- /
import java.util.concurrent.*; import java.util.concurrent.atomic.*; public class CreateTest extends Thread {
static AtomicInteger nCalls; static int target = 0; static boolean done = false; static Object lock = new Object(); public static void main(String[] args) { target = 10000; doTestCreate(); doTestPool(8); doTestLoop(); target = Integer.parseInt(args[0]); cleanGC(); Timestamp createTS = new Timestamp(TimeUnit.MICROSECONDS); doTestCreate(); createTS.stop(); System.out.println("Create thread test took " + createTS); cleanGC(); Timestamp pool8TS = new Timestamp(TimeUnit.MICROSECONDS); doTestPool(8); pool8TS.stop(); System.out.println("Pool test (8 threads) took " + pool8TS); cleanGC(); Timestamp poolTS = new Timestamp(TimeUnit.MICROSECONDS); doTestPool(1); poolTS.stop(); System.out.println("Pool test (1 thread) took " + poolTS); cleanGC(); Timestamp loopTS = new Timestamp(TimeUnit.MICROSECONDS); doTestLoop(); loopTS.stop(); System.out.println("Loop test took " + loopTS); double d = ((double) (createTS.elapsedTime() - loopTS.elapsedTime())) / target; System.out.println("Creating a thread took " + d + " " + loopTS.units() + " per thread"); d = ((double) (createTS.elapsedTime() - poolTS.elapsedTime())) / target; System.out.println("Using a thread pool (1 thread) saved " + d + " " + loopTS.units() + " per task"); d = ((double) (createTS.elapsedTime() - pool8TS.elapsedTime())) / target; System.out.println("Using a thread pool (8 threads) saved " + d + " " + loopTS.units() + " per task"); d = ((double) (poolTS.elapsedTime() - loopTS.elapsedTime())) / target; System.out.println("Thread pool overhead (1 thread) is " + d + " " + loopTS.units() + " per task"); d = ((double) (pool8TS.elapsedTime() - loopTS.elapsedTime())) / target; System.out.println("Thread pool overhead (8 threads) is " + d + " " + loopTS.units() + " per task"); } static void doTestLoop() { nCalls = new AtomicInteger(0); done = false; for (int i = 0; i < target; i++) work(); synchronized (lock) { while (!done) try { lock.wait(); } catch (Exception e) { } } } static void doTestCreate() { done = false; nCalls = new AtomicInteger(0); for (int i = 0; i < target; i++) { Thread t = new CreateTest(); t.start(); } synchronized (lock) { while (!done) try { lock.wait(); } catch (Exception e) { } } }
static void doTestPool(int nThreads) {
done = false; nCalls = new AtomicInteger(0); ThreadPoolExecutor tpe = new ThreadPoolExecutor(nThreads, nThreads, 50000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); Runnable r = new CreateTest(); for (int i = 0; i < target; i++) { tpe.execute(r); } tpe.shutdown(); try { tpe.awaitTermination(10000000L, TimeUnit.SECONDS); } catch (Exception e) {} } public void run() { work(); } public static void work() { int n = nCalls.incrementAndGet(); if (n == target) { synchronized (lock) { done = true; lock.notify(); } } } public static void cleanGC() { System.gc(); System.runFinalization(); System.gc(); }
}
</source>
Thread Pool 2
<source lang="java">
/*
* * Copyright (c) 1997-1999 Scott Oaks and Henry Wong. All Rights Reserved. * * Permission to use, copy, modify, and distribute this software * and its documentation for NON-COMMERCIAL purposes and * without fee is hereby granted. * * This sample source code is provided for example only, * on an unsupported, as-is basis. * * AUTHOR MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF * THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED * TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * PARTICULAR PURPOSE, OR NON-INFRINGEMENT. AUTHOR SHALL NOT BE LIABLE FOR * ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR * DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES. * * THIS SOFTWARE IS NOT DESIGNED OR INTENDED FOR USE OR RESALE AS ON-LINE * CONTROL EQUIPMENT IN HAZARDOUS ENVIRONMENTS REQUIRING FAIL-SAFE * PERFORMANCE, SUCH AS IN THE OPERATION OF NUCLEAR FACILITIES, AIRCRAFT * NAVIGATION OR COMMUNICATION SYSTEMS, AIR TRAFFIC CONTROL, DIRECT LIFE * SUPPORT MACHINES, OR WEAPONS SYSTEMS, IN WHICH THE FAILURE OF THE * SOFTWARE COULD LEAD DIRECTLY TO DEATH, PERSONAL INJURY, OR SEVERE * PHYSICAL OR ENVIRONMENTAL DAMAGE ("HIGH RISK ACTIVITIES"). AUTHOR * SPECIFICALLY DISCLAIMS ANY EXPRESS OR IMPLIED WARRANTY OF FITNESS FOR * HIGH RISK ACTIVITIES. */
import java.util.*; public class ThreadPool {
class ThreadPoolRequest { Runnable target; Object lock; ThreadPoolRequest(Runnable t, Object l) { target = t; lock = l; } } class ThreadPoolThread extends Thread { ThreadPool parent; boolean shouldRun = true; ThreadPoolThread(ThreadPool parent, int i) { super("ThreadPoolThread " + i); this.parent = parent; } public void run() { ThreadPoolRequest obj = null; while (shouldRun) { try { parent.cvFlag.getBusyFlag(); while (obj == null && shouldRun) { try { obj = (ThreadPoolRequest) parent.objects.elementAt(0); parent.objects.removeElementAt(0); } catch (ArrayIndexOutOfBoundsException aiobe) { obj = null; } catch (ClassCastException cce) { System.err.println("Unexpected data"); obj = null; } if (obj == null) { try { parent.cvAvailable.cvWait(); } catch (InterruptedException ie) { return; } } } } finally { parent.cvFlag.freeBusyFlag(); } if (!shouldRun) return; obj.target.run(); try { parent.cvFlag.getBusyFlag(); nObjects--; if (nObjects == 0) parent.cvEmpty.cvSignal(); } finally { parent.cvFlag.freeBusyFlag(); } if (obj.lock != null) { synchronized(obj.lock) { obj.lock.notify(); } } obj = null; } } } Vector objects; int nObjects = 0; CondVar cvAvailable, cvEmpty; BusyFlag cvFlag; ThreadPoolThread poolThreads[]; boolean terminated = false; public ThreadPool(int n) { cvFlag = new BusyFlag(); cvAvailable = new CondVar(cvFlag); cvEmpty = new CondVar(cvFlag); objects = new Vector(); poolThreads = new ThreadPoolThread[n]; for (int i = 0; i < n; i++) { poolThreads[i] = new ThreadPoolThread(this, i); poolThreads[i].start(); } } private void add(Runnable target, Object lock) { try { cvFlag.getBusyFlag(); if (terminated) throw new IllegalStateException("Thread pool has shutdown"); objects.addElement(new ThreadPoolRequest(target, lock)); nObjects++; cvAvailable.cvSignal(); } finally { cvFlag.freeBusyFlag(); } } public void addRequest(Runnable target) { add(target, null); } public void addRequestAndWait(Runnable target) throws InterruptedException { Object lock = new Object(); synchronized(lock) { add(target, lock); lock.wait(); } } public void waitForAll(boolean terminate) throws InterruptedException { try { cvFlag.getBusyFlag(); while (nObjects != 0) cvEmpty.cvWait(); if (terminate) { for (int i = 0; i < poolThreads.length; i++) poolThreads[i].shouldRun = false; cvAvailable.cvBroadcast(); terminated = true; } } finally { cvFlag.freeBusyFlag(); } } public void waitForAll() throws InterruptedException { waitForAll(false); }
}
</source>
Thread pool demo
<source lang="java">
public class ThreadPoolMain extends Object {
public static Runnable makeRunnable(final String name, final long firstDelay) { return new Runnable() { public void run() { try { System.out.println(name + ": starting up"); Thread.sleep(firstDelay); System.out.println(name + ": doing some stuff"); Thread.sleep(2000); System.out.println(name + ": leaving"); } catch (InterruptedException ix) { System.out.println(name + ": got interrupted!"); return; } catch (Exception x) { x.printStackTrace(); } } public String toString() { return name; } }; } public static void main(String[] args) { try { ThreadPool pool = new ThreadPool(3); Runnable ra = makeRunnable("RA", 3000); pool.execute(ra); Runnable rb = makeRunnable("RB", 1000); pool.execute(rb); Runnable rc = makeRunnable("RC", 2000); pool.execute(rc); Runnable rd = makeRunnable("RD", 60000); pool.execute(rd); Runnable re = makeRunnable("RE", 1000); pool.execute(re); pool.stopRequestIdleWorkers(); Thread.sleep(2000); pool.stopRequestIdleWorkers(); Thread.sleep(5000); pool.stopRequestAllWorkers(); } catch (InterruptedException ix) { ix.printStackTrace(); } }
} class ThreadPool extends Object {
private ObjectFIFO idleWorkers; private ThreadPoolWorker[] workerList; public ThreadPool(int numberOfThreads) { // make sure that it"s at least one numberOfThreads = Math.max(1, numberOfThreads); idleWorkers = new ObjectFIFO(numberOfThreads); workerList = new ThreadPoolWorker[numberOfThreads]; for (int i = 0; i < workerList.length; i++) { workerList[i] = new ThreadPoolWorker(idleWorkers); } } public void execute(Runnable target) throws InterruptedException { // block (forever) until a worker is available ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove(); worker.process(target); } public void stopRequestIdleWorkers() { try { Object[] idle = idleWorkers.removeAll(); for (int i = 0; i < idle.length; i++) { ((ThreadPoolWorker) idle[i]).stopRequest(); } } catch (InterruptedException x) { Thread.currentThread().interrupt(); // re-assert } } public void stopRequestAllWorkers() { stopRequestIdleWorkers(); try { Thread.sleep(250); } catch (InterruptedException x) { } for (int i = 0; i < workerList.length; i++) { if (workerList[i].isAlive()) { workerList[i].stopRequest(); } } }
} class ThreadPoolWorker extends Object {
private static int nextWorkerID = 0; private ObjectFIFO idleWorkers; private int workerID; private ObjectFIFO handoffBox; private Thread internalThread; private volatile boolean noStopRequested; public ThreadPoolWorker(ObjectFIFO idleWorkers) { this.idleWorkers = idleWorkers; workerID = getNextWorkerID(); handoffBox = new ObjectFIFO(1); // only one slot // just before returning, the thread should be created and started. noStopRequested = true; Runnable r = new Runnable() { public void run() { try { runWork(); } catch (Exception x) { // in case ANY exception slips through x.printStackTrace(); } } }; internalThread = new Thread(r); internalThread.start(); } public static synchronized int getNextWorkerID() { // notice: synchronized at the class level to ensure uniqueness int id = nextWorkerID; nextWorkerID++; return id; } public void process(Runnable target) throws InterruptedException { handoffBox.add(target); } private void runWork() { while (noStopRequested) { try { System.out.println("workerID=" + workerID + ", ready for work"); idleWorkers.add(this); Runnable r = (Runnable) handoffBox.remove(); System.out.println("workerID=" + workerID + ", starting execution of new Runnable: " + r); runIt(r); } catch (InterruptedException x) { Thread.currentThread().interrupt(); // re-assert } } } private void runIt(Runnable r) { try { r.run(); } catch (Exception runex) { System.err.println("Uncaught exception fell through from run()"); runex.printStackTrace(); } finally { Thread.interrupted(); } } public void stopRequest() { System.out .println("workerID=" + workerID + ", stopRequest() received."); noStopRequested = false; internalThread.interrupt(); } public boolean isAlive() { return internalThread.isAlive(); }
} class ObjectFIFO extends Object {
private Object[] queue; private int capacity; private int size; private int head; private int tail; public ObjectFIFO(int cap) { capacity = (cap > 0) ? cap : 1; // at least 1 queue = new Object[capacity]; head = 0; tail = 0; size = 0; } public int getCapacity() { return capacity; } public synchronized int getSize() { return size; } public synchronized boolean isEmpty() { return (size == 0); } public synchronized boolean isFull() { return (size == capacity); } public synchronized void add(Object obj) throws InterruptedException { waitWhileFull(); queue[head] = obj; head = (head + 1) % capacity; size++; notifyAll(); } public synchronized void addEach(Object[] list) throws InterruptedException { for (int i = 0; i < list.length; i++) { add(list[i]); } } public synchronized Object remove() throws InterruptedException { waitWhileEmpty(); Object obj = queue[tail]; queue[tail] = null; tail = (tail + 1) % capacity; size--; notifyAll(); return obj; } public synchronized Object[] removeAll() throws InterruptedException { Object[] list = new Object[size]; for (int i = 0; i < list.length; i++) { list[i] = remove(); } return list; } public synchronized Object[] removeAtLeastOne() throws InterruptedException { waitWhileEmpty(); return removeAll(); } public synchronized boolean waitUntilEmpty(long msTimeout) throws InterruptedException { if (msTimeout == 0L) { waitUntilEmpty(); return true; } long endTime = System.currentTimeMillis() + msTimeout; long msRemaining = msTimeout; while (!isEmpty() && (msRemaining > 0L)) { wait(msRemaining); msRemaining = endTime - System.currentTimeMillis(); } return isEmpty(); } public synchronized void waitUntilEmpty() throws InterruptedException { while (!isEmpty()) { wait(); } } public synchronized void waitWhileEmpty() throws InterruptedException { while (isEmpty()) { wait(); } } public synchronized void waitUntilFull() throws InterruptedException { while (!isFull()) { wait(); } } public synchronized void waitWhileFull() throws InterruptedException { while (isFull()) { wait(); } }
}
</source>
Thread Pools 1
<source lang="java">
import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MainClass {
public static void main(String[] args) { int nTasks = 5; long n = 1000L; int tpSize = 10; ThreadPoolExecutor tpe = new ThreadPoolExecutor(tpSize, tpSize, 50000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); Task[] tasks = new Task[nTasks]; for (int i = 0; i < nTasks; i++) { tasks[i] = new Task(n, "Task " + i); tpe.execute(tasks[i]); } tpe.shutdown(); }
} class SingleThreadAccess {
private ThreadPoolExecutor tpe; public SingleThreadAccess() { tpe = new ThreadPoolExecutor(1, 1, 50000L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); } public void invokeLater(Runnable r) { tpe.execute(r); } public void invokeAneWait(Runnable r) throws InterruptedException, ExecutionException { FutureTask task = new FutureTask(r, null); tpe.execute(task); task.get(); } public void shutdown() { tpe.shutdown(); }
} class Task implements Runnable {
long n; String id; private long fib(long n) { if (n == 0) return 0L; if (n == 1) return 1L; return fib(n - 1) + fib(n - 2); } public Task(long n, String id) { this.n = n; this.id = id; } public void run() { Date d = new Date(); DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS"); long startTime = System.currentTimeMillis(); d.setTime(startTime); System.out.println("Starting task " + id + " at " + df.format(d)); fib(n); long endTime = System.currentTimeMillis(); d.setTime(endTime); System.out.println("Ending task " + id + " at " + df.format(d) + " after " + (endTime - startTime) + " milliseconds"); }
}
</source>
Thread Pools 2
<source lang="java">
/* Java Threads, 3rd Edition By Scott Oaks, Henry Wong 3rd Edition September 2004 ISBN: 0-596-00782-5
- /
import java.util.concurrent.*; import java.util.*; import java.text.*; import java.io.*; public class SingleThreadTest {
public static void main(String[] args) { int nTasks = 5; int fib = 4; SingleThreadAccess sta = new SingleThreadAccess(); for (int i = 0; i < nTasks; i++) sta.invokeLater(new Task(fib, "Task " + i)); sta.shutdown(); }
} class SingleThreadAccess {
private ThreadPoolExecutor tpe; public SingleThreadAccess() { tpe = new ThreadPoolExecutor( 1, 1, 50000L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); } public void invokeLater(Runnable r) { tpe.execute(r); } public void invokeAneWait(Runnable r) throws InterruptedException, ExecutionException { FutureTask task = new FutureTask(r, null); tpe.execute(task); task.get(); } public void shutdown() { tpe.shutdown(); }
} class Task implements Runnable {
long n; String id; private long fib(long n) { if (n == 0) return 0L; if (n == 1) return 1L; return fib(n - 1) + fib(n - 2); } public Task(long n, String id) { this.n = n; this.id = id; } public void run() { Date d = new Date(); DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS"); long startTime = System.currentTimeMillis(); d.setTime(startTime); System.out.println("Starting task " + id + " at " + df.format(d)); fib(n); long endTime = System.currentTimeMillis(); d.setTime(endTime); System.out.println("Ending task " + id + " at " + df.format(d) + " after " + (endTime - startTime) + " milliseconds"); }
}
</source>
Thread Pool Test
<source lang="java"> /*
DEVELOPING GAME IN JAVA Caracteristiques Editeur : NEW RIDERS Auteur : BRACKEEN Parution : 09 2003 Pages : 972 Isbn : 1-59273-005-1 Reliure : Paperback Disponibilite : Disponible a la librairie
- /
import java.util.LinkedList; public class ThreadPoolTest {
public static void main(String[] args) { if (args.length != 2) { System.out.println("Tests the ThreadPool task."); System.out .println("Usage: java ThreadPoolTest numTasks numThreads"); System.out.println(" numTasks - integer: number of task to run."); System.out.println(" numThreads - integer: number of threads " + "in the thread pool."); return; } int numTasks = Integer.parseInt(args[0]); int numThreads = Integer.parseInt(args[1]); // create the thread pool ThreadPool threadPool = new ThreadPool(numThreads); // run example tasks for (int i = 0; i < numTasks; i++) { threadPool.runTask(createTask(i)); } // close the pool and wait for all tasks to finish. threadPool.join(); } /** * Creates a simple Runnable that prints an ID, waits 500 milliseconds, then * prints the ID again. */ private static Runnable createTask(final int taskID) { return new Runnable() { public void run() { System.out.println("Task " + taskID + ": start"); // simulate a long-running task try { Thread.sleep(500); } catch (InterruptedException ex) { } System.out.println("Task " + taskID + ": end"); } }; }
} /**
* A thread pool is a group of a limited number of threads that are used to * execute tasks. */
class ThreadPool extends ThreadGroup {
private boolean isAlive; private LinkedList taskQueue; private int threadID; private static int threadPoolID; /** * Creates a new ThreadPool. * * @param numThreads * The number of threads in the pool. */ public ThreadPool(int numThreads) { super("ThreadPool-" + (threadPoolID++)); setDaemon(true); isAlive = true; taskQueue = new LinkedList(); for (int i = 0; i < numThreads; i++) { new PooledThread().start(); } } /** * Requests a new task to run. This method returns immediately, and the task * executes on the next available idle thread in this ThreadPool.*
* Tasks start execution in the order they are received. * * @param task * The task to run. If null, no action is taken. * @throws IllegalStateException * if this ThreadPool is already closed. */ public synchronized void runTask(Runnable task) { if (!isAlive) { throw new IllegalStateException(); } if (task != null) { taskQueue.add(task); notify(); } } protected synchronized Runnable getTask() throws InterruptedException { while (taskQueue.size() == 0) { if (!isAlive) { return null; } wait(); } return (Runnable) taskQueue.removeFirst(); } /** * Closes this ThreadPool and returns immediately. All threads are stopped, * and any waiting tasks are not executed. Once a ThreadPool is closed, no * more tasks can be run on this ThreadPool. */ public synchronized void close() { if (isAlive) { isAlive = false; taskQueue.clear(); interrupt(); } } /** * Closes this ThreadPool and waits for all running threads to finish. Any * waiting tasks are executed. */ public void join() { // notify all waiting threads that this ThreadPool is no // longer alive synchronized (this) { isAlive = false; notifyAll(); } // wait for all threads to finish Thread[] threads = new Thread[activeCount()]; int count = enumerate(threads); for (int i = 0; i < count; i++) { try { threads[i].join(); } catch (InterruptedException ex) { } } } /** * A PooledThread is a Thread in a ThreadPool group, designed to run tasks * (Runnables). */ private class PooledThread extends Thread { public PooledThread() { super(ThreadPool.this, "PooledThread-" + (threadID++)); } public void run() { while (!isInterrupted()) { // get a task to run Runnable task = null; try { task = getTask(); } catch (InterruptedException ex) { } // if getTask() returned null or was interrupted, // close this thread by returning. if (task == null) { return; } // run the task, and eat any exceptions it throws try { task.run(); } catch (Throwable t) { uncaughtException(this, t); } } } } } </source>
Very basic implementation of a thread pool
<source lang="java">
/*
* Copyright (c) 2004, Rob Gordon. */
// revised from oddjob import java.util.LinkedList;
/**
* Very basic implementation of a thread pool. * * @author Rob Gordon. */
public class ThreadPool {
private BlockingQueue queue = new BlockingQueue(); private boolean closed = true; private int poolSize = 3; public void setPoolSize(int poolSize) { this.poolSize = poolSize; } public int getPoolSize() { return poolSize; } synchronized public void start() { if (!closed) { throw new IllegalStateException("Pool already started."); } closed = false; for (int i = 0; i < poolSize; ++i) { new PooledThread().start(); } } synchronized public void execute(Runnable job) { if (closed) { throw new PoolClosedException(); } queue.enqueue(job); } private class PooledThread extends Thread { public void run() { while (true) { Runnable job = (Runnable) queue.dequeue(); if (job == null) { break; } try { job.run(); } catch (Throwable t) { // ignore } } } } public void close() { closed = true; queue.close(); } private static class PoolClosedException extends RuntimeException { PoolClosedException() { super ("Pool closed."); } }
} /*
* Copyright � 2004, Rob Gordon. */
/**
* * @author Rob Gordon. */ class BlockingQueue { private final LinkedList list = new LinkedList(); private boolean closed = false; private boolean wait = false; synchronized public void enqueue(Object o) { if (closed) { throw new ClosedException(); } list.add(o); notify(); } synchronized public Object dequeue() { while (!closed && list.size() == 0) { try { wait(); } catch (InterruptedException e) { // ignore } } if (list.size() == 0) { return null; } return list.removeFirst(); } synchronized public int size() { return list.size(); } synchronized public void close() { closed = true; notifyAll(); } synchronized public void open() { closed = false; } public static class ClosedException extends RuntimeException { ClosedException() { super("Queue closed."); } }
}
</source>
Worker thread pool
<source lang="java">
/**
* * JFreeReport : a free Java reporting library * * * Project Info: http://reporting.pentaho.org/ * * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors. * * This library is free software; you can redistribute it and/or modify it under the terms * of the GNU Lesser General Public License as published by the Free Software Foundation; * either version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License along with this * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, * Boston, MA 02111-1307, USA. * * [Java is a trademark or registered trademark of Sun Microsystems, Inc. * in the United States and other countries.] * * ------------ * WorkerPool.java * ------------ * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors. */
/**
* A simple static workpool. Worker threads are created when necessary. * * @author Thomas Morgner * */
public class WorkerPool {
/** * The worker array. */ private Worker[] workers; /** * A flag indicating whether idle workers are available. */ private boolean workersAvailable; /** * the name prefix for all workers of this pool. */ private String namePrefix; /** * Creates a new worker pool with the default size of 10 workers and the default name. */ public WorkerPool () { this(10); } /** * Creates a new workerpool with the given number of workers and the default name. * * @param size the maximum number of workers available. */ public WorkerPool (final int size) { this(size, "WorkerPool-worker"); } /** * Creates a new worker pool for the given number of workers and with the given name * prefix. * * @param size the size of the worker pool. * @param namePrefix the name prefix for all created workers. */ public WorkerPool (final int size, final String namePrefix) { if (size <= 0) { throw new IllegalArgumentException("Size must be > 0"); } workers = new Worker[size]; workersAvailable = true; this.namePrefix = namePrefix; } /** * Checks whether workers are available. * * @return true, if at least one worker is idle, false otherwise. */ public synchronized boolean isWorkerAvailable () { return workersAvailable; } /** * Updates the workersAvailable flag after a worker was assigned. */ private void updateWorkersAvailable () { for (int i = 0; i < workers.length; i++) { if (workers[i] == null) { workersAvailable = true; return; } if (workers[i].isAvailable() == true) { workersAvailable = true; return; } } workersAvailable = false; } /** * Waits until a worker will be available. */ private synchronized void waitForWorkerAvailable () { while (isWorkerAvailable() == false) { try { // remove lock this.wait(5000); } catch (InterruptedException ie) { // ignored } } } /** * Returns a workerhandle for the given workload. This method will wait until an idle * worker is found. * * @param r the workload for the worker * @return a handle to the worker. */ public synchronized WorkerHandle getWorkerForWorkload (final Runnable r) { waitForWorkerAvailable(); int emptySlot = -1; for (int i = 0; i < workers.length; i++) { if (workers[i] == null) { // in the first run, try to avoid to create new threads... // reuse the already available threads if (emptySlot == -1) { emptySlot = i; } continue; } if (workers[i].isAvailable() == true) { workers[i].setWorkload(r); updateWorkersAvailable(); return new WorkerHandle(workers[i]); } } if (emptySlot != -1) { workers[emptySlot] = new Worker(); workers[emptySlot].setName(namePrefix + "-" + emptySlot); workers[emptySlot].setWorkerPool(this); workers[emptySlot].setWorkload(r); updateWorkersAvailable(); return new WorkerHandle(workers[emptySlot]); } throw new IllegalStateException ("At this point, a worker should already have been assigned."); } /** * Marks the given worker as finished. The worker will be removed from the list of the * available workers. * * @param worker the worker which was finished. */ public void workerFinished (final Worker worker) { if (worker.isFinish() == false) { throw new IllegalArgumentException("This worker is not in the finish state."); } for (int i = 0; i < workers.length; i++) { if (workers[i] == worker) { synchronized (this) { workers[i] = null; workersAvailable = true; this.notifyAll(); } return; } } } /** * Marks the given worker as available. * * @param worker the worker which was available. */ public synchronized void workerAvailable (final Worker worker) { for (int i = 0; i < workers.length; i++) { if (workers[i] == worker) { synchronized (this) { workersAvailable = true; this.notifyAll(); } return; } } } /** * Finishes all worker of this pool. */ public void finishAll () { for (int i = 0; i < workers.length; i++) { if (workers[i] != null) { workers[i].finish(); } } }
} /**
* * JFreeReport : a free Java reporting library * * * Project Info: http://reporting.pentaho.org/ * * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors. * * This library is free software; you can redistribute it and/or modify it under the terms * of the GNU Lesser General Public License as published by the Free Software Foundation; * either version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License along with this * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, * Boston, MA 02111-1307, USA. * * [Java is a trademark or registered trademark of Sun Microsystems, Inc. * in the United States and other countries.] * * ------------ * Worker.java * ------------ * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors. */
/**
* A simple worker implementation. The worker executes a assigned workload and then sleeps * until another workload is set or the worker is killed. * * @author Thomas Morgner */
final class Worker extends Thread {
/** * the worker"s task. */ private Runnable workload; /** * a flag whether the worker should exit after the processing. */ private volatile boolean finish; /** * the time in milliseconds beween 2 checks for exit or work requests. */ private final int sleeptime; /** * The worker pool, to which this worker is assigned. May be null. */ private WorkerPool workerPool; /** * Creates a new worker. * * @param sleeptime the time this worker sleeps until he checks for new work. */ public Worker (final int sleeptime) { this.sleeptime = sleeptime; this.setDaemon(true); start(); } /** * Creates a new worker with an default idle timeout of 2 minutes. */ public Worker () { this(120000); } /** * Set the next workload for this worker. * * @param r the next workload for the worker. * @throws IllegalStateException if the worker is not idle. */ public void setWorkload (final Runnable r) { if (workload != null) { throw new IllegalStateException("This worker is not idle."); } //Log.debug("Workload set..."); synchronized (this) { workload = r; //Log.debug("Workload assigned: Notified " + getName()); this.notifyAll(); } } /** * Returns the workload object. * * @return the runnable executed by this worker thread. */ public synchronized Runnable getWorkload() { return workload; } /** * Kills the worker after he completed his work. Awakens the worker if he"s sleeping, so * that the worker dies without delay. */ public void finish () { finish = true; // we are evil .. try { this.interrupt(); } catch (SecurityException se) { // ignored } if (workerPool != null) { workerPool.workerFinished(this); } } /** * Checks whether this worker has some work to do. * * @return true, if this worker has no more work and is currently sleeping. */ public boolean isAvailable () { return (workload == null); } /** * If a workload is set, process it. After the workload is processed, this worker starts * to sleep until a new workload is set for the worker or the worker got the finish() * request. */ public void run () { while (!finish) { if (workload != null) { try { workload.run(); } catch (Exception e) { System.out.println("Worker caught exception on run: "+ e); } workload = null; if (workerPool != null) { workerPool.workerAvailable(this); } } if (!finish) { synchronized (this) { try { // remove lock this.wait(sleeptime); } catch (InterruptedException ie) { // ignored } } } } synchronized (this) { this.notifyAll(); } } /** * Checks whether this worker has received the signal to finish and die. * * @return true, if the worker should finish the work and end the thread. */ public boolean isFinish () { return finish; } /** * Returns the worker"s assigned pool. * * @return the worker pool (or null, if the worker is not assigned to a pool). */ public WorkerPool getWorkerPool () { return workerPool; } /** * Defines the worker"s assigned pool. * * @param workerPool the worker pool (or null, if the worker is not assigned to a * pool). */ public void setWorkerPool (final WorkerPool workerPool) { this.workerPool = workerPool; }
}
/**
* * JFreeReport : a free Java reporting library * * * Project Info: http://reporting.pentaho.org/ * * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors. * * This library is free software; you can redistribute it and/or modify it under the terms * of the GNU Lesser General Public License as published by the Free Software Foundation; * either version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License along with this * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, * Boston, MA 02111-1307, USA. * * [Java is a trademark or registered trademark of Sun Microsystems, Inc. * in the United States and other countries.] * * ------------ * WorkerHandle.java * ------------ * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors. */
/**
* The worker handle is a control structure which allows control over the worker without * exposing the thread object. * * @author Thomas Morgner * @deprecated This class is used by the WorkerPool, which is not used anywhere anymore. */ class WorkerHandle
{
/** * The worker for this handle. */ private final Worker worker; /** * Creates a new handle for the given worker. * * @param worker the worker. */ public WorkerHandle (final Worker worker) { this.worker = worker; } /** * Finishes the worker. */ public void finish () { worker.finish(); }
}
</source>