Java/Threads/Thread Pool

Материал из Java эксперт
Версия от 18:01, 31 мая 2010; (обсуждение)
(разн.) ← Предыдущая | Текущая версия (разн.) | Следующая → (разн.)
Перейти к: навигация, поиск

Create a new thread for the thread pool. The create thread will be a daemon thread.

  

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
 * Simple implementation of a ThreadFactory that 
 * marks all of the threads as daemon threads.
 */
public class DaemonThreadFactory implements ThreadFactory {
    private final ThreadFactory factory = Executors.defaultThreadFactory();
    /**
     * Create a new thread for the thread pool.  The create
     * thread will be a daemon thread.
     * 
     * @param r      The runnable used by the thread pool.
     * 
     * @return The newly created thread. 
     */
    public Thread newThread(Runnable r) {
        Thread t = factory.newThread(r);
        if (!t.isDaemon()) {
            t.setDaemon(true);
        }
        return t;
    }
}





Defining a thread for a thread pool

   
import java.util.LinkedList;
class ThreadTask extends Thread {
  private ThreadPool pool;
  public ThreadTask(ThreadPool thePool) {
    pool = thePool;
  }
  public void run() {
    while (true) {
      // blocks until job
      Runnable job = pool.getNext();
      try {
        job.run();
      } catch (Exception e) {
        // Ignore exceptions thrown from jobs
        System.err.println("Job exception: " + e);
      }
    }
  }
}
public class ThreadPool {
  private LinkedList tasks = new LinkedList();
  public ThreadPool(int size) {
    for (int i = 0; i < size; i++) {
      Thread thread = new ThreadTask(this);
      thread.start();
    }
  }
  public void run(Runnable task) {
    synchronized (tasks) {
      tasks.addLast(task);
      tasks.notify();
    }
  }
  public Runnable getNext() {
    Runnable returnVal = null;
    synchronized (tasks) {
      while (tasks.isEmpty()) {
        try {
          tasks.wait();
        } catch (InterruptedException ex) {
          System.err.println("Interrupted");
        }
      }
      returnVal = (Runnable) tasks.removeFirst();
    }
    return returnVal;
  }
  public static void main(String args[]) {
    final String message[] = { "Java", "Source", "and", "Support" };
    ThreadPool pool = new ThreadPool(message.length / 2);
    for (int i = 0, n = message.length; i < n; i++) {
      final int innerI = i;
      Runnable runner = new Runnable() {
        public void run() {
          for (int j = 0; j < 25; j++) {
            System.out.println("j: " + j + ": " + message[innerI]);
          }
        }
      };
      pool.run(runner);
    }
  }
}





JDK1.5 provides a mechanism to create a pool a scheduled task

   
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main{
  public static void main(String args[]) {
    ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(5);
    
    stpe.scheduleAtFixedRate(new Job1(), 0, 5, TimeUnit.SECONDS);
    stpe.scheduleAtFixedRate(new Job2(), 1, 2, TimeUnit.SECONDS);
  }
}
class Job1 implements Runnable {
  public void run() {
    System.out.println("Job 1");
  }
}
class Job2 implements Runnable {
  public void run() {
      for(int i=-99;i<99;i++){
        System.out.println(i);
      }
  }
}





Simple object pool. Based on ThreadPool and few other classes

 
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.    
 */
/**
 * Simple object pool. Based on ThreadPool and few other classes
 * 
 * The pool will ignore overflow and return null if empty.
 * 
 * @author Gal Shachor
 * @author Costin
 * @author 
 * @version $Id: SimplePool.java 463298 2006-10-12 16:10:32Z henning $
 */
public final class SimplePool {
  /*
   * Where the objects are held.
   */
  private Object pool[];
  /**
   * max amount of objects to be managed set via CTOR
   */
  private int max;
  /**
   * index of previous to next free slot
   */
  private int current = -1;
  /**
   * @param max
   */
  public SimplePool(int max) {
    this.max = max;
    pool = new Object[max];
  }
  /**
   * Add the object to the pool, silent nothing if the pool is full
   * 
   * @param o
   */
  public void put(Object o) {
    int idx = -1;
    synchronized (this) {
      /*
       * if we aren"t full
       */
      if (current < max - 1) {
        /*
         * then increment the current index.
         */
        idx = ++current;
      }
      if (idx >= 0) {
        pool[idx] = o;
      }
    }
  }
  /**
   * Get an object from the pool, null if the pool is empty.
   * 
   * @return The object from the pool.
   */
  public Object get() {
    synchronized (this) {
      /*
       * if we have any in the pool
       */
      if (current >= 0) {
        /*
         * remove the current one
         */
        Object o = pool[current];
        pool[current] = null;
        current--;
        return o;
      }
    }
    return null;
  }
  /**
   * Return the size of the pool
   * 
   * @return The pool size.
   */
  public int getMax() {
    return max;
  }
  /**
   * for testing purposes, so we can examine the pool
   * 
   * @return Array of Objects in the pool.
   */
  Object[] getPool() {
    return pool;
  }
}





Simple pool of Threads

  
/*
 * Copyright (c) 1998 - 2005 Versant Corporation
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 * Versant Corporation - initial API and implementation
 */
import java.util.LinkedList;
import java.util.Iterator;
import java.util.HashSet;
/**
 * Simple pool of Threads.
 */
public class ThreadPool {
    private String name;
    private HashSet active = new HashSet();
    private LinkedList idle = new LinkedList();
    private int idleCount;
    private int maxActive = 10;
    private int maxIdle = 3;
    private int lastThreadId;
    private boolean closed;
    public ThreadPool(String name) {
        this.name = name;
    }
    public int getMaxActive() {
        return maxActive;
    }
    public void setMaxActive(int maxActive) {
        this.maxActive = maxActive;
    }
    public int getMaxIdle() {
        return maxIdle;
    }
    public void setMaxIdle(int maxIdle) {
        this.maxIdle = maxIdle;
    }
    public synchronized int getActiveCount() {
        return active.size();
    }
    public synchronized int getIdleCount() {
        return idleCount;
    }
    /**
     * Close the pool, stopping all threads. This does not wait for the
     * threads to actually stop before returning. This is a NOP if the
     * pool has already been closed.
     */
    public synchronized void close() {
        if (closed) {
            return;
        }
        closed = true;
        for (Iterator i = idle.iterator(); i.hasNext(); ) {
            Worker w = (Worker)i.next();
            w.terminate();
        }
        idle = null;
        idleCount = 0;
        for (Iterator i = active.iterator(); i.hasNext(); ) {
            Worker w = (Worker)i.next();
            w.terminate();
        }
        active = null;
    }
    /**
     * Executed runnable using a Thread from the pool. This will block for
     * timeoutMs and forever if this is 0. Returns true if the task is
     * being executed (i.e. a Thread was available) or false if not (i.e.
     * pool full).
     */
    public synchronized boolean execute(Runnable runnable, int timeoutMs) {
        if (closed) {
            throw new IllegalStateException("Pool has been closed");
        }
        Worker t;
        if (idleCount == 0) {
            for (; isFull(); ) {
                try {
                    wait(timeoutMs);
                    if (isFull()) {
                        return false;
                    }
                } catch (InterruptedException e) {
                    // ignore
                }
            }
            t = new Worker();
        } else {
            t = (Worker)idle.removeFirst();
            --idleCount;
        }
        active.add(t);
        t.execute(runnable);
        return true;
    }
    protected boolean isFull() {
        return active.size() >= maxActive;
    }
    private synchronized void finishedWork(Worker t) {
        if (!closed) {
            active.remove(t);
            if (idleCount >= maxIdle) {
                t.terminate();
            } else {
                idle.addLast(t);
                ++idleCount;
            }
        }
    }
    private class Worker extends Thread {
        private boolean stopFlag;
        private Runnable runnable;
        public Worker() {
            super(name + " " + ++lastThreadId);
            setDaemon(true);
        }
        /**
         * Executed runnable.
         */
        public void execute(Runnable runnable) {
            this.runnable = runnable;
            if (!isAlive()) {
                start();
            } else {
                synchronized (this) {
                    notify();
                }
            }
        }
        /**
         * Stop this thread as soon as possible.
         */
        public void terminate() {
            stopFlag = true;
            interrupt();
        }
        public void run() {
            for (; !stopFlag; ) {
                try {
                    runnable.run();
                } catch (Throwable e) {
                    if (e instanceof ThreadDeath) {
                        throw (ThreadDeath)e;
                    }
                }
                runnable = null;
                finishedWork(this);
                if (stopFlag) break;
                synchronized (this) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        // ignore
                    }
                }
            }
        }
    }
}





