Java/Threads/Thread Pool

Материал из Java эксперт
Перейти к: навигация, поиск

Create a new thread for the thread pool. The create thread will be a daemon thread.

   <source lang="java">
 

import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /**

* Simple implementation of a ThreadFactory that 
* marks all of the threads as daemon threads.
*/

public class DaemonThreadFactory implements ThreadFactory {

   private final ThreadFactory factory = Executors.defaultThreadFactory();
   /**
    * Create a new thread for the thread pool.  The create
    * thread will be a daemon thread.
    * 
    * @param r      The runnable used by the thread pool.
    * 
    * @return The newly created thread. 
    */
   public Thread newThread(Runnable r) {
       Thread t = factory.newThread(r);
       if (!t.isDaemon()) {
           t.setDaemon(true);
       }
       return t;
   }

}


 </source>
   
  
 
  



Defining a thread for a thread pool

   <source lang="java">
  

import java.util.LinkedList; class ThreadTask extends Thread {

 private ThreadPool pool;
 public ThreadTask(ThreadPool thePool) {
   pool = thePool;
 }
 public void run() {
   while (true) {
     // blocks until job
     Runnable job = pool.getNext();
     try {
       job.run();
     } catch (Exception e) {
       // Ignore exceptions thrown from jobs
       System.err.println("Job exception: " + e);
     }
   }
 }

} public class ThreadPool {

 private LinkedList tasks = new LinkedList();
 public ThreadPool(int size) {
   for (int i = 0; i < size; i++) {
     Thread thread = new ThreadTask(this);
     thread.start();
   }
 }
 public void run(Runnable task) {
   synchronized (tasks) {
     tasks.addLast(task);
     tasks.notify();
   }
 }
 public Runnable getNext() {
   Runnable returnVal = null;
   synchronized (tasks) {
     while (tasks.isEmpty()) {
       try {
         tasks.wait();
       } catch (InterruptedException ex) {
         System.err.println("Interrupted");
       }
     }
     returnVal = (Runnable) tasks.removeFirst();
   }
   return returnVal;
 }
 public static void main(String args[]) {
   final String message[] = { "Java", "Source", "and", "Support" };
   ThreadPool pool = new ThreadPool(message.length / 2);
   for (int i = 0, n = message.length; i < n; i++) {
     final int innerI = i;
     Runnable runner = new Runnable() {
       public void run() {
         for (int j = 0; j < 25; j++) {
           System.out.println("j: " + j + ": " + message[innerI]);
         }
       }
     };
     pool.run(runner);
   }
 }

}



 </source>
   
  
 
  



JDK1.5 provides a mechanism to create a pool a scheduled task

   <source lang="java">
  

import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main{

 public static void main(String args[]) {
   ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(5);
   
   stpe.scheduleAtFixedRate(new Job1(), 0, 5, TimeUnit.SECONDS);
   stpe.scheduleAtFixedRate(new Job2(), 1, 2, TimeUnit.SECONDS);
 }

} class Job1 implements Runnable {

 public void run() {
   System.out.println("Job 1");
 }

} class Job2 implements Runnable {

 public void run() {
     for(int i=-99;i<99;i++){
       System.out.println(i);
     }
 }

}


 </source>
   
  
 
  



Simple object pool. Based on ThreadPool and few other classes

   <source lang="java">

/*

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.    
*/

/**

* Simple object pool. Based on ThreadPool and few other classes
* 
* The pool will ignore overflow and return null if empty.
* 
* @author Gal Shachor
* @author Costin
* @author 
* @version $Id: SimplePool.java 463298 2006-10-12 16:10:32Z henning $
*/

public final class SimplePool {

 /*
  * Where the objects are held.
  */
 private Object pool[];
 /**
  * max amount of objects to be managed set via CTOR
  */
 private int max;
 /**
  * index of previous to next free slot
  */
 private int current = -1;
 /**
  * @param max
  */
 public SimplePool(int max) {
   this.max = max;
   pool = new Object[max];
 }
 /**
  * Add the object to the pool, silent nothing if the pool is full
  * 
  * @param o
  */
 public void put(Object o) {
   int idx = -1;
   synchronized (this) {
     /*
      * if we aren"t full
      */
     if (current < max - 1) {
       /*
        * then increment the current index.
        */
       idx = ++current;
     }
     if (idx >= 0) {
       pool[idx] = o;
     }
   }
 }
 /**
  * Get an object from the pool, null if the pool is empty.
  * 
  * @return The object from the pool.
  */
 public Object get() {
   synchronized (this) {
     /*
      * if we have any in the pool
      */
     if (current >= 0) {
       /*
        * remove the current one
        */
       Object o = pool[current];
       pool[current] = null;
       current--;
       return o;
     }
   }
   return null;
 }
 /**
  * Return the size of the pool
  * 
  * @return The pool size.
  */
 public int getMax() {
   return max;
 }
 /**
  * for testing purposes, so we can examine the pool
  * 
  * @return Array of Objects in the pool.
  */
 Object[] getPool() {
   return pool;
 }

}

 </source>
   
  
 
  



Simple pool of Threads

   <source lang="java">
 

/*

* Copyright (c) 1998 - 2005 Versant Corporation
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Versant Corporation - initial API and implementation
*/

import java.util.LinkedList; import java.util.Iterator; import java.util.HashSet; /**

* Simple pool of Threads.
*/

public class ThreadPool {

   private String name;
   private HashSet active = new HashSet();
   private LinkedList idle = new LinkedList();
   private int idleCount;
   private int maxActive = 10;
   private int maxIdle = 3;
   private int lastThreadId;
   private boolean closed;
   public ThreadPool(String name) {
       this.name = name;
   }
   public int getMaxActive() {
       return maxActive;
   }
   public void setMaxActive(int maxActive) {
       this.maxActive = maxActive;
   }
   public int getMaxIdle() {
       return maxIdle;
   }
   public void setMaxIdle(int maxIdle) {
       this.maxIdle = maxIdle;
   }
   public synchronized int getActiveCount() {
       return active.size();
   }
   public synchronized int getIdleCount() {
       return idleCount;
   }
   /**
    * Close the pool, stopping all threads. This does not wait for the
    * threads to actually stop before returning. This is a NOP if the
    * pool has already been closed.
    */
   public synchronized void close() {
       if (closed) {
           return;
       }
       closed = true;
       for (Iterator i = idle.iterator(); i.hasNext(); ) {
           Worker w = (Worker)i.next();
           w.terminate();
       }
       idle = null;
       idleCount = 0;
       for (Iterator i = active.iterator(); i.hasNext(); ) {
           Worker w = (Worker)i.next();
           w.terminate();
       }
       active = null;
   }
   /**
    * Executed runnable using a Thread from the pool. This will block for
    * timeoutMs and forever if this is 0. Returns true if the task is
    * being executed (i.e. a Thread was available) or false if not (i.e.
    * pool full).
    */
   public synchronized boolean execute(Runnable runnable, int timeoutMs) {
       if (closed) {
           throw new IllegalStateException("Pool has been closed");
       }
       Worker t;
       if (idleCount == 0) {
           for (; isFull(); ) {
               try {
                   wait(timeoutMs);
                   if (isFull()) {
                       return false;
                   }
               } catch (InterruptedException e) {
                   // ignore
               }
           }
           t = new Worker();
       } else {
           t = (Worker)idle.removeFirst();
           --idleCount;
       }
       active.add(t);
       t.execute(runnable);
       return true;
   }
   protected boolean isFull() {
       return active.size() >= maxActive;
   }
   private synchronized void finishedWork(Worker t) {
       if (!closed) {
           active.remove(t);
           if (idleCount >= maxIdle) {
               t.terminate();
           } else {
               idle.addLast(t);
               ++idleCount;
           }
       }
   }
   private class Worker extends Thread {
       private boolean stopFlag;
       private Runnable runnable;
       public Worker() {
           super(name + " " + ++lastThreadId);
           setDaemon(true);
       }
       /**
        * Executed runnable.
        */
       public void execute(Runnable runnable) {
           this.runnable = runnable;
           if (!isAlive()) {
               start();
           } else {
               synchronized (this) {
                   notify();
               }
           }
       }
       /**
        * Stop this thread as soon as possible.
        */
       public void terminate() {
           stopFlag = true;
           interrupt();
       }
       public void run() {
           for (; !stopFlag; ) {
               try {
                   runnable.run();
               } catch (Throwable e) {
                   if (e instanceof ThreadDeath) {
                       throw (ThreadDeath)e;
                   }
               }
               runnable = null;
               finishedWork(this);
               if (stopFlag) break;
               synchronized (this) {
                   try {
                       wait();
                   } catch (InterruptedException e) {
                       // ignore
                   }
               }
           }
       }
   }

}


 </source>
   
  
 
  



