Java/Threads/Producer Consumer
Содержание
Fibonacci: producer and consumer
<source lang="java">
/* Java Threads, 3rd Edition By Scott Oaks, Henry Wong 3rd Edition September 2004 ISBN: 0-596-00782-5
- /
import java.util.*; import java.util.concurrent.*; public class FibonacciTest {
public static void main(String[] args) { ArrayBlockingQueue<Integer> queue; queue = new ArrayBlockingQueue<Integer>(10); new FibonacciProducer(queue); int nThreads = Integer.parseInt(args[0]); for (int i = 0; i < nThreads; i++) new FibonacciConsumer(queue); }
} class Fibonacci {
private ConcurrentMap<Integer, Integer> values = new ConcurrentHashMap<Integer, Integer>(); public int calculate(int x) { if (x < 0) throw new IllegalArgumentException("positive numbers only"); if (x <= 1) return x; return calculate(x-1) + calculate(x-2); } public int calculateWithCache(int x) { Integer key = new Integer(x); Integer result = values.get(key); if (result == null) { result = new Integer(calculate(x)); values.putIfAbsent(key, result); } return result.intValue(); } public int calculateOnlyWithCache(int x) { Integer v1 = values.get(new Integer(x-1)); Integer v2 = values.get(new Integer(x-2)); Integer key = new Integer(x); Integer result = values.get(key); if (result != null) return result.intValue(); if ((v1 == null) || (v2 == null)) throw new IllegalArgumentException("values not in cache"); result = new Integer(v1.intValue() + v2.intValue()); values.putIfAbsent(key, result); return result.intValue(); } public void calculateRangeInCache(int x, int y) { calculateWithCache(x++); calculateWithCache(x++); while (x <= y) { calculateOnlyWithCache(x++); } }
} class FibonacciProducer implements Runnable {
private Thread thr; private BlockingQueue<Integer> queue; public FibonacciProducer(BlockingQueue<Integer> q) { queue = q; thr = new Thread(this); thr.start(); } public void run() { try { for(int x=0;;x++) { Thread.sleep(1000); queue.put(new Integer(x)); System.out.println("Produced request " + x); } } catch (InterruptedException ex) { } }
} class FibonacciConsumer implements Runnable {
private Fibonacci fib = new Fibonacci(); private Thread thr; private BlockingQueue<Integer> queue; public FibonacciConsumer(BlockingQueue<Integer> q) { queue = q; thr = new Thread(this); thr.start(); } public void run() { int request, result; try { while (true) { request = queue.take().intValue(); result = fib.calculateWithCache(request); System.out.println("Calculated result of " + result + " from " + request); } } catch (InterruptedException ex) { } }
}
</source>
Producer and Comsumer
<source lang="java">
public class ProducerComsumer extends Object {
private Object slot; public ProducerComsumer() { slot = null; // null indicates empty } public synchronized void putIn(Object obj) throws InterruptedException { while ( slot != null ) { wait(); } slot = obj; // put object into slot notifyAll(); // signal that slot has been filled } public synchronized Object takeOut() throws InterruptedException { while ( slot == null ) { wait(); // wait while slot is empty } Object obj = slot; slot = null; // mark slot as empty notifyAll(); // signal that slot is empty return obj; } public static void main(String[] args) { final ProducerComsumer ch = new ProducerComsumer(); Runnable runA = new Runnable() { public void run() { try { String str; Thread.sleep(500); str = "multithreaded"; ch.putIn(str); str = "programming"; ch.putIn(str); str = "with Java"; ch.putIn(str); } catch ( InterruptedException x ) { x.printStackTrace(); } } }; Runnable runB = new Runnable() { public void run() { try { Object obj; obj = ch.takeOut(); System.out.println("in run() - just took out: "" + obj + """); Thread.sleep(500); obj = ch.takeOut(); System.out.println("in run() - just took out: "" + obj + """); obj = ch.takeOut(); System.out.println("in run() - just took out: "" + obj + """); } catch ( InterruptedException x ) { x.printStackTrace(); } } }; Thread threadA = new Thread(runA, "threadA"); threadA.start(); Thread threadB = new Thread(runB, "threadB"); threadB.start(); }
}
</source>
Producer consumer for J2SE 1.5 using concurrent
<source lang="java">
/*
* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002. * All rights reserved. Software written by Ian F. Darwin and others. * $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS"" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee * cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s, * pioneering role in inventing and promulgating (and standardizing) the Java * language and environment is gratefully acknowledged. * * The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for * inventing predecessor languages C and C++ is also gratefully acknowledged. */
import java.util.*; import java.io.*; import java.util.concurrent.*; /** Producer-Consumer in Java, for J2SE 1.5 using concurrent.
*/
public class ProdCons15 {
protected boolean done = false; /** Inner class representing the Producer side */ class Producer implements Runnable { protected BlockingQueue queue; Producer(BlockingQueue theQueue) { this.queue = theQueue; } public void run() { try { while (true) { Object justProduced = getRequestFromNetwork(); queue.put(justProduced); System.out.println("Produced 1 object; List size now " + queue.size()); if (done) { return; } } } catch (InterruptedException ex) { System.out.println("Producer INTERRUPTED"); } } Object getRequestFromNetwork() { // Simulation of reading from client try { Thread.sleep(10); // simulate time passing during read } catch (InterruptedException ex) { System.out.println("Producer Read INTERRUPTED"); } return(new Object()); } } /** Inner class representing the Consumer side */ class Consumer implements Runnable { protected BlockingQueue queue; Consumer(BlockingQueue theQueue) { this.queue = theQueue; } public void run() { try { while (true) { Object obj = queue.take(); int len = queue.size(); System.out.println("List size now " + len); process(obj); if (done) { return; } } } catch (InterruptedException ex) { System.out.println("CONSUMER INTERRUPTED"); } } void process(Object obj) { // Thread.sleep(xxx) // Simulate time passing System.out.println("Consuming object " + obj); } } ProdCons15(int nP, int nC) { BlockingQueue myQueue = new LinkedBlockingQueue(); for (int i=0; i<nP; i++) new Thread(new Producer(myQueue)).start(); for (int i=0; i<nC; i++) new Thread(new Consumer(myQueue)).start(); } public static void main(String[] args) throws IOException, InterruptedException { // Start producers and consumers int numProducers = 4; int numConsumers = 3; ProdCons15 pc = new ProdCons15(numProducers, numConsumers); // Let the simulation run for, say, 10 seconds Thread.sleep(10*1000); // End of simulation - shut down gracefully pc.done = true; }
}
</source>
Producer consumer in Java 1
<source lang="java">
/*
* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002. * All rights reserved. Software written by Ian F. Darwin and others. * $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS"" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee * cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s, * pioneering role in inventing and promulgating (and standardizing) the Java * language and environment is gratefully acknowledged. * * The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for * inventing predecessor languages C and C++ is also gratefully acknowledged. */
import java.util.*; import java.io.*; /** Producer-Consumer in Java. Version 1.
*/
public class ProdCons1 {
protected LinkedList list = new LinkedList(); protected void produce() { int len = 0; synchronized(list) { Object justProduced = new Object(); list.addFirst(justProduced); len = list.size(); list.notifyAll(); } System.out.println("List size now " + len); } protected void consume() { Object obj = null; int len = 0; synchronized(list) { while (list.size() == 0) { try { list.wait(); } catch (InterruptedException ex) { return; } } obj = list.removeLast(); len = list.size(); } System.out.println("Consuming object " + obj); System.out.println("List size now " + len); } public static void main(String[] args) throws IOException { ProdCons1 pc = new ProdCons1(); System.out.println("Ready (p to produce, c to consume):"); int i; while ((i = System.in.read()) != -1) { char ch = (char)i; switch(ch) { case "p": pc.produce(); break; case "c": pc.consume(); break; } } }
}
</source>
Producer consumer in Java 2
<source lang="java">
/*
* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002. * All rights reserved. Software written by Ian F. Darwin and others. * $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS"" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee * cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s, * pioneering role in inventing and promulgating (and standardizing) the Java * language and environment is gratefully acknowledged. * * The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for * inventing predecessor languages C and C++ is also gratefully acknowledged. */
import java.util.*; import java.io.*; /** Producer-consumer in Java, Take II.
*/
public class ProdCons2 {
/** Throughout the code, this is the object we synchronize on so this * is also the object we wait() and notifyAll() on. */ protected LinkedList list = new LinkedList(); protected int MAX = 10; protected boolean done = false; // Also protected by lock on list. /** Inner class representing the Producer side */ class Producer extends Thread { public void run() { while (true) { Object justProduced = getRequestFromNetwork(); // Get request from the network - outside the synch section. // We"re simulating this actually reading from a client, and it // might have to wait for hours if the client is having coffee. synchronized(list) { while (list.size() == MAX) // queue "full" try { System.out.println("Producer WAITING"); list.wait(); // Limit the size } catch (InterruptedException ex) { System.out.println("Producer INTERRUPTED"); } list.addFirst(justProduced); list.notifyAll(); // must own the lock System.out.println("Produced 1; List size now " + list.size()); if (done) break; // yield(); // Useful for green threads & demo programs. } } } Object getRequestFromNetwork() { // Simulation of reading from client // try { // Thread.sleep(10); // simulate time passing during read // } catch (InterruptedException ex) { // System.out.println("Producer Read INTERRUPTED"); // } return(new Object()); } } /** Inner class representing the Consumer side */ class Consumer extends Thread { public void run() { while (true) { Object obj = null; synchronized(list) { while (list.size() == 0) { try { System.out.println("CONSUMER WAITING"); list.wait(); // must own the lock } catch (InterruptedException ex) { System.out.println("CONSUMER INTERRUPTED"); } } obj = list.removeLast(); list.notifyAll(); int len = list.size(); System.out.println("List size now " + len); if (done) break; } process(obj); // Outside synch section (could take time) //yield(); DITTO } } void process(Object obj) { // Thread.sleep(xxx) // Simulate time passing System.out.println("Consuming object " + obj); } } ProdCons2(int nP, int nC) { for (int i=0; i<nP; i++) new Producer().start(); for (int i=0; i<nC; i++) new Consumer().start(); } public static void main(String[] args) throws IOException, InterruptedException { // Start producers and consumers int numProducers = 4; int numConsumers = 3; ProdCons2 pc = new ProdCons2(numProducers, numConsumers); // Let it run for, say, 10 seconds Thread.sleep(10*1000); // End of simulation - shut down gracefully synchronized(pc.list) { pc.done = true; pc.list.notifyAll(); } }
}
</source>
Producer Consumer Test
<source lang="java">
/* From http://java.sun.ru/docs/books/tutorial/index.html */ /*
* Copyright (c) 2006 Sun Microsystems, Inc. All Rights Reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * -Redistribution of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * -Redistribution in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * Neither the name of Sun Microsystems, Inc. or the names of contributors may * be used to endorse or promote products derived from this software without * specific prior written permission. * * This software is provided "AS IS," without a warranty of any kind. ALL * EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING * ANY IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE * OR NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN MIDROSYSTEMS, INC. ("SUN") * AND ITS LICENSORS SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE * AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS * DERIVATIVES. IN NO EVENT WILL SUN OR ITS LICENSORS BE LIABLE FOR ANY LOST * REVENUE, PROFIT OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL, CONSEQUENTIAL, * INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND REGARDLESS OF THE THEORY * OF LIABILITY, ARISING OUT OF THE USE OF OR INABILITY TO USE THIS SOFTWARE, * EVEN IF SUN HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. * * You acknowledge that this software is not designed, licensed or intended * for use in the design, construction, operation or maintenance of any * nuclear facility. */
public class ProducerConsumerTest {
public static void main(String[] args) { CubbyHole c = new CubbyHole(); Producer p1 = new Producer(c, 1); Consumer c1 = new Consumer(c, 1); p1.start(); c1.start(); }
} class CubbyHole {
private int contents; private boolean available = false; public synchronized int get() { while (available == false) { try { wait(); } catch (InterruptedException e) { } } available = false; notifyAll(); return contents; } public synchronized void put(int value) { while (available == true) { try { wait(); } catch (InterruptedException e) { } } contents = value; available = true; notifyAll(); }
}
class Consumer extends Thread {
private CubbyHole cubbyhole; private int number; public Consumer(CubbyHole c, int number) { cubbyhole = c; this.number = number; } public void run() { int value = 0; for (int i = 0; i < 10; i++) { value = cubbyhole.get(); System.out.println("Consumer #" + this.number + " got: " + value); } }
}
class Producer extends Thread {
private CubbyHole cubbyhole; private int number; public Producer(CubbyHole c, int number) { cubbyhole = c; this.number = number; } public void run() { for (int i = 0; i < 10; i++) { cubbyhole.put(i); System.out.println("Producer #" + this.number + " put: " + i); try { sleep((int)(Math.random() * 100)); } catch (InterruptedException e) { } } }
}
</source>
The producer-consumer approach to thread cooperation
<source lang="java">
// : c13:Restaurant.java // The producer-consumer approach to thread cooperation. // From "Thinking in Java, 3rd ed." (c) Bruce Eckel 2002 // www.BruceEckel.ru. See copyright notice in CopyRight.txt. class Order {
private static int i = 0; private int count = i++; public Order() { if (count == 10) { System.out.println("Out of food, closing"); System.exit(0); } } public String toString() { return "Order " + count; }
} class WaitPerson extends Thread {
private Restaurant restaurant; public WaitPerson(Restaurant r) { restaurant = r; start(); } public void run() { while (true) { while (restaurant.order == null) synchronized (this) { try { wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } System.out.println("Waitperson got " + restaurant.order); restaurant.order = null; } }
} class Chef extends Thread {
private Restaurant restaurant; private WaitPerson waitPerson; public Chef(Restaurant r, WaitPerson w) { restaurant = r; waitPerson = w; start(); } public void run() { while (true) { if (restaurant.order == null) { restaurant.order = new Order(); System.out.print("Order up! "); synchronized (waitPerson) { waitPerson.notify(); } } try { sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
} public class Restaurant {
Order order; // Package access public static void main(String[] args) { Restaurant restaurant = new Restaurant(); WaitPerson waitPerson = new WaitPerson(restaurant); Chef chef = new Chef(restaurant, waitPerson); }
} ///:~
</source>