Java Tutorial/Thread/Producer and consumer

Материал из Java эксперт
Перейти к: навигация, поиск

A queue(LinkedList) is used to coordinate work between a producer and a set of worker threads.

import java.util.LinkedList;
public class Main {
  public static void main(String[] argv) {
    WorkQueue queue = new WorkQueue();
    int numWorkers = 2;
    Worker[] workers = new Worker[numWorkers];
    for (int i = 0; i < workers.length; i++) {
      workers[i] = new Worker(queue);
      workers[i].start();
    }
    for (int i = 0; i < 100; i++) {
      queue.addWork(i);
    }
  }
}
class WorkQueue {
  LinkedList<Object> queue = new LinkedList<Object>();
  public synchronized void addWork(Object o) {
    queue.addLast(o);
    notify();
  }
  public synchronized Object getWork() throws InterruptedException {
    while (queue.isEmpty()) {
      wait();
    }
    return queue.removeFirst();
  }
}
class Worker extends Thread {
  WorkQueue q;
  Worker(WorkQueue q) {
    this.q = q;
  }
  public void run() {
    try {
      while (true) {
        Object x = q.getWork();
        if (x == null) {
          break;
        }
        System.out.println(x);
      }
    } catch (InterruptedException e) {
    }
  }
}





Producer and comsumer with DataInputStream and DataOutputStream

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class MainClass {
  public static void main (String[] args) throws IOException {
    PipedOutputStream pout = new PipedOutputStream();
    PipedInputStream pin = new PipedInputStream(pout);
    NumberProducer fw = new NumberProducer(pout, 20);
    NumberConsumer fr = new NumberConsumer(pin);
    fw.start();
    fr.start();
  }
}
class NumberProducer extends Thread {
  private DataOutputStream theOutput;
  private int howMany;
  public NumberProducer(OutputStream out, int howMany) {
    theOutput = new DataOutputStream(out);
    this.howMany = howMany;
  }
  public void run() {
    try {
      for (int i = 0; i < howMany; i++) {
        theOutput.writeInt(i);
      }
    }
    catch (IOException ex) { System.err.println(ex); }
  }
}
 class NumberConsumer extends Thread {
  private DataInputStream theInput;
  public NumberConsumer(InputStream in) {
    theInput = new DataInputStream(in);
  }
  public void run() {
    try {
      while (true) {
        System.out.println(theInput.readInt());
      }
    }
    catch (IOException ex) {
      if (ex.getMessage().equals("Pipe broken")
        || ex.getMessage().equals("Write end dead")) {
        // normal termination
        return;
      }
      ex.printStackTrace();
    }
  }
}





Producer and consumer based on ReadableByteChannel and WritableByteChannel

import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
public class MainClass {
  public static void main(String[] args) throws IOException {
    Pipe pipe = Pipe.open();
    WritableByteChannel out = pipe.sink();
    ReadableByteChannel in = pipe.source();
    NumberProducer producer = new NumberProducer(out, 200);
    NumberConsumer consumer = new NumberConsumer(in);
    producer.start();
    consumer.start();
  }
}
class NumberConsumer extends Thread {
  private ReadableByteChannel in;
  public NumberConsumer(ReadableByteChannel in) {
    this.in = in;
  }
  public void run() {
    ByteBuffer sizeb = ByteBuffer.allocate(4);
    try {
      while (sizeb.hasRemaining())
        in.read(sizeb);
      sizeb.flip();
      int howMany = sizeb.getInt();
      sizeb.clear();
      for (int i = 0; i < howMany; i++) {
        while (sizeb.hasRemaining())
          in.read(sizeb);
        sizeb.flip();
        int length = sizeb.getInt();
        sizeb.clear();
        ByteBuffer data = ByteBuffer.allocate(length);
        while (data.hasRemaining())
          in.read(data);
        BigInteger result = new BigInteger(data.array());
        System.out.println(result);
      }
    } catch (IOException ex) {
      System.err.println(ex);
    } finally {
      try {
        in.close();
      } catch (Exception ex) {
        // We tried
      }
    }
  }
}
class NumberProducer extends Thread {
  private WritableByteChannel out;
  private int howMany;
  public NumberProducer(WritableByteChannel out, int howMany) {
    this.out = out;
    this.howMany = howMany;
  }
  public void run() {
    try {
      ByteBuffer buffer = ByteBuffer.allocate(4);
      buffer.putInt(this.howMany);
      buffer.flip();
      while (buffer.hasRemaining())
        out.write(buffer);
      for (int i = 0; i < howMany; i++) {
        byte[] data = new BigInteger(Integer.toString(i)).toByteArray();
        buffer = ByteBuffer.allocate(4 + data.length);
        buffer.putInt(data.length);
        buffer.put(data);
        buffer.flip();
        while (buffer.hasRemaining())
          out.write(buffer);
      }
      out.close();
      System.err.println("Closed");
    } catch (IOException ex) {
      System.err.println(ex);
    }
  }
}