Simple thread pool. A task is executed by obtaining a thread from the pool

  
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.    
 */
import java.util.ArrayList;
import java.util.List;

/** Simple thread pool. A task is executed by obtaining a thread from
 * the pool
 */
public class ThreadPool {
  /** The thread pool contains instances of {@link ThreadPool.Task}.
   */
  public interface Task {
    /** Performs the task.
     * @throws Throwable The task failed, and the worker thread won"t be used again.
     */
    void run() throws Throwable;
  }
    /** A task, which may be interrupted, if the pool is shutting down. 
     */
    public interface InterruptableTask extends Task {
        /** Interrupts the task.
         * @throws Throwable Shutting down the task failed.
         */
        void shutdown() throws Throwable;
    }
    private class Poolable {
        private boolean shuttingDown;
        private Task task;
        private Thread thread;
        Poolable(ThreadGroup pGroup, int pNum) {
            thread = new Thread(pGroup, pGroup.getName() + "-" + pNum){
                public void run() {
                    while (!isShuttingDown()) {
                        final Task t = getTask();
                        if (t == null) {
                            try {
                                synchronized (this) {
                                    if (!isShuttingDown()  &&  getTask() == null) {
                                        wait();
                                    }
                                }
                            } catch (InterruptedException e) {
                                // Do nothing
                            }
                        } else {
                            try {
                                t.run();
                                resetTask();
                                repool(Poolable.this);
                            } catch (Throwable e) {
                                discard(Poolable.this);
                                resetTask();
                            }
                        }
                    }
                }
            };
            thread.start();
        }
        synchronized void shutdown() {
            shuttingDown = true;
            final Task t = getTask();
            if (t != null  &&  t instanceof InterruptableTask) {
                try {
                    ((InterruptableTask) t).shutdown();
                } catch (Throwable th) {
                    // Ignore me
                }
            }
            task = null;
            synchronized (thread) {
                thread.notify();
            }
        }
        private synchronized boolean isShuttingDown() { return shuttingDown; }
        String getName() { return thread.getName(); }
        private synchronized Task getTask() {
            return task;
        }
        private synchronized void resetTask() {
            task = null;
        }
        synchronized void start(Task pTask) {
            task = pTask;
            synchronized (thread) {
                thread.notify();
            }
        }
    }
  private final ThreadGroup threadGroup;
  private final int maxSize;
  private final List waitingThreads = new ArrayList();
  private final List runningThreads = new ArrayList();
  private final List waitingTasks = new ArrayList();
  private int num;

  /** Creates a new instance.
   * @param pMaxSize Maximum number of concurrent threads.
   * @param pName Thread group name.
   */
  public ThreadPool(int pMaxSize, String pName) {
    maxSize = pMaxSize;
    threadGroup = new ThreadGroup(pName);
  }
  synchronized void discard(Poolable pPoolable) {
    pPoolable.shutdown();
        runningThreads.remove(pPoolable);
        waitingThreads.remove(pPoolable);
  }
  synchronized void repool(Poolable pPoolable) {
        if (runningThreads.remove(pPoolable)) {
            if (maxSize != 0  &&  runningThreads.size() + waitingThreads.size() >= maxSize) {
                discard(pPoolable);
            } else {
                waitingThreads.add(pPoolable);
                if (waitingTasks.size() > 0) {
                    Task task = (Task) waitingTasks.remove(waitingTasks.size() - 1);
                    startTask(task);
                }
            }
        } else {
            discard(pPoolable);
        }
  }
  /** Starts a task immediately.
   * @param pTask The task being started.
   * @return True, if the task could be started immediately. False, if
   * the maxmimum number of concurrent tasks was exceeded. If so, you
   * might consider to use the {@link #addTask(ThreadPool.Task)} method instead.
   */
  public synchronized boolean startTask(Task pTask) {
    if (maxSize != 0  &&  runningThreads.size() >= maxSize) {
      return false;
    }
        Poolable poolable;
    if (waitingThreads.size() > 0) {
        poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);
    } else {
            poolable = new Poolable(threadGroup, num++);
    }
    runningThreads.add(poolable);
        poolable.start(pTask);
    return true;
  }
  /** Adds a task for immediate or deferred execution.
   * @param pTask The task being added.
   * @return True, if the task was started immediately. False, if
   * the task will be executed later.
   */
  public synchronized boolean addTask(Task pTask) {
    if (startTask(pTask)) {
      return true;
    }
    waitingTasks.add(pTask);
    return false;
  }
  /** Closes the pool.
   */
  public synchronized void shutdown() {
        while (!waitingThreads.isEmpty()) {
            Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);
            poolable.shutdown();
        }
        while (!runningThreads.isEmpty()) {
            Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size()-1);
            poolable.shutdown();
        }
  }
  /** Returns the maximum number of concurrent threads.
   * @return Maximum number of threads.
   */
  public int getMaxThreads() { return maxSize; }
  /** Returns the number of threads, which have actually been created,
     * as opposed to the number of currently running threads.
   */
    public synchronized int getNumThreads() { return num; }
}





Thread Cache

  
// ThreadCache.java
// $Id: ThreadCache.java,v 1.16 2000/08/16 21:37:58 ylafon Exp $
// (c) COPYRIGHT MIT and INRIA, 1996-1997.
// Please first read the full copyright statement in file COPYRIGHT.html

