Java Tutorial/Thread/Producer and consumer

Материал из Java эксперт
Перейти к: навигация, поиск

A queue(LinkedList) is used to coordinate work between a producer and a set of worker threads.

   <source lang="java">

import java.util.LinkedList; public class Main {

 public static void main(String[] argv) {
   WorkQueue queue = new WorkQueue();
   int numWorkers = 2;
   Worker[] workers = new Worker[numWorkers];
   for (int i = 0; i < workers.length; i++) {
     workers[i] = new Worker(queue);
     workers[i].start();
   }
   for (int i = 0; i < 100; i++) {
     queue.addWork(i);
   }
 }

} class WorkQueue {

 LinkedList<Object> queue = new LinkedList<Object>();
 public synchronized void addWork(Object o) {
   queue.addLast(o);
   notify();
 }
 public synchronized Object getWork() throws InterruptedException {
   while (queue.isEmpty()) {
     wait();
   }
   return queue.removeFirst();
 }

} class Worker extends Thread {

 WorkQueue q;
 Worker(WorkQueue q) {
   this.q = q;
 }
 public void run() {
   try {
     while (true) {
       Object x = q.getWork();
       if (x == null) {
         break;
       }
       System.out.println(x);
     }
   } catch (InterruptedException e) {
   }
 }

}</source>





Producer and comsumer with DataInputStream and DataOutputStream

   <source lang="java">

import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class MainClass {

 public static void main (String[] args) throws IOException {
   PipedOutputStream pout = new PipedOutputStream();
   PipedInputStream pin = new PipedInputStream(pout);
   NumberProducer fw = new NumberProducer(pout, 20);
   NumberConsumer fr = new NumberConsumer(pin);
   fw.start();
   fr.start();
 }

} class NumberProducer extends Thread {

 private DataOutputStream theOutput;
 private int howMany;
 public NumberProducer(OutputStream out, int howMany) {
   theOutput = new DataOutputStream(out);
   this.howMany = howMany;
 }
 public void run() {
   try {
     for (int i = 0; i < howMany; i++) {
       theOutput.writeInt(i);
     }
   }
   catch (IOException ex) { System.err.println(ex); }
 }

}

class NumberConsumer extends Thread {
 private DataInputStream theInput;
 public NumberConsumer(InputStream in) {
   theInput = new DataInputStream(in);
 }
 public void run() {
   try {
     while (true) {
       System.out.println(theInput.readInt());
     }
   }
   catch (IOException ex) {
     if (ex.getMessage().equals("Pipe broken")
       || ex.getMessage().equals("Write end dead")) {
       // normal termination
       return;
     }
     ex.printStackTrace();
   }
 }

}</source>





Producer and consumer based on ReadableByteChannel and WritableByteChannel

   <source lang="java">

import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.channels.Pipe; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; public class MainClass {

 public static void main(String[] args) throws IOException {
   Pipe pipe = Pipe.open();
   WritableByteChannel out = pipe.sink();
   ReadableByteChannel in = pipe.source();
   NumberProducer producer = new NumberProducer(out, 200);
   NumberConsumer consumer = new NumberConsumer(in);
   producer.start();
   consumer.start();
 }

} class NumberConsumer extends Thread {

 private ReadableByteChannel in;
 public NumberConsumer(ReadableByteChannel in) {
   this.in = in;
 }
 public void run() {
   ByteBuffer sizeb = ByteBuffer.allocate(4);
   try {
     while (sizeb.hasRemaining())
       in.read(sizeb);
     sizeb.flip();
     int howMany = sizeb.getInt();
     sizeb.clear();
     for (int i = 0; i < howMany; i++) {
       while (sizeb.hasRemaining())
         in.read(sizeb);
       sizeb.flip();
       int length = sizeb.getInt();
       sizeb.clear();
       ByteBuffer data = ByteBuffer.allocate(length);
       while (data.hasRemaining())
         in.read(data);
       BigInteger result = new BigInteger(data.array());
       System.out.println(result);
     }
   } catch (IOException ex) {
     System.err.println(ex);
   } finally {
     try {
       in.close();
     } catch (Exception ex) {
       // We tried
     }
   }
 }

} class NumberProducer extends Thread {

 private WritableByteChannel out;
 private int howMany;
 public NumberProducer(WritableByteChannel out, int howMany) {
   this.out = out;
   this.howMany = howMany;
 }
 public void run() {
   try {
     ByteBuffer buffer = ByteBuffer.allocate(4);
     buffer.putInt(this.howMany);
     buffer.flip();
     while (buffer.hasRemaining())
       out.write(buffer);
     for (int i = 0; i < howMany; i++) {
       byte[] data = new BigInteger(Integer.toString(i)).toByteArray();
       buffer = ByteBuffer.allocate(4 + data.length);
       buffer.putInt(data.length);
       buffer.put(data);
       buffer.flip();
       while (buffer.hasRemaining())
         out.write(buffer);
     }
     out.close();
     System.err.println("Closed");
   } catch (IOException ex) {
     System.err.println(ex);
   }
 }

}</source>





