Java Tutorial/Thread/BlockingQueue
Communicate between threads using a Queue
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class PrepareProduction implements Runnable {
BlockingQueue<String> queue;
PrepareProduction(BlockingQueue<String> q) {
queue = q;
}
public void run() {
String thisLine;
try {
queue.put("1");
queue.put("done");
} catch (Exception e) {
e.printStackTrace();
}
}
}
class DoProduction implements Runnable {
private final BlockingQueue<String> queue;
DoProduction(BlockingQueue<String> q) {
queue = q;
}
public void run() {
try {
String value = queue.take();
while (!value.equals("done")) {
value = queue.take();
System.out.println(value);
}
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + " "
+ e.getMessage());
}
}
}
public class Main {
public static void main(String[] args) throws Exception {
BlockingQueue<String> q = new LinkedBlockingQueue<String>();
Thread p1 = new Thread(new PrepareProduction(q));
Thread c1 = new Thread(new DoProduction(q));
p1.start();
c1.start();
p1.join();
c1.join();
}
}
Creating a Bounded Work Queue
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Main {
public static void main(String[] argv) throws Exception {
int capacity = 10;
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
int numWorkers = 2;
Worker[] workers = new Worker[numWorkers];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker(queue);
workers[i].start();
}
for (int i = 0; i < 100; i++) {
queue.put(i);
}
}
}
class Worker extends Thread {
BlockingQueue<Integer> q;
Worker(BlockingQueue<Integer> q) {
this.q = q;
}
public void run() {
try {
while (true) {
Integer x = q.take();
if (x == null) {
break;
}
System.out.println(x);
}
} catch (InterruptedException e) {
}
}
}
Implementing an Unbounded Work Queue with synchronized block
import java.util.LinkedList;
public class Main {
public static void main(String[] argv) {
WorkQueue queue = new WorkQueue();
int numWorkers = 2;
Worker[] workers = new Worker[numWorkers];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker(queue);
workers[i].start();
}
for (int i = 0; i < 100; i++) {
queue.addWork(i);
}
}
}
class WorkQueue {
LinkedList<Object> queue = new LinkedList<Object>();
public synchronized void addWork(Object o) {
queue.addLast(o);
notify();
}
public synchronized Object getWork() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
return queue.removeFirst();
}
}
class Worker extends Thread {
WorkQueue q;
Worker(WorkQueue q) {
this.q = q;
}
public void run() {
try {
while (true) {
Object x = q.getWork();
if (x == null) {
break;
}
System.out.println(x);
}
} catch (InterruptedException e) {
}
}
}