class CachedThread extends Thread {
    Runnable     runner     = null;
    boolean      alive      = true;
    ThreadCache  cache      = null;
    CachedThread next       = null;
    CachedThread prev       = null;
    boolean      terminated = false;
    boolean      started    = false;
    boolean      firstime   = true;
    synchronized boolean isTerminated() {
  boolean ret = terminated;
  terminated = true;
  return ret;
    }
    synchronized Runnable waitForRunner() {
  boolean to = false;
  while ( alive ) {
      // Is a runner available ?
      if ( runner != null ) {
    Runnable torun = runner;
    firstime = false;
    runner   = null;
    return torun;
      } else if ( firstime ) {
    // This thread will not be declared free until it runs once:
    try {
        wait();
    } catch (InterruptedException ex) {
    }
      } else if ( alive = cache.isFree(this, to) ) {
    // Notify the cache that we are free, and continue if allowed:
    try {
        int idleto = cache.getIdleTimeout();
        to = false;
        if ( idleto > 0 ) {
      wait(idleto);
      to = (runner == null);
        } else {
      wait();
        }
    } catch (InterruptedException ex) {
    }
      }
  }
  return null;
    }
    synchronized void kill() {
  alive = false;
  notify();
    }
    synchronized boolean wakeup(Runnable runnable) {
  if ( alive ) {
      runner = runnable;
      if ( ! started ) 
    this.start();
      notify();
      return true;
  } else {
      return false;
  }
    }
    public synchronized void start() {
  super.start();
  this.started = true;
    }
    public void run() {
  try {
      while ( true ) {
    // Wait for a runner:
    Runnable torun = waitForRunner();
    // If runner, run:
    if ( torun != null ) 
        torun.run();
    // If dead, stop
    if ( ! alive )
        break;
      }
  } finally {
      cache.isDead(this);
  }
    }
    CachedThread(ThreadCache cache, int id) {
  super(cache.getThreadGroup(), cache.getThreadGroup().getName()+":"+id);
  this.cache = cache;
  setPriority(cache.getThreadPriority());
  setDaemon(true);
    }
}
public class ThreadCache {
    private static final boolean debug = false;
    /**
     * Default number of cached threads.
     */
    private static final int DEFAULT_CACHESIZE = 5;
    /**
     * Has this thread cache been initialized ?
     */
    protected boolean inited = false;
    /**
     * The thread group for this thread cache.
     */
    protected ThreadGroup group = null;
    /**
     * Number of cached threads.
     */
    protected int cachesize = DEFAULT_CACHESIZE;
    /**
     * Number of created threads.
     */
    protected int threadcount = 0;
    /**
     * Uniq thread identifier within this ThreadCache instance.
     */
    protected int threadid = 0;
    /**
     * Number of idle threads to always maintain alive.
     */
    protected int idlethreads = 0;
    /**
     * Should we queue thread requests, rather then creating new threads.
     */
    protected boolean growasneeded = false;
    /**
     * Number of used threads
     */
    protected int usedthreads = 0;
    /**
     * List of free threads.
     */
    protected CachedThread freelist = null;
    protected CachedThread freetail = null;
    /**
     * The idle timeout, for a thread to wait before being killed.
     * Defaults to <strong>5000</strong> milliseconds.
     */
    protected int idletimeout = 5000;
    /**
     * Cached thread priority.
     */
    protected int threadpriority = 5;
    /**
     * Get the idle timeout value for this cache.
     * @return The idletimeout value, or negative if no timeout applies.
     */
    synchronized final int getIdleTimeout() {
  return (threadcount <= idlethreads) ? -1 : idletimeout;
    }
    /**
     * The given thread is about to be declared free.
     * @return A boolean, <strong>true</strong> if the thread is to continue
     * running, <strong>false</strong> if the thread should stop.
     */
    final synchronized boolean isFree(CachedThread t, boolean timedout) {
  if ( timedout && (threadcount > idlethreads) ) {
      if ( ! t.isTerminated() ) {
    threadcount--;
    usedthreads--;
    notifyAll();
      } 
      return false;
  } else if ( threadcount <= cachesize ) {
      t.prev   = freetail;
      if (freetail != null)
    freetail.next = t;
      freetail = t;
      if (freelist == null)
    freelist = t;
      usedthreads--;
      notifyAll();
      return true;
  } else {
      if ( ! t.isTerminated() ) {
    threadcount--;
    usedthreads--;
    notifyAll();
      }
      return false;
  }
    }
    /**
     * The given thread has terminated, cleanup any associated state.
     * @param dead The dead CachedThread instance.
     */
    final synchronized void isDead(CachedThread t) {
  if ( debug )
      System.out.println("** "+t+": is dead tc="+threadcount);
  if ( ! t.isTerminated() ) {
      threadcount--;
      notifyAll();
  }
    }
    /**
     * Create a new thread within this thread cache.
     * @return A new CachedThread instance.
     */
    private synchronized CachedThread createThread() {
  threadcount++;
  threadid++;
  return new CachedThread(this, threadid);
    }
    /**
     * Allocate a new thread, as requested.
     * @param waitp Should we wait until a thread is available ?
     * @return A launched CachedThread instance, or <strong>null</strong> if 
     * unable to allocate a new thread, and <code>waitp</code> is <strong>
     * false</strong>.
     */
    protected synchronized CachedThread allocateThread(boolean waitp) {
  CachedThread t = null;
  while ( true ) {
      if ( freelist != null ) {
    if ( debug )
        System.out.println("*** allocateThread: free thread");
    t        = freelist;
    freelist = freelist.next;
    if (freelist != null) {
        freelist.prev = null;
    } else {
        freetail = null;
    }
    t.next = null;
    break;
      } else if ((threadcount < cachesize) || growasneeded) {
    if ( debug )
        System.out.println("*** create new thread.");
    t = createThread();
    break;
      } else if ( waitp ) {
    if ( debug )
        System.out.println("*** wait for a thread.");
    // Wait for a thread to become available
    try {
        wait();
    } catch (InterruptedException ex) {
    }
      } else {
    return null;
      }
  }
  return t;
    }
    /**
     * Set the thread cache size.
     * This will also update the number of idle threads to maintain, if 
     * requested.
     * @param cachesize The new thread cache size.
     * @param update If <strong>true</strong> also update the number of
     * threads to maintain idle.
     */
    public synchronized void setCachesize(int cachesize, boolean update) {
  this.cachesize = cachesize;
  if ( update ) 
      this.idlethreads = (cachesize>>1);
    }
    /**
     * Set the thread cache size.
     * Updaet the number of idle threads to keep alive.
     * @param cachesize The new thread cache size.
     */
    public void setCachesize(int cachesize) {
  setCachesize(cachesize, true);
    }
    /**
     * Enable/disable the thread cache to grow as needed.
     * This flag should be turned on only if always getting a thread as fast
     * as possible is critical.
     * @param onoff The toggle.
     */
    public void setGrowAsNeeded(boolean onoff) {
  this.growasneeded = onoff;
    }
    /**
     * Set all the cached threads priority.
     * Changing the cached thread priority should be done before the thread
     * cache is initialized, it will <em>not</em> affect already created 
     * threads.
     * @param priority The new cachewd threads priority.
     */
    public void setThreadPriority(int priority) {
  threadpriority = priority;
    }
    /**
     * Get the cached thread normal priority.
     * @return Currently assigned cached thread priority.
     */
    public int getThreadPriority() {
  return threadpriority;
    }
    /**
     * Set the idle timeout. 
     * The idle timeout value is used to eliminate threads that have remain 
     * idle for too long (although the thread cache will ensure that a 
     * decent minimal number of threads stay around).
     * @param idletimeout The new idle timeout.
     */
    public synchronized void setIdleTimeout(int idletimeout) {
  this.idletimeout = idletimeout;
    }
    /**
     * Request a thread to run on the given object.
     * @param runnable The object to run with the allocated thread.
     * @param waitp If <strong>true</strong> wait until a free thread is 
     * available, otherwise, return <strong>false</strong>.
     * @return A boolean, <strong>true</strong> if a thread was successfully
     * allocated for the given object, <strong>false</strong> otherwise.
     */
    public boolean getThread(Runnable runnable, boolean waitp) {
  if ( debug )
      System.out.println("*** getting a thread for "+runnable);
  if ( ! inited )
      throw new RuntimeException("Uninitialized thread cache");
  // Allocate and launch the thread:
  while ( true ) {
      CachedThread t = allocateThread(waitp);
      if ( t != null ) {
    if ( t.wakeup(runnable) ) {
        synchronized (this) {
      usedthreads++;
        }
        return true;
    }
      } else {
    return false;
      }
  }
    }
    /**
     * Get the ThreadGroup managed by this ThreadCache instance.
     * @return A ThreadGroup instance.
     */
    public ThreadGroup getThreadGroup() {
  return group;
    }
    /**
     * Wait until all the threads have finished their duty
     */
    public synchronized void waitForCompletion() {
  while (usedthreads > 0) {
      if ( debug )
    System.out.println("*** Waiting for "+usedthreads+ " threads");
      try {
    wait();
      } catch (InterruptedException ex) {
      }
  }
    }
    /**
     * Initialize the given thread cache.
     * This two stage initialize method is done so that configuration
     * of the thread cache can be done before any thread get actually
     * created.
     */
    public synchronized void initialize() {
  CachedThread t = createThread();
  freelist = t;
  freetail = t;
  t.next = null;
  t.prev = null;
  t.start();
  for (int i = 1 ; i < idlethreads ; i++) {
      t = createThread();
      t.next = freelist;
      t.prev = null;
      freelist.prev = t;
      freelist = t;
      t.start();
  }
  inited = true;
    }
    /**
     * Create a thread cache, whose threads are to be children of the group.
     * @param group The thread group to which this thread cache is bound.
     * @param nstart Number of thread to create in advance.
     */
    public ThreadCache(ThreadGroup group) {
  this.group = group;
    }
    /**
     * Create a thread cache, after creating a new thread group.
     * @param name The name of the thread group to create.
     */
    public ThreadCache(String name) {
  this(new ThreadGroup(name));
    }
    /**
     * Create a thread cache, after creating a new thread group.
     * @param parent The parent of the thread group to create.
     * @param name The name of the thread group.
     */
    public ThreadCache(ThreadGroup parent, String name) {
  this(new ThreadGroup(parent, name));
    }
}