Producer, consumer and Queue

import java.util.Vector;
class Producer extends Thread {
  Queue queue;
  Producer(Queue queue) {
    this.queue = queue;
  }
  public void run() {
    int i = 0;
    while(true) {
      queue.add(i++);
    }
  }
}
class Consumer extends Thread {
  String str;
  Queue queue;
  Consumer(String str, Queue queue) {
    this.str = str;
    this.queue = queue;
  }
  public void run() {
    while(true) {
      System.out.println(str + ": " + queue.remove());
    }
  }
}
class Queue {
  private final static int SIZE = 5;
  private Vector queue = new Vector();
  private int count = 0;
  
  synchronized void add(int i) {
    while(count == SIZE) {
      try {
        wait();
      }
      catch(InterruptedException ie) {
        ie.printStackTrace();
        System.exit(0);
      }
    }
    queue.addElement(new Integer(i));
    ++count;
    notifyAll();
  }
  synchronized int remove() {
    while(count == 0) {
      try {
        wait();
      }
      catch(InterruptedException ie) {
        ie.printStackTrace();
        System.exit(0);
      }
    }
    Integer iobj = (Integer)queue.firstElement();
    queue.removeElement(iobj);
    --count;
    notifyAll();
    return iobj.intValue();
  }
}
class ProducerConsumers {
  public static void main(String args[]) {
    Queue queue = new Queue();
    new Producer(queue).start();
    new Consumer("ConsumerA", queue).start();
    new Consumer("ConsumerB", queue).start();
    new Consumer("ConsumerC", queue).start();
  }
}





Synchronized Queue with Producer and Consumer

public class ThreadTester {
  public static void main(String[] args) {
    SynchronizedQueue<String> queue = new SynchronizedQueue<String>(10);
    final int GREETING_COUNT = 100;
    Runnable run1 = new Producer("Hello, World!", queue, GREETING_COUNT);
    Runnable run2 = new Producer("Goodbye, World!", queue, GREETING_COUNT);
    Runnable run3 = new Consumer(queue, 2 * GREETING_COUNT);
    Thread thread1 = new Thread(run1);
    Thread thread2 = new Thread(run2);
    Thread thread3 = new Thread(run3);
    thread1.start();
    thread2.start();
    thread3.start();
  }
}
class Producer implements Runnable {
  private String greeting;
  private SynchronizedQueue<String> queue;
  private int greetingCount;
  public Producer(String aGreeting, SynchronizedQueue<String> aQueue, int count) {
    greeting = aGreeting;
    queue = aQueue;
    greetingCount = count;
  }
  public void run() {
    try {
      int i = 1;
      while (i <= greetingCount) {
        queue.add(i + ": " + greeting);
        i++;
        Thread.sleep(2000);
      }
    } catch (InterruptedException exception) {
    }
  }
}
class Consumer implements Runnable {
  private SynchronizedQueue<String> queue;
  private int greetingCount;
  public Consumer(SynchronizedQueue<String> aQueue, int count) {
    queue = aQueue;
    greetingCount = count;
  }
  public void run() {
    try {
      int i = 1;
      while (i <= greetingCount) {
        String greeting = queue.remove();
        System.out.println(greeting);
        i++;
        Thread.sleep(3000);
      }
    } catch (InterruptedException exception) {
    }
  }
}
class SynchronizedQueue<V> {
  private Object[] elements;
  private int head;
  private int tail;
  private int size;
  public SynchronizedQueue(int capacity) {
    elements = new Object[capacity];
    head = 0;
    tail = 0;
    size = 0;
  }
  public synchronized V remove() throws InterruptedException {
    while (size == 0)
      wait();
    V r = (V) elements[head];
    head++;
    size--;
    if (head == elements.length)
      head = 0;
    notifyAll();
    return r;
  }
  public synchronized void add(V newValue) throws InterruptedException {
    while (size == elements.length)
      wait();
    elements[tail] = newValue;
    tail++;
    size++;
    if (tail == elements.length)
      tail = 0;
    notifyAll();
  }
}