Java Tutorial/Thread/BlockingQueue
Communicate between threads using a Queue
<source lang="java">
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; class PrepareProduction implements Runnable {
BlockingQueue<String> queue; PrepareProduction(BlockingQueue<String> q) { queue = q; } public void run() { String thisLine; try { queue.put("1"); queue.put("done"); } catch (Exception e) { e.printStackTrace(); } }
} class DoProduction implements Runnable {
private final BlockingQueue<String> queue; DoProduction(BlockingQueue<String> q) { queue = q; } public void run() { try { String value = queue.take(); while (!value.equals("done")) { value = queue.take(); System.out.println(value); } } catch (Exception e) { System.out.println(Thread.currentThread().getName() + " " + e.getMessage()); } }
} public class Main {
public static void main(String[] args) throws Exception { BlockingQueue<String> q = new LinkedBlockingQueue<String>(); Thread p1 = new Thread(new PrepareProduction(q)); Thread c1 = new Thread(new DoProduction(q)); p1.start(); c1.start(); p1.join(); c1.join(); }
}</source>
Creating a Bounded Work Queue
<source lang="java">
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Main {
public static void main(String[] argv) throws Exception { int capacity = 10; BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity); int numWorkers = 2; Worker[] workers = new Worker[numWorkers]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker(queue); workers[i].start(); } for (int i = 0; i < 100; i++) { queue.put(i); } }
} class Worker extends Thread {
BlockingQueue<Integer> q; Worker(BlockingQueue<Integer> q) { this.q = q; } public void run() { try { while (true) { Integer x = q.take(); if (x == null) { break; } System.out.println(x); } } catch (InterruptedException e) { } }
}</source>
Implementing an Unbounded Work Queue with synchronized block
<source lang="java">
import java.util.LinkedList; public class Main {
public static void main(String[] argv) { WorkQueue queue = new WorkQueue(); int numWorkers = 2; Worker[] workers = new Worker[numWorkers]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker(queue); workers[i].start(); } for (int i = 0; i < 100; i++) { queue.addWork(i); } }
} class WorkQueue {
LinkedList<Object> queue = new LinkedList<Object>(); public synchronized void addWork(Object o) { queue.addLast(o); notify(); } public synchronized Object getWork() throws InterruptedException { while (queue.isEmpty()) { wait(); } return queue.removeFirst(); }
} class Worker extends Thread {
WorkQueue q; Worker(WorkQueue q) { this.q = q; } public void run() { try { while (true) { Object x = q.getWork(); if (x == null) { break; } System.out.println(x); } } catch (InterruptedException e) { } }
}</source>