Simple thread pool. A task is executed by obtaining a thread from the pool

   <source lang="java">
 

/*

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.    
*/

import java.util.ArrayList; import java.util.List;

/** Simple thread pool. A task is executed by obtaining a thread from

* the pool
*/

public class ThreadPool {

 /** The thread pool contains instances of {@link ThreadPool.Task}.
  */
 public interface Task {
   /** Performs the task.
    * @throws Throwable The task failed, and the worker thread won"t be used again.
    */
   void run() throws Throwable;
 }
   /** A task, which may be interrupted, if the pool is shutting down. 
    */
   public interface InterruptableTask extends Task {
       /** Interrupts the task.
        * @throws Throwable Shutting down the task failed.
        */
       void shutdown() throws Throwable;
   }
   private class Poolable {
       private boolean shuttingDown;
       private Task task;
       private Thread thread;
       Poolable(ThreadGroup pGroup, int pNum) {
           thread = new Thread(pGroup, pGroup.getName() + "-" + pNum){
               public void run() {
                   while (!isShuttingDown()) {
                       final Task t = getTask();
                       if (t == null) {
                           try {
                               synchronized (this) {
                                   if (!isShuttingDown()  &&  getTask() == null) {
                                       wait();
                                   }
                               }
                           } catch (InterruptedException e) {
                               // Do nothing
                           }
                       } else {
                           try {
                               t.run();
                               resetTask();
                               repool(Poolable.this);
                           } catch (Throwable e) {
                               discard(Poolable.this);
                               resetTask();
                           }
                       }
                   }
               }
           };
           thread.start();
       }
       synchronized void shutdown() {
           shuttingDown = true;
           final Task t = getTask();
           if (t != null  &&  t instanceof InterruptableTask) {
               try {
                   ((InterruptableTask) t).shutdown();
               } catch (Throwable th) {
                   // Ignore me
               }
           }
           task = null;
           synchronized (thread) {
               thread.notify();
           }
       }
       private synchronized boolean isShuttingDown() { return shuttingDown; }
       String getName() { return thread.getName(); }
       private synchronized Task getTask() {
           return task;
       }
       private synchronized void resetTask() {
           task = null;
       }
       synchronized void start(Task pTask) {
           task = pTask;
           synchronized (thread) {
               thread.notify();
           }
       }
   }
 private final ThreadGroup threadGroup;
 private final int maxSize;
 private final List waitingThreads = new ArrayList();
 private final List runningThreads = new ArrayList();
 private final List waitingTasks = new ArrayList();
 private int num;
 /** Creates a new instance.
  * @param pMaxSize Maximum number of concurrent threads.
  * @param pName Thread group name.
  */
 public ThreadPool(int pMaxSize, String pName) {
   maxSize = pMaxSize;
   threadGroup = new ThreadGroup(pName);
 }
 synchronized void discard(Poolable pPoolable) {
   pPoolable.shutdown();
       runningThreads.remove(pPoolable);
       waitingThreads.remove(pPoolable);
 }
 synchronized void repool(Poolable pPoolable) {
       if (runningThreads.remove(pPoolable)) {
           if (maxSize != 0  &&  runningThreads.size() + waitingThreads.size() >= maxSize) {
               discard(pPoolable);
           } else {
               waitingThreads.add(pPoolable);
               if (waitingTasks.size() > 0) {
                   Task task = (Task) waitingTasks.remove(waitingTasks.size() - 1);
                   startTask(task);
               }
           }
       } else {
           discard(pPoolable);
       }
 }
 /** Starts a task immediately.
  * @param pTask The task being started.
  * @return True, if the task could be started immediately. False, if
  * the maxmimum number of concurrent tasks was exceeded. If so, you
  * might consider to use the {@link #addTask(ThreadPool.Task)} method instead.
  */
 public synchronized boolean startTask(Task pTask) {
   if (maxSize != 0  &&  runningThreads.size() >= maxSize) {
     return false;
   }
       Poolable poolable;
   if (waitingThreads.size() > 0) {
       poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);
   } else {
           poolable = new Poolable(threadGroup, num++);
   }
   runningThreads.add(poolable);
       poolable.start(pTask);
   return true;
 }
 /** Adds a task for immediate or deferred execution.
  * @param pTask The task being added.
  * @return True, if the task was started immediately. False, if
  * the task will be executed later.
  */
 public synchronized boolean addTask(Task pTask) {
   if (startTask(pTask)) {
     return true;
   }
   waitingTasks.add(pTask);
   return false;
 }
 /** Closes the pool.
  */
 public synchronized void shutdown() {
       while (!waitingThreads.isEmpty()) {
           Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);
           poolable.shutdown();
       }
       while (!runningThreads.isEmpty()) {
           Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size()-1);
           poolable.shutdown();
       }
 }
 /** Returns the maximum number of concurrent threads.
  * @return Maximum number of threads.
  */
 public int getMaxThreads() { return maxSize; }
 /** Returns the number of threads, which have actually been created,
    * as opposed to the number of currently running threads.
  */
   public synchronized int getNumThreads() { return num; }

}


 </source>
   
  
 
  



Thread Cache

   <source lang="java">
 

// ThreadCache.java // $Id: ThreadCache.java,v 1.16 2000/08/16 21:37:58 ylafon Exp $ // (c) COPYRIGHT MIT and INRIA, 1996-1997. // Please first read the full copyright statement in file COPYRIGHT.html

class CachedThread extends Thread {

