Java Tutorial/Thread/Thread Pool — различия между версиями
Admin (обсуждение | вклад) м (1 версия) |
|
(нет различий)
|
Текущая версия на 18:18, 31 мая 2010
A pool of work threads
<source lang="java">
/*
* WorkThreadPool.java - Background thread pool that does stuff * :tabSize=8:indentSize=8:noTabs=false: * :folding=explicit:collapseFolds=1: * * Copyright (C) 2000 Slava Pestov * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
//{{{ Imports import java.util.EventListener; import javax.swing.SwingUtilities; import javax.swing.event.EventListenerList; //}}} /**
* A pool of work threads. * * @author Slava Pestov * @version $Id: WorkThreadPool.java 12504 2008-04-22 23:12:43Z ezust $ * @see org.gjt.sp.util.WorkThread * @since jEdit 2.6pre1 */
public class WorkThreadPool {
// {{{ WorkThreadPool constructor /** * Creates a new work thread pool with the specified number of work threads. * * @param name * The thread name prefix * @param count * The number of work threads */ public WorkThreadPool(String name, int count) { listenerList = new EventListenerList(); if (count != 0) { threadGroup = new ThreadGroup(name); threads = new WorkThread[count]; for (int i = 0; i < threads.length; i++) { threads[i] = new WorkThread(this, threadGroup, name + " #" + (i + 1)); } } } // }}} // {{{ start() method /** * Starts all the threads in this thread pool. */ public void start() { /* not really needed since threads don"t start until after */ synchronized (lock) { started = true; if (awtRequestCount != 0 && requestCount == 0) queueAWTRunner(); } if (threads != null) { for (int i = 0; i < threads.length; i++) { threads[i].start(); } } } // }}} // {{{ addWorkRequest() method /** * Adds a work request to the queue. * * @param run * The runnable * @param inAWT * If true, will be executed in AWT thread. Otherwise, will be * executed in work thread */ public void addWorkRequest(Runnable run, boolean inAWT) { if (threads == null) { run.run(); return; } synchronized (lock) { // {{{ if there are no requests, execute AWT requests immediately if (started && inAWT && requestCount == 0 && awtRequestCount == 0) { // Log.log(Log.DEBUG,this,"AWT immediate: " + run); if (SwingUtilities.isEventDispatchThread()) run.run(); else SwingUtilities.invokeLater(run); return; } // }}} Request request = new Request(run); // {{{ Add to AWT queue... if (inAWT) { if (firstAWTRequest == null && lastAWTRequest == null) firstAWTRequest = lastAWTRequest = request; else { lastAWTRequest.next = request; lastAWTRequest = request; } awtRequestCount++; // if no requests are running, requestDone() // will not be called, so we must queue the // AWT runner ourselves. if (started && requestCount == 0) queueAWTRunner(); } // }}} // {{{ Add to work thread queue... else { if (firstRequest == null && lastRequest == null) firstRequest = lastRequest = request; else { lastRequest.next = request; lastRequest = request; } requestCount++; } // }}} lock.notifyAll(); } } // }}} // {{{ waitForRequests() method /** * Waits until all requests are complete. */ public void waitForRequests() { if (threads == null) return; synchronized (waitForAllLock) { while (requestCount != 0) { try { waitForAllLock.wait(); } catch (InterruptedException ie) { } } } if (SwingUtilities.isEventDispatchThread()) { // do any queued AWT runnables doAWTRequests(); } else { try { SwingUtilities.invokeAndWait(new RunRequestsInAWTThread()); } catch (Exception e) { } } } // }}} // {{{ getRequestCount() method /** * Returns the number of pending requests. * * @return the pending request count */ public int getRequestCount() { return requestCount; } // }}} // {{{ getThreadCount() method /** * Returns the number of threads in this pool. * * @return the thread count */ public int getThreadCount() { if (threads == null) return 0; else return threads.length; } // }}} // {{{ getThread() method /** * Returns the specified thread. * * @param index * The index of the thread * @return a WorkThread */ public WorkThread getThread(int index) { return threads[index]; } // }}} // {{{ addProgressListener() method /** * Adds a progress listener to this thread pool. * * @param listener * The listener */ public void addProgressListener(WorkThreadProgressListener listener) { listenerList.add(WorkThreadProgressListener.class, listener); } // }}} // {{{ removeProgressListener() method /** * Removes a progress listener from this thread pool. * * @param listener * The listener */ public void removeProgressListener(WorkThreadProgressListener listener) { listenerList.remove(WorkThreadProgressListener.class, listener); } // }}} // {{{ Package-private members final Object lock = new Object(); final Object waitForAllLock = new Object(); // {{{ fireStatusChanged() method void fireStatusChanged(WorkThread thread) { final Object[] listeners = listenerList.getListenerList(); if (listeners.length != 0) { int index = 0; for (int i = 0; i < threads.length; i++) { if (threads[i] == thread) { index = i; break; } } for (int i = listeners.length - 2; i >= 0; i--) { if (listeners[i] == WorkThreadProgressListener.class) { ((WorkThreadProgressListener) listeners[i + 1]).statusUpdate(WorkThreadPool.this, index); } } } } // }}} // {{{ fireProgressChanged() method void fireProgressChanged(WorkThread thread) { final Object[] listeners = listenerList.getListenerList(); if (listeners.length != 0) { int index = 0; for (int i = 0; i < threads.length; i++) { if (threads[i] == thread) { index = i; break; } } for (int i = listeners.length - 2; i >= 0; i--) { if (listeners[i] == WorkThreadProgressListener.class) { ((WorkThreadProgressListener) listeners[i + 1]) .progressUpdate(WorkThreadPool.this, index); } } } } // }}} // {{{ requestDone() method void requestDone() { synchronized (lock) { requestCount--; if (requestCount == 0 && firstAWTRequest != null) queueAWTRunner(); } } // }}} // {{{ getNextRequest() method Request getNextRequest() { synchronized (lock) { Request request = firstRequest; if (request == null) return null; firstRequest = firstRequest.next; if (firstRequest == null) lastRequest = null; if (request.alreadyRun) throw new InternalError("AIEE!!! Request run twice!!! " + request.run); request.alreadyRun = true; /* * StringBuffer buf = new StringBuffer("request queue is now: "); Request * _request = request.next; while(_request != null) { * buf.append(_request.id); if(_request.next != null) buf.append(","); * _request = _request.next; } Log.log(Log.DEBUG,this,buf.toString()); */ return request; } } // }}} // }}} // {{{ Private members // {{{ Instance variables private boolean started; private ThreadGroup threadGroup; private WorkThread[] threads; // Request queue private Request firstRequest; private Request lastRequest; private int requestCount; // AWT thread magic private boolean awtRunnerQueued; private Request firstAWTRequest; private Request lastAWTRequest; private int awtRequestCount; private EventListenerList listenerList; // }}} // {{{ doAWTRequests() method /** Must always be called with the lock held. */ private void doAWTRequests() { while (requestCount == 0 && firstAWTRequest != null) { doAWTRequest(getNextAWTRequest()); } } // }}} // {{{ doAWTRequest() method /** * Must always be called with the lock held. * * @param request * the request to run */ private void doAWTRequest(Request request) { // Log.log(Log.DEBUG,this,"Running in AWT thread: " + request); try { request.run.run(); } catch (Throwable t) { } awtRequestCount--; } // }}} // {{{ queueAWTRunner() method /** Must always be called with the lock held. */ private void queueAWTRunner() { if (!awtRunnerQueued) { awtRunnerQueued = true; SwingUtilities.invokeLater(new RunRequestsInAWTThread()); // Log.log(Log.DEBUG,this,"AWT runner queued"); } } // }}} // {{{ getNextAWTRequest() method private Request getNextAWTRequest() { Request request = firstAWTRequest; firstAWTRequest = firstAWTRequest.next; if (firstAWTRequest == null) lastAWTRequest = null; if (request.alreadyRun) throw new InternalError("AIEE!!! Request run twice!!! " + request.run); request.alreadyRun = true; /* * StringBuffer buf = new StringBuffer("AWT request queue is now: "); * Request _request = request.next; while(_request != null) { * buf.append(_request.id); if(_request.next != null) buf.append(","); * _request = _request.next; } Log.log(Log.DEBUG,this,buf.toString()); */ return request; } // }}} // }}} static int ID; // {{{ Request class static class Request { int id = ++ID; Runnable run; boolean alreadyRun; Request next; Request(Runnable run) { this.run = run; } public String toString() { return "[id=" + id + ",run=" + run + "]"; } } // }}} // {{{ RunRequestsInAWTThread class class RunRequestsInAWTThread implements Runnable { public void run() { synchronized (lock) { awtRunnerQueued = false; if (requestCount == 0) doAWTRequests(); } } } // }}}
} /*
* WorkThread.java - Background thread that does stuff Copyright (C) 2000 Slava * Pestov * * This program is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free Software * Foundation; either version 2 of the License, or any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along with * this program; if not, write to the Free Software Foundation, Inc., 59 Temple * Place - Suite 330, Boston, MA 02111-1307, USA. */
/**
* Services work requests in the background. * * @author Slava Pestov * @version $Id: WorkThread.java 12504 2008-04-22 23:12:43Z ezust $ */
class WorkThread extends Thread implements ThreadAbortMonitor {
public WorkThread(WorkThreadPool pool, ThreadGroup group, String name) { super(group, name); // so that jEdit doesn"t exit with no views open automatically // setDaemon(true); setPriority(Thread.MIN_PRIORITY); this.pool = pool; } /** * Sets if the current request can be aborted. If set to true and already * aborted, the thread will be stopped * * @param abortable * true if the WorkThread is abortable * @since jEdit 2.6pre1 */ public void setAbortable(boolean abortable) { synchronized (abortLock) { this.abortable = abortable; if (aborted) stop(new Abort()); } } /** * Returns if the work thread is currently running a request. * * @return true if a request is currently running */ public boolean isRequestRunning() { return requestRunning; } public boolean isAborted() { synchronized (abortLock) { return aborted; } } /** * Returns the status text. * * @return the status label */ public String getStatus() { return status; } /** * Sets the status text. * * @param status * the new status of the thread * @since jEdit 2.6pre1 */ public void setStatus(String status) { this.status = status; pool.fireProgressChanged(this); } /** * Returns the progress value. * * @return the progress value */ public int getProgressValue() { return progressValue; } /** * Sets the progress value. * * @param progressValue * the new progress value * @since jEdit 2.6pre1 */ public void setProgressValue(int progressValue) { this.progressValue = progressValue; pool.fireProgressChanged(this); } /** * Returns the progress maximum. * * @return the maximum value of the progression */ public int getProgressMaximum() { return progressMaximum; } /** * Sets the maximum progress value. * * @param progressMaximum * the maximum value of the progression * @since jEdit 2.6pre1 */ public void setProgressMaximum(int progressMaximum) { this.progressMaximum = progressMaximum; pool.fireProgressChanged(this); } /** * Aborts the currently running request, if allowed. * * @since jEdit 2.6pre1 */ public void abortCurrentRequest() { synchronized (abortLock) { if (abortable && !aborted) stop(new Abort()); aborted = true; } } public void run() { for (;;) { doRequests(); } } // private members private WorkThreadPool pool; private final Object abortLock = new Object(); private boolean requestRunning; private boolean abortable; private boolean aborted; private String status; private int progressValue; private int progressMaximum; private void doRequests() { WorkThreadPool.Request request; for (;;) { request = pool.getNextRequest(); if (request == null) break; else { requestRunning = true; pool.fireStatusChanged(this); doRequest(request); requestRunning = false; } } pool.fireStatusChanged(this); synchronized (pool.waitForAllLock) { // notify a running waitForRequests() method pool.waitForAllLock.notifyAll(); } synchronized (pool.lock) { // wait for more requests try { pool.lock.wait(); } catch (InterruptedException ie) { } } } private void doRequest(WorkThreadPool.Request request) { try { request.run.run(); } catch (Throwable t) { } finally { synchronized (abortLock) { aborted = abortable = false; } status = null; progressValue = progressMaximum = 0; pool.requestDone(); pool.fireStatusChanged(this); } } public static class Abort extends Error { public Abort() { super("Work request aborted"); } }
} /*
* ThreadAbortMonitor.java - Thread Abort Monitor * :tabSize=8:indentSize=8:noTabs=false: :folding=explicit:collapseFolds=1: * * Copyright (C) 2006 Matthieu Casanova * * This program is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free Software * Foundation; either version 2 of the License, or any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along with * this program; if not, write to the Free Software Foundation, Inc., 59 Temple * Place - Suite 330, Boston, MA 02111-1307, USA. */
/**
* @author Matthieu Casanova * @author $Id: VFS.java 7129 2006-09-25 20:05:57Z kpouer $ */
interface ThreadAbortMonitor {
boolean isAborted();
} /*
* WorkThreadProgressListener.java - Progress listener Copyright (C) 2000 Slava * Pestov * * This program is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free Software * Foundation; either version 2 of the License, or any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along with * this program; if not, write to the Free Software Foundation, Inc., 59 Temple * Place - Suite 330, Boston, MA 02111-1307, USA. */
/**
* A work thread execution progress listener. * * @since jEdit 2.6pre1 */
interface WorkThreadProgressListener extends EventListener {
// status message changed, operation started, operation ends, ... void statusUpdate(WorkThreadPool threadPool, int threadIndex); // progress bar value change void progressUpdate(WorkThreadPool threadPool, int threadIndex);
}</source>
Simple thread pool. A task is executed by obtaining a thread from the pool
<source lang="java">
/*
* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */
import java.util.ArrayList; import java.util.List;
/** Simple thread pool. A task is executed by obtaining a thread from
* the pool */
public class ThreadPool {
/** The thread pool contains instances of {@link ThreadPool.Task}. */ public interface Task { /** Performs the task. * @throws Throwable The task failed, and the worker thread won"t be used again. */ void run() throws Throwable; } /** A task, which may be interrupted, if the pool is shutting down. */ public interface InterruptableTask extends Task { /** Interrupts the task. * @throws Throwable Shutting down the task failed. */ void shutdown() throws Throwable; } private class Poolable { private boolean shuttingDown; private Task task; private Thread thread; Poolable(ThreadGroup pGroup, int pNum) { thread = new Thread(pGroup, pGroup.getName() + "-" + pNum){ public void run() { while (!isShuttingDown()) { final Task t = getTask(); if (t == null) { try { synchronized (this) { if (!isShuttingDown() && getTask() == null) { wait(); } } } catch (InterruptedException e) { // Do nothing } } else { try { t.run(); resetTask(); repool(Poolable.this); } catch (Throwable e) { discard(Poolable.this); resetTask(); } } } } }; thread.start(); } synchronized void shutdown() { shuttingDown = true; final Task t = getTask(); if (t != null && t instanceof InterruptableTask) { try { ((InterruptableTask) t).shutdown(); } catch (Throwable th) { // Ignore me } } task = null; synchronized (thread) { thread.notify(); } } private synchronized boolean isShuttingDown() { return shuttingDown; } String getName() { return thread.getName(); } private synchronized Task getTask() { return task; } private synchronized void resetTask() { task = null; } synchronized void start(Task pTask) { task = pTask; synchronized (thread) { thread.notify(); } } } private final ThreadGroup threadGroup; private final int maxSize; private final List waitingThreads = new ArrayList(); private final List runningThreads = new ArrayList(); private final List waitingTasks = new ArrayList(); private int num;
/** Creates a new instance. * @param pMaxSize Maximum number of concurrent threads. * @param pName Thread group name. */ public ThreadPool(int pMaxSize, String pName) { maxSize = pMaxSize; threadGroup = new ThreadGroup(pName); } synchronized void discard(Poolable pPoolable) { pPoolable.shutdown(); runningThreads.remove(pPoolable); waitingThreads.remove(pPoolable); } synchronized void repool(Poolable pPoolable) { if (runningThreads.remove(pPoolable)) { if (maxSize != 0 && runningThreads.size() + waitingThreads.size() >= maxSize) { discard(pPoolable); } else { waitingThreads.add(pPoolable); if (waitingTasks.size() > 0) { Task task = (Task) waitingTasks.remove(waitingTasks.size() - 1); startTask(task); } } } else { discard(pPoolable); } } /** Starts a task immediately. * @param pTask The task being started. * @return True, if the task could be started immediately. False, if * the maxmimum number of concurrent tasks was exceeded. If so, you * might consider to use the {@link #addTask(ThreadPool.Task)} method instead. */ public synchronized boolean startTask(Task pTask) { if (maxSize != 0 && runningThreads.size() >= maxSize) { return false; } Poolable poolable; if (waitingThreads.size() > 0) { poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1); } else { poolable = new Poolable(threadGroup, num++); } runningThreads.add(poolable); poolable.start(pTask); return true; } /** Adds a task for immediate or deferred execution. * @param pTask The task being added. * @return True, if the task was started immediately. False, if * the task will be executed later. */ public synchronized boolean addTask(Task pTask) { if (startTask(pTask)) { return true; } waitingTasks.add(pTask); return false; } /** Closes the pool. */ public synchronized void shutdown() { while (!waitingThreads.isEmpty()) { Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1); poolable.shutdown(); } while (!runningThreads.isEmpty()) { Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size()-1); poolable.shutdown(); } } /** Returns the maximum number of concurrent threads. * @return Maximum number of threads. */ public int getMaxThreads() { return maxSize; } /** Returns the number of threads, which have actually been created, * as opposed to the number of currently running threads. */ public synchronized int getNumThreads() { return num; }
}</source>