Java/Design Pattern/Work Thread Pattern
Worker Thread Pattern in Java
//[C] 2002 Sun Microsystems, Inc.---
import java.io.IOException;
import java.io.Serializable;
import java.rmi.Naming;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.Vector;
public class RunWorkerThreadPattern {
private static final String WORKER_SERVER_URL = "//localhost/workerThreadServer";
public static void main(String[] arguments) {
System.out.println("Example for the WorkerThread pattern");
System.out
.println("In this example, a ConcreteQueue object which uses a");
System.out
.println(" worker thread, will retrieve a number of objects from");
System.out.println(" the server.");
System.out.println();
System.out.println("Running the RMI compiler (rmic)");
System.out.println();
try {
Process p1 = Runtime.getRuntime().exec("rmic ServerDataStoreImpl");
p1.waitFor();
} catch (IOException exc) {
System.err
.println("Unable to run rmic utility. Exiting application.");
System.exit(1);
} catch (InterruptedException exc) {
System.err
.println("Threading problems encountered while using the rmic utility.");
}
System.out.println("Starting the rmiregistry");
System.out.println();
Process rmiProcess = null;
try {
rmiProcess = Runtime.getRuntime().exec("rmiregistry");
Thread.sleep(15000);
} catch (IOException exc) {
System.err
.println("Unable to start the rmiregistry. Exiting application.");
System.exit(1);
} catch (InterruptedException exc) {
System.err
.println("Threading problems encountered when starting the rmiregistry.");
}
System.out
.println("Creating the queue, which will be managed by the worker thread");
System.out.println();
ConcreteQueue workQueue = new ConcreteQueue();
System.out
.println("Creating the RMI server object, ServerDataStoreImpl");
System.out.println();
ServerDataStore server = new ServerDataStoreImpl();
System.out.println("Creating AddressRetrievers and ContactRetreivers.");
System.out.println(" These will placed in the queue, as tasks to be");
System.out.println(" performed by the worker thread.");
System.out.println();
AddressRetriever firstAddr = new AddressRetriever(5280L,
WORKER_SERVER_URL);
AddressRetriever secondAddr = new AddressRetriever(2010L,
WORKER_SERVER_URL);
ContactRetriever firstContact = new ContactRetriever(5280L,
WORKER_SERVER_URL);
workQueue.put(firstAddr);
workQueue.put(firstContact);
workQueue.put(secondAddr);
while (!secondAddr.isAddressAvailable()) {
try {
Thread.sleep(1000);
} catch (InterruptedException exc) {
}
}
System.out
.println("WorkerThread completed the processing of its Tasks");
System.out.println("Printing out the retrieved objects now:");
System.out.println();
System.out.println(firstAddr.getAddress());
System.out.println(firstContact.getContact());
System.out.println(secondAddr.getAddress());
}
}
interface Address extends Serializable {
public static final String EOL_STRING = System
.getProperty("line.separator");
public static final String SPACE = " ";
public static final String COMMA = ",";
public String getType();
public String getDescription();
public String getStreet();
public String getCity();
public String getState();
public String getZipCode();
public void setType(String newType);
public void setDescription(String newDescription);
public void setStreet(String newStreet);
public void setCity(String newCity);
public void setState(String newState);
public void setZipCode(String newZip);
}
class AddressImpl implements Address {
private String type;
private String description;
private String street;
private String city;
private String state;
private String zipCode;
public AddressImpl() {
}
public AddressImpl(String newDescription, String newStreet, String newCity,
String newState, String newZipCode) {
description = newDescription;
street = newStreet;
city = newCity;
state = newState;
zipCode = newZipCode;
}
public String getType() {
return type;
}
public String getDescription() {
return description;
}
public String getStreet() {
return street;
}
public String getCity() {
return city;
}
public String getState() {
return state;
}
public String getZipCode() {
return zipCode;
}
public void setType(String newType) {
type = newType;
}
public void setDescription(String newDescription) {
description = newDescription;
}
public void setStreet(String newStreet) {
street = newStreet;
}
public void setCity(String newCity) {
city = newCity;
}
public void setState(String newState) {
state = newState;
}
public void setZipCode(String newZip) {
zipCode = newZip;
}
public String toString() {
return street + EOL_STRING + city + COMMA + SPACE + state + SPACE
+ zipCode + EOL_STRING;
}
}
class AddressRetriever implements RunnableTask {
private Address address;
private long addressID;
private String url;
public AddressRetriever(long newAddressID, String newUrl) {
addressID = newAddressID;
url = newUrl;
}
public void execute() {
try {
ServerDataStore dataStore = (ServerDataStore) Naming.lookup(url);
address = dataStore.retrieveAddress(addressID);
} catch (Exception exc) {
}
}
public Address getAddress() {
return address;
}
public boolean isAddressAvailable() {
return (address == null) ? false : true;
}
}
class ConcreteQueue implements Queue {
private Vector tasks = new Vector();
private boolean waiting;
private boolean shutdown;
public void setShutdown(boolean isShutdown) {
shutdown = isShutdown;
}
public ConcreteQueue() {
tasks = new Vector();
waiting = false;
new Thread(new Worker()).start();
}
public void put(RunnableTask r) {
tasks.add(r);
if (waiting) {
synchronized (this) {
notifyAll();
}
}
}
public RunnableTask take() {
if (tasks.isEmpty()) {
synchronized (this) {
waiting = true;
try {
wait();
} catch (InterruptedException ie) {
waiting = false;
}
}
}
return (RunnableTask) tasks.remove(0);
}
private class Worker implements Runnable {
public void run() {
while (!shutdown) {
RunnableTask r = take();
r.execute();
}
}
}
}
interface Contact extends Serializable {
public static final String EOL_STRING = System
.getProperty("line.separator");
public static final String SPACE = " ";
public String getFirstName();
public String getLastName();
public String getTitle();
public String getOrganization();
public void setFirstName(String newFirstName);
public void setLastName(String newLastName);
public void setTitle(String newTitle);
public void setOrganization(String newOrganization);
}
class ContactImpl implements Contact {
private String firstName;
private String lastName;
private String title;
private String organization;
public ContactImpl() {
}
public ContactImpl(String newFirstName, String newLastName,
String newTitle, String newOrganization) {
firstName = newFirstName;
lastName = newLastName;
title = newTitle;
organization = newOrganization;
}
public String getFirstName() {
return firstName;
}
public String getLastName() {
return lastName;
}
public String getTitle() {
return title;
}
public String getOrganization() {
return organization;
}
public void setFirstName(String newFirstName) {
firstName = newFirstName;
}
public void setLastName(String newLastName) {
lastName = newLastName;
}
public void setTitle(String newTitle) {
title = newTitle;
}
public void setOrganization(String newOrganization) {
organization = newOrganization;
}
public String toString() {
return firstName + SPACE + lastName + EOL_STRING;
}
}
class ContactRetriever implements RunnableTask {
private Contact contact;
private long contactID;
private String url;
public ContactRetriever(long newContactID, String newUrl) {
contactID = newContactID;
url = newUrl;
}
public void execute() {
try {
ServerDataStore dataStore = (ServerDataStore) Naming.lookup(url);
contact = dataStore.retrieveContact(contactID);
} catch (Exception exc) {
}
}
public Contact getContact() {
return contact;
}
public boolean isContactAvailable() {
return (contact == null) ? false : true;
}
}
interface Queue {
void put(RunnableTask r);
RunnableTask take();
}
interface RunnableTask {
public void execute();
}
interface ServerDataStore extends Remote {
public Address retrieveAddress(long addressID) throws RemoteException;
public Contact retrieveContact(long contactID) throws RemoteException;
}
class ServerDataStoreImpl implements ServerDataStore {
private static final String WORKER_SERVER_SERVICE_NAME = "workerThreadServer";
public ServerDataStoreImpl() {
try {
UnicastRemoteObject.exportObject(this);
Naming.rebind(WORKER_SERVER_SERVICE_NAME, this);
} catch (Exception exc) {
System.err
.println("Error using RMI to register the ServerDataStoreImpl "
+ exc);
}
}
public Address retrieveAddress(long addressID) {
if (addressID == 5280L) {
return new AddressImpl("Fine Dining", "416 Chartres St.",
"New Orleans", "LA", "51720");
} else if (addressID == 2010L) {
return new AddressImpl("Mystic Yacht Club", "19 Imaginary Lane",
"Mystic", "CT", "46802");
} else {
return new AddressImpl();
}
}
public Contact retrieveContact(long contactID) {
if (contactID == 5280L) {
return new ContactImpl("Dwayne", "Dibley", "Accountant", "Virtucon");
} else {
return new ContactImpl();
}
}
}