   Runnable     runner     = null;
   boolean      alive      = true;
   ThreadCache  cache      = null;
   CachedThread next       = null;
   CachedThread prev       = null;
   boolean      terminated = false;
   boolean      started    = false;
   boolean      firstime   = true;
   synchronized boolean isTerminated() {
 boolean ret = terminated;
 terminated = true;
 return ret;
   }
   synchronized Runnable waitForRunner() {
 boolean to = false;
 while ( alive ) {
     // Is a runner available ?
     if ( runner != null ) {
   Runnable torun = runner;
   firstime = false;
   runner   = null;
   return torun;
     } else if ( firstime ) {
   // This thread will not be declared free until it runs once:
   try {
       wait();
   } catch (InterruptedException ex) {
   }
     } else if ( alive = cache.isFree(this, to) ) {
   // Notify the cache that we are free, and continue if allowed:
   try {
       int idleto = cache.getIdleTimeout();
       to = false;
       if ( idleto > 0 ) {
     wait(idleto);
     to = (runner == null);
       } else {
     wait();
       }
   } catch (InterruptedException ex) {
   }
     }
 }
 return null;
   }
   synchronized void kill() {
 alive = false;
 notify();
   }
   synchronized boolean wakeup(Runnable runnable) {
 if ( alive ) {
     runner = runnable;
     if ( ! started ) 
   this.start();
     notify();
     return true;
 } else {
     return false;
 }
   }
   public synchronized void start() {
 super.start();
 this.started = true;
   }
   public void run() {
 try {
     while ( true ) {
   // Wait for a runner:
   Runnable torun = waitForRunner();
   // If runner, run:
   if ( torun != null ) 
       torun.run();
   // If dead, stop
   if ( ! alive )
       break;
     }
 } finally {
     cache.isDead(this);
 }
   }
   CachedThread(ThreadCache cache, int id) {
 super(cache.getThreadGroup(), cache.getThreadGroup().getName()+":"+id);
 this.cache = cache;
 setPriority(cache.getThreadPriority());
 setDaemon(true);
   }

} public class ThreadCache {

   private static final boolean debug = false;
   /**
    * Default number of cached threads.
    */
   private static final int DEFAULT_CACHESIZE = 5;
   /**
    * Has this thread cache been initialized ?
    */
   protected boolean inited = false;
   /**
    * The thread group for this thread cache.
    */
   protected ThreadGroup group = null;
   /**
    * Number of cached threads.
    */
   protected int cachesize = DEFAULT_CACHESIZE;
   /**
    * Number of created threads.
    */
   protected int threadcount = 0;
   /**
    * Uniq thread identifier within this ThreadCache instance.
    */
   protected int threadid = 0;
   /**
    * Number of idle threads to always maintain alive.
    */
   protected int idlethreads = 0;
   /**
    * Should we queue thread requests, rather then creating new threads.
    */
   protected boolean growasneeded = false;
   /**
    * Number of used threads
    */
   protected int usedthreads = 0;
   /**
    * List of free threads.
    */
   protected CachedThread freelist = null;
   protected CachedThread freetail = null;
   /**
    * The idle timeout, for a thread to wait before being killed.
    * Defaults to 5000 milliseconds.
    */
   protected int idletimeout = 5000;
   /**
    * Cached thread priority.
    */
   protected int threadpriority = 5;
   /**
    * Get the idle timeout value for this cache.
    * @return The idletimeout value, or negative if no timeout applies.
    */
   synchronized final int getIdleTimeout() {
 return (threadcount <= idlethreads) ? -1 : idletimeout;
   }
   /**
    * The given thread is about to be declared free.
    * @return A boolean, true if the thread is to continue
    * running, false if the thread should stop.
    */
   final synchronized boolean isFree(CachedThread t, boolean timedout) {
 if ( timedout && (threadcount > idlethreads) ) {
     if ( ! t.isTerminated() ) {
   threadcount--;
   usedthreads--;
   notifyAll();
     } 
     return false;
 } else if ( threadcount <= cachesize ) {
     t.prev   = freetail;
     if (freetail != null)
   freetail.next = t;
     freetail = t;
     if (freelist == null)
   freelist = t;
     usedthreads--;
     notifyAll();
     return true;
 } else {
     if ( ! t.isTerminated() ) {
   threadcount--;
   usedthreads--;
   notifyAll();
     }
     return false;
 }
   }
   /**
    * The given thread has terminated, cleanup any associated state.
    * @param dead The dead CachedThread instance.
    */
   final synchronized void isDead(CachedThread t) {
 if ( debug )
     System.out.println("** "+t+": is dead tc="+threadcount);
 if ( ! t.isTerminated() ) {
     threadcount--;
     notifyAll();
 }
   }
   /**
    * Create a new thread within this thread cache.
    * @return A new CachedThread instance.
    */
   private synchronized CachedThread createThread() {
 threadcount++;
 threadid++;
 return new CachedThread(this, threadid);
   }
   /**
    * Allocate a new thread, as requested.
    * @param waitp Should we wait until a thread is available ?
    * @return A launched CachedThread instance, or null if 
    * unable to allocate a new thread, and waitp is 
    * false.
    */
   protected synchronized CachedThread allocateThread(boolean waitp) {
 CachedThread t = null;
 while ( true ) {
     if ( freelist != null ) {
   if ( debug )
       System.out.println("*** allocateThread: free thread");
   t        = freelist;
   freelist = freelist.next;
   if (freelist != null) {
       freelist.prev = null;
   } else {
       freetail = null;
   }
   t.next = null;
   break;
     } else if ((threadcount < cachesize) || growasneeded) {
   if ( debug )
       System.out.println("*** create new thread.");
   t = createThread();
   break;
     } else if ( waitp ) {
   if ( debug )
       System.out.println("*** wait for a thread.");
   // Wait for a thread to become available
   try {
       wait();
   } catch (InterruptedException ex) {
   }
     } else {
   return null;
     }
 }
 return t;
   }
   /**
    * Set the thread cache size.
    * This will also update the number of idle threads to maintain, if 
    * requested.
    * @param cachesize The new thread cache size.
    * @param update If true also update the number of
    * threads to maintain idle.
    */
   public synchronized void setCachesize(int cachesize, boolean update) {
 this.cachesize = cachesize;
 if ( update ) 
     this.idlethreads = (cachesize>>1);
   }
   /**
    * Set the thread cache size.
    * Updaet the number of idle threads to keep alive.
    * @param cachesize The new thread cache size.
    */
   public void setCachesize(int cachesize) {
 setCachesize(cachesize, true);
   }
   /**
    * Enable/disable the thread cache to grow as needed.
    * This flag should be turned on only if always getting a thread as fast
    * as possible is critical.
    * @param onoff The toggle.
    */
   public void setGrowAsNeeded(boolean onoff) {
 this.growasneeded = onoff;
   }
   /**
    * Set all the cached threads priority.
    * Changing the cached thread priority should be done before the thread
    * cache is initialized, it will not affect already created 
    * threads.
    * @param priority The new cachewd threads priority.
    */
   public void setThreadPriority(int priority) {
 threadpriority = priority;
   }
   /**
    * Get the cached thread normal priority.
    * @return Currently assigned cached thread priority.
    */
   public int getThreadPriority() {
 return threadpriority;
   }
   /**
    * Set the idle timeout. 
    * The idle timeout value is used to eliminate threads that have remain 
    * idle for too long (although the thread cache will ensure that a 
    * decent minimal number of threads stay around).
    * @param idletimeout The new idle timeout.
    */
   public synchronized void setIdleTimeout(int idletimeout) {
 this.idletimeout = idletimeout;
   }
   /**
    * Request a thread to run on the given object.
    * @param runnable The object to run with the allocated thread.
    * @param waitp If true wait until a free thread is 
    * available, otherwise, return false.
    * @return A boolean, true if a thread was successfully
    * allocated for the given object, false otherwise.
    */
   public boolean getThread(Runnable runnable, boolean waitp) {
 if ( debug )
     System.out.println("*** getting a thread for "+runnable);
 if ( ! inited )
     throw new RuntimeException("Uninitialized thread cache");
 // Allocate and launch the thread:
 while ( true ) {
     CachedThread t = allocateThread(waitp);
     if ( t != null ) {
   if ( t.wakeup(runnable) ) {
       synchronized (this) {
     usedthreads++;
       }
       return true;
   }
     } else {
   return false;
     }
 }
   }
   /**
    * Get the ThreadGroup managed by this ThreadCache instance.
    * @return A ThreadGroup instance.
    */
   public ThreadGroup getThreadGroup() {
 return group;
   }
   /**
    * Wait until all the threads have finished their duty
    */
   public synchronized void waitForCompletion() {
 while (usedthreads > 0) {
     if ( debug )
   System.out.println("*** Waiting for "+usedthreads+ " threads");
     try {
   wait();
     } catch (InterruptedException ex) {
     }
 }
   }
   /**
    * Initialize the given thread cache.
    * This two stage initialize method is done so that configuration
    * of the thread cache can be done before any thread get actually
    * created.
    */
   public synchronized void initialize() {
 CachedThread t = createThread();
 freelist = t;
 freetail = t;
 t.next = null;
 t.prev = null;
 t.start();
 for (int i = 1 ; i < idlethreads ; i++) {
     t = createThread();
     t.next = freelist;
     t.prev = null;
     freelist.prev = t;
     freelist = t;
     t.start();
 }
 inited = true;
   }
   /**
    * Create a thread cache, whose threads are to be children of the group.
    * @param group The thread group to which this thread cache is bound.
    * @param nstart Number of thread to create in advance.
    */
   public ThreadCache(ThreadGroup group) {
 this.group = group;
   }
   /**
    * Create a thread cache, after creating a new thread group.
    * @param name The name of the thread group to create.
    */
   public ThreadCache(String name) {
 this(new ThreadGroup(name));
   }
   /**
    * Create a thread cache, after creating a new thread group.
    * @param parent The parent of the thread group to create.
    * @param name The name of the thread group.
    */
   public ThreadCache(ThreadGroup parent, String name) {
 this(new ThreadGroup(parent, name));
   }

}


 </source>
   
  
 
  