Thread pool

   
/*
Java Threads, 3rd Edition
By Scott Oaks, Henry Wong
3rd Edition September 2004 
ISBN: 0-596-00782-5
*/
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class CreateTest extends Thread {
  static AtomicInteger nCalls;
  static int target = 0;
  static boolean done = false;
  static Object lock = new Object();
  public static void main(String[] args) {
    target = 10000;
    doTestCreate();
    doTestPool(8);
    doTestLoop();
    target = Integer.parseInt(args[0]);
    cleanGC();
    Timestamp createTS = new Timestamp(TimeUnit.MICROSECONDS);
    doTestCreate();
    createTS.stop();
    System.out.println("Create thread test took " + createTS);
    cleanGC();
    Timestamp pool8TS = new Timestamp(TimeUnit.MICROSECONDS);
    doTestPool(8);
    pool8TS.stop();
    System.out.println("Pool test (8 threads) took " + pool8TS);
    cleanGC();
    Timestamp poolTS = new Timestamp(TimeUnit.MICROSECONDS);
    doTestPool(1);
    poolTS.stop();
    System.out.println("Pool test (1 thread) took " + poolTS);
    cleanGC();
    Timestamp loopTS = new Timestamp(TimeUnit.MICROSECONDS);
    doTestLoop();
    loopTS.stop();
    System.out.println("Loop test took " + loopTS);
    double d = ((double) (createTS.elapsedTime() - loopTS.elapsedTime()))
        / target;
    System.out.println("Creating a thread took " + d + " " + loopTS.units()
        + " per thread");
    d = ((double) (createTS.elapsedTime() - poolTS.elapsedTime())) / target;
    System.out.println("Using a thread pool (1 thread) saved  " + d + " "
        + loopTS.units() + " per task");
    d = ((double) (createTS.elapsedTime() - pool8TS.elapsedTime()))
        / target;
    System.out.println("Using a thread pool (8 threads) saved  " + d + " "
        + loopTS.units() + " per task");
    d = ((double) (poolTS.elapsedTime() - loopTS.elapsedTime())) / target;
    System.out.println("Thread pool overhead (1 thread) is " + d + " "
        + loopTS.units() + " per task");
    d = ((double) (pool8TS.elapsedTime() - loopTS.elapsedTime())) / target;
    System.out.println("Thread pool overhead (8 threads) is " + d + " "
        + loopTS.units() + " per task");
  }
  static void doTestLoop() {
    nCalls = new AtomicInteger(0);
    done = false;
    for (int i = 0; i < target; i++)
      work();
    synchronized (lock) {
      while (!done)
        try {
          lock.wait();
        } catch (Exception e) {
        }
    }
  }
  static void doTestCreate() {
    done = false;
    nCalls = new AtomicInteger(0);
    for (int i = 0; i < target; i++) {
      Thread t = new CreateTest();
      t.start();
    }
    synchronized (lock) {
      while (!done)
        try {
          lock.wait();
        } catch (Exception e) {
        }
    }
  }
static void doTestPool(int nThreads) {
  done = false;
  nCalls = new AtomicInteger(0);
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(nThreads, nThreads,
          50000L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());
        Runnable r = new CreateTest();
        for (int i = 0; i < target; i++) {
            tpe.execute(r);
        }
        tpe.shutdown();
        try {
            tpe.awaitTermination(10000000L, TimeUnit.SECONDS);
        } catch (Exception e) {}
    }  public void run() {
    work();
  }
  public static void work() {
    int n = nCalls.incrementAndGet();
    if (n == target) {
      synchronized (lock) {
        done = true;
        lock.notify();
      }
    }
  }
  public static void cleanGC() {
    System.gc();
    System.runFinalization();
    System.gc();
  }
}





Thread Pool 2

   
/*
 *
 * Copyright (c) 1997-1999 Scott Oaks and Henry Wong. All Rights Reserved.
 *
 * Permission to use, copy, modify, and distribute this software
 * and its documentation for NON-COMMERCIAL purposes and
 * without fee is hereby granted.
 *
 * This sample source code is provided for example only,
 * on an unsupported, as-is basis. 
 *
 * AUTHOR MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF
 * THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
 * TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
 * PARTICULAR PURPOSE, OR NON-INFRINGEMENT. AUTHOR SHALL NOT BE LIABLE FOR
 * ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR
 * DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES.
 *
 * THIS SOFTWARE IS NOT DESIGNED OR INTENDED FOR USE OR RESALE AS ON-LINE
 * CONTROL EQUIPMENT IN HAZARDOUS ENVIRONMENTS REQUIRING FAIL-SAFE
 * PERFORMANCE, SUCH AS IN THE OPERATION OF NUCLEAR FACILITIES, AIRCRAFT
 * NAVIGATION OR COMMUNICATION SYSTEMS, AIR TRAFFIC CONTROL, DIRECT LIFE
 * SUPPORT MACHINES, OR WEAPONS SYSTEMS, IN WHICH THE FAILURE OF THE
 * SOFTWARE COULD LEAD DIRECTLY TO DEATH, PERSONAL INJURY, OR SEVERE
 * PHYSICAL OR ENVIRONMENTAL DAMAGE ("HIGH RISK ACTIVITIES").  AUTHOR
 * SPECIFICALLY DISCLAIMS ANY EXPRESS OR IMPLIED WARRANTY OF FITNESS FOR
 * HIGH RISK ACTIVITIES.
 */
