Java Tutorial/Thread/BlockingQueue

Материал из Java эксперт
Версия от 15:18, 31 мая 2010; Admin (обсуждение | вклад) (1 версия)
(разн.) ← Предыдущая | Текущая версия (разн.) | Следующая → (разн.)
Перейти к: навигация, поиск

Communicate between threads using a Queue

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class PrepareProduction implements Runnable {
  BlockingQueue<String> queue;
  PrepareProduction(BlockingQueue<String> q) {
    queue = q;
  }
  public void run() {
    String thisLine;
    try {
      queue.put("1");
      queue.put("done");
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
class DoProduction implements Runnable {
  private final BlockingQueue<String> queue;
  DoProduction(BlockingQueue<String> q) {
    queue = q;
  }
  public void run() {
    try {
      String value = queue.take();
      while (!value.equals("done")) {
        value = queue.take();
        System.out.println(value);
      }
    } catch (Exception e) {
      System.out.println(Thread.currentThread().getName() + " "
          + e.getMessage());
    }
  }
}
public class Main {
  public static void main(String[] args) throws Exception {
    BlockingQueue<String> q = new LinkedBlockingQueue<String>();
    Thread p1 = new Thread(new PrepareProduction(q));
    Thread c1 = new Thread(new DoProduction(q));
    p1.start();
    c1.start();
    p1.join();
    c1.join();
  }
}





Creating a Bounded Work Queue

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Main {
  public static void main(String[] argv) throws Exception {
    int capacity = 10;
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
    int numWorkers = 2;
    Worker[] workers = new Worker[numWorkers];
    for (int i = 0; i < workers.length; i++) {
      workers[i] = new Worker(queue);
      workers[i].start();
    }
    for (int i = 0; i < 100; i++) {
      queue.put(i);
    }
  }
}
class Worker extends Thread {
  BlockingQueue<Integer> q;
  Worker(BlockingQueue<Integer> q) {
    this.q = q;
  }
  public void run() {
    try {
      while (true) {
        Integer x = q.take();
        if (x == null) {
          break;
        }
        System.out.println(x);
      }
    } catch (InterruptedException e) {
    }
  }
}





Implementing an Unbounded Work Queue with synchronized block

import java.util.LinkedList;
public class Main {
  public static void main(String[] argv) {
    WorkQueue queue = new WorkQueue();
    int numWorkers = 2;
    Worker[] workers = new Worker[numWorkers];
    for (int i = 0; i < workers.length; i++) {
      workers[i] = new Worker(queue);
      workers[i].start();
    }
    for (int i = 0; i < 100; i++) {
      queue.addWork(i);
    }
  }
}
class WorkQueue {
  LinkedList<Object> queue = new LinkedList<Object>();
  public synchronized void addWork(Object o) {
    queue.addLast(o);
    notify();
  }
  public synchronized Object getWork() throws InterruptedException {
    while (queue.isEmpty()) {
      wait();
    }
    return queue.removeFirst();
  }
}
class Worker extends Thread {
  WorkQueue q;
  Worker(WorkQueue q) {
    this.q = q;
  }
  public void run() {
    try {
      while (true) {
        Object x = q.getWork();
        if (x == null) {
          break;
        }
        System.out.println(x);
      }
    } catch (InterruptedException e) {
    }
  }
}