Java Tutorial/Thread/Producer and consumer
Содержание
- 1 A queue(LinkedList) is used to coordinate work between a producer and a set of worker threads.
- 2 Producer and comsumer with DataInputStream and DataOutputStream
- 3 Producer and consumer based on ReadableByteChannel and WritableByteChannel
- 4 Producer, consumer and Queue
- 5 Synchronized Queue with Producer and Consumer
A queue(LinkedList) is used to coordinate work between a producer and a set of worker threads.
<source lang="java">
import java.util.LinkedList; public class Main {
public static void main(String[] argv) { WorkQueue queue = new WorkQueue(); int numWorkers = 2; Worker[] workers = new Worker[numWorkers]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker(queue); workers[i].start(); } for (int i = 0; i < 100; i++) { queue.addWork(i); } }
} class WorkQueue {
LinkedList<Object> queue = new LinkedList<Object>(); public synchronized void addWork(Object o) { queue.addLast(o); notify(); } public synchronized Object getWork() throws InterruptedException { while (queue.isEmpty()) { wait(); } return queue.removeFirst(); }
} class Worker extends Thread {
WorkQueue q; Worker(WorkQueue q) { this.q = q; } public void run() { try { while (true) { Object x = q.getWork(); if (x == null) { break; } System.out.println(x); } } catch (InterruptedException e) { } }
}</source>
Producer and comsumer with DataInputStream and DataOutputStream
<source lang="java">
import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class MainClass {
public static void main (String[] args) throws IOException { PipedOutputStream pout = new PipedOutputStream(); PipedInputStream pin = new PipedInputStream(pout); NumberProducer fw = new NumberProducer(pout, 20); NumberConsumer fr = new NumberConsumer(pin); fw.start(); fr.start(); }
} class NumberProducer extends Thread {
private DataOutputStream theOutput; private int howMany; public NumberProducer(OutputStream out, int howMany) { theOutput = new DataOutputStream(out); this.howMany = howMany; } public void run() { try { for (int i = 0; i < howMany; i++) { theOutput.writeInt(i); } } catch (IOException ex) { System.err.println(ex); } }
}
class NumberConsumer extends Thread { private DataInputStream theInput; public NumberConsumer(InputStream in) { theInput = new DataInputStream(in); } public void run() { try { while (true) { System.out.println(theInput.readInt()); } } catch (IOException ex) { if (ex.getMessage().equals("Pipe broken") || ex.getMessage().equals("Write end dead")) { // normal termination return; } ex.printStackTrace(); } }
}</source>
Producer and consumer based on ReadableByteChannel and WritableByteChannel
<source lang="java">
import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.channels.Pipe; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; public class MainClass {
public static void main(String[] args) throws IOException { Pipe pipe = Pipe.open(); WritableByteChannel out = pipe.sink(); ReadableByteChannel in = pipe.source(); NumberProducer producer = new NumberProducer(out, 200); NumberConsumer consumer = new NumberConsumer(in); producer.start(); consumer.start(); }
} class NumberConsumer extends Thread {
private ReadableByteChannel in; public NumberConsumer(ReadableByteChannel in) { this.in = in; } public void run() { ByteBuffer sizeb = ByteBuffer.allocate(4); try { while (sizeb.hasRemaining()) in.read(sizeb); sizeb.flip(); int howMany = sizeb.getInt(); sizeb.clear(); for (int i = 0; i < howMany; i++) { while (sizeb.hasRemaining()) in.read(sizeb); sizeb.flip(); int length = sizeb.getInt(); sizeb.clear(); ByteBuffer data = ByteBuffer.allocate(length); while (data.hasRemaining()) in.read(data); BigInteger result = new BigInteger(data.array()); System.out.println(result); } } catch (IOException ex) { System.err.println(ex); } finally { try { in.close(); } catch (Exception ex) { // We tried } } }
} class NumberProducer extends Thread {
private WritableByteChannel out; private int howMany; public NumberProducer(WritableByteChannel out, int howMany) { this.out = out; this.howMany = howMany; } public void run() { try { ByteBuffer buffer = ByteBuffer.allocate(4); buffer.putInt(this.howMany); buffer.flip(); while (buffer.hasRemaining()) out.write(buffer); for (int i = 0; i < howMany; i++) { byte[] data = new BigInteger(Integer.toString(i)).toByteArray(); buffer = ByteBuffer.allocate(4 + data.length); buffer.putInt(data.length); buffer.put(data); buffer.flip(); while (buffer.hasRemaining()) out.write(buffer); } out.close(); System.err.println("Closed"); } catch (IOException ex) { System.err.println(ex); } }
}</source>
Producer, consumer and Queue
<source lang="java">
import java.util.Vector; class Producer extends Thread {
Queue queue; Producer(Queue queue) { this.queue = queue; } public void run() { int i = 0; while(true) { queue.add(i++); } }
} class Consumer extends Thread {
String str; Queue queue; Consumer(String str, Queue queue) { this.str = str; this.queue = queue; } public void run() { while(true) { System.out.println(str + ": " + queue.remove()); } }
} class Queue {
private final static int SIZE = 5; private Vector queue = new Vector(); private int count = 0; synchronized void add(int i) { while(count == SIZE) { try { wait(); } catch(InterruptedException ie) { ie.printStackTrace(); System.exit(0); } } queue.addElement(new Integer(i)); ++count; notifyAll(); } synchronized int remove() { while(count == 0) { try { wait(); } catch(InterruptedException ie) { ie.printStackTrace(); System.exit(0); } } Integer iobj = (Integer)queue.firstElement(); queue.removeElement(iobj); --count; notifyAll(); return iobj.intValue(); }
} class ProducerConsumers {
public static void main(String args[]) { Queue queue = new Queue(); new Producer(queue).start(); new Consumer("ConsumerA", queue).start(); new Consumer("ConsumerB", queue).start(); new Consumer("ConsumerC", queue).start(); }
}</source>
Synchronized Queue with Producer and Consumer
<source lang="java">
public class ThreadTester {
public static void main(String[] args) { SynchronizedQueue<String> queue = new SynchronizedQueue<String>(10); final int GREETING_COUNT = 100; Runnable run1 = new Producer("Hello, World!", queue, GREETING_COUNT); Runnable run2 = new Producer("Goodbye, World!", queue, GREETING_COUNT); Runnable run3 = new Consumer(queue, 2 * GREETING_COUNT); Thread thread1 = new Thread(run1); Thread thread2 = new Thread(run2); Thread thread3 = new Thread(run3); thread1.start(); thread2.start(); thread3.start(); }
} class Producer implements Runnable {
private String greeting; private SynchronizedQueue<String> queue; private int greetingCount; public Producer(String aGreeting, SynchronizedQueue<String> aQueue, int count) { greeting = aGreeting; queue = aQueue; greetingCount = count; } public void run() { try { int i = 1; while (i <= greetingCount) { queue.add(i + ": " + greeting); i++; Thread.sleep(2000); } } catch (InterruptedException exception) { } }
} class Consumer implements Runnable {
private SynchronizedQueue<String> queue; private int greetingCount; public Consumer(SynchronizedQueue<String> aQueue, int count) { queue = aQueue; greetingCount = count; } public void run() { try { int i = 1; while (i <= greetingCount) { String greeting = queue.remove(); System.out.println(greeting); i++; Thread.sleep(3000); } } catch (InterruptedException exception) { } }
} class SynchronizedQueue<V> {
private Object[] elements; private int head; private int tail; private int size; public SynchronizedQueue(int capacity) { elements = new Object[capacity]; head = 0; tail = 0; size = 0; } public synchronized V remove() throws InterruptedException { while (size == 0) wait(); V r = (V) elements[head]; head++; size--; if (head == elements.length) head = 0; notifyAll(); return r; } public synchronized void add(V newValue) throws InterruptedException { while (size == elements.length) wait(); elements[tail] = newValue; tail++; size++; if (tail == elements.length) tail = 0; notifyAll(); }
}</source>