Java/Threads/Lock Synchronize
Содержание
- 1 A reader-writer lock from "Java Threads" by Scott Oak and Henry Wong.
- 2 Boolean lock
- 3 Byte FIFO
- 4 Coordinates threads for multi-threaded operations
- 5 Daemon Lock
- 6 Determining If the Current Thread Is Holding a Synchronized Lock
- 7 Handle concurrent read/write: use synchronized to lock the data
- 8 Interruptible Synchronized Block
- 9 Invoke a series of runnables as closely to synchronously as possible
- 10 Lock for read and write
- 11 No synchronize
- 12 Object FIFO
- 13 Operations that may seem safe are not, when threads are present
- 14 Read Write Lock
- 15 Signaling
- 16 Simple Object FIFO
- 17 Static synchronize
- 18 Static synchronized block
- 19 Synchronized Block demo
- 20 Synchronize method
- 21 Synchronizing blocks instead of entire methods
- 22 Synchronizing on another object
- 23 Thread deadlock
- 24 Thread: Dining Philosophers
- 25 Thread notify
- 26 Threads join
- 27 Thread synchronization
A reader-writer lock from "Java Threads" by Scott Oak and Henry Wong.
<source lang="java">
/*
* JCommon : a free general purpose class library for the Java(tm) platform * * * (C) Copyright 2000-2005, by Object Refinery Limited and Contributors. * * Project Info: http://www.jfree.org/jcommon/index.html * * This library is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or * (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, * USA. * * [Java is a trademark or registered trademark of Sun Microsystems, Inc. * in the United States and other countries.] * * --------------------- * ReaderWriterLock.java * --------------------- * * $Id: ReaderWriterLock.java,v 1.3 2005/10/18 13:18:34 mungady Exp $ * * Changes * ------- * 29-Jan-2003 : Added standard header (DG); * */
import java.util.ArrayList; import java.util.Iterator; /**
* A reader-writer lock from "Java Threads" by Scott Oak and Henry Wong. * * @author Scott Oak and Henry Wong */
public class ReaderWriterLock {
/** * A node for the waiting list. * * @author Scott Oak and Henry Wong */ private static class ReaderWriterNode { /** A reader. */ protected static final int READER = 0; /** A writer. */ protected static final int WRITER = 1; /** The thread. */ protected Thread t; /** The state. */ protected int state; /** The number of acquires. */ protected int nAcquires; /** * Creates a new node. * * @param t * the thread. * @param state * the state. */ private ReaderWriterNode(final Thread t, final int state) { this.t = t; this.state = state; this.nAcquires = 0; } } /** The waiting threads. */ private ArrayList waiters; /** * Default constructor. */ public ReaderWriterLock() { this.waiters = new ArrayList(); } /** * Grab the read lock. */ public synchronized void lockRead() { final ReaderWriterNode node; final Thread me = Thread.currentThread(); final int index = getIndex(me); if (index == -1) { node = new ReaderWriterNode(me, ReaderWriterNode.READER); this.waiters.add(node); } else { node = (ReaderWriterNode) this.waiters.get(index); } while (getIndex(me) > firstWriter()) { try { wait(); } catch (Exception e) { System.err.println("ReaderWriterLock.lockRead(): exception."); System.err.print(e.getMessage()); } } node.nAcquires++; } /** * Grab the write lock. */ public synchronized void lockWrite() { final ReaderWriterNode node; final Thread me = Thread.currentThread(); final int index = getIndex(me); if (index == -1) { node = new ReaderWriterNode(me, ReaderWriterNode.WRITER); this.waiters.add(node); } else { node = (ReaderWriterNode) this.waiters.get(index); if (node.state == ReaderWriterNode.READER) { throw new IllegalArgumentException("Upgrade lock"); } node.state = ReaderWriterNode.WRITER; } while (getIndex(me) != 0) { try { wait(); } catch (Exception e) { System.err.println("ReaderWriterLock.lockWrite(): exception."); System.err.print(e.getMessage()); } } node.nAcquires++; } /** * Unlock. */ public synchronized void unlock() { final ReaderWriterNode node; final Thread me = Thread.currentThread(); final int index = getIndex(me); if (index > firstWriter()) { throw new IllegalArgumentException("Lock not held"); } node = (ReaderWriterNode) this.waiters.get(index); node.nAcquires--; if (node.nAcquires == 0) { this.waiters.remove(index); } notifyAll(); } /** * Returns the index of the first waiting writer. * * @return The index. */ private int firstWriter() { final Iterator e = this.waiters.iterator(); int index = 0; while (e.hasNext()) { final ReaderWriterNode node = (ReaderWriterNode) e.next(); if (node.state == ReaderWriterNode.WRITER) { return index; } index += 1; } return Integer.MAX_VALUE; } /** * Returns the index of a thread. * * @param t * the thread. * * @return The index. */ private int getIndex(final Thread t) { final Iterator e = this.waiters.iterator(); int index = 0; while (e.hasNext()) { final ReaderWriterNode node = (ReaderWriterNode) e.next(); if (node.t == t) { return index; } index += 1; } return -1; }
}
</source>
Boolean lock
<source lang="java">
public class BestReplacement extends Object {
private Thread internalThread; private volatile boolean stopRequested; private BooleanLock suspendRequested; private BooleanLock internalThreadSuspended; public BestReplacement() { stopRequested = false; suspendRequested = new BooleanLock(false); internalThreadSuspended = new BooleanLock(false); Runnable r = new Runnable() { public void run() { try { runWork(); } catch (Exception x) { x.printStackTrace(); } } }; internalThread = new Thread(r); internalThread.start(); } private void runWork() { int count = 0; while (!stopRequested) { try { waitWhileSuspended(); } catch (InterruptedException x) { Thread.currentThread().interrupt(); continue; } System.out.println("Part I - count=" + count); try { Thread.sleep(1000); } catch (InterruptedException x) { Thread.currentThread().interrupt(); } System.out.println("Part II - count=" + count); try { Thread.sleep(1000); } catch (InterruptedException x) { Thread.currentThread().interrupt(); } System.out.println("Part III - count=" + count); count++; } } private void waitWhileSuspended() throws InterruptedException { synchronized (suspendRequested) { if (suspendRequested.isTrue()) { try { internalThreadSuspended.setValue(true); suspendRequested.waitUntilFalse(0); } finally { internalThreadSuspended.setValue(false); } } } } public void suspendRequest() { suspendRequested.setValue(true); } public void resumeRequest() { suspendRequested.setValue(false); } public boolean waitForActualSuspension(long msTimeout) throws InterruptedException { return internalThreadSuspended.waitUntilTrue(msTimeout); } public void stopRequest() { stopRequested = true; internalThread.interrupt(); } public boolean isAlive() { return internalThread.isAlive(); } public static void main(String[] args) { try { BestReplacement br = new BestReplacement(); System.out .println(" just created, br.isAlive()=" + br.isAlive()); Thread.sleep(4200); long startTime = System.currentTimeMillis(); br.suspendRequest(); System.out.println(" just submitted a suspendRequest"); boolean suspensionTookEffect = br.waitForActualSuspension(10000); long stopTime = System.currentTimeMillis(); if (suspensionTookEffect) { System.out.println(" the internal thread took " + (stopTime - startTime) + " ms to notice " + "\n the suspend request and is now " + "suspended."); } else { System.out.println(" the internal thread did not notice " + "the suspend request " + "\n within 10 seconds."); } Thread.sleep(5000); br.resumeRequest(); System.out.println("Submitted a resumeRequest"); Thread.sleep(2200); br.stopRequest(); System.out.println("Submitted a stopRequest"); } catch (InterruptedException x) { // ignore } }
} class BooleanLock extends Object {
private boolean value; public BooleanLock(boolean initialValue) { value = initialValue; } public BooleanLock() { this(false); } public synchronized void setValue(boolean newValue) { if ( newValue != value ) { value = newValue; notifyAll(); } } public synchronized boolean waitToSetTrue(long msTimeout) throws InterruptedException { boolean success = waitUntilFalse(msTimeout); if ( success ) { setValue(true); } return success; } public synchronized boolean waitToSetFalse(long msTimeout) throws InterruptedException { boolean success = waitUntilTrue(msTimeout); if ( success ) { setValue(false); } return success; } public synchronized boolean isTrue() { return value; } public synchronized boolean isFalse() { return !value; } public synchronized boolean waitUntilTrue(long msTimeout) throws InterruptedException { return waitUntilStateIs(true, msTimeout); } public synchronized boolean waitUntilFalse(long msTimeout) throws InterruptedException { return waitUntilStateIs(false, msTimeout); } public synchronized boolean waitUntilStateIs( boolean state, long msTimeout ) throws InterruptedException { if ( msTimeout == 0L ) { while ( value != state ) { wait(); } return true; } long endTime = System.currentTimeMillis() + msTimeout; long msRemaining = msTimeout; while ( ( value != state ) && ( msRemaining > 0L ) ) { wait(msRemaining); msRemaining = endTime - System.currentTimeMillis(); } return ( value == state ); }
}
</source>
Byte FIFO
<source lang="java">
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class ByteFIFOTest extends Object {
private ByteFIFO fifo; private byte[] srcData; public ByteFIFOTest() throws IOException { fifo = new ByteFIFO(20); makeSrcData(); System.out.println("srcData.length=" + srcData.length); Runnable srcRunnable = new Runnable() { public void run() { src(); } }; Thread srcThread = new Thread(srcRunnable); srcThread.start(); Runnable dstRunnable = new Runnable() { public void run() { dst(); } }; Thread dstThread = new Thread(dstRunnable); dstThread.start(); } private void makeSrcData() throws IOException { String[] list = { "The first string is right here", "The second string is a bit longer and also right here", "The third string", "ABCDEFGHIJKLMNOPQRSTUVWXYZ", "0123456789", "The last string in the list" }; ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(list); oos.flush(); oos.close(); srcData = baos.toByteArray(); } private void src() { try { boolean justAddOne = true; int count = 0; while ( count < srcData.length ) { if ( !justAddOne ) { int writeSize = (int) ( 40.0 * Math.random() ); writeSize = Math.min(writeSize, srcData.length - count); byte[] buf = new byte[writeSize]; System.arraycopy(srcData, count, buf, 0, writeSize); fifo.add(buf); count += writeSize; System.out.println("just added " + writeSize + " bytes"); } else { fifo.add(srcData[count]); count++; System.out.println("just added exactly 1 byte"); } justAddOne = !justAddOne; } } catch ( InterruptedException x ) { x.printStackTrace(); } } private void dst() { try { boolean justAddOne = true; int count = 0; byte[] dstData = new byte[srcData.length]; while ( count < dstData.length ) { if ( !justAddOne ) { byte[] buf = fifo.removeAll(); if ( buf.length > 0 ) { System.arraycopy(buf, 0, dstData, count, buf.length); count += buf.length; } System.out.println( "just removed " + buf.length + " bytes"); } else { byte b = fifo.remove(); dstData[count] = b; count++; System.out.println( "just removed exactly 1 byte"); } justAddOne = !justAddOne; } System.out.println("received all data, count=" + count); ObjectInputStream ois = new ObjectInputStream( new ByteArrayInputStream(dstData)); String[] line = (String[]) ois.readObject(); for ( int i = 0; i < line.length; i++ ) { System.out.println("line[" + i + "]=" + line[i]); } } catch ( ClassNotFoundException x1 ) { x1.printStackTrace(); } catch ( IOException iox ) { iox.printStackTrace(); } catch ( InterruptedException x ) { x.printStackTrace(); } } public static void main(String[] args) { try { new ByteFIFOTest(); } catch ( IOException iox ) { iox.printStackTrace(); } }
} class ByteFIFO extends Object {
private byte[] queue; private int capacity; private int size; private int head; private int tail; public ByteFIFO(int cap) { capacity = ( cap > 0 ) ? cap : 1; // at least 1 queue = new byte[capacity]; head = 0; tail = 0; size = 0; } public int getCapacity() { return capacity; } public synchronized int getSize() { return size; } public synchronized boolean isEmpty() { return ( size == 0 ); } public synchronized boolean isFull() { return ( size == capacity ); } public synchronized void add(byte b) throws InterruptedException { waitWhileFull(); queue[head] = b; head = ( head + 1 ) % capacity; size++; notifyAll(); // let any waiting threads know about change } public synchronized void add(byte[] list) throws InterruptedException { // For efficiency, the bytes are copied in blocks // instead of one at a time. As space becomes available, // more bytes are copied until all of them have been // added. int ptr = 0; while ( ptr < list.length ) { // If full, the lock will be released to allow // another thread to come in and remove bytes. waitWhileFull(); int space = capacity - size; int distToEnd = capacity - head; int blockLen = Math.min(space, distToEnd); int bytesRemaining = list.length - ptr; int copyLen = Math.min(blockLen, bytesRemaining); System.arraycopy(list, ptr, queue, head, copyLen); head = ( head + copyLen ) % capacity; size += copyLen; ptr += copyLen; // Keep the lock, but let any waiting threads // know that something has changed. notifyAll(); } } public synchronized byte remove() throws InterruptedException { waitWhileEmpty(); byte b = queue[tail]; tail = ( tail + 1 ) % capacity; size--; notifyAll(); // let any waiting threads know about change return b; } public synchronized byte[] removeAll() { // For efficiency, the bytes are copied in blocks // instead of one at a time. if ( isEmpty() ) { // Nothing to remove, return a zero-length // array and do not bother with notification // since nothing was removed. return new byte[0]; } // based on the current size byte[] list = new byte[size]; // copy in the block from tail to the end int distToEnd = capacity - tail; int copyLen = Math.min(size, distToEnd); System.arraycopy(queue, tail, list, 0, copyLen); // If data wraps around, copy the remaining data // from the front of the array. if ( size > copyLen ) { System.arraycopy( queue, 0, list, copyLen, size - copyLen); } tail = ( tail + size ) % capacity; size = 0; // everything has been removed // Signal any and all waiting threads that // something has changed. notifyAll(); return list; } public synchronized byte[] removeAtLeastOne() throws InterruptedException { waitWhileEmpty(); // wait for a least one to be in FIFO return removeAll(); } public synchronized boolean waitUntilEmpty(long msTimeout) throws InterruptedException { if ( msTimeout == 0L ) { waitUntilEmpty(); // use other method return true; } // wait only for the specified amount of time long endTime = System.currentTimeMillis() + msTimeout; long msRemaining = msTimeout; while ( !isEmpty() && ( msRemaining > 0L ) ) { wait(msRemaining); msRemaining = endTime - System.currentTimeMillis(); } // May have timed out, or may have met condition, // calc return value. return isEmpty(); } public synchronized void waitUntilEmpty() throws InterruptedException { while ( !isEmpty() ) { wait(); } } public synchronized void waitWhileEmpty() throws InterruptedException { while ( isEmpty() ) { wait(); } } public synchronized void waitUntilFull() throws InterruptedException { while ( !isFull() ) { wait(); } } public synchronized void waitWhileFull() throws InterruptedException { while ( isFull() ) { wait(); } }
}
</source>
Coordinates threads for multi-threaded operations
<source lang="java">
/* Copyright (C) 2005-2008 by Peter Eastman
This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. */
import java.util.concurrent.atomic.*; import java.util.*; /**
* This class coordinates threads for multi-threaded operations. The execution model * provided by this class is a single "task" (e.g. tracing a ray through a single pixel) * which must be executed many times. The task is parameterized by a single index * (e.g. the column containing the pixel).*
* To use this class, pass it an object which implements the Task interface. It * automatically creates an appropriate number of threads based on the number of * available processors. When you call run(), the task is repeatedly executed by * the worker threads, with the index running * over the desired range. You may invoke run() any number of times (e.g. once * for each row of the image). Finally, call finish() to clean up the worker threads. */ public class ThreadManager { private int numIndices; private AtomicInteger nextIndex; private Thread thread[]; private HashSet<Thread> waitingThreads; private Task task; private Object controller; private boolean controllerWaiting; /** * Create a new uninitialized ThreadManager. You must invoke setNumIndices() and setTask() * to initialize it before calling run(). */ public ThreadManager() { this(0, null); } /** * Create a new ThreadManager. * * @param numIndices the number of values the index should take on (from 0 to numIndices-1) * @param task the task to perform */ public ThreadManager(int numIndices, Task task) { this.numIndices = numIndices; this.task = task; nextIndex = new AtomicInteger(numIndices); controller = new Object(); controllerWaiting = false; waitingThreads = new HashSet<Thread>(); } /** * Create and start the worker threads. This is invoked the first time run() is called. */ private void createThreads() { // Create a worker thread for each processor. thread = new Thread [Runtime.getRuntime().availableProcessors()]; for (int i = 0; i < thread.length; i++) { thread[i] = new Thread("Worker thread "+(i+1)) { public void run() { // Repeatedly perform the task until we are finished. while (true) { try { int index = nextIndex(); task.execute(index); } catch (InterruptedException ex) { task.cleanup(); return; } catch (Exception ex) { cancel(); ex.printStackTrace(); } } } }; thread[i].start(); } } /** * Set the number of values the index should take on. This must be invoked from the same * thread that instantiated the ThreadManager and that calls run(). */ public void setNumIndices(int numIndices) { this.numIndices = numIndices; nextIndex.set(numIndices); } /** * Set the Task to be executed by the worker threads. If another Task has already been set, * that one is discarded immediately and cleanup() will never be invoked on in. This method * must be invoked from the same thread that instantiated the ThreadManager and that calls run(). */ public void setTask(Task task) { this.task = task; } /** * Perform the task the specified number of times. This method blocks until all * occurrences of the task are completed. If the current thread is interrupted * while this method is in progress, all of the worker threads will be interrupted * and disposed of. */ public void run() { controllerWaiting = false; nextIndex.set(0); waitingThreads.clear(); if (thread == null) createThreads(); // Notify all the worker threads, then wait for them to finish. synchronized (this) { notifyAll(); } synchronized (controller) { try { controllerWaiting = true; controller.wait(); } catch (InterruptedException ex) { finish(); } } } /** * Cancel a run which is in progress. Calling this method does not interrupt any tasks that * are currently executing, but it prevents any more from being started until the next time * run() is called. */ public void cancel() { nextIndex.set(numIndices); } /** * Dispose of all the worker threads. Once this has been called, do not call run() again. */ public void finish() { if (thread != null) for (int i = 0; i < thread.length; i++) thread[i].interrupt(); } private int nextIndex() throws InterruptedException { int index; while ((index = nextIndex.getAndIncrement()) >= numIndices) { // Wait until run() is called again. synchronized (this) { waitingThreads.add(Thread.currentThread()); if (waitingThreads.size() == thread.length) { while (!controllerWaiting) wait(1); synchronized (controller) { controller.notify(); } } wait(); } } return index; } /** * This interface defines a task to be performed by the worker threads. */ public static interface Task { /** * Execute the task for the specified index. */ public void execute(int index); /** * This is called once from each worker thread when finish() is called. It gives a chance * to do any necessary cleanup. */ public void cleanup(); } } </source>
Daemon Lock
<source lang="java">
/*
* * Copyright (c) 1997-1999 Scott Oaks and Henry Wong. All Rights Reserved. * * Permission to use, copy, modify, and distribute this software * and its documentation for NON-COMMERCIAL purposes and * without fee is hereby granted. * * This sample source code is provided for example only, * on an unsupported, as-is basis. * * AUTHOR MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF * THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED * TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * PARTICULAR PURPOSE, OR NON-INFRINGEMENT. AUTHOR SHALL NOT BE LIABLE FOR * ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR * DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES. * * THIS SOFTWARE IS NOT DESIGNED OR INTENDED FOR USE OR RESALE AS ON-LINE * CONTROL EQUIPMENT IN HAZARDOUS ENVIRONMENTS REQUIRING FAIL-SAFE * PERFORMANCE, SUCH AS IN THE OPERATION OF NUCLEAR FACILITIES, AIRCRAFT * NAVIGATION OR COMMUNICATION SYSTEMS, AIR TRAFFIC CONTROL, DIRECT LIFE * SUPPORT MACHINES, OR WEAPONS SYSTEMS, IN WHICH THE FAILURE OF THE * SOFTWARE COULD LEAD DIRECTLY TO DEATH, PERSONAL INJURY, OR SEVERE * PHYSICAL OR ENVIRONMENTAL DAMAGE ("HIGH RISK ACTIVITIES"). AUTHOR * SPECIFICALLY DISCLAIMS ANY EXPRESS OR IMPLIED WARRANTY OF FITNESS FOR * HIGH RISK ACTIVITIES. */
public class DaemonLock implements Runnable {
private int lockCount = 0; public synchronized void acquire() { if (lockCount++ == 0) { Thread t = new Thread(this); t.setDaemon(false); t.start(); } } public synchronized void release() { if (--lockCount == 0) { notify(); } } public synchronized void run() { while (lockCount != 0) { try { wait(); } catch (InterruptedException ex) {}; } }
}
</source>
Determining If the Current Thread Is Holding a Synchronized Lock
<source lang="java">
public class Main {
public static void main(String[] argv) throws Exception { } public synchronized void myMethod() { Object o = new Object(); System.out.println(Thread.holdsLock(o)); synchronized (o) { System.out.println(Thread.holdsLock(o)); } System.out.println(Thread.holdsLock(this)); }
}
</source>
Handle concurrent read/write: use synchronized to lock the data
<source lang="java">
import java.util.Iterator; import java.util.Vector; public class Main {
Vector data = new Vector(); public Main() { new Producer().start(); new Consumer().start(); } public static void main(String[] args) throws Exception { new Main(); } class Consumer extends Thread { Consumer() { super("Consumer"); } public void run() { while (true) { synchronized (data) { Iterator it = data.iterator(); while (it.hasNext()) it.next(); } } } } class Producer extends Thread { Producer() { super("Producer"); } public void run() { while (true) { data.addElement(new Object()); if (data.size() > 1000) data.removeAllElements(); } } }
}
</source>
Interruptible Synchronized Block
<source lang="java">
public class InterruptibleSyncBlock extends Object {
private Object longLock; private BooleanLock busyLock; public InterruptibleSyncBlock() { longLock = new Object(); busyLock = new BooleanLock(false); } public void doStuff() throws InterruptedException { print("about to try to get exclusive access " + "to busyLock"); busyLock.waitToSetTrue(0); try { print("about to try to get exclusive access " + "to longLock"); synchronized (longLock) { print("got exclusive access to longLock"); try { Thread.sleep(10000); } catch (InterruptedException x) { // ignore } print("about to relinquish exclusive access " + "to longLock"); } } finally { print("about to free up busyLock"); busyLock.setValue(false); } } private static void print(String msg) { String name = Thread.currentThread().getName(); System.err.println(name + ": " + msg); } private static Thread launch(final InterruptibleSyncBlock sb, String name) { Runnable r = new Runnable() { public void run() { print("in run()"); try { sb.doStuff(); } catch (InterruptedException x) { print("InterruptedException thrown " + "from doStuff()"); } } }; Thread t = new Thread(r, name); t.start(); return t; } public static void main(String[] args) { try { InterruptibleSyncBlock sb = new InterruptibleSyncBlock(); Thread t1 = launch(sb, "T1"); Thread.sleep(500); Thread t2 = launch(sb, "T2"); Thread t3 = launch(sb, "T3"); Thread.sleep(1000); print("about to interrupt T2"); t2.interrupt(); print("just interrupted T2"); } catch (InterruptedException x) { x.printStackTrace(); } }
} class BooleanLock extends Object {
private boolean value; public BooleanLock(boolean initialValue) { value = initialValue; } public BooleanLock() { this(false); } public synchronized void setValue(boolean newValue) { if ( newValue != value ) { value = newValue; notifyAll(); } } public synchronized boolean waitToSetTrue(long msTimeout) throws InterruptedException { boolean success = waitUntilFalse(msTimeout); if ( success ) { setValue(true); } return success; } public synchronized boolean waitToSetFalse(long msTimeout) throws InterruptedException { boolean success = waitUntilTrue(msTimeout); if ( success ) { setValue(false); } return success; } public synchronized boolean isTrue() { return value; } public synchronized boolean isFalse() { return !value; } public synchronized boolean waitUntilTrue(long msTimeout) throws InterruptedException { return waitUntilStateIs(true, msTimeout); } public synchronized boolean waitUntilFalse(long msTimeout) throws InterruptedException { return waitUntilStateIs(false, msTimeout); } public synchronized boolean waitUntilStateIs( boolean state, long msTimeout ) throws InterruptedException { if ( msTimeout == 0L ) { while ( value != state ) { wait(); // wait indefinitely until notified } // condition has finally been met return true; } // only wait for the specified amount of time long endTime = System.currentTimeMillis() + msTimeout; long msRemaining = msTimeout; while ( ( value != state ) && ( msRemaining > 0L ) ) { wait(msRemaining); msRemaining = endTime - System.currentTimeMillis(); } // May have timed out, or may have met value, // calculate return value. return ( value == state ); }
}
</source>
Invoke a series of runnables as closely to synchronously as possible
<source lang="java">
/*
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
import java.util.ArrayList; import java.util.List; import java.util.Iterator; import java.util.Collections; import java.util.Random; /**
* The purpose of this class is to invoke a series of runnables as * closely to synchronously as possible. It does this by starting * a thread for each one, getting the threads into there run method, * then quickly running through (in random order) and notifying each * thread. */
public class ThreadPounder {
List runnables; Object [] threads; Object lock = new Object(); public ThreadPounder(List runnables) throws InterruptedException { this(runnables, new Random(1234)); } public ThreadPounder(List runnables, Random rand) throws InterruptedException { this.runnables = new ArrayList(runnables); Collections.shuffle(this.runnables, rand); threads = new Object[this.runnables.size()]; int i=0; Iterator iter= this.runnables.iterator(); synchronized (lock) { while (iter.hasNext()) { Thread t = new SyncThread((Runnable)iter.next()); t.start(); lock.wait(); threads[i] = t; i++; } } } public void start() { synchronized(this) { this.notifyAll(); } } class SyncThread extends Thread { Runnable toRun; public long runTime; public SyncThread(Runnable toRun) { this.toRun = toRun; } public void run() { try { synchronized (ThreadPounder.this) { synchronized (lock) { // Let pounder know I"m ready to go lock.notify(); } // Wait for pounder to wake me up. ThreadPounder.this.wait(); } toRun.run(); } catch (InterruptedException ie) { } } } public static void main(String [] str) { List l = new ArrayList(20); for (int i=0; i<20; i++) { final int x = i; l.add(new Runnable() { public void run() { System.out.println("Thread " + x); } }); } try { ThreadPounder tp = new ThreadPounder(l); System.out.println("Starting:" ); tp.start(); System.out.println("All Started:" ); } catch (InterruptedException ie) { ie.printStackTrace(); } }
}
</source>
Lock for read and write
<source lang="java">
/*
* * Copyright (c) 1997-1999 Scott Oaks and Henry Wong. All Rights Reserved. * * Permission to use, copy, modify, and distribute this software * and its documentation for NON-COMMERCIAL purposes and * without fee is hereby granted. * * This sample source code is provided for example only, * on an unsupported, as-is basis. * * AUTHOR MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF * THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED * TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * PARTICULAR PURPOSE, OR NON-INFRINGEMENT. AUTHOR SHALL NOT BE LIABLE FOR * ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR * DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES. * * THIS SOFTWARE IS NOT DESIGNED OR INTENDED FOR USE OR RESALE AS ON-LINE * CONTROL EQUIPMENT IN HAZARDOUS ENVIRONMENTS REQUIRING FAIL-SAFE * PERFORMANCE, SUCH AS IN THE OPERATION OF NUCLEAR FACILITIES, AIRCRAFT * NAVIGATION OR COMMUNICATION SYSTEMS, AIR TRAFFIC CONTROL, DIRECT LIFE * SUPPORT MACHINES, OR WEAPONS SYSTEMS, IN WHICH THE FAILURE OF THE * SOFTWARE COULD LEAD DIRECTLY TO DEATH, PERSONAL INJURY, OR SEVERE * PHYSICAL OR ENVIRONMENTAL DAMAGE ("HIGH RISK ACTIVITIES"). AUTHOR * SPECIFICALLY DISCLAIMS ANY EXPRESS OR IMPLIED WARRANTY OF FITNESS FOR * HIGH RISK ACTIVITIES. */
import java.util.*; class RWNode {
static final int READER = 0; static final int WRITER = 1; Thread t; int state; int nAcquires; RWNode(Thread t, int state) { this.t = t; this.state = state; nAcquires = 0; }
} public class RWLock {
private Vector waiters; private int firstWriter() { Enumeration e; int index; for (index = 0, e = waiters.elements(); e.hasMoreElements(); index++) { RWNode node = (RWNode) e.nextElement(); if (node.state == RWNode.WRITER) return index; } return Integer.MAX_VALUE; } private int getIndex(Thread t) { Enumeration e; int index; for (index = 0, e = waiters.elements(); e.hasMoreElements(); index++) { RWNode node = (RWNode) e.nextElement(); if (node.t == t) return index; } return -1; } public RWLock() { waiters = new Vector(); } public synchronized void lockRead() { RWNode node; Thread me = Thread.currentThread(); int index = getIndex(me); if (index == -1) { node = new RWNode(me, RWNode.READER); waiters.addElement(node); } else node = (RWNode) waiters.elementAt(index); while (getIndex(me) > firstWriter()) { try { wait(); } catch (Exception e) {} } node.nAcquires++; } public synchronized void lockWrite() { RWNode node; Thread me = Thread.currentThread(); int index = getIndex(me); if (index == -1) { node = new RWNode(me, RWNode.WRITER); waiters.addElement(node); } else { node = (RWNode) waiters.elementAt(index); if (node.state == RWNode.READER) throw new IllegalArgumentException("Upgrade lock"); node.state = RWNode.WRITER; } while (getIndex(me) != 0) { try { wait(); } catch (Exception e) {} } node.nAcquires++; } public synchronized void unlock() { RWNode node; Thread me = Thread.currentThread(); int index; index = getIndex(me); if (index > firstWriter()) throw new IllegalArgumentException("Lock not held"); node = (RWNode) waiters.elementAt(index); node.nAcquires--; if (node.nAcquires == 0) { waiters.removeElementAt(index); notifyAll(); } }
}
</source>
No synchronize
<source lang="java">
public class WithoutSync extends Object {
private static int nextSerialNum = 10001; public static int getNextSerialNum() { int sn = nextSerialNum; try { Thread.sleep(1000); } catch ( InterruptedException x ) { } nextSerialNum++; return sn; } private static void print(String msg) { String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": " + msg); } public static void main(String[] args) { try { Runnable r = new Runnable() { public void run() { print("getNextSerialNum()=" + getNextSerialNum()); } }; Thread threadA = new Thread(r, "threadA"); threadA.start(); Thread.sleep(1500); Thread threadB = new Thread(r, "threadB"); threadB.start(); Thread.sleep(500); Thread threadC = new Thread(r, "threadC"); threadC.start(); Thread.sleep(2500); Thread threadD = new Thread(r, "threadD"); threadD.start(); } catch ( InterruptedException x ) { // ignore } }
}
</source>
Object FIFO
<source lang="java">
public class ObjectFIFOTest extends Object {
private static void fullCheck(ObjectFIFO fifo) { try { // Sync"d to allow messages to print while // condition is still true. synchronized (fifo) { while (true) { fifo.waitUntilFull(); print("FULL"); fifo.waitWhileFull(); print("NO LONGER FULL"); } } } catch (InterruptedException ix) { return; } } private static void emptyCheck(ObjectFIFO fifo) { try { // Sync"d to allow messages to print while // condition is still true. synchronized (fifo) { while (true) { fifo.waitUntilEmpty(); print("EMPTY"); fifo.waitWhileEmpty(); print("NO LONGER EMPTY"); } } } catch (InterruptedException ix) { return; } } private static void consumer(ObjectFIFO fifo) { try { print("just entered consumer()"); for (int i = 0; i < 3; i++) { synchronized (fifo) { Object obj = fifo.remove(); print("DATA-OUT - did remove(), obj=" + obj); } Thread.sleep(3000); } synchronized (fifo) { boolean resultOfWait = fifo.waitUntilEmpty(500); print("did waitUntilEmpty(500), resultOfWait=" + resultOfWait + ", getSize()=" + fifo.getSize()); } for (int i = 0; i < 3; i++) { synchronized (fifo) { Object[] list = fifo.removeAll(); print("did removeAll(), list.length=" + list.length); for (int j = 0; j < list.length; j++) { print("DATA-OUT - list[" + j + "]=" + list[j]); } } Thread.sleep(100); } for (int i = 0; i < 3; i++) { synchronized (fifo) { Object[] list = fifo.removeAtLeastOne(); print("did removeAtLeastOne(), list.length=" + list.length); for (int j = 0; j < list.length; j++) { print("DATA-OUT - list[" + j + "]=" + list[j]); } } Thread.sleep(1000); } while (!fifo.isEmpty()) { synchronized (fifo) { Object obj = fifo.remove(); print("DATA-OUT - did remove(), obj=" + obj); } Thread.sleep(1000); } print("leaving consumer()"); } catch (InterruptedException ix) { return; } } private static void producer(ObjectFIFO fifo) { try { print("just entered producer()"); int count = 0; Object obj0 = new Integer(count); count++; synchronized (fifo) { fifo.add(obj0); print("DATA-IN - did add(), obj0=" + obj0); boolean resultOfWait = fifo.waitUntilEmpty(500); print("did waitUntilEmpty(500), resultOfWait=" + resultOfWait + ", getSize()=" + fifo.getSize()); } for (int i = 0; i < 10; i++) { Object obj = new Integer(count); count++; synchronized (fifo) { fifo.add(obj); print("DATA-IN - did add(), obj=" + obj); } Thread.sleep(1000); } Thread.sleep(2000); Object obj = new Integer(count); count++; synchronized (fifo) { fifo.add(obj); print("DATA-IN - did add(), obj=" + obj); } Thread.sleep(500); Integer[] list1 = new Integer[3]; for (int i = 0; i < list1.length; i++) { list1[i] = new Integer(count); count++; } synchronized (fifo) { fifo.addEach(list1); print("did addEach(), list1.length=" + list1.length); } Integer[] list2 = new Integer[8]; for (int i = 0; i < list2.length; i++) { list2[i] = new Integer(count); count++; } synchronized (fifo) { fifo.addEach(list2); print("did addEach(), list2.length=" + list2.length); } synchronized (fifo) { fifo.waitUntilEmpty(); print("fifo.isEmpty()=" + fifo.isEmpty()); } print("leaving producer()"); } catch (InterruptedException ix) { return; } } private static synchronized void print(String msg) { System.out.println(Thread.currentThread().getName() + ": " + msg); } public static void main(String[] args) { final ObjectFIFO fifo = new ObjectFIFO(5); Runnable fullCheckRunnable = new Runnable() { public void run() { fullCheck(fifo); } }; Thread fullCheckThread = new Thread(fullCheckRunnable, "fchk"); fullCheckThread.setPriority(9); fullCheckThread.setDaemon(true); // die automatically fullCheckThread.start(); Runnable emptyCheckRunnable = new Runnable() { public void run() { emptyCheck(fifo); } }; Thread emptyCheckThread = new Thread(emptyCheckRunnable, "echk"); emptyCheckThread.setPriority(8); emptyCheckThread.setDaemon(true); // die automatically emptyCheckThread.start(); Runnable consumerRunnable = new Runnable() { public void run() { consumer(fifo); } }; Thread consumerThread = new Thread(consumerRunnable, "cons"); consumerThread.setPriority(7); consumerThread.start(); Runnable producerRunnable = new Runnable() { public void run() { producer(fifo); } }; Thread producerThread = new Thread(producerRunnable, "prod"); producerThread.setPriority(6); producerThread.start(); }
} class ObjectFIFO extends Object {
private Object[] queue; private int capacity; private int size; private int head; private int tail; public ObjectFIFO(int cap) { capacity = (cap > 0) ? cap : 1; // at least 1 queue = new Object[capacity]; head = 0; tail = 0; size = 0; } public int getCapacity() { return capacity; } public synchronized int getSize() { return size; } public synchronized boolean isEmpty() { return (size == 0); } public synchronized boolean isFull() { return (size == capacity); } public synchronized void add(Object obj) throws InterruptedException { waitWhileFull(); queue[head] = obj; head = (head + 1) % capacity; size++; notifyAll(); // let any waiting threads know about change } public synchronized void addEach(Object[] list) throws InterruptedException { // // You might want to code a more efficient // implementation here ... (see ByteFIFO.java) // for (int i = 0; i < list.length; i++) { add(list[i]); } } public synchronized Object remove() throws InterruptedException { waitWhileEmpty(); Object obj = queue[tail]; // don"t block GC by keeping unnecessary reference queue[tail] = null; tail = (tail + 1) % capacity; size--; notifyAll(); // let any waiting threads know about change return obj; } public synchronized Object[] removeAll() throws InterruptedException { // // You might want to code a more efficient // implementation here ... (see ByteFIFO.java) // Object[] list = new Object[size]; // use the current size for (int i = 0; i < list.length; i++) { list[i] = remove(); } // if FIFO was empty, a zero-length array is returned return list; } public synchronized Object[] removeAtLeastOne() throws InterruptedException { waitWhileEmpty(); // wait for a least one to be in FIFO return removeAll(); } public synchronized boolean waitUntilEmpty(long msTimeout) throws InterruptedException { if (msTimeout == 0L) { waitUntilEmpty(); // use other method return true; } // wait only for the specified amount of time long endTime = System.currentTimeMillis() + msTimeout; long msRemaining = msTimeout; while (!isEmpty() && (msRemaining > 0L)) { wait(msRemaining); msRemaining = endTime - System.currentTimeMillis(); } // May have timed out, or may have met condition, // calc return value. return isEmpty(); } public synchronized void waitUntilEmpty() throws InterruptedException { while (!isEmpty()) { wait(); } } public synchronized void waitWhileEmpty() throws InterruptedException { while (isEmpty()) { wait(); } } public synchronized void waitUntilFull() throws InterruptedException { while (!isFull()) { wait(); } } public synchronized void waitWhileFull() throws InterruptedException { while (isFull()) { wait(); } }
}
</source>
Operations that may seem safe are not, when threads are present
<source lang="java">
// : c13:SerialNumberChecker.java // Operations that may seem safe are not, // when threads are present. // From "Thinking in Java, 3rd ed." (c) Bruce Eckel 2002 // www.BruceEckel.ru. See copyright notice in CopyRight.txt. // Reuses storage so we don"t run out of memory:
class CircularSet {
private int[] array; private int len; private int index = 0; public CircularSet(int size) { array = new int[size]; len = size; // Initialize to a value not produced // by the SerialNumberGenerator: for (int i = 0; i < size; i++) array[i] = -1; } public synchronized void add(int i) { array[index] = i; // Wrap index and write over old elements: index = ++index % len; } public synchronized boolean contains(int val) { for (int i = 0; i < len; i++) if (array[i] == val) return true; return false; }
} public class SerialNumberChecker {
private static CircularSet serials = new CircularSet(1000); static class SerialChecker extends Thread { SerialChecker() { start(); } public void run() { while (true) { int serial = SerialNumberGenerator.nextSerialNumber(); if (serials.contains(serial)) { System.out.println("Duplicate: " + serial); System.exit(0); } serials.add(serial); } } } public static void main(String[] args) { for (int i = 0; i < 10; i++) new SerialChecker(); // Stop after 4 seconds: new Timeout(4000, "No duplicates detected"); }
} ///:~
class SerialNumberGenerator { private static volatile int serialNumber = 0; public static int nextSerialNumber() { return serialNumber++; }
} ///:~ class Timeout extends java.util.Timer {
public Timeout(int delay, final String msg) { super(true); // Daemon thread schedule(new TimerTask() { public void run() { System.out.println(msg); System.exit(0); } }, delay); }
} ///:~
</source>
Read Write Lock
<source lang="java">
/*
* Copyright 2002 (C) TJDO. * All rights reserved. * * This software is distributed under the terms of the TJDO License version 1.0. * See the terms of the TJDO License in the documentation provided with this software. * * $Id: ReadWriteLock.java,v 1.5 2006/08/02 22:41:25 jackknifebarber Exp $ */
/**
* A simple read-write lock implementation. Multiple threads may lock using * readLock(), only one can lock using writeLock(). The caller is responsible * for coding a try-finally that ensures unlock() is called for every readLock() * and writeLock() call. * * <p>A ReadWriteLock is recursive; with one exception, a thread can re-lock an * object it already has locked. Multiple read locks can be acquired by the * same thread, as can multiple write locks. The exception however is that a * write lock cannot be acquired when a read lock is already held (to allow * this would cause deadlocks). * * <p>Successive lock calls from the same thread must be matched by an * equal number of unlock() calls. * * @author * @version $Revision: 1.5 $ */
public class ReadWriteLock {
private static final int WAIT_LOG_INTERVAL = 5000; /** A count for each thread indicating the number of read locks it holds. */ private ThreadLocal readLocksByThread; /** The number of read locks held across all threads. */ private int readLocks; /** The number of write locks held (by writeLockedBy). */ private int writeLocks; /** The thread holding the write lock(s), if any. */ private Thread writeLockedBy;
/** * An object holding a per-thread read-lock count. */ private static class Count { public int value = 0; }
/** * Constructs read-write lock. */ public ReadWriteLock() { readLocksByThread = new ThreadLocal() { public Object initialValue() { return new Count(); } }; readLocks = 0; writeLocks = 0; writeLockedBy = null; }
/** * Acquire a read lock. The calling thread will be suspended until no other * thread holds a write lock. * * <p>If the calling thread already owns a write lock for the object a read * lock is immediately acquired. * * @exception InterruptedException * If the thread is interrupted while attempting to acquire the lock. */ public synchronized void readLock() throws InterruptedException { Thread me = Thread.currentThread(); Count myReadLocks = (Count)readLocksByThread.get(); if (writeLockedBy != me) { while (writeLocks > 0) { wait(WAIT_LOG_INTERVAL); if (writeLocks > 0) System.out.println("Still waiting for read lock on "); } } ++readLocks; ++myReadLocks.value; }
/**
* Acquire a write lock. The calling thread will be suspended until no
* other thread holds a read or write lock.
*
* <p>This method cannot be called if the thread already owns a read lock on
* the same ReadWriteLock object, otherwise an
* IllegalStateException
is thrown.
*
* @exception IllegalStateException
* If the thread already holds a read lock on the same object.
* @exception InterruptedException
* If the thread is interrupted while attempting to acquire the lock.
*/
public synchronized void writeLock() throws InterruptedException
{
Thread me = Thread.currentThread();
Count myReadLocks = (Count)readLocksByThread.get();
if (myReadLocks.value > 0)
throw new IllegalStateException("Thread already holds a read lock");
if (writeLockedBy != me)
{
while (writeLocks > 0 || readLocks > 0)
{
wait(WAIT_LOG_INTERVAL);
if (writeLocks > 0 || readLocks > 0)
System.out.println("Still waiting for write lock on ");
}
writeLockedBy = me;
}
++writeLocks;
}
/** * Release a read or write lock. Must be called in a finally block after * acquiring a lock. */ public synchronized void unlock() { Thread me = Thread.currentThread(); Count myReadLocks = (Count)readLocksByThread.get(); if (myReadLocks.value > 0) { --myReadLocks.value; --readLocks; } else if (writeLockedBy == me) { if (writeLocks > 0) { if (--writeLocks == 0) writeLockedBy = null; } } notifyAll(); }
public String toString() { StringBuffer s = new StringBuffer(super.toString()); s.append(": readLocks = ").append(readLocks) .append(", writeLocks = ").append(writeLocks) .append(", writeLockedBy = ").append(writeLockedBy); return s.toString(); }
}
</source>
Signaling
<source lang="java">
public class Signaling extends Object {
private BooleanLock readyLock; public Signaling(BooleanLock readyLock) { this.readyLock = readyLock; Runnable r = new Runnable() { public void run() { try { runWork(); } catch (Exception x) { // in case ANY exception slips through x.printStackTrace(); } } }; Thread internalThread = new Thread(r, "internal"); internalThread.start(); } private void runWork() { try { print("about to wait for readyLock to be true"); readyLock.waitUntilTrue(0); // 0 - wait forever print("readyLock is now true"); } catch (InterruptedException x) { print("interrupted while waiting for readyLock " + "to become true"); } } private static void print(String msg) { String name = Thread.currentThread().getName(); System.err.println(name + ": " + msg); } public static void main(String[] args) { try { print("creating BooleanLock instance"); BooleanLock ready = new BooleanLock(false); print("creating Signaling instance"); new Signaling(ready); print("about to sleep for 3 seconds"); Thread.sleep(3000); print("about to setValue to true"); ready.setValue(true); print("ready.isTrue()=" + ready.isTrue()); } catch (InterruptedException x) { x.printStackTrace(); } }
} class BooleanLock extends Object {
private boolean value; public BooleanLock(boolean initialValue) { value = initialValue; } public BooleanLock() { this(false); } public synchronized void setValue(boolean newValue) { if (newValue != value) { value = newValue; notifyAll(); } } public synchronized boolean waitToSetTrue(long msTimeout) throws InterruptedException { boolean success = waitUntilFalse(msTimeout); if (success) { setValue(true); } return success; } public synchronized boolean waitToSetFalse(long msTimeout) throws InterruptedException { boolean success = waitUntilTrue(msTimeout); if (success) { setValue(false); } return success; } public synchronized boolean isTrue() { return value; } public synchronized boolean isFalse() { return !value; } public synchronized boolean waitUntilTrue(long msTimeout) throws InterruptedException { return waitUntilStateIs(true, msTimeout); } public synchronized boolean waitUntilFalse(long msTimeout) throws InterruptedException { return waitUntilStateIs(false, msTimeout); } public synchronized boolean waitUntilStateIs(boolean state, long msTimeout) throws InterruptedException { if (msTimeout == 0L) { while (value != state) { wait(); // wait indefinitely until notified } // condition has finally been met return true; } // only wait for the specified amount of time long endTime = System.currentTimeMillis() + msTimeout; long msRemaining = msTimeout; while ((value != state) && (msRemaining > 0L)) { wait(msRemaining); msRemaining = endTime - System.currentTimeMillis(); } // May have timed out, or may have met value, // calculate return value. return (value == state); }
}
</source>
Simple Object FIFO
<source lang="java">
public class SimpleObjectFIFO extends Object {
private Object[] queue; private int capacity; private int size; private int head; private int tail; public SimpleObjectFIFO(int cap) { capacity = (cap > 0) ? cap : 1; // at least 1 queue = new Object[capacity]; head = 0; tail = 0; size = 0; } public synchronized int getSize() { return size; } public synchronized boolean isFull() { return (size == capacity); } public synchronized void add(Object obj) throws InterruptedException { while (isFull()) { wait(); } queue[head] = obj; head = (head + 1) % capacity; size++; notifyAll(); // let any waiting threads know about change } public synchronized Object remove() throws InterruptedException { while (size == 0) { wait(); } Object obj = queue[tail]; queue[tail] = null; // don"t block GC by keeping unnecessary reference tail = (tail + 1) % capacity; size--; notifyAll(); // let any waiting threads know about change return obj; } public synchronized void printState() { StringBuffer sb = new StringBuffer(); sb.append("SimpleObjectFIFO:\n"); sb.append(" capacity=" + capacity + "\n"); sb.append(" size=" + size); if (isFull()) { sb.append(" - FULL"); } else if (size == 0) { sb.append(" - EMPTY"); } sb.append("\n"); sb.append(" head=" + head + "\n"); sb.append(" tail=" + tail + "\n"); for (int i = 0; i < queue.length; i++) { sb.append(" queue[" + i + "]=" + queue[i] + "\n"); } System.out.print(sb); } public static void main(String[] args) { try { SimpleObjectFIFO fifo = new SimpleObjectFIFO(5); fifo.printState(); fifo.add("S01"); fifo.printState(); fifo.add("S02"); fifo.printState(); fifo.add("S03"); fifo.printState(); Object obj = fifo.remove(); System.out.println("just removed obj=" + obj); fifo.printState(); fifo.add("S04"); fifo.printState(); fifo.add("S05"); fifo.printState(); fifo.add("S06"); fifo.printState(); } catch (InterruptedException x) { x.printStackTrace(); } }
}
</source>
Static synchronize
<source lang="java">
public class StaticSync extends Object {
private static int nextSerialNum = 10001; public static synchronized int getNextSerialNum() { int sn = nextSerialNum; try { Thread.sleep(1000); } catch ( InterruptedException x ) { } nextSerialNum++; return sn; } private static void print(String msg) { String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": " + msg); } public static void main(String[] args) { try { Runnable r = new Runnable() { public void run() { print("getNextSerialNum()=" + getNextSerialNum()); } }; Thread threadA = new Thread(r, "threadA"); threadA.start(); Thread.sleep(1500); Thread threadB = new Thread(r, "threadB"); threadB.start(); Thread.sleep(500); Thread threadC = new Thread(r, "threadC"); threadC.start(); Thread.sleep(2500); Thread threadD = new Thread(r, "threadD"); threadD.start(); } catch ( InterruptedException x ) { // ignore } }
}
</source>
Static synchronized block
<source lang="java">
public class StaticBlock extends Object {
public static synchronized void staticA() { System.out.println("entering staticA()"); try { Thread.sleep(5000); } catch (InterruptedException x) { } System.out.println("leaving staticA()"); } public static void staticB() { synchronized (StaticBlock.class) { System.out.println("in staticB() : inside sync block"); try { Thread.sleep(2000); } catch (InterruptedException x) { } } } public static void main(String[] args) { Runnable runA = new Runnable() { public void run() { StaticBlock.staticA(); } }; Thread threadA = new Thread(runA, "A"); threadA.start(); try { Thread.sleep(200); } catch (InterruptedException x) { } Runnable runB = new Runnable() { public void run() { StaticBlock.staticB(); } }; Thread threadB = new Thread(runB, "B"); threadB.start(); }
}
</source>
Synchronized Block demo
<source lang="java">
public class SyncBlock extends Object {
private Object longLock; public SyncBlock() { longLock = new Object(); } public void doStuff() { print("about to try to get exclusive access " + "to longLock"); synchronized (longLock) { print("got exclusive access to longLock"); try { Thread.sleep(10000); } catch (InterruptedException x) { } print("about to relinquish exclusive access to " + "longLock"); } } private static void print(String msg) { String name = Thread.currentThread().getName(); System.err.println(name + ": " + msg); } private static Thread launch(final SyncBlock sb, String name) { Runnable r = new Runnable() { public void run() { print("in run()"); sb.doStuff(); } }; Thread t = new Thread(r, name); t.start(); return t; } public static void main(String[] args) { try { SyncBlock sb = new SyncBlock(); Thread t1 = launch(sb, "T1"); Thread.sleep(500); Thread t2 = launch(sb, "T2"); Thread t3 = launch(sb, "T3"); Thread.sleep(1000); print("about to interrupt T2"); t2.interrupt(); print("just interrupted T2"); } catch (InterruptedException x) { x.printStackTrace(); } }
}
</source>
Synchronize method
<source lang="java">
public class OnlyOneInMethod extends Object {
private String objID; public OnlyOneInMethod(String objID) { this.objID = objID; } public synchronized void doStuff(int val) { print("entering doStuff()"); int num = val * 2 + objID.length(); print("local variable num=" + num); try { Thread.sleep(2000); } catch (InterruptedException x) { } print("leaving doStuff()"); } public void print(String msg) { threadPrint("objID=" + objID + " - " + msg); } public static void threadPrint(String msg) { String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": " + msg); } public static void main(String[] args) { final OnlyOneInMethod ooim = new OnlyOneInMethod("obj1"); Runnable runA = new Runnable() { public void run() { ooim.doStuff(3); } }; Thread threadA = new Thread(runA, "threadA"); threadA.start(); try { Thread.sleep(200); } catch (InterruptedException x) { } Runnable runB = new Runnable() { public void run() { ooim.doStuff(7); } }; Thread threadB = new Thread(runB, "threadB"); threadB.start(); }
}
</source>
Synchronizing blocks instead of entire methods
<source lang="java">
// : c13:CriticalSection.java // Synchronizing blocks instead of entire methods. Also // demonstrates protection of a non-thread-safe class // with a thread-safe one. // From "Thinking in Java, 3rd ed." (c) Bruce Eckel 2002 // www.BruceEckel.ru. See copyright notice in CopyRight.txt. import java.util.ArrayList; import java.util.List; import java.util.Timer; import java.util.TimerTask; public class CriticalSection {
public static void main(String[] args) { // Test the two different approaches: final PairManipulator pm1 = new PairManipulator(new PairManager1()), pm2 = new PairManipulator( new PairManager2()); new Timer(true).schedule(new TimerTask() { public void run() { System.out.println("pm1: " + pm1); System.out.println("pm2: " + pm2); System.exit(0); } }, 500); // run() after 500 milliseconds }
} ///:~
class Pair { // Not thread-safe
private int x, y; public Pair(int x, int y) { this.x = x; this.y = y; } public Pair() { this(0, 0); } public int getX() { return x; } public int getY() { return y; } public void incrementX() { x++; } public void incrementY() { y++; } public String toString() { return "x: " + x + ", y: " + y; } public class PairValuesNotEqualException extends RuntimeException { public PairValuesNotEqualException() { super("Pair values not equal: " + Pair.this); } } // Arbitrary invariant -- both variables must be equal: public void checkState() { if (x != y) throw new PairValuesNotEqualException(); }
} // Protect a Pair inside a thread-safe class: abstract class PairManager {
protected Pair p = new Pair(); private List storage = new ArrayList(); public synchronized Pair getPair() { // Make a copy to keep the original safe: return new Pair(p.getX(), p.getY()); } protected void store() { storage.add(getPair()); } // A "template method": public abstract void doTask();
} // Synchronize the entire method: class PairManager1 extends PairManager {
public synchronized void doTask() { p.incrementX(); p.incrementY(); store(); }
} // Use a critical section: class PairManager2 extends PairManager {
public void doTask() { synchronized (this) { p.incrementX(); p.incrementY(); } store(); }
} class PairManipulator extends Thread {
private PairManager pm; private int checkCounter = 0; private class PairChecker extends Thread { PairChecker() { start(); } public void run() { while (true) { checkCounter++; pm.getPair().checkState(); } } } public PairManipulator(PairManager pm) { this.pm = pm; start(); new PairChecker(); } public void run() { while (true) { pm.doTask(); } } public String toString() { return "Pair: " + pm.getPair() + " checkCounter = " + checkCounter; }
}
</source>
Synchronizing on another object
<source lang="java">
// : c13:SyncObject.java // Synchronizing on another object // From "Thinking in Java, 3rd ed." (c) Bruce Eckel 2002 // www.BruceEckel.ru. See copyright notice in CopyRight.txt. class DualSynch {
private Object syncObject = new Object(); public synchronized void f() { System.out.println("Inside f()"); // Doesn"t release lock: try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("Leaving f()"); } public void g() { synchronized (syncObject) { System.out.println("Inside g()"); try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("Leaving g()"); } }
} public class SyncObject {
public static void main(String[] args) { final DualSynch ds = new DualSynch(); new Thread() { public void run() { ds.f(); } }.start(); ds.g(); }
} ///:~
</source>
Thread deadlock
<source lang="java">
public class Deadlock extends Object {
private String objID; public Deadlock(String id) { objID = id; } public synchronized void checkOther(Deadlock other) { print("entering checkOther()"); try { Thread.sleep(2000); } catch (InterruptedException x) { } print("invoke "other.action()""); other.action(); print("leaving checkOther()"); } public synchronized void action() { print("entering action()"); // simulate some work here try { Thread.sleep(500); } catch (InterruptedException x) { } print("leaving action()"); } public void print(String msg) { threadPrint("objID=" + objID + " - " + msg); } public static void threadPrint(String msg) { String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": " + msg); } public static void main(String[] args) { final Deadlock obj1 = new Deadlock("Thread 1"); final Deadlock obj2 = new Deadlock("Thread 2"); Runnable runA = new Runnable() { public void run() { obj1.checkOther(obj2); } }; Thread thread = new Thread(runA, "A"); thread.start(); try { Thread.sleep(200); } catch (InterruptedException x) { } Runnable runB = new Runnable() { public void run() { obj2.checkOther(obj1); } }; Thread threadB = new Thread(runB, "B"); threadB.start(); try { Thread.sleep(5000); } catch (InterruptedException x) { } threadPrint("finished sleeping"); threadPrint("about to interrupt() threadA"); thread.interrupt(); try { Thread.sleep(1000); } catch (InterruptedException x) { } threadPrint("about to interrupt() threadB"); threadB.interrupt(); try { Thread.sleep(1000); } catch (InterruptedException x) { } threadPrint("did that break the deadlock?"); }
}
</source>
Thread: Dining Philosophers
<source lang="java">
/* From http://java.sun.ru/docs/books/tutorial/index.html */ /*
* Copyright (c) 2006 Sun Microsystems, Inc. All Rights Reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * -Redistribution of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * -Redistribution in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * Neither the name of Sun Microsystems, Inc. or the names of contributors may * be used to endorse or promote products derived from this software without * specific prior written permission. * * This software is provided "AS IS," without a warranty of any kind. ALL * EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING * ANY IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE * OR NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN MIDROSYSTEMS, INC. ("SUN") * AND ITS LICENSORS SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE * AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS * DERIVATIVES. IN NO EVENT WILL SUN OR ITS LICENSORS BE LIABLE FOR ANY LOST * REVENUE, PROFIT OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL, CONSEQUENTIAL, * INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND REGARDLESS OF THE THEORY * OF LIABILITY, ARISING OUT OF THE USE OF OR INABILITY TO USE THIS SOFTWARE, * EVEN IF SUN HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. * * You acknowledge that this software is not designed, licensed or intended * for use in the design, construction, operation or maintenance of any * nuclear facility. */
import java.awt.Color; import java.awt.Dimension; import java.awt.GridBagConstraints; import java.awt.GridBagLayout; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.net.URL; import javax.swing.BorderFactory; import javax.swing.ImageIcon; import javax.swing.JButton; import javax.swing.JLabel; import javax.swing.JPanel; import javax.swing.JSlider; import javax.swing.event.ChangeEvent; import javax.swing.event.ChangeListener; public class DiningPhilosophers extends javax.swing.JApplet implements
ActionListener, ChangeListener { private JButton stopStartButton = new JButton("start"); // delays can go from 0 to 10,000 milliseconds, initial value is 500 int grabDelay = 500; private JSlider grabDelaySlider = new JSlider(JSlider.HORIZONTAL, 0, 100, 5); private JLabel label = new JLabel(" 500 milliseconds"); private JPanel philosopherArea; public ImageIcon[] imgs = new ImageIcon[3]; Chopstick[] chopsticks = new Chopstick[NUMPHILS]; String[] names = { "Arisduktle", "Dukrates", "Pythagoduke", "Duko", "Dukimedes" }; static final int NUMPHILS = 5; static final int HUNGRYDUKE = 0; static final int RIGHTSPOONDUKE = 1; static final int BOTHSPOONSDUKE = 2; private int width = 0; private int height = 0; private double spacing; private static final double MARGIN = 10.0f; private Philosopher[] philosophers = new Philosopher[NUMPHILS]; public void init() { imgs[HUNGRYDUKE] = new ImageIcon(getURL("images/hungryduke.gif")); imgs[RIGHTSPOONDUKE] = new ImageIcon( getURL("images/rightspoonduke.gif")); imgs[BOTHSPOONSDUKE] = new ImageIcon( getURL("images/bothspoonsduke.gif")); width = imgs[HUNGRYDUKE].getIconWidth() + (int) (MARGIN * 2.0); height = imgs[HUNGRYDUKE].getIconHeight() + (int) (MARGIN * 2.0); spacing = width + MARGIN; GridBagLayout gridBag = new GridBagLayout(); GridBagConstraints c = new GridBagConstraints(); JPanel contentPane = new JPanel(); contentPane.setLayout(gridBag); philosopherArea = new JPanel(null); philosopherArea.setBackground(Color.white); Dimension preferredSize = createPhilosophersAndChopsticks(); philosopherArea.setBorder(BorderFactory.createCompoundBorder( BorderFactory.createLoweredBevelBorder(), BorderFactory .createEmptyBorder(5, 5, 5, 5))); philosopherArea.setPreferredSize(preferredSize); c.fill = GridBagConstraints.BOTH; c.weighty = 1.0; c.gridwidth = GridBagConstraints.REMAINDER; //end row gridBag.setConstraints(philosopherArea, c); contentPane.add(philosopherArea); c.fill = GridBagConstraints.HORIZONTAL; c.weightx = 1.0; c.weighty = 0.0; gridBag.setConstraints(stopStartButton, c); contentPane.add(stopStartButton); c.gridwidth = GridBagConstraints.RELATIVE; //don"t end row c.weightx = 1.0; c.weighty = 0.0; gridBag.setConstraints(grabDelaySlider, c); contentPane.add(grabDelaySlider); c.weightx = 0.0; c.gridwidth = GridBagConstraints.REMAINDER; //end row gridBag.setConstraints(label, c); contentPane.add(label); contentPane.setBorder(BorderFactory.createEmptyBorder(5, 5, 5, 5)); setContentPane(contentPane); stopStartButton.addActionListener(this); grabDelaySlider.addChangeListener(this); } public void actionPerformed(ActionEvent e) { if (stopStartButton.getText().equals("stop/reset")) { stopPhilosophers(); stopStartButton.setText("start"); } else if (stopStartButton.getText().equals("start")) { startPhilosophers(); stopStartButton.setText("stop/reset"); } } public void stateChanged(ChangeEvent e) { JSlider source = (JSlider) e.getSource(); grabDelay = source.getValue() * 100; label.setText(String.valueOf(grabDelay + " milliseconds")); } public void startPhilosophers() { for (int i = 0; i < NUMPHILS; i++) philosophers[i].philThread.start(); } public void stopPhilosophers() { for (int i = 0; i < NUMPHILS; i++) philosophers[i].philThread.interrupt(); } public Dimension createPhilosophersAndChopsticks() { double x, y; double radius = 80.0; double centerAdj = 85.0; double radians; Dimension preferredSize = new Dimension(0, 0); /* * for a straight line y = MARGIN; */ for (int i = 0; i < NUMPHILS; i++) chopsticks[i] = new Chopstick(); for (int i = 0; i < NUMPHILS; i++) { /* * for a straight line x = i * spacing; */ radians = i * (2.0 * Math.PI / (double) NUMPHILS); x = Math.sin(radians) * radius + centerAdj; y = Math.cos(radians) * radius + centerAdj; philosophers[i] = new Philosopher(this, i, imgs[HUNGRYDUKE]); philosophers[i].setBounds((int) x, (int) y, width, height); philosopherArea.add(philosophers[i]); if ((int) x > preferredSize.width) preferredSize.width = (int) x; if ((int) y > preferredSize.height) preferredSize.height = (int) y; } preferredSize.width += width; preferredSize.height += height; return preferredSize; } protected URL getURL(String filename) { URL codeBase = getCodeBase(); URL url = null; try { url = new URL(codeBase, filename); } catch (java.net.MalformedURLException e) { System.out.println("Couldn"t create image: " + "badly specified URL"); return null; } return url; }
} /*
* This class requires no changes from the 1.0 version. It"s kept here so the * rest of the example can compile. */
class Philosopher extends JLabel implements Runnable {
private Chopstick leftStick, rightStick; private boolean sated; private DiningPhilosophers parent; private int position; Thread philThread = null; public Philosopher(DiningPhilosophers parent, int position, ImageIcon img) { super(parent.names[position], img, JLabel.CENTER); this.parent = parent; this.position = position; setVerticalTextPosition(JLabel.BOTTOM); setHorizontalTextPosition(JLabel.CENTER); // identify the chopsticks to my right and left this.rightStick = parent.chopsticks[position]; if (position == 0) { this.leftStick = parent.chopsticks[parent.NUMPHILS - 1]; } else { this.leftStick = parent.chopsticks[position - 1]; } // I"m hungry this.sated = false; philThread = new Thread(this); } public void run() { try { while (true) { Thread.sleep((int) (Math.random() * parent.grabDelay)); setText(" "); rightStick.grab(); setIcon(parent.imgs[parent.RIGHTSPOONDUKE]); Thread.sleep((int) (Math.random() * parent.grabDelay)); leftStick.grab(); setIcon(parent.imgs[parent.BOTHSPOONSDUKE]); Thread.sleep((int) (Math.random() * parent.grabDelay)); rightStick.release(); leftStick.release(); setIcon(parent.imgs[parent.HUNGRYDUKE]); setText("Mmmm!"); sated = true; Thread.sleep((int) (Math.random() * parent.grabDelay * 4)); sated = false; } } catch (java.lang.InterruptedException e) { } leftStick.releaseIfMine(); rightStick.releaseIfMine(); setIcon(parent.imgs[parent.HUNGRYDUKE]); setText(parent.names[position]); sated = false; philThread = new Thread(this); }
} class Chopstick {
Thread holder = null; public synchronized void grab() throws InterruptedException { while (holder != null) wait(); holder = Thread.currentThread(); } public synchronized void release() { holder = null; notify(); } public synchronized void releaseIfMine() { if (holder == Thread.currentThread()) holder = null; notify(); }
}
</source>
Thread notify
<source lang="java">
import java.util.Collections; import java.util.LinkedList; import java.util.List; public class EarlyNotify extends Object {
private List list; public EarlyNotify() { list = Collections.synchronizedList(new LinkedList()); } public String removeItem() throws InterruptedException { synchronized (list) { while (list.isEmpty()) { print("wait()"); list.wait(); print("done with wait()"); } String item = (String) list.remove(0); return item; } } public void addItem(String item) { print("entering"); synchronized (list) { list.add(item); print("added: "" + item + """); list.notifyAll(); print("notified"); } print("leaving"); } private static void print(String msg) { String name = Thread.currentThread().getName(); System.out.println(name + ": " + msg); } public static void main(String[] args) { final EarlyNotify enf = new EarlyNotify(); Runnable runA = new Runnable() { public void run() { try { String item = enf.removeItem(); print("returned: "" + item + """); } catch (InterruptedException ix) { print("interrupted!"); } catch (Exception x) { print("threw an Exception!!!\n" + x); } } }; Runnable runB = new Runnable() { public void run() { enf.addItem("Hello!"); } }; try { Thread threadA1 = new Thread(runA, "A"); threadA1.start(); Thread.sleep(500); Thread threadA2 = new Thread(runA, "B"); threadA2.start(); Thread.sleep(500); Thread threadB = new Thread(runB, "C"); threadB.start(); Thread.sleep(1000); threadA1.interrupt(); threadA2.interrupt(); } catch (InterruptedException x) { } }
}
</source>
Threads join
<source lang="java">
public class JoinDemo extends Object {
public static Thread createThread(String name, long napTime) { final long sleepTime = napTime; Runnable r = new Runnable() { public void run() { try { print("in run() - entering"); Thread.sleep(sleepTime); } catch ( InterruptedException x ) { print("interrupted!"); } finally { print("in run() - leaving"); } } }; Thread t = new Thread(r, name); t.start(); return t; } private static void print(String msg) { String name = Thread.currentThread().getName(); System.out.println(name + ": " + msg); } public static void main(String[] args) { Thread[] t = new Thread[3]; t[0] = createThread("thread A", 2000); t[1] = createThread("thread B", 1000); t[2] = createThread("thread C", 3000); for ( int i = 0; i < t.length; i++ ) { try { String idxStr = "thread[" + i + "]"; String name = "[" + t[i].getName() + "]"; print(idxStr + ".isAlive()=" + t[i].isAlive() + " " + name); print("about to do: " + idxStr + ".join() " + name); long start = System.currentTimeMillis(); t[i].join(); // wait for the thread to die long stop = System.currentTimeMillis(); print(idxStr + ".join() - took " + ( stop - start ) + " ms " + name); } catch ( InterruptedException x ) { print("interrupted waiting on #" + i); } } }
}
</source>
Thread synchronization
<source lang="java">
public class TwoObjects extends Object {
private String objID; public TwoObjects(String objID) { this.objID = objID; } public synchronized void synchronizedMethod(int val) { print("entering synchronized method()"); int num = val * 2 + objID.length(); print("in doStuff() - local variable num=" + num); try { Thread.sleep(2000); } catch (InterruptedException x) { } print("leaving synchronized method"); } public void print(String msg) { threadPrint("objID=" + objID + " - " + msg); } public static void threadPrint(String msg) { String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": " + msg); } public static void main(String[] args) { final TwoObjects obj1 = new TwoObjects("obj1"); final TwoObjects obj2 = new TwoObjects("obj2"); Runnable runA = new Runnable() { public void run() { obj1.synchronizedMethod(3); } }; Thread threadA = new Thread(runA, "threadA"); threadA.start(); try { Thread.sleep(200); } catch (InterruptedException x) { } Runnable runB = new Runnable() { public void run() { obj2.synchronizedMethod(7); } }; Thread threadB = new Thread(runB, "threadB"); threadB.start(); }
}
</source>