Thread pool

   <source lang="java">
  

/* Java Threads, 3rd Edition By Scott Oaks, Henry Wong 3rd Edition September 2004 ISBN: 0-596-00782-5

  • /

import java.util.concurrent.*; import java.util.concurrent.atomic.*; public class CreateTest extends Thread {

 static AtomicInteger nCalls;
 static int target = 0;
 static boolean done = false;
 static Object lock = new Object();
 public static void main(String[] args) {
   target = 10000;
   doTestCreate();
   doTestPool(8);
   doTestLoop();
   target = Integer.parseInt(args[0]);
   cleanGC();
   Timestamp createTS = new Timestamp(TimeUnit.MICROSECONDS);
   doTestCreate();
   createTS.stop();
   System.out.println("Create thread test took " + createTS);
   cleanGC();
   Timestamp pool8TS = new Timestamp(TimeUnit.MICROSECONDS);
   doTestPool(8);
   pool8TS.stop();
   System.out.println("Pool test (8 threads) took " + pool8TS);
   cleanGC();
   Timestamp poolTS = new Timestamp(TimeUnit.MICROSECONDS);
   doTestPool(1);
   poolTS.stop();
   System.out.println("Pool test (1 thread) took " + poolTS);
   cleanGC();
   Timestamp loopTS = new Timestamp(TimeUnit.MICROSECONDS);
   doTestLoop();
   loopTS.stop();
   System.out.println("Loop test took " + loopTS);
   double d = ((double) (createTS.elapsedTime() - loopTS.elapsedTime()))
       / target;
   System.out.println("Creating a thread took " + d + " " + loopTS.units()
       + " per thread");
   d = ((double) (createTS.elapsedTime() - poolTS.elapsedTime())) / target;
   System.out.println("Using a thread pool (1 thread) saved  " + d + " "
       + loopTS.units() + " per task");
   d = ((double) (createTS.elapsedTime() - pool8TS.elapsedTime()))
       / target;
   System.out.println("Using a thread pool (8 threads) saved  " + d + " "
       + loopTS.units() + " per task");
   d = ((double) (poolTS.elapsedTime() - loopTS.elapsedTime())) / target;
   System.out.println("Thread pool overhead (1 thread) is " + d + " "
       + loopTS.units() + " per task");
   d = ((double) (pool8TS.elapsedTime() - loopTS.elapsedTime())) / target;
   System.out.println("Thread pool overhead (8 threads) is " + d + " "
       + loopTS.units() + " per task");
 }
 static void doTestLoop() {
   nCalls = new AtomicInteger(0);
   done = false;
   for (int i = 0; i < target; i++)
     work();
   synchronized (lock) {
     while (!done)
       try {
         lock.wait();
       } catch (Exception e) {
       }
   }
 }
 static void doTestCreate() {
   done = false;
   nCalls = new AtomicInteger(0);
   for (int i = 0; i < target; i++) {
     Thread t = new CreateTest();
     t.start();
   }
   synchronized (lock) {
     while (!done)
       try {
         lock.wait();
       } catch (Exception e) {
       }
   }
 }

static void doTestPool(int nThreads) {

 done = false;
 nCalls = new AtomicInteger(0);
       ThreadPoolExecutor tpe = new ThreadPoolExecutor(nThreads, nThreads,
         50000L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>());
       Runnable r = new CreateTest();
       for (int i = 0; i < target; i++) {
           tpe.execute(r);
       }
       tpe.shutdown();
       try {
           tpe.awaitTermination(10000000L, TimeUnit.SECONDS);
       } catch (Exception e) {}
   }  public void run() {
   work();
 }
 public static void work() {
   int n = nCalls.incrementAndGet();
   if (n == target) {
     synchronized (lock) {
       done = true;
       lock.notify();
     }
   }
 }
 public static void cleanGC() {
   System.gc();
   System.runFinalization();
   System.gc();
 }

}



 </source>
   
  
 
  



Thread Pool 2

   <source lang="java">
  

/*

*
* Copyright (c) 1997-1999 Scott Oaks and Henry Wong. All Rights Reserved.
*
* Permission to use, copy, modify, and distribute this software
* and its documentation for NON-COMMERCIAL purposes and
* without fee is hereby granted.
*
* This sample source code is provided for example only,
* on an unsupported, as-is basis. 
*
* AUTHOR MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF
* THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
* TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
* PARTICULAR PURPOSE, OR NON-INFRINGEMENT. AUTHOR SHALL NOT BE LIABLE FOR
* ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR
* DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES.
*
* THIS SOFTWARE IS NOT DESIGNED OR INTENDED FOR USE OR RESALE AS ON-LINE
* CONTROL EQUIPMENT IN HAZARDOUS ENVIRONMENTS REQUIRING FAIL-SAFE
* PERFORMANCE, SUCH AS IN THE OPERATION OF NUCLEAR FACILITIES, AIRCRAFT
* NAVIGATION OR COMMUNICATION SYSTEMS, AIR TRAFFIC CONTROL, DIRECT LIFE
* SUPPORT MACHINES, OR WEAPONS SYSTEMS, IN WHICH THE FAILURE OF THE
* SOFTWARE COULD LEAD DIRECTLY TO DEATH, PERSONAL INJURY, OR SEVERE
* PHYSICAL OR ENVIRONMENTAL DAMAGE ("HIGH RISK ACTIVITIES").  AUTHOR
* SPECIFICALLY DISCLAIMS ANY EXPRESS OR IMPLIED WARRANTY OF FITNESS FOR
* HIGH RISK ACTIVITIES.
*/