import java.util.*;
public class ThreadPool {
  class ThreadPoolRequest {
    Runnable target;
    Object lock;
    ThreadPoolRequest(Runnable t, Object l) {
      target = t;
      lock = l;
    }
  }
  class ThreadPoolThread extends Thread {
    ThreadPool parent;
    boolean shouldRun = true;
    ThreadPoolThread(ThreadPool parent, int i) {
      super("ThreadPoolThread " + i);
      this.parent = parent;
    }
    public void run() {
      ThreadPoolRequest obj = null;
      while (shouldRun) {
        try {
          parent.cvFlag.getBusyFlag();
          while (obj == null && shouldRun) {
            try {
              obj = (ThreadPoolRequest)
                  parent.objects.elementAt(0);
              parent.objects.removeElementAt(0);
            } catch (ArrayIndexOutOfBoundsException aiobe) {
              obj = null;
            } catch (ClassCastException cce) {
              System.err.println("Unexpected data");
              obj = null;
            }
            if (obj == null) {
              try {
                parent.cvAvailable.cvWait();
              } catch (InterruptedException ie) {
                return;
              }
            }
          }
        } finally {
          parent.cvFlag.freeBusyFlag();
        }
        if (!shouldRun)
          return;
        obj.target.run();
        try {
          parent.cvFlag.getBusyFlag();
          nObjects--;
          if (nObjects == 0)
            parent.cvEmpty.cvSignal();
        } finally {
          parent.cvFlag.freeBusyFlag();
        }
        if (obj.lock != null) {
          synchronized(obj.lock) {
            obj.lock.notify();
          }
        }
        obj = null;
      }
    }
  }
  Vector objects;
  int  nObjects = 0;
  CondVar cvAvailable, cvEmpty;
  BusyFlag cvFlag;
  ThreadPoolThread poolThreads[];
  boolean terminated = false;
  public ThreadPool(int n) {
    cvFlag = new BusyFlag();
    cvAvailable = new CondVar(cvFlag);
    cvEmpty = new CondVar(cvFlag);
    objects = new Vector();
    poolThreads = new ThreadPoolThread[n];
    for (int i = 0; i < n; i++) {
      poolThreads[i] = new ThreadPoolThread(this, i);
      poolThreads[i].start();
    }
  }
  private void add(Runnable target, Object lock) {
    try {
      cvFlag.getBusyFlag();
      if (terminated)
        throw new IllegalStateException("Thread pool has shutdown");
      objects.addElement(new ThreadPoolRequest(target, lock));
      nObjects++;
      cvAvailable.cvSignal();
    } finally {
      cvFlag.freeBusyFlag();
    }
  }
  public void addRequest(Runnable target) {
    add(target, null);
  }
  public void addRequestAndWait(Runnable target)
              throws InterruptedException {
    Object lock = new Object();
    synchronized(lock) {
      add(target, lock);
      lock.wait();
    }
  }
  public void waitForAll(boolean terminate) throws InterruptedException {
    try {
      cvFlag.getBusyFlag();
      while (nObjects != 0)
        cvEmpty.cvWait();
      if (terminate) {
        for (int i = 0; i < poolThreads.length; i++)
          poolThreads[i].shouldRun = false;
        cvAvailable.cvBroadcast();
        terminated = true;
      }
    } finally {
      cvFlag.freeBusyFlag();
    }
  }
  public void waitForAll() throws InterruptedException {
    waitForAll(false);
  }
}





Thread pool demo

   
public class ThreadPoolMain extends Object {
  public static Runnable makeRunnable(final String name, final long firstDelay) {
    return new Runnable() {
      public void run() {
        try {
          System.out.println(name + ": starting up");
          Thread.sleep(firstDelay);
          System.out.println(name + ": doing some stuff");
          Thread.sleep(2000);
          System.out.println(name + ": leaving");
        } catch (InterruptedException ix) {
          System.out.println(name + ": got interrupted!");
          return;
        } catch (Exception x) {
          x.printStackTrace();
        }
      }
      public String toString() {
        return name;
      }
    };
  }
  public static void main(String[] args) {
    try {
      ThreadPool pool = new ThreadPool(3);
      Runnable ra = makeRunnable("RA", 3000);
      pool.execute(ra);
      Runnable rb = makeRunnable("RB", 1000);
      pool.execute(rb);
      Runnable rc = makeRunnable("RC", 2000);
      pool.execute(rc);
      Runnable rd = makeRunnable("RD", 60000);
      pool.execute(rd);
      Runnable re = makeRunnable("RE", 1000);
      pool.execute(re);
      pool.stopRequestIdleWorkers();
      Thread.sleep(2000);
      pool.stopRequestIdleWorkers();
      Thread.sleep(5000);
      pool.stopRequestAllWorkers();
    } catch (InterruptedException ix) {
      ix.printStackTrace();
    }
  }
}
class ThreadPool extends Object {
  private ObjectFIFO idleWorkers;
  private ThreadPoolWorker[] workerList;
  public ThreadPool(int numberOfThreads) {
    // make sure that it"s at least one
    numberOfThreads = Math.max(1, numberOfThreads);
    idleWorkers = new ObjectFIFO(numberOfThreads);
    workerList = new ThreadPoolWorker[numberOfThreads];
    for (int i = 0; i < workerList.length; i++) {
      workerList[i] = new ThreadPoolWorker(idleWorkers);
    }
  }
  public void execute(Runnable target) throws InterruptedException {
    // block (forever) until a worker is available
    ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
    worker.process(target);
  }
  public void stopRequestIdleWorkers() {
    try {
      Object[] idle = idleWorkers.removeAll();
      for (int i = 0; i < idle.length; i++) {
        ((ThreadPoolWorker) idle[i]).stopRequest();
      }
    } catch (InterruptedException x) {
      Thread.currentThread().interrupt(); // re-assert
    }
  }
  public void stopRequestAllWorkers() {
    stopRequestIdleWorkers();
    try {
      Thread.sleep(250);
    } catch (InterruptedException x) {
    }
    for (int i = 0; i < workerList.length; i++) {
      if (workerList[i].isAlive()) {
        workerList[i].stopRequest();
      }
    }
  }
}
class ThreadPoolWorker extends Object {
  private static int nextWorkerID = 0;
  private ObjectFIFO idleWorkers;
  private int workerID;
  private ObjectFIFO handoffBox;
  private Thread internalThread;
  private volatile boolean noStopRequested;
  public ThreadPoolWorker(ObjectFIFO idleWorkers) {
    this.idleWorkers = idleWorkers;
    workerID = getNextWorkerID();
    handoffBox = new ObjectFIFO(1); // only one slot
    // just before returning, the thread should be created and started.
    noStopRequested = true;
    Runnable r = new Runnable() {
      public void run() {
        try {
          runWork();
        } catch (Exception x) {
          // in case ANY exception slips through
          x.printStackTrace();
        }
      }
    };
    internalThread = new Thread(r);
    internalThread.start();
  }
  public static synchronized int getNextWorkerID() {
    // notice: synchronized at the class level to ensure uniqueness
    int id = nextWorkerID;
    nextWorkerID++;
    return id;
  }
  public void process(Runnable target) throws InterruptedException {
    handoffBox.add(target);
  }
  private void runWork() {
    while (noStopRequested) {
      try {
        System.out.println("workerID=" + workerID + ", ready for work");
        idleWorkers.add(this);
        Runnable r = (Runnable) handoffBox.remove();
        System.out.println("workerID=" + workerID
            + ", starting execution of new Runnable: " + r);
        runIt(r);
      } catch (InterruptedException x) {
        Thread.currentThread().interrupt(); // re-assert
      }
    }
  }
  private void runIt(Runnable r) {
    try {
      r.run();
    } catch (Exception runex) {
      System.err.println("Uncaught exception fell through from run()");
      runex.printStackTrace();
    } finally {
      Thread.interrupted();
    }
  }
  public void stopRequest() {
    System.out
        .println("workerID=" + workerID + ", stopRequest() received.");
    noStopRequested = false;
    internalThread.interrupt();
  }
  public boolean isAlive() {
    return internalThread.isAlive();
  }
}
class ObjectFIFO extends Object {
  private Object[] queue;
  private int capacity;
  private int size;
  private int head;
  private int tail;
  public ObjectFIFO(int cap) {
    capacity = (cap > 0) ? cap : 1; // at least 1
    queue = new Object[capacity];
    head = 0;
    tail = 0;
    size = 0;
  }
  public int getCapacity() {
    return capacity;
  }
  public synchronized int getSize() {
    return size;
  }
  public synchronized boolean isEmpty() {
    return (size == 0);
  }
  public synchronized boolean isFull() {
    return (size == capacity);
  }
  public synchronized void add(Object obj) throws InterruptedException {
    waitWhileFull();
    queue[head] = obj;
    head = (head + 1) % capacity;
    size++;
    notifyAll();
  }
  public synchronized void addEach(Object[] list) throws InterruptedException {
    for (int i = 0; i < list.length; i++) {
      add(list[i]);
    }
  }
  public synchronized Object remove() throws InterruptedException {
    waitWhileEmpty();
    Object obj = queue[tail];
    queue[tail] = null;
    tail = (tail + 1) % capacity;
    size--;
    notifyAll(); 
    return obj;
  }
  public synchronized Object[] removeAll() throws InterruptedException {
     Object[] list = new Object[size]; 
    for (int i = 0; i < list.length; i++) {
      list[i] = remove();
    }
    return list;
  }
  public synchronized Object[] removeAtLeastOne() throws InterruptedException {
    waitWhileEmpty(); 
    return removeAll();
  }
  public synchronized boolean waitUntilEmpty(long msTimeout)
      throws InterruptedException {
    if (msTimeout == 0L) {
      waitUntilEmpty(); 
      return true;
    }
    long endTime = System.currentTimeMillis() + msTimeout;
    long msRemaining = msTimeout;
    while (!isEmpty() && (msRemaining > 0L)) {
      wait(msRemaining);
      msRemaining = endTime - System.currentTimeMillis();
    }
    return isEmpty();
  }
  public synchronized void waitUntilEmpty() throws InterruptedException {
    while (!isEmpty()) {
      wait();
    }
  }
  public synchronized void waitWhileEmpty() throws InterruptedException {
    while (isEmpty()) {
      wait();
    }
  }
  public synchronized void waitUntilFull() throws InterruptedException {
    while (!isFull()) {
      wait();
    }
  }
  public synchronized void waitWhileFull() throws InterruptedException {
    while (isFull()) {
      wait();
    }
  }
}





