Java/Threads/Producer Consumer
Содержание
Fibonacci: producer and consumer
/*
Java Threads, 3rd Edition
By Scott Oaks, Henry Wong
3rd Edition September 2004
ISBN: 0-596-00782-5
*/
import java.util.*;
import java.util.concurrent.*;
public class FibonacciTest {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue;
queue = new ArrayBlockingQueue<Integer>(10);
new FibonacciProducer(queue);
int nThreads = Integer.parseInt(args[0]);
for (int i = 0; i < nThreads; i++)
new FibonacciConsumer(queue);
}
}
class Fibonacci {
private ConcurrentMap<Integer, Integer> values =
new ConcurrentHashMap<Integer, Integer>();
public int calculate(int x) {
if (x < 0)
throw new IllegalArgumentException("positive numbers only");
if (x <= 1)
return x;
return calculate(x-1) + calculate(x-2);
}
public int calculateWithCache(int x) {
Integer key = new Integer(x);
Integer result = values.get(key);
if (result == null) {
result = new Integer(calculate(x));
values.putIfAbsent(key, result);
}
return result.intValue();
}
public int calculateOnlyWithCache(int x) {
Integer v1 = values.get(new Integer(x-1));
Integer v2 = values.get(new Integer(x-2));
Integer key = new Integer(x);
Integer result = values.get(key);
if (result != null)
return result.intValue();
if ((v1 == null) || (v2 == null))
throw new IllegalArgumentException("values not in cache");
result = new Integer(v1.intValue() + v2.intValue());
values.putIfAbsent(key, result);
return result.intValue();
}
public void calculateRangeInCache(int x, int y) {
calculateWithCache(x++);
calculateWithCache(x++);
while (x <= y) {
calculateOnlyWithCache(x++);
}
}
}
class FibonacciProducer implements Runnable {
private Thread thr;
private BlockingQueue<Integer> queue;
public FibonacciProducer(BlockingQueue<Integer> q) {
queue = q;
thr = new Thread(this);
thr.start();
}
public void run() {
try {
for(int x=0;;x++) {
Thread.sleep(1000);
queue.put(new Integer(x));
System.out.println("Produced request " + x);
}
} catch (InterruptedException ex) {
}
}
}
class FibonacciConsumer implements Runnable {
private Fibonacci fib = new Fibonacci();
private Thread thr;
private BlockingQueue<Integer> queue;
public FibonacciConsumer(BlockingQueue<Integer> q) {
queue = q;
thr = new Thread(this);
thr.start();
}
public void run() {
int request, result;
try {
while (true) {
request = queue.take().intValue();
result = fib.calculateWithCache(request);
System.out.println("Calculated result of " + result + " from " + request);
}
} catch (InterruptedException ex) {
}
}
}
Producer and Comsumer
public class ProducerComsumer extends Object {
private Object slot;
public ProducerComsumer() {
slot = null; // null indicates empty
}
public synchronized void putIn(Object obj)
throws InterruptedException {
while ( slot != null ) {
wait();
}
slot = obj; // put object into slot
notifyAll(); // signal that slot has been filled
}
public synchronized Object takeOut()
throws InterruptedException {
while ( slot == null ) {
wait(); // wait while slot is empty
}
Object obj = slot;
slot = null; // mark slot as empty
notifyAll(); // signal that slot is empty
return obj;
}
public static void main(String[] args) {
final ProducerComsumer ch = new ProducerComsumer();
Runnable runA = new Runnable() {
public void run() {
try {
String str;
Thread.sleep(500);
str = "multithreaded";
ch.putIn(str);
str = "programming";
ch.putIn(str);
str = "with Java";
ch.putIn(str);
} catch ( InterruptedException x ) {
x.printStackTrace();
}
}
};
Runnable runB = new Runnable() {
public void run() {
try {
Object obj;
obj = ch.takeOut();
System.out.println("in run() - just took out: "" +
obj + """);
Thread.sleep(500);
obj = ch.takeOut();
System.out.println("in run() - just took out: "" +
obj + """);
obj = ch.takeOut();
System.out.println("in run() - just took out: "" +
obj + """);
} catch ( InterruptedException x ) {
x.printStackTrace();
}
}
};
Thread threadA = new Thread(runA, "threadA");
threadA.start();
Thread threadB = new Thread(runB, "threadB");
threadB.start();
}
}
Producer consumer for J2SE 1.5 using concurrent
/*
* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
* All rights reserved. Software written by Ian F. Darwin and others.
* $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
* cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
* pioneering role in inventing and promulgating (and standardizing) the Java
* language and environment is gratefully acknowledged.
*
* The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
* inventing predecessor languages C and C++ is also gratefully acknowledged.
*/
import java.util.*;
import java.io.*;
import java.util.concurrent.*;
/** Producer-Consumer in Java, for J2SE 1.5 using concurrent.
*/
public class ProdCons15 {
protected boolean done = false;
/** Inner class representing the Producer side */
class Producer implements Runnable {
protected BlockingQueue queue;
Producer(BlockingQueue theQueue) { this.queue = theQueue; }
public void run() {
try {
while (true) {
Object justProduced = getRequestFromNetwork();
queue.put(justProduced);
System.out.println("Produced 1 object; List size now " + queue.size());
if (done) {
return;
}
}
} catch (InterruptedException ex) {
System.out.println("Producer INTERRUPTED");
}
}
Object getRequestFromNetwork() { // Simulation of reading from client
try {
Thread.sleep(10); // simulate time passing during read
} catch (InterruptedException ex) {
System.out.println("Producer Read INTERRUPTED");
}
return(new Object());
}
}
/** Inner class representing the Consumer side */
class Consumer implements Runnable {
protected BlockingQueue queue;
Consumer(BlockingQueue theQueue) { this.queue = theQueue; }
public void run() {
try {
while (true) {
Object obj = queue.take();
int len = queue.size();
System.out.println("List size now " + len);
process(obj);
if (done) {
return;
}
}
} catch (InterruptedException ex) {
System.out.println("CONSUMER INTERRUPTED");
}
}
void process(Object obj) {
// Thread.sleep(xxx) // Simulate time passing
System.out.println("Consuming object " + obj);
}
}
ProdCons15(int nP, int nC) {
BlockingQueue myQueue = new LinkedBlockingQueue();
for (int i=0; i<nP; i++)
new Thread(new Producer(myQueue)).start();
for (int i=0; i<nC; i++)
new Thread(new Consumer(myQueue)).start();
}
public static void main(String[] args)
throws IOException, InterruptedException {
// Start producers and consumers
int numProducers = 4;
int numConsumers = 3;
ProdCons15 pc = new ProdCons15(numProducers, numConsumers);
// Let the simulation run for, say, 10 seconds
Thread.sleep(10*1000);
// End of simulation - shut down gracefully
pc.done = true;
}
}
Producer consumer in Java 1
/*
* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
* All rights reserved. Software written by Ian F. Darwin and others.
* $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
* cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
* pioneering role in inventing and promulgating (and standardizing) the Java
* language and environment is gratefully acknowledged.
*
* The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
* inventing predecessor languages C and C++ is also gratefully acknowledged.
*/
import java.util.*;
import java.io.*;
/** Producer-Consumer in Java. Version 1.
*/
public class ProdCons1 {
protected LinkedList list = new LinkedList();
protected void produce() {
int len = 0;
synchronized(list) {
Object justProduced = new Object();
list.addFirst(justProduced);
len = list.size();
list.notifyAll();
}
System.out.println("List size now " + len);
}
protected void consume() {
Object obj = null;
int len = 0;
synchronized(list) {
while (list.size() == 0) {
try {
list.wait();
} catch (InterruptedException ex) {
return;
}
}
obj = list.removeLast();
len = list.size();
}
System.out.println("Consuming object " + obj);
System.out.println("List size now " + len);
}
public static void main(String[] args) throws IOException {
ProdCons1 pc = new ProdCons1();
System.out.println("Ready (p to produce, c to consume):");
int i;
while ((i = System.in.read()) != -1) {
char ch = (char)i;
switch(ch) {
case "p": pc.produce(); break;
case "c": pc.consume(); break;
}
}
}
}
Producer consumer in Java 2
/*
* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
* All rights reserved. Software written by Ian F. Darwin and others.
* $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
* cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
* pioneering role in inventing and promulgating (and standardizing) the Java
* language and environment is gratefully acknowledged.
*
* The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
* inventing predecessor languages C and C++ is also gratefully acknowledged.
*/
import java.util.*;
import java.io.*;
/** Producer-consumer in Java, Take II.
*/
public class ProdCons2 {
/** Throughout the code, this is the object we synchronize on so this
* is also the object we wait() and notifyAll() on.
*/
protected LinkedList list = new LinkedList();
protected int MAX = 10;
protected boolean done = false; // Also protected by lock on list.
/** Inner class representing the Producer side */
class Producer extends Thread {
public void run() {
while (true) {
Object justProduced = getRequestFromNetwork();
// Get request from the network - outside the synch section.
// We"re simulating this actually reading from a client, and it
// might have to wait for hours if the client is having coffee.
synchronized(list) {
while (list.size() == MAX) // queue "full"
try {
System.out.println("Producer WAITING");
list.wait(); // Limit the size
} catch (InterruptedException ex) {
System.out.println("Producer INTERRUPTED");
}
list.addFirst(justProduced);
list.notifyAll(); // must own the lock
System.out.println("Produced 1; List size now " + list.size());
if (done)
break;
// yield(); // Useful for green threads & demo programs.
}
}
}
Object getRequestFromNetwork() { // Simulation of reading from client
// try {
// Thread.sleep(10); // simulate time passing during read
// } catch (InterruptedException ex) {
// System.out.println("Producer Read INTERRUPTED");
// }
return(new Object());
}
}
/** Inner class representing the Consumer side */
class Consumer extends Thread {
public void run() {
while (true) {
Object obj = null;
synchronized(list) {
while (list.size() == 0) {
try {
System.out.println("CONSUMER WAITING");
list.wait(); // must own the lock
} catch (InterruptedException ex) {
System.out.println("CONSUMER INTERRUPTED");
}
}
obj = list.removeLast();
list.notifyAll();
int len = list.size();
System.out.println("List size now " + len);
if (done)
break;
}
process(obj); // Outside synch section (could take time)
//yield(); DITTO
}
}
void process(Object obj) {
// Thread.sleep(xxx) // Simulate time passing
System.out.println("Consuming object " + obj);
}
}
ProdCons2(int nP, int nC) {
for (int i=0; i<nP; i++)
new Producer().start();
for (int i=0; i<nC; i++)
new Consumer().start();
}
public static void main(String[] args)
throws IOException, InterruptedException {
// Start producers and consumers
int numProducers = 4;
int numConsumers = 3;
ProdCons2 pc = new ProdCons2(numProducers, numConsumers);
// Let it run for, say, 10 seconds
Thread.sleep(10*1000);
// End of simulation - shut down gracefully
synchronized(pc.list) {
pc.done = true;
pc.list.notifyAll();
}
}
}
Producer Consumer Test
/* From http://java.sun.ru/docs/books/tutorial/index.html */
/*
* Copyright (c) 2006 Sun Microsystems, Inc. All Rights Reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* -Redistribution of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* -Redistribution in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* Neither the name of Sun Microsystems, Inc. or the names of contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING
* ANY IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE
* OR NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN MIDROSYSTEMS, INC. ("SUN")
* AND ITS LICENSORS SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE
* AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS
* DERIVATIVES. IN NO EVENT WILL SUN OR ITS LICENSORS BE LIABLE FOR ANY LOST
* REVENUE, PROFIT OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL, CONSEQUENTIAL,
* INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND REGARDLESS OF THE THEORY
* OF LIABILITY, ARISING OUT OF THE USE OF OR INABILITY TO USE THIS SOFTWARE,
* EVEN IF SUN HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
*
* You acknowledge that this software is not designed, licensed or intended
* for use in the design, construction, operation or maintenance of any
* nuclear facility.
*/
public class ProducerConsumerTest {
public static void main(String[] args) {
CubbyHole c = new CubbyHole();
Producer p1 = new Producer(c, 1);
Consumer c1 = new Consumer(c, 1);
p1.start();
c1.start();
}
}
class CubbyHole {
private int contents;
private boolean available = false;
public synchronized int get() {
while (available == false) {
try {
wait();
} catch (InterruptedException e) { }
}
available = false;
notifyAll();
return contents;
}
public synchronized void put(int value) {
while (available == true) {
try {
wait();
} catch (InterruptedException e) { }
}
contents = value;
available = true;
notifyAll();
}
}
class Consumer extends Thread {
private CubbyHole cubbyhole;
private int number;
public Consumer(CubbyHole c, int number) {
cubbyhole = c;
this.number = number;
}
public void run() {
int value = 0;
for (int i = 0; i < 10; i++) {
value = cubbyhole.get();
System.out.println("Consumer #" + this.number
+ " got: " + value);
}
}
}
class Producer extends Thread {
private CubbyHole cubbyhole;
private int number;
public Producer(CubbyHole c, int number) {
cubbyhole = c;
this.number = number;
}
public void run() {
for (int i = 0; i < 10; i++) {
cubbyhole.put(i);
System.out.println("Producer #" + this.number
+ " put: " + i);
try {
sleep((int)(Math.random() * 100));
} catch (InterruptedException e) { }
}
}
}
The producer-consumer approach to thread cooperation
// : c13:Restaurant.java
// The producer-consumer approach to thread cooperation.
// From "Thinking in Java, 3rd ed." (c) Bruce Eckel 2002
// www.BruceEckel.ru. See copyright notice in CopyRight.txt.
class Order {
private static int i = 0;
private int count = i++;
public Order() {
if (count == 10) {
System.out.println("Out of food, closing");
System.exit(0);
}
}
public String toString() {
return "Order " + count;
}
}
class WaitPerson extends Thread {
private Restaurant restaurant;
public WaitPerson(Restaurant r) {
restaurant = r;
start();
}
public void run() {
while (true) {
while (restaurant.order == null)
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("Waitperson got " + restaurant.order);
restaurant.order = null;
}
}
}
class Chef extends Thread {
private Restaurant restaurant;
private WaitPerson waitPerson;
public Chef(Restaurant r, WaitPerson w) {
restaurant = r;
waitPerson = w;
start();
}
public void run() {
while (true) {
if (restaurant.order == null) {
restaurant.order = new Order();
System.out.print("Order up! ");
synchronized (waitPerson) {
waitPerson.notify();
}
}
try {
sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class Restaurant {
Order order; // Package access
public static void main(String[] args) {
Restaurant restaurant = new Restaurant();
WaitPerson waitPerson = new WaitPerson(restaurant);
Chef chef = new Chef(restaurant, waitPerson);
}
} ///:~