import java.util.*; public class ThreadPool {

 class ThreadPoolRequest {
   Runnable target;
   Object lock;
   ThreadPoolRequest(Runnable t, Object l) {
     target = t;
     lock = l;
   }
 }
 class ThreadPoolThread extends Thread {
   ThreadPool parent;
   boolean shouldRun = true;
   ThreadPoolThread(ThreadPool parent, int i) {
     super("ThreadPoolThread " + i);
     this.parent = parent;
   }
   public void run() {
     ThreadPoolRequest obj = null;
     while (shouldRun) {
       try {
         parent.cvFlag.getBusyFlag();
         while (obj == null && shouldRun) {
           try {
             obj = (ThreadPoolRequest)
                 parent.objects.elementAt(0);
             parent.objects.removeElementAt(0);
           } catch (ArrayIndexOutOfBoundsException aiobe) {
             obj = null;
           } catch (ClassCastException cce) {
             System.err.println("Unexpected data");
             obj = null;
           }
           if (obj == null) {
             try {
               parent.cvAvailable.cvWait();
             } catch (InterruptedException ie) {
               return;
             }
           }
         }
       } finally {
         parent.cvFlag.freeBusyFlag();
       }
       if (!shouldRun)
         return;
       obj.target.run();
       try {
         parent.cvFlag.getBusyFlag();
         nObjects--;
         if (nObjects == 0)
           parent.cvEmpty.cvSignal();
       } finally {
         parent.cvFlag.freeBusyFlag();
       }
       if (obj.lock != null) {
         synchronized(obj.lock) {
           obj.lock.notify();
         }
       }
       obj = null;
     }
   }
 }
 Vector objects;
 int  nObjects = 0;
 CondVar cvAvailable, cvEmpty;
 BusyFlag cvFlag;
 ThreadPoolThread poolThreads[];
 boolean terminated = false;
 public ThreadPool(int n) {
   cvFlag = new BusyFlag();
   cvAvailable = new CondVar(cvFlag);
   cvEmpty = new CondVar(cvFlag);
   objects = new Vector();
   poolThreads = new ThreadPoolThread[n];
   for (int i = 0; i < n; i++) {
     poolThreads[i] = new ThreadPoolThread(this, i);
     poolThreads[i].start();
   }
 }
 private void add(Runnable target, Object lock) {
   try {
     cvFlag.getBusyFlag();
     if (terminated)
       throw new IllegalStateException("Thread pool has shutdown");
     objects.addElement(new ThreadPoolRequest(target, lock));
     nObjects++;
     cvAvailable.cvSignal();
   } finally {
     cvFlag.freeBusyFlag();
   }
 }
 public void addRequest(Runnable target) {
   add(target, null);
 }
 public void addRequestAndWait(Runnable target)
             throws InterruptedException {
   Object lock = new Object();
   synchronized(lock) {
     add(target, lock);
     lock.wait();
   }
 }
 public void waitForAll(boolean terminate) throws InterruptedException {
   try {
     cvFlag.getBusyFlag();
     while (nObjects != 0)
       cvEmpty.cvWait();
     if (terminate) {
       for (int i = 0; i < poolThreads.length; i++)
         poolThreads[i].shouldRun = false;
       cvAvailable.cvBroadcast();
       terminated = true;
     }
   } finally {
     cvFlag.freeBusyFlag();
   }
 }
 public void waitForAll() throws InterruptedException {
   waitForAll(false);
 }

}



 </source>
   
  
 
  



Thread pool demo

   <source lang="java">
  

public class ThreadPoolMain extends Object {

 public static Runnable makeRunnable(final String name, final long firstDelay) {
   return new Runnable() {
     public void run() {
       try {
         System.out.println(name + ": starting up");
         Thread.sleep(firstDelay);
         System.out.println(name + ": doing some stuff");
         Thread.sleep(2000);
         System.out.println(name + ": leaving");
       } catch (InterruptedException ix) {
         System.out.println(name + ": got interrupted!");
         return;
       } catch (Exception x) {
         x.printStackTrace();
       }
     }
     public String toString() {
       return name;
     }
   };
 }
 public static void main(String[] args) {
   try {
     ThreadPool pool = new ThreadPool(3);
     Runnable ra = makeRunnable("RA", 3000);
     pool.execute(ra);
     Runnable rb = makeRunnable("RB", 1000);
     pool.execute(rb);
     Runnable rc = makeRunnable("RC", 2000);
     pool.execute(rc);
     Runnable rd = makeRunnable("RD", 60000);
     pool.execute(rd);
     Runnable re = makeRunnable("RE", 1000);
     pool.execute(re);
     pool.stopRequestIdleWorkers();
     Thread.sleep(2000);
     pool.stopRequestIdleWorkers();
     Thread.sleep(5000);
     pool.stopRequestAllWorkers();
   } catch (InterruptedException ix) {
     ix.printStackTrace();
   }
 }

} class ThreadPool extends Object {

 private ObjectFIFO idleWorkers;
 private ThreadPoolWorker[] workerList;
 public ThreadPool(int numberOfThreads) {
   // make sure that it"s at least one
   numberOfThreads = Math.max(1, numberOfThreads);
   idleWorkers = new ObjectFIFO(numberOfThreads);
   workerList = new ThreadPoolWorker[numberOfThreads];
   for (int i = 0; i < workerList.length; i++) {
     workerList[i] = new ThreadPoolWorker(idleWorkers);
   }
 }
 public void execute(Runnable target) throws InterruptedException {
   // block (forever) until a worker is available
   ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
   worker.process(target);
 }
 public void stopRequestIdleWorkers() {
   try {
     Object[] idle = idleWorkers.removeAll();
     for (int i = 0; i < idle.length; i++) {
       ((ThreadPoolWorker) idle[i]).stopRequest();
     }
   } catch (InterruptedException x) {
     Thread.currentThread().interrupt(); // re-assert
   }
 }
 public void stopRequestAllWorkers() {
   stopRequestIdleWorkers();
   try {
     Thread.sleep(250);
   } catch (InterruptedException x) {
   }
   for (int i = 0; i < workerList.length; i++) {
     if (workerList[i].isAlive()) {
       workerList[i].stopRequest();
     }
   }
 }

} class ThreadPoolWorker extends Object {

 private static int nextWorkerID = 0;
 private ObjectFIFO idleWorkers;
 private int workerID;
 private ObjectFIFO handoffBox;
 private Thread internalThread;
 private volatile boolean noStopRequested;
 public ThreadPoolWorker(ObjectFIFO idleWorkers) {
   this.idleWorkers = idleWorkers;
   workerID = getNextWorkerID();
   handoffBox = new ObjectFIFO(1); // only one slot
   // just before returning, the thread should be created and started.
   noStopRequested = true;
   Runnable r = new Runnable() {
     public void run() {
       try {
         runWork();
       } catch (Exception x) {
         // in case ANY exception slips through
         x.printStackTrace();
       }
     }
   };
   internalThread = new Thread(r);
   internalThread.start();
 }
 public static synchronized int getNextWorkerID() {
   // notice: synchronized at the class level to ensure uniqueness
   int id = nextWorkerID;
   nextWorkerID++;
   return id;
 }
 public void process(Runnable target) throws InterruptedException {
   handoffBox.add(target);
 }
 private void runWork() {
   while (noStopRequested) {
     try {
       System.out.println("workerID=" + workerID + ", ready for work");
       idleWorkers.add(this);
       Runnable r = (Runnable) handoffBox.remove();
       System.out.println("workerID=" + workerID
           + ", starting execution of new Runnable: " + r);
       runIt(r);
     } catch (InterruptedException x) {
       Thread.currentThread().interrupt(); // re-assert
     }
   }
 }
 private void runIt(Runnable r) {
   try {
     r.run();
   } catch (Exception runex) {
     System.err.println("Uncaught exception fell through from run()");
     runex.printStackTrace();
   } finally {
     Thread.interrupted();
   }
 }
 public void stopRequest() {
   System.out
       .println("workerID=" + workerID + ", stopRequest() received.");
   noStopRequested = false;
   internalThread.interrupt();
 }
 public boolean isAlive() {
   return internalThread.isAlive();
 }

} class ObjectFIFO extends Object {

 private Object[] queue;
 private int capacity;
 private int size;
 private int head;
 private int tail;
 public ObjectFIFO(int cap) {
   capacity = (cap > 0) ? cap : 1; // at least 1
   queue = new Object[capacity];
   head = 0;
   tail = 0;
   size = 0;
 }
 public int getCapacity() {
   return capacity;
 }
 public synchronized int getSize() {
   return size;
 }
 public synchronized boolean isEmpty() {
   return (size == 0);
 }
 public synchronized boolean isFull() {
   return (size == capacity);
 }
 public synchronized void add(Object obj) throws InterruptedException {
   waitWhileFull();
   queue[head] = obj;
   head = (head + 1) % capacity;
   size++;
   notifyAll();
 }
 public synchronized void addEach(Object[] list) throws InterruptedException {
   for (int i = 0; i < list.length; i++) {
     add(list[i]);
   }
 }
 public synchronized Object remove() throws InterruptedException {
   waitWhileEmpty();
   Object obj = queue[tail];
   queue[tail] = null;
   tail = (tail + 1) % capacity;
   size--;
   notifyAll(); 
   return obj;
 }
 public synchronized Object[] removeAll() throws InterruptedException {
    Object[] list = new Object[size]; 
   for (int i = 0; i < list.length; i++) {
     list[i] = remove();
   }
   return list;
 }
 public synchronized Object[] removeAtLeastOne() throws InterruptedException {
   waitWhileEmpty(); 
   return removeAll();
 }
 public synchronized boolean waitUntilEmpty(long msTimeout)
     throws InterruptedException {
   if (msTimeout == 0L) {
     waitUntilEmpty(); 
     return true;
   }
   long endTime = System.currentTimeMillis() + msTimeout;
   long msRemaining = msTimeout;
   while (!isEmpty() && (msRemaining > 0L)) {
     wait(msRemaining);
     msRemaining = endTime - System.currentTimeMillis();
   }
   return isEmpty();
 }
 public synchronized void waitUntilEmpty() throws InterruptedException {
   while (!isEmpty()) {
     wait();
   }
 }
 public synchronized void waitWhileEmpty() throws InterruptedException {
   while (isEmpty()) {
     wait();
   }
 }
 public synchronized void waitUntilFull() throws InterruptedException {
   while (!isFull()) {
     wait();
   }
 }
 public synchronized void waitWhileFull() throws InterruptedException {
   while (isFull()) {
     wait();
   }
 }

}



 </source>
   
  
 
  