Thread Pools 1

   
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MainClass {
  public static void main(String[] args) {
    int nTasks = 5;
    long n = 1000L;
    int tpSize = 10;
    ThreadPoolExecutor tpe = new ThreadPoolExecutor(tpSize, tpSize, 50000L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
    Task[] tasks = new Task[nTasks];
    for (int i = 0; i < nTasks; i++) {
      tasks[i] = new Task(n, "Task " + i);
      tpe.execute(tasks[i]);
    }
    tpe.shutdown();
  }
}
class SingleThreadAccess {
  private ThreadPoolExecutor tpe;
  public SingleThreadAccess() {
    tpe = new ThreadPoolExecutor(1, 1, 50000L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>());
  }
  public void invokeLater(Runnable r) {
    tpe.execute(r);
  }
  public void invokeAneWait(Runnable r) throws InterruptedException, ExecutionException {
    FutureTask task = new FutureTask(r, null);
    tpe.execute(task);
    task.get();
  }
  public void shutdown() {
    tpe.shutdown();
  }
}
class Task implements Runnable {
  long n;
  String id;
  private long fib(long n) {
    if (n == 0)
      return 0L;
    if (n == 1)
      return 1L;
    return fib(n - 1) + fib(n - 2);
  }
  public Task(long n, String id) {
    this.n = n;
    this.id = id;
  }
  public void run() {
    Date d = new Date();
    DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
    long startTime = System.currentTimeMillis();
    d.setTime(startTime);
    System.out.println("Starting task " + id + " at " + df.format(d));
    fib(n);
    long endTime = System.currentTimeMillis();
    d.setTime(endTime);
    System.out.println("Ending task " + id + " at " + df.format(d) + " after "
        + (endTime - startTime) + " milliseconds");
  }
}





Thread Pools 2

   
/*
Java Threads, 3rd Edition
By Scott Oaks, Henry Wong
3rd Edition September 2004 
ISBN: 0-596-00782-5
*/
import java.util.concurrent.*;
import java.util.*;
import java.text.*;
import java.io.*;
public class SingleThreadTest {
  public static void main(String[] args) {
    int nTasks = 5;
    int fib = 4;
    SingleThreadAccess sta = new SingleThreadAccess();
    for (int i = 0; i < nTasks; i++)
      sta.invokeLater(new Task(fib, "Task " + i));
    sta.shutdown();
  }
}
class SingleThreadAccess {
  private ThreadPoolExecutor tpe;
  public SingleThreadAccess() {
        tpe = new ThreadPoolExecutor(
          1, 1, 50000L, TimeUnit.SECONDS,
          new LinkedBlockingQueue<Runnable>());
    }  public void invokeLater(Runnable r) {
    tpe.execute(r);
  }
  public void invokeAneWait(Runnable r) throws InterruptedException,
      ExecutionException {
    FutureTask task = new FutureTask(r, null);
    tpe.execute(task);
    task.get();
  }
  public void shutdown() {
    tpe.shutdown();
  }
}
class Task implements Runnable {
  long n;
  String id;
  private long fib(long n) {
    if (n == 0)
      return 0L;
    if (n == 1)
      return 1L;
    return fib(n - 1) + fib(n - 2);
  }
  public Task(long n, String id) {
    this.n = n;
    this.id = id;
  }
  public void run() {
    Date d = new Date();
    DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
    long startTime = System.currentTimeMillis();
    d.setTime(startTime);
    System.out.println("Starting task " + id + " at " + df.format(d));
    fib(n);
    long endTime = System.currentTimeMillis();
    d.setTime(endTime);
    System.out.println("Ending task " + id + " at " + df.format(d)
        + " after " + (endTime - startTime) + " milliseconds");
  }
}





Thread Pool Test

   
       /*
DEVELOPING GAME IN JAVA 
Caracteristiques
Editeur : NEW RIDERS 
Auteur : BRACKEEN 
Parution : 09 2003 
Pages : 972 
Isbn : 1-59273-005-1 
Reliure : Paperback 
Disponibilite : Disponible a la librairie 
*/
import java.util.LinkedList;
public class ThreadPoolTest {
  public static void main(String[] args) {
    if (args.length != 2) {
      System.out.println("Tests the ThreadPool task.");
      System.out
          .println("Usage: java ThreadPoolTest numTasks numThreads");
      System.out.println("  numTasks - integer: number of task to run.");
      System.out.println("  numThreads - integer: number of threads "
          + "in the thread pool.");
      return;
    }
    int numTasks = Integer.parseInt(args[0]);
    int numThreads = Integer.parseInt(args[1]);
    // create the thread pool
    ThreadPool threadPool = new ThreadPool(numThreads);
    // run example tasks
    for (int i = 0; i < numTasks; i++) {
      threadPool.runTask(createTask(i));
    }
    // close the pool and wait for all tasks to finish.
    threadPool.join();
  }
  /**
   * Creates a simple Runnable that prints an ID, waits 500 milliseconds, then
   * prints the ID again.
   */
  private static Runnable createTask(final int taskID) {
    return new Runnable() {
      public void run() {
        System.out.println("Task " + taskID + ": start");
        // simulate a long-running task
        try {
          Thread.sleep(500);
        } catch (InterruptedException ex) {
        }
        System.out.println("Task " + taskID + ": end");
      }
    };
  }
}
/**
 * A thread pool is a group of a limited number of threads that are used to
 * execute tasks.
 */
class ThreadPool extends ThreadGroup {
  private boolean isAlive;
  private LinkedList taskQueue;
  private int threadID;
  private static int threadPoolID;
  /**
   * Creates a new ThreadPool.
   * 
   * @param numThreads
   *            The number of threads in the pool.
   */
  public ThreadPool(int numThreads) {
    super("ThreadPool-" + (threadPoolID++));
    setDaemon(true);
    isAlive = true;
    taskQueue = new LinkedList();
    for (int i = 0; i < numThreads; i++) {
      new PooledThread().start();
    }
  }
  /**
   * Requests a new task to run. This method returns immediately, and the task
   * executes on the next available idle thread in this ThreadPool.
   * <p>
   * Tasks start execution in the order they are received.
   * 
   * @param task
   *            The task to run. If null, no action is taken.
   * @throws IllegalStateException
   *             if this ThreadPool is already closed.
   */
  public synchronized void runTask(Runnable task) {
    if (!isAlive) {
      throw new IllegalStateException();
    }
    if (task != null) {
      taskQueue.add(task);
      notify();
    }
  }
  protected synchronized Runnable getTask() throws InterruptedException {
    while (taskQueue.size() == 0) {
      if (!isAlive) {
        return null;
      }
      wait();
    }
    return (Runnable) taskQueue.removeFirst();
  }
  /**
   * Closes this ThreadPool and returns immediately. All threads are stopped,
   * and any waiting tasks are not executed. Once a ThreadPool is closed, no
   * more tasks can be run on this ThreadPool.
   */
  public synchronized void close() {
    if (isAlive) {
      isAlive = false;
      taskQueue.clear();
      interrupt();
    }
  }
  /**
   * Closes this ThreadPool and waits for all running threads to finish. Any
   * waiting tasks are executed.
   */
  public void join() {
    // notify all waiting threads that this ThreadPool is no
    // longer alive
    synchronized (this) {
      isAlive = false;
      notifyAll();
    }
    // wait for all threads to finish
    Thread[] threads = new Thread[activeCount()];
    int count = enumerate(threads);
    for (int i = 0; i < count; i++) {
      try {
        threads[i].join();
      } catch (InterruptedException ex) {
      }
    }
  }
  /**
   * A PooledThread is a Thread in a ThreadPool group, designed to run tasks
   * (Runnables).
   */
  private class PooledThread extends Thread {
    public PooledThread() {
      super(ThreadPool.this, "PooledThread-" + (threadID++));
    }
    public void run() {
      while (!isInterrupted()) {
        // get a task to run
        Runnable task = null;
        try {
          task = getTask();
        } catch (InterruptedException ex) {
        }
        // if getTask() returned null or was interrupted,
        // close this thread by returning.
        if (task == null) {
          return;
        }
        // run the task, and eat any exceptions it throws
        try {
          task.run();
        } catch (Throwable t) {
          uncaughtException(this, t);
        }
      }
    }
  }
}





Very basic implementation of a thread pool

  
/*
 * Copyright (c) 2004, Rob Gordon.
 */
// revised from oddjob
import java.util.LinkedList;

/**
 * Very basic implementation of a thread pool.
 * 
 * @author Rob Gordon.
 */
public class ThreadPool {
    private BlockingQueue queue = new BlockingQueue();
    private boolean closed = true;
    private int poolSize = 3;
    
    
    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }
    
    public int getPoolSize() {
        return poolSize;
    }
    
    synchronized public void start() {
        if (!closed) {
            throw new IllegalStateException("Pool already started.");
        }
        closed = false;
        for (int i = 0; i < poolSize; ++i) {
            new PooledThread().start();
        }
    }
    synchronized public void execute(Runnable job) {
        if (closed) {
            throw new PoolClosedException();
        }
        queue.enqueue(job);
    }
    
    private class PooledThread extends Thread {
        
        public void run() {
            while (true) {
                Runnable job = (Runnable) queue.dequeue();
                if (job == null) {
                    break;
                }
                try {
                    job.run();
                } catch (Throwable t) {
                    // ignore
                }
            }
        }
    }
    public void close() {
        closed = true;
        queue.close();
    }
    
    private static class PoolClosedException extends RuntimeException {
        PoolClosedException() { 
            super ("Pool closed.");
        }
    }
}
/*
 * Copyright � 2004, Rob Gordon.
 */