Producer, consumer and Queue

   <source lang="java">

import java.util.Vector; class Producer extends Thread {

 Queue queue;
 Producer(Queue queue) {
   this.queue = queue;
 }
 public void run() {
   int i = 0;
   while(true) {
     queue.add(i++);
   }
 }

} class Consumer extends Thread {

 String str;
 Queue queue;
 Consumer(String str, Queue queue) {
   this.str = str;
   this.queue = queue;
 }
 public void run() {
   while(true) {
     System.out.println(str + ": " + queue.remove());
   }
 }

} class Queue {

 private final static int SIZE = 5;
 private Vector queue = new Vector();
 private int count = 0;
 
 synchronized void add(int i) {
   while(count == SIZE) {
     try {
       wait();
     }
     catch(InterruptedException ie) {
       ie.printStackTrace();
       System.exit(0);
     }
   }
   queue.addElement(new Integer(i));
   ++count;
   notifyAll();
 }
 synchronized int remove() {
   while(count == 0) {
     try {
       wait();
     }
     catch(InterruptedException ie) {
       ie.printStackTrace();
       System.exit(0);
     }
   }
   Integer iobj = (Integer)queue.firstElement();
   queue.removeElement(iobj);
   --count;
   notifyAll();
   return iobj.intValue();
 }

} class ProducerConsumers {

 public static void main(String args[]) {
   Queue queue = new Queue();
   new Producer(queue).start();
   new Consumer("ConsumerA", queue).start();
   new Consumer("ConsumerB", queue).start();
   new Consumer("ConsumerC", queue).start();
 }

}</source>





Synchronized Queue with Producer and Consumer

   <source lang="java">

public class ThreadTester {

 public static void main(String[] args) {
   SynchronizedQueue<String> queue = new SynchronizedQueue<String>(10);
   final int GREETING_COUNT = 100;
   Runnable run1 = new Producer("Hello, World!", queue, GREETING_COUNT);
   Runnable run2 = new Producer("Goodbye, World!", queue, GREETING_COUNT);
   Runnable run3 = new Consumer(queue, 2 * GREETING_COUNT);
   Thread thread1 = new Thread(run1);
   Thread thread2 = new Thread(run2);
   Thread thread3 = new Thread(run3);
   thread1.start();
   thread2.start();
   thread3.start();
 }

} class Producer implements Runnable {

 private String greeting;
 private SynchronizedQueue<String> queue;
 private int greetingCount;
 public Producer(String aGreeting, SynchronizedQueue<String> aQueue, int count) {
   greeting = aGreeting;
   queue = aQueue;
   greetingCount = count;
 }
 public void run() {
   try {
     int i = 1;
     while (i <= greetingCount) {
       queue.add(i + ": " + greeting);
       i++;
       Thread.sleep(2000);
     }
   } catch (InterruptedException exception) {
   }
 }

} class Consumer implements Runnable {

 private SynchronizedQueue<String> queue;
 private int greetingCount;
 public Consumer(SynchronizedQueue<String> aQueue, int count) {
   queue = aQueue;
   greetingCount = count;
 }
 public void run() {
   try {
     int i = 1;
     while (i <= greetingCount) {
       String greeting = queue.remove();
       System.out.println(greeting);
       i++;
       Thread.sleep(3000);
     }
   } catch (InterruptedException exception) {
   }
 }

} class SynchronizedQueue<V> {

 private Object[] elements;
 private int head;
 private int tail;
 private int size;
 public SynchronizedQueue(int capacity) {
   elements = new Object[capacity];
   head = 0;
   tail = 0;
   size = 0;
 }
 public synchronized V remove() throws InterruptedException {
   while (size == 0)
     wait();
   V r = (V) elements[head];
   head++;
   size--;
   if (head == elements.length)
     head = 0;
   notifyAll();
   return r;
 }
 public synchronized void add(V newValue) throws InterruptedException {
   while (size == elements.length)
     wait();
   elements[tail] = newValue;
   tail++;
   size++;
   if (tail == elements.length)
     tail = 0;
   notifyAll();
 }

}</source>