Thread Pools 1

   <source lang="java">
  

import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MainClass {

 public static void main(String[] args) {
   int nTasks = 5;
   long n = 1000L;
   int tpSize = 10;
   ThreadPoolExecutor tpe = new ThreadPoolExecutor(tpSize, tpSize, 50000L, TimeUnit.MILLISECONDS,
       new LinkedBlockingQueue<Runnable>());
   Task[] tasks = new Task[nTasks];
   for (int i = 0; i < nTasks; i++) {
     tasks[i] = new Task(n, "Task " + i);
     tpe.execute(tasks[i]);
   }
   tpe.shutdown();
 }

} class SingleThreadAccess {

 private ThreadPoolExecutor tpe;
 public SingleThreadAccess() {
   tpe = new ThreadPoolExecutor(1, 1, 50000L, TimeUnit.SECONDS,
       new LinkedBlockingQueue<Runnable>());
 }
 public void invokeLater(Runnable r) {
   tpe.execute(r);
 }
 public void invokeAneWait(Runnable r) throws InterruptedException, ExecutionException {
   FutureTask task = new FutureTask(r, null);
   tpe.execute(task);
   task.get();
 }
 public void shutdown() {
   tpe.shutdown();
 }

} class Task implements Runnable {

 long n;
 String id;
 private long fib(long n) {
   if (n == 0)
     return 0L;
   if (n == 1)
     return 1L;
   return fib(n - 1) + fib(n - 2);
 }
 public Task(long n, String id) {
   this.n = n;
   this.id = id;
 }
 public void run() {
   Date d = new Date();
   DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
   long startTime = System.currentTimeMillis();
   d.setTime(startTime);
   System.out.println("Starting task " + id + " at " + df.format(d));
   fib(n);
   long endTime = System.currentTimeMillis();
   d.setTime(endTime);
   System.out.println("Ending task " + id + " at " + df.format(d) + " after "
       + (endTime - startTime) + " milliseconds");
 }

}



 </source>
   
  
 
  



Thread Pools 2

   <source lang="java">
  

/* Java Threads, 3rd Edition By Scott Oaks, Henry Wong 3rd Edition September 2004 ISBN: 0-596-00782-5

  • /

import java.util.concurrent.*; import java.util.*; import java.text.*; import java.io.*; public class SingleThreadTest {

 public static void main(String[] args) {
   int nTasks = 5;
   int fib = 4;
   SingleThreadAccess sta = new SingleThreadAccess();
   for (int i = 0; i < nTasks; i++)
     sta.invokeLater(new Task(fib, "Task " + i));
   sta.shutdown();
 }

} class SingleThreadAccess {

 private ThreadPoolExecutor tpe;
 public SingleThreadAccess() {
       tpe = new ThreadPoolExecutor(
         1, 1, 50000L, TimeUnit.SECONDS,
         new LinkedBlockingQueue<Runnable>());
   }  public void invokeLater(Runnable r) {
   tpe.execute(r);
 }
 public void invokeAneWait(Runnable r) throws InterruptedException,
     ExecutionException {
   FutureTask task = new FutureTask(r, null);
   tpe.execute(task);
   task.get();
 }
 public void shutdown() {
   tpe.shutdown();
 }

} class Task implements Runnable {

 long n;
 String id;
 private long fib(long n) {
   if (n == 0)
     return 0L;
   if (n == 1)
     return 1L;
   return fib(n - 1) + fib(n - 2);
 }
 public Task(long n, String id) {
   this.n = n;
   this.id = id;
 }
 public void run() {
   Date d = new Date();
   DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
   long startTime = System.currentTimeMillis();
   d.setTime(startTime);
   System.out.println("Starting task " + id + " at " + df.format(d));
   fib(n);
   long endTime = System.currentTimeMillis();
   d.setTime(endTime);
   System.out.println("Ending task " + id + " at " + df.format(d)
       + " after " + (endTime - startTime) + " milliseconds");
 }

}



 </source>
   
  
 
  



Thread Pool Test

   <source lang="java">
  
      /*

DEVELOPING GAME IN JAVA Caracteristiques Editeur : NEW RIDERS Auteur : BRACKEEN Parution : 09 2003 Pages : 972 Isbn : 1-59273-005-1 Reliure : Paperback Disponibilite : Disponible a la librairie

  • /

import java.util.LinkedList; public class ThreadPoolTest {

 public static void main(String[] args) {
   if (args.length != 2) {
     System.out.println("Tests the ThreadPool task.");
     System.out
         .println("Usage: java ThreadPoolTest numTasks numThreads");
     System.out.println("  numTasks - integer: number of task to run.");
     System.out.println("  numThreads - integer: number of threads "
         + "in the thread pool.");
     return;
   }
   int numTasks = Integer.parseInt(args[0]);
   int numThreads = Integer.parseInt(args[1]);
   // create the thread pool
   ThreadPool threadPool = new ThreadPool(numThreads);
   // run example tasks
   for (int i = 0; i < numTasks; i++) {
     threadPool.runTask(createTask(i));
   }
   // close the pool and wait for all tasks to finish.
   threadPool.join();
 }
 /**
  * Creates a simple Runnable that prints an ID, waits 500 milliseconds, then
  * prints the ID again.
  */
 private static Runnable createTask(final int taskID) {
   return new Runnable() {
     public void run() {
       System.out.println("Task " + taskID + ": start");
       // simulate a long-running task
       try {
         Thread.sleep(500);
       } catch (InterruptedException ex) {
       }
       System.out.println("Task " + taskID + ": end");
     }
   };
 }

} /**

* A thread pool is a group of a limited number of threads that are used to
* execute tasks.
*/

class ThreadPool extends ThreadGroup {

 private boolean isAlive;
 private LinkedList taskQueue;
 private int threadID;
 private static int threadPoolID;
 /**
  * Creates a new ThreadPool.
  * 
  * @param numThreads
  *            The number of threads in the pool.
  */
 public ThreadPool(int numThreads) {
   super("ThreadPool-" + (threadPoolID++));
   setDaemon(true);
   isAlive = true;
   taskQueue = new LinkedList();
   for (int i = 0; i < numThreads; i++) {
     new PooledThread().start();
   }
 }
 /**
  * Requests a new task to run. This method returns immediately, and the task
  * executes on the next available idle thread in this ThreadPool.
*

* Tasks start execution in the order they are received. * * @param task * The task to run. If null, no action is taken. * @throws IllegalStateException * if this ThreadPool is already closed. */ public synchronized void runTask(Runnable task) { if (!isAlive) { throw new IllegalStateException(); } if (task != null) { taskQueue.add(task); notify(); } } protected synchronized Runnable getTask() throws InterruptedException { while (taskQueue.size() == 0) { if (!isAlive) { return null; } wait(); } return (Runnable) taskQueue.removeFirst(); } /** * Closes this ThreadPool and returns immediately. All threads are stopped, * and any waiting tasks are not executed. Once a ThreadPool is closed, no * more tasks can be run on this ThreadPool. */ public synchronized void close() { if (isAlive) { isAlive = false; taskQueue.clear(); interrupt(); } } /** * Closes this ThreadPool and waits for all running threads to finish. Any * waiting tasks are executed. */ public void join() { // notify all waiting threads that this ThreadPool is no // longer alive synchronized (this) { isAlive = false; notifyAll(); } // wait for all threads to finish Thread[] threads = new Thread[activeCount()]; int count = enumerate(threads); for (int i = 0; i < count; i++) { try { threads[i].join(); } catch (InterruptedException ex) { } } } /** * A PooledThread is a Thread in a ThreadPool group, designed to run tasks * (Runnables). */ private class PooledThread extends Thread { public PooledThread() { super(ThreadPool.this, "PooledThread-" + (threadID++)); } public void run() { while (!isInterrupted()) { // get a task to run Runnable task = null; try { task = getTask(); } catch (InterruptedException ex) { } // if getTask() returned null or was interrupted, // close this thread by returning. if (task == null) { return; } // run the task, and eat any exceptions it throws try { task.run(); } catch (Throwable t) { uncaughtException(this, t); } } } } } </source>

Very basic implementation of a thread pool

   <source lang="java">
 

/*

* Copyright (c) 2004, Rob Gordon.
*/

// revised from oddjob import java.util.LinkedList;

/**

* Very basic implementation of a thread pool.
* 
* @author Rob Gordon.
*/

public class ThreadPool {

   private BlockingQueue queue = new BlockingQueue();
   private boolean closed = true;
   private int poolSize = 3;
   
   
   public void setPoolSize(int poolSize) {
       this.poolSize = poolSize;
   }
   
   public int getPoolSize() {
       return poolSize;
   }
   
   synchronized public void start() {
       if (!closed) {
           throw new IllegalStateException("Pool already started.");
       }
       closed = false;
       for (int i = 0; i < poolSize; ++i) {
           new PooledThread().start();
       }
   }
   synchronized public void execute(Runnable job) {
       if (closed) {
           throw new PoolClosedException();
       }
       queue.enqueue(job);
   }
   
   private class PooledThread extends Thread {
       
       public void run() {
           while (true) {
               Runnable job = (Runnable) queue.dequeue();
               if (job == null) {
                   break;
               }
               try {
                   job.run();
               } catch (Throwable t) {
                   // ignore
               }
           }
       }
   }
   public void close() {
       closed = true;
       queue.close();
   }
   
   private static class PoolClosedException extends RuntimeException {
       PoolClosedException() { 
           super ("Pool closed.");
       }
   }

} /*

* Copyright � 2004, Rob Gordon.
*/

/**

*
* @author Rob Gordon.
*/
class BlockingQueue {
 private final LinkedList list = new LinkedList();
 private boolean closed = false;
 private boolean wait = false;
 
 synchronized public void enqueue(Object o) {
   if (closed) {
     throw new ClosedException();
   }
   list.add(o);
   notify();
 }
 synchronized public Object dequeue() {
   while (!closed && list.size() == 0) {
     try {
       wait();
     }
     catch (InterruptedException e) {
       // ignore
     }
   }
   if (list.size() == 0) {
     return null;
   }
   return list.removeFirst();
 }
 
 synchronized public int size() {
     return list.size();
 }
 
 synchronized public void close() {
   closed = true;
   notifyAll();
 }
 
 synchronized public void open() {
     closed = false;
 }
 
 public static class ClosedException extends RuntimeException {
     ClosedException() {
         super("Queue closed.");
     }
 }

}


 </source>
   
  
 
  



Worker thread pool

   <source lang="java">
 

/**

* 
* JFreeReport : a free Java reporting library
* 
*
* Project Info:  http://reporting.pentaho.org/
*
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*
* This library is free software; you can redistribute it and/or modify it under the terms
* of the GNU Lesser General Public License as published by the Free Software Foundation;
* either version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this
* library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*
* [Java is a trademark or registered trademark of Sun Microsystems, Inc.
* in the United States and other countries.]
*
* ------------
* WorkerPool.java
* ------------
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*/

/**

* A simple static workpool. Worker threads are created when necessary.
*
* @author Thomas Morgner
* 
*/

public class WorkerPool {

 /**
  * The worker array.
  */
 private Worker[] workers;
 /**
  * A flag indicating whether idle workers are available.
  */
 private boolean workersAvailable;
 /**
  * the name prefix for all workers of this pool.
  */
 private String namePrefix;
 /**
  * Creates a new worker pool with the default size of 10 workers and the default name.
  */
 public WorkerPool ()
 {
   this(10);
 }
 /**
  * Creates a new workerpool with the given number of workers and the default name.
  *
  * @param size the maximum number of workers available.
  */
 public WorkerPool (final int size)
 {
   this(size, "WorkerPool-worker");
 }
 /**
  * Creates a new worker pool for the given number of workers and with the given name
  * prefix.
  *
  * @param size       the size of the worker pool.
  * @param namePrefix the name prefix for all created workers.
  */
 public WorkerPool (final int size, final String namePrefix)
 {
   if (size <= 0)
   {
     throw new IllegalArgumentException("Size must be > 0");
   }
   workers = new Worker[size];
   workersAvailable = true;
   this.namePrefix = namePrefix;
 }
 /**
  * Checks whether workers are available.
  *
  * @return true, if at least one worker is idle, false otherwise.
  */
 public synchronized boolean isWorkerAvailable ()
 {
   return workersAvailable;
 }
 /**
  * Updates the workersAvailable flag after a worker was assigned.
  */
 private void updateWorkersAvailable ()
 {
   for (int i = 0; i < workers.length; i++)
   {
     if (workers[i] == null)
     {
       workersAvailable = true;
       return;
     }
     if (workers[i].isAvailable() == true)
     {
       workersAvailable = true;
       return;
     }
   }
   workersAvailable = false;
 }
 /**
  * Waits until a worker will be available.
  */
 private synchronized void waitForWorkerAvailable ()
 {
   while (isWorkerAvailable() == false)
   {
     try
     {
       // remove lock
       this.wait(5000);
     }
     catch (InterruptedException ie)
     {
       // ignored
     }
   }
 }
 /**
  * Returns a workerhandle for the given workload. This method will wait until an idle
  * worker is found.
  *
  * @param r the workload for the worker
  * @return a handle to the worker.
  */
 public synchronized WorkerHandle getWorkerForWorkload (final Runnable r)
 {
   waitForWorkerAvailable();
   int emptySlot = -1;
   for (int i = 0; i < workers.length; i++)
   {
     if (workers[i] == null)
     {
       // in the first run, try to avoid to create new threads...
       // reuse the already available threads
       if (emptySlot == -1)
       {
         emptySlot = i;
       }
       continue;
     }
     if (workers[i].isAvailable() == true)
     {
       workers[i].setWorkload(r);
       updateWorkersAvailable();
       return new WorkerHandle(workers[i]);
     }
   }
   if (emptySlot != -1)
   {
     workers[emptySlot] = new Worker();
     workers[emptySlot].setName(namePrefix + "-" + emptySlot);
     workers[emptySlot].setWorkerPool(this);
     workers[emptySlot].setWorkload(r);
     updateWorkersAvailable();
     return new WorkerHandle(workers[emptySlot]);
   }
   throw new IllegalStateException
           ("At this point, a worker should already have been assigned.");
 }
 /**
  * Marks the given worker as finished. The worker will be removed from the list of the
  * available workers.
  *
  * @param worker the worker which was finished.
  */
 public void workerFinished (final Worker worker)
 {
   if (worker.isFinish() == false)
   {
     throw new IllegalArgumentException("This worker is not in the finish state.");
   }
   for (int i = 0; i < workers.length; i++)
   {
     if (workers[i] == worker)
     {
       synchronized (this)
       {
         workers[i] = null;
         workersAvailable = true;
         this.notifyAll();
       }
       return;
     }
   }
 }
 /**
  * Marks the given worker as available.
  *
  * @param worker the worker which was available.
  */
 public synchronized void workerAvailable (final Worker worker)
 {
   for (int i = 0; i < workers.length; i++)
   {
     if (workers[i] == worker)
     {
       synchronized (this)
       {
         workersAvailable = true;
         this.notifyAll();
       }
       return;
     }
   }
 }
 /**
  * Finishes all worker of this pool.
  */
 public void finishAll ()
 {
   for (int i = 0; i < workers.length; i++)
   {
     if (workers[i] != null)
     {
       workers[i].finish();
     }
   }
 }

} /**

* 
* JFreeReport : a free Java reporting library
* 
*
* Project Info:  http://reporting.pentaho.org/
*
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*
* This library is free software; you can redistribute it and/or modify it under the terms
* of the GNU Lesser General Public License as published by the Free Software Foundation;
* either version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this
* library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*
* [Java is a trademark or registered trademark of Sun Microsystems, Inc.
* in the United States and other countries.]
*
* ------------
* Worker.java
* ------------
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*/

/**

* A simple worker implementation. The worker executes a assigned workload and then sleeps
* until another workload is set or the worker is killed.
*
* @author Thomas Morgner
*/

final class Worker extends Thread {

 /**
  * the worker"s task.
  */
 private Runnable workload;
 /**
  * a flag whether the worker should exit after the processing.
  */
 private volatile boolean finish;
 /**
  * the time in milliseconds beween 2 checks for exit or work requests.
  */
 private final int sleeptime;
 /**
  * The worker pool, to which this worker is assigned. May be null.
  */
 private WorkerPool workerPool;
 /**
  * Creates a new worker.
  *
  * @param sleeptime the time this worker sleeps until he checks for new work.
  */
 public Worker (final int sleeptime)
 {
   this.sleeptime = sleeptime;
   this.setDaemon(true);
   start();
 }
 /**
  * Creates a new worker with an default idle timeout of 2 minutes.
  */
 public Worker ()
 {
   this(120000);
 }
 /**
  * Set the next workload for this worker.
  *
  * @param r the next workload for the worker.
  * @throws IllegalStateException if the worker is not idle.
  */
 public void setWorkload (final Runnable r)
 {
   if (workload != null)
   {
     throw new IllegalStateException("This worker is not idle.");
   }
   //Log.debug("Workload set...");
   synchronized (this)
   {
     workload = r;
     //Log.debug("Workload assigned: Notified " + getName());
     this.notifyAll();
   }
 }
 /**
  * Returns the workload object.
  *
  * @return the runnable executed by this worker thread.
  */
 public synchronized Runnable getWorkload()
 {
   return workload;
 }
 /**
  * Kills the worker after he completed his work. Awakens the worker if he"s sleeping, so
  * that the worker dies without delay.
  */
 public void finish ()
 {
   finish = true;
   // we are evil ..
   try
   {
     this.interrupt();
   }
   catch (SecurityException se)
   {
     // ignored
   }
   if (workerPool != null)
   {
     workerPool.workerFinished(this);
   }
 }
 /**
  * Checks whether this worker has some work to do.
  *
  * @return true, if this worker has no more work and is currently sleeping.
  */
 public boolean isAvailable ()
 {
   return (workload == null);
 }
 /**
  * If a workload is set, process it. After the workload is processed, this worker starts
  * to sleep until a new workload is set for the worker or the worker got the finish()
  * request.
  */
 public void run ()
 {
   while (!finish)
   {
     if (workload != null)
     {
       try
       {
         workload.run();
       }
       catch (Exception e)
       {
         System.out.println("Worker caught exception on run: "+ e);
       }
       workload = null;
       if (workerPool != null)
       {
         workerPool.workerAvailable(this);
       }
     }
     if (!finish)
     {
       synchronized (this)
       {
         try
         {
           // remove lock
           this.wait(sleeptime);
         }
         catch (InterruptedException ie)
         {
           // ignored
         }
       }
     }
   }
   synchronized (this)
   {
     this.notifyAll();
   }
 }
 /**
  * Checks whether this worker has received the signal to finish and die.
  *
  * @return true, if the worker should finish the work and end the thread.
  */
 public boolean isFinish ()
 {
   return finish;
 }
 /**
  * Returns the worker"s assigned pool.
  *
  * @return the worker pool (or null, if the worker is not assigned to a pool).
  */
 public WorkerPool getWorkerPool ()
 {
   return workerPool;
 }
 /**
  * Defines the worker"s assigned pool.
  *
  * @param workerPool the worker pool (or null, if the worker is not assigned to a
  *                   pool).
  */
 public void setWorkerPool (final WorkerPool workerPool)
 {
   this.workerPool = workerPool;
 }

}

/**

* 
* JFreeReport : a free Java reporting library
* 
*
* Project Info:  http://reporting.pentaho.org/
*
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*
* This library is free software; you can redistribute it and/or modify it under the terms
* of the GNU Lesser General Public License as published by the Free Software Foundation;
* either version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this
* library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
* Boston, MA 02111-1307, USA.
*
* [Java is a trademark or registered trademark of Sun Microsystems, Inc.
* in the United States and other countries.]
*
* ------------
* WorkerHandle.java
* ------------
* (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
*/

/**

* The worker handle is a control structure which allows control over the worker without
* exposing the thread object.
*
* @author Thomas Morgner
* @deprecated This class is used by the WorkerPool, which is not used anywhere anymore.
*/
class WorkerHandle

{

 /**
  * The worker for this handle.
  */
 private final Worker worker;
 /**
  * Creates a new handle for the given worker.
  *
  * @param worker the worker.
  */
 public WorkerHandle (final Worker worker)
 {
   this.worker = worker;
 }
 /**
  * Finishes the worker.
  */
 public void finish ()
 {
   worker.finish();
 }

}


 </source>