/**
 *
 * @author Rob Gordon.
 */
 class BlockingQueue {
  private final LinkedList list = new LinkedList();
  private boolean closed = false;
  private boolean wait = false;
  
  synchronized public void enqueue(Object o) {
    if (closed) {
      throw new ClosedException();
    }
    list.add(o);
    notify();
  }
  synchronized public Object dequeue() {
    while (!closed && list.size() == 0) {
      try {
        wait();
      }
      catch (InterruptedException e) {
        // ignore
      }
    }
    if (list.size() == 0) {
      return null;
    }
    return list.removeFirst();
  }
  
  synchronized public int size() {
      return list.size();
  }
  
  synchronized public void close() {
    closed = true;
    notifyAll();
  }
  
  synchronized public void open() {
      closed = false;
  }
  
  public static class ClosedException extends RuntimeException {
      ClosedException() {
          super("Queue closed.");
      }
  }
}





Worker thread pool

  
/**
 * 
 * JFreeReport : a free Java reporting library
 * 
 *
 * Project Info:  http://reporting.pentaho.org/
 *
 * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
 *
 * This library is free software; you can redistribute it and/or modify it under the terms
 * of the GNU Lesser General Public License as published by the Free Software Foundation;
 * either version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License along with this
 * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
 * Boston, MA 02111-1307, USA.
 *
 * [Java is a trademark or registered trademark of Sun Microsystems, Inc.
 * in the United States and other countries.]
 *
 * ------------
 * WorkerPool.java
 * ------------
 * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
 */

/**
 * A simple static workpool. Worker threads are created when necessary.
 *
 * @author Thomas Morgner
 * 
 */
public class WorkerPool
{
  /**
   * The worker array.
   */
  private Worker[] workers;
  /**
   * A flag indicating whether idle workers are available.
   */
  private boolean workersAvailable;
  /**
   * the name prefix for all workers of this pool.
   */
  private String namePrefix;
  /**
   * Creates a new worker pool with the default size of 10 workers and the default name.
   */
  public WorkerPool ()
  {
    this(10);
  }
  /**
   * Creates a new workerpool with the given number of workers and the default name.
   *
   * @param size the maximum number of workers available.
   */
  public WorkerPool (final int size)
  {
    this(size, "WorkerPool-worker");
  }
  /**
   * Creates a new worker pool for the given number of workers and with the given name
   * prefix.
   *
   * @param size       the size of the worker pool.
   * @param namePrefix the name prefix for all created workers.
   */
  public WorkerPool (final int size, final String namePrefix)
  {
    if (size <= 0)
    {
      throw new IllegalArgumentException("Size must be > 0");
    }
    workers = new Worker[size];
    workersAvailable = true;
    this.namePrefix = namePrefix;
  }
  /**
   * Checks whether workers are available.
   *
   * @return true, if at least one worker is idle, false otherwise.
   */
  public synchronized boolean isWorkerAvailable ()
  {
    return workersAvailable;
  }
  /**
   * Updates the workersAvailable flag after a worker was assigned.
   */
  private void updateWorkersAvailable ()
  {
    for (int i = 0; i < workers.length; i++)
    {
      if (workers[i] == null)
      {
        workersAvailable = true;
        return;
      }
      if (workers[i].isAvailable() == true)
      {
        workersAvailable = true;
        return;
      }
    }
    workersAvailable = false;
  }
  /**
   * Waits until a worker will be available.
   */
  private synchronized void waitForWorkerAvailable ()
  {
    while (isWorkerAvailable() == false)
    {
      try
      {
        // remove lock
        this.wait(5000);
      }
      catch (InterruptedException ie)
      {
        // ignored
      }
    }
  }
  /**
   * Returns a workerhandle for the given workload. This method will wait until an idle
   * worker is found.
   *
   * @param r the workload for the worker
   * @return a handle to the worker.
   */
  public synchronized WorkerHandle getWorkerForWorkload (final Runnable r)
  {
    waitForWorkerAvailable();
    int emptySlot = -1;
    for (int i = 0; i < workers.length; i++)
    {
      if (workers[i] == null)
      {
        // in the first run, try to avoid to create new threads...
        // reuse the already available threads
        if (emptySlot == -1)
        {
          emptySlot = i;
        }
        continue;
      }
      if (workers[i].isAvailable() == true)
      {
        workers[i].setWorkload(r);
        updateWorkersAvailable();
        return new WorkerHandle(workers[i]);
      }
    }
    if (emptySlot != -1)
    {
      workers[emptySlot] = new Worker();
      workers[emptySlot].setName(namePrefix + "-" + emptySlot);
      workers[emptySlot].setWorkerPool(this);
      workers[emptySlot].setWorkload(r);
      updateWorkersAvailable();
      return new WorkerHandle(workers[emptySlot]);
    }
    throw new IllegalStateException
            ("At this point, a worker should already have been assigned.");
  }
  /**
   * Marks the given worker as finished. The worker will be removed from the list of the
   * available workers.
   *
   * @param worker the worker which was finished.
   */
  public void workerFinished (final Worker worker)
  {
    if (worker.isFinish() == false)
    {
      throw new IllegalArgumentException("This worker is not in the finish state.");
    }
    for (int i = 0; i < workers.length; i++)
    {
      if (workers[i] == worker)
      {
        synchronized (this)
        {
          workers[i] = null;
          workersAvailable = true;
          this.notifyAll();
        }
        return;
      }
    }
  }
  /**
   * Marks the given worker as available.
   *
   * @param worker the worker which was available.
   */
  public synchronized void workerAvailable (final Worker worker)
  {
    for (int i = 0; i < workers.length; i++)
    {
      if (workers[i] == worker)
      {
        synchronized (this)
        {
          workersAvailable = true;
          this.notifyAll();
        }
        return;
      }
    }
  }
  /**
   * Finishes all worker of this pool.
   */
  public void finishAll ()
  {
    for (int i = 0; i < workers.length; i++)
    {
      if (workers[i] != null)
      {
        workers[i].finish();
      }
    }
  }
}
/**
 * 
 * JFreeReport : a free Java reporting library
 * 
 *
 * Project Info:  http://reporting.pentaho.org/
 *
 * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
 *
 * This library is free software; you can redistribute it and/or modify it under the terms
 * of the GNU Lesser General Public License as published by the Free Software Foundation;
 * either version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License along with this
 * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
 * Boston, MA 02111-1307, USA.
 *
 * [Java is a trademark or registered trademark of Sun Microsystems, Inc.
 * in the United States and other countries.]
 *
 * ------------
 * Worker.java
 * ------------
 * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
 */

/**
 * A simple worker implementation. The worker executes a assigned workload and then sleeps
 * until another workload is set or the worker is killed.
 *
 * @author Thomas Morgner
 */
final class Worker extends Thread
{
  /**
   * the worker"s task.
   */
  private Runnable workload;
  /**
   * a flag whether the worker should exit after the processing.
   */
  private volatile boolean finish;
  /**
   * the time in milliseconds beween 2 checks for exit or work requests.
   */
  private final int sleeptime;
  /**
   * The worker pool, to which this worker is assigned. May be null.
   */
  private WorkerPool workerPool;
  /**
   * Creates a new worker.
   *
   * @param sleeptime the time this worker sleeps until he checks for new work.
   */
  public Worker (final int sleeptime)
  {
    this.sleeptime = sleeptime;
    this.setDaemon(true);
    start();
  }
  /**
   * Creates a new worker with an default idle timeout of 2 minutes.
   */
  public Worker ()
  {
    this(120000);
  }
  /**
   * Set the next workload for this worker.
   *
   * @param r the next workload for the worker.
   * @throws IllegalStateException if the worker is not idle.
   */
  public void setWorkload (final Runnable r)
  {
    if (workload != null)
    {
      throw new IllegalStateException("This worker is not idle.");
    }
    //Log.debug("Workload set...");
    synchronized (this)
    {
      workload = r;
      //Log.debug("Workload assigned: Notified " + getName());
      this.notifyAll();
    }
  }
  /**
   * Returns the workload object.
   *
   * @return the runnable executed by this worker thread.
   */
  public synchronized Runnable getWorkload()
  {
    return workload;
  }
  /**
   * Kills the worker after he completed his work. Awakens the worker if he"s sleeping, so
   * that the worker dies without delay.
   */
  public void finish ()
  {
    finish = true;
    // we are evil ..
    try
    {
      this.interrupt();
    }
    catch (SecurityException se)
    {
      // ignored
    }
    if (workerPool != null)
    {
      workerPool.workerFinished(this);
    }
  }
  /**
   * Checks whether this worker has some work to do.
   *
   * @return true, if this worker has no more work and is currently sleeping.
   */
  public boolean isAvailable ()
  {
    return (workload == null);
  }
  /**
   * If a workload is set, process it. After the workload is processed, this worker starts
   * to sleep until a new workload is set for the worker or the worker got the finish()
   * request.
   */
  public void run ()
  {
    while (!finish)
    {
      if (workload != null)
      {
        try
        {
          workload.run();
        }
        catch (Exception e)
        {
          System.out.println("Worker caught exception on run: "+ e);
        }
        workload = null;
        if (workerPool != null)
        {
          workerPool.workerAvailable(this);
        }
      }
      if (!finish)
      {
        synchronized (this)
        {
          try
          {
            // remove lock
            this.wait(sleeptime);
          }
          catch (InterruptedException ie)
          {
            // ignored
          }
        }
      }
    }
    synchronized (this)
    {
      this.notifyAll();
    }
  }
  /**
   * Checks whether this worker has received the signal to finish and die.
   *
   * @return true, if the worker should finish the work and end the thread.
   */
  public boolean isFinish ()
  {
    return finish;
  }
  /**
   * Returns the worker"s assigned pool.
   *
   * @return the worker pool (or null, if the worker is not assigned to a pool).
   */
  public WorkerPool getWorkerPool ()
  {
    return workerPool;
  }
  /**
   * Defines the worker"s assigned pool.
   *
   * @param workerPool the worker pool (or null, if the worker is not assigned to a
   *                   pool).
   */
  public void setWorkerPool (final WorkerPool workerPool)
  {
    this.workerPool = workerPool;
  }
}

/**
 * 
 * JFreeReport : a free Java reporting library
 * 
 *
 * Project Info:  http://reporting.pentaho.org/
 *
 * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
 *
 * This library is free software; you can redistribute it and/or modify it under the terms
 * of the GNU Lesser General Public License as published by the Free Software Foundation;
 * either version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License along with this
 * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
 * Boston, MA 02111-1307, USA.
 *
 * [Java is a trademark or registered trademark of Sun Microsystems, Inc.
 * in the United States and other countries.]
 *
 * ------------
 * WorkerHandle.java
 * ------------
 * (C) Copyright 2001-2007, by Object Refinery Ltd, Pentaho Corporation and Contributors.
 */

/**
 * The worker handle is a control structure which allows control over the worker without
 * exposing the thread object.
 *
 * @author Thomas Morgner
 * @deprecated This class is used by the WorkerPool, which is not used anywhere anymore.
 */
 class WorkerHandle
{
  /**
   * The worker for this handle.
   */
  private final Worker worker;
  /**
   * Creates a new handle for the given worker.
   *
   * @param worker the worker.
   */
  public WorkerHandle (final Worker worker)
  {
    this.worker = worker;
  }
  /**
   * Finishes the worker.
   */
  public void finish ()
  {
    worker.finish();
  }
}