Java Tutorial/Thread/Producer and consumer
Версия от 17:44, 31 мая 2010; (обсуждение)
Содержание
- 1 A queue(LinkedList) is used to coordinate work between a producer and a set of worker threads.
- 2 Producer and comsumer with DataInputStream and DataOutputStream
- 3 Producer and consumer based on ReadableByteChannel and WritableByteChannel
- 4 Producer, consumer and Queue
- 5 Synchronized Queue with Producer and Consumer
A queue(LinkedList) is used to coordinate work between a producer and a set of worker threads.
import java.util.LinkedList;
public class Main {
public static void main(String[] argv) {
WorkQueue queue = new WorkQueue();
int numWorkers = 2;
Worker[] workers = new Worker[numWorkers];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker(queue);
workers[i].start();
}
for (int i = 0; i < 100; i++) {
queue.addWork(i);
}
}
}
class WorkQueue {
LinkedList<Object> queue = new LinkedList<Object>();
public synchronized void addWork(Object o) {
queue.addLast(o);
notify();
}
public synchronized Object getWork() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
return queue.removeFirst();
}
}
class Worker extends Thread {
WorkQueue q;
Worker(WorkQueue q) {
this.q = q;
}
public void run() {
try {
while (true) {
Object x = q.getWork();
if (x == null) {
break;
}
System.out.println(x);
}
} catch (InterruptedException e) {
}
}
}
Producer and comsumer with DataInputStream and DataOutputStream
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class MainClass {
public static void main (String[] args) throws IOException {
PipedOutputStream pout = new PipedOutputStream();
PipedInputStream pin = new PipedInputStream(pout);
NumberProducer fw = new NumberProducer(pout, 20);
NumberConsumer fr = new NumberConsumer(pin);
fw.start();
fr.start();
}
}
class NumberProducer extends Thread {
private DataOutputStream theOutput;
private int howMany;
public NumberProducer(OutputStream out, int howMany) {
theOutput = new DataOutputStream(out);
this.howMany = howMany;
}
public void run() {
try {
for (int i = 0; i < howMany; i++) {
theOutput.writeInt(i);
}
}
catch (IOException ex) { System.err.println(ex); }
}
}
class NumberConsumer extends Thread {
private DataInputStream theInput;
public NumberConsumer(InputStream in) {
theInput = new DataInputStream(in);
}
public void run() {
try {
while (true) {
System.out.println(theInput.readInt());
}
}
catch (IOException ex) {
if (ex.getMessage().equals("Pipe broken")
|| ex.getMessage().equals("Write end dead")) {
// normal termination
return;
}
ex.printStackTrace();
}
}
}
Producer and consumer based on ReadableByteChannel and WritableByteChannel
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
public class MainClass {
public static void main(String[] args) throws IOException {
Pipe pipe = Pipe.open();
WritableByteChannel out = pipe.sink();
ReadableByteChannel in = pipe.source();
NumberProducer producer = new NumberProducer(out, 200);
NumberConsumer consumer = new NumberConsumer(in);
producer.start();
consumer.start();
}
}
class NumberConsumer extends Thread {
private ReadableByteChannel in;
public NumberConsumer(ReadableByteChannel in) {
this.in = in;
}
public void run() {
ByteBuffer sizeb = ByteBuffer.allocate(4);
try {
while (sizeb.hasRemaining())
in.read(sizeb);
sizeb.flip();
int howMany = sizeb.getInt();
sizeb.clear();
for (int i = 0; i < howMany; i++) {
while (sizeb.hasRemaining())
in.read(sizeb);
sizeb.flip();
int length = sizeb.getInt();
sizeb.clear();
ByteBuffer data = ByteBuffer.allocate(length);
while (data.hasRemaining())
in.read(data);
BigInteger result = new BigInteger(data.array());
System.out.println(result);
}
} catch (IOException ex) {
System.err.println(ex);
} finally {
try {
in.close();
} catch (Exception ex) {
// We tried
}
}
}
}
class NumberProducer extends Thread {
private WritableByteChannel out;
private int howMany;
public NumberProducer(WritableByteChannel out, int howMany) {
this.out = out;
this.howMany = howMany;
}
public void run() {
try {
ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(this.howMany);
buffer.flip();
while (buffer.hasRemaining())
out.write(buffer);
for (int i = 0; i < howMany; i++) {
byte[] data = new BigInteger(Integer.toString(i)).toByteArray();
buffer = ByteBuffer.allocate(4 + data.length);
buffer.putInt(data.length);
buffer.put(data);
buffer.flip();
while (buffer.hasRemaining())
out.write(buffer);
}
out.close();
System.err.println("Closed");
} catch (IOException ex) {
System.err.println(ex);
}
}
}
Producer, consumer and Queue
import java.util.Vector;
class Producer extends Thread {
Queue queue;
Producer(Queue queue) {
this.queue = queue;
}
public void run() {
int i = 0;
while(true) {
queue.add(i++);
}
}
}
class Consumer extends Thread {
String str;
Queue queue;
Consumer(String str, Queue queue) {
this.str = str;
this.queue = queue;
}
public void run() {
while(true) {
System.out.println(str + ": " + queue.remove());
}
}
}
class Queue {
private final static int SIZE = 5;
private Vector queue = new Vector();
private int count = 0;
synchronized void add(int i) {
while(count == SIZE) {
try {
wait();
}
catch(InterruptedException ie) {
ie.printStackTrace();
System.exit(0);
}
}
queue.addElement(new Integer(i));
++count;
notifyAll();
}
synchronized int remove() {
while(count == 0) {
try {
wait();
}
catch(InterruptedException ie) {
ie.printStackTrace();
System.exit(0);
}
}
Integer iobj = (Integer)queue.firstElement();
queue.removeElement(iobj);
--count;
notifyAll();
return iobj.intValue();
}
}
class ProducerConsumers {
public static void main(String args[]) {
Queue queue = new Queue();
new Producer(queue).start();
new Consumer("ConsumerA", queue).start();
new Consumer("ConsumerB", queue).start();
new Consumer("ConsumerC", queue).start();
}
}
Synchronized Queue with Producer and Consumer
public class ThreadTester {
public static void main(String[] args) {
SynchronizedQueue<String> queue = new SynchronizedQueue<String>(10);
final int GREETING_COUNT = 100;
Runnable run1 = new Producer("Hello, World!", queue, GREETING_COUNT);
Runnable run2 = new Producer("Goodbye, World!", queue, GREETING_COUNT);
Runnable run3 = new Consumer(queue, 2 * GREETING_COUNT);
Thread thread1 = new Thread(run1);
Thread thread2 = new Thread(run2);
Thread thread3 = new Thread(run3);
thread1.start();
thread2.start();
thread3.start();
}
}
class Producer implements Runnable {
private String greeting;
private SynchronizedQueue<String> queue;
private int greetingCount;
public Producer(String aGreeting, SynchronizedQueue<String> aQueue, int count) {
greeting = aGreeting;
queue = aQueue;
greetingCount = count;
}
public void run() {
try {
int i = 1;
while (i <= greetingCount) {
queue.add(i + ": " + greeting);
i++;
Thread.sleep(2000);
}
} catch (InterruptedException exception) {
}
}
}
class Consumer implements Runnable {
private SynchronizedQueue<String> queue;
private int greetingCount;
public Consumer(SynchronizedQueue<String> aQueue, int count) {
queue = aQueue;
greetingCount = count;
}
public void run() {
try {
int i = 1;
while (i <= greetingCount) {
String greeting = queue.remove();
System.out.println(greeting);
i++;
Thread.sleep(3000);
}
} catch (InterruptedException exception) {
}
}
}
class SynchronizedQueue<V> {
private Object[] elements;
private int head;
private int tail;
private int size;
public SynchronizedQueue(int capacity) {
elements = new Object[capacity];
head = 0;
tail = 0;
size = 0;
}
public synchronized V remove() throws InterruptedException {
while (size == 0)
wait();
V r = (V) elements[head];
head++;
size--;
if (head == elements.length)
head = 0;
notifyAll();
return r;
}
public synchronized void add(V newValue) throws InterruptedException {
while (size == elements.length)
wait();
elements[tail] = newValue;
tail++;
size++;
if (tail == elements.length)
tail = 0;
notifyAll();
}
}