Java/Design Pattern/Work Thread Pattern
Worker Thread Pattern in Java
<source lang="java">
//[C] 2002 Sun Microsystems, Inc.---
import java.io.IOException; import java.io.Serializable; import java.rmi.Naming; import java.rmi.Remote; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; import java.util.Vector; public class RunWorkerThreadPattern {
private static final String WORKER_SERVER_URL = "//localhost/workerThreadServer"; public static void main(String[] arguments) { System.out.println("Example for the WorkerThread pattern"); System.out .println("In this example, a ConcreteQueue object which uses a"); System.out .println(" worker thread, will retrieve a number of objects from"); System.out.println(" the server."); System.out.println(); System.out.println("Running the RMI compiler (rmic)"); System.out.println(); try { Process p1 = Runtime.getRuntime().exec("rmic ServerDataStoreImpl"); p1.waitFor(); } catch (IOException exc) { System.err .println("Unable to run rmic utility. Exiting application."); System.exit(1); } catch (InterruptedException exc) { System.err .println("Threading problems encountered while using the rmic utility."); } System.out.println("Starting the rmiregistry"); System.out.println(); Process rmiProcess = null; try { rmiProcess = Runtime.getRuntime().exec("rmiregistry"); Thread.sleep(15000); } catch (IOException exc) { System.err .println("Unable to start the rmiregistry. Exiting application."); System.exit(1); } catch (InterruptedException exc) { System.err .println("Threading problems encountered when starting the rmiregistry."); } System.out .println("Creating the queue, which will be managed by the worker thread"); System.out.println(); ConcreteQueue workQueue = new ConcreteQueue(); System.out .println("Creating the RMI server object, ServerDataStoreImpl"); System.out.println(); ServerDataStore server = new ServerDataStoreImpl(); System.out.println("Creating AddressRetrievers and ContactRetreivers."); System.out.println(" These will placed in the queue, as tasks to be"); System.out.println(" performed by the worker thread."); System.out.println(); AddressRetriever firstAddr = new AddressRetriever(5280L, WORKER_SERVER_URL); AddressRetriever secondAddr = new AddressRetriever(2010L, WORKER_SERVER_URL); ContactRetriever firstContact = new ContactRetriever(5280L, WORKER_SERVER_URL); workQueue.put(firstAddr); workQueue.put(firstContact); workQueue.put(secondAddr); while (!secondAddr.isAddressAvailable()) { try { Thread.sleep(1000); } catch (InterruptedException exc) { } } System.out .println("WorkerThread completed the processing of its Tasks"); System.out.println("Printing out the retrieved objects now:"); System.out.println(); System.out.println(firstAddr.getAddress()); System.out.println(firstContact.getContact()); System.out.println(secondAddr.getAddress()); }
} interface Address extends Serializable {
public static final String EOL_STRING = System .getProperty("line.separator"); public static final String SPACE = " "; public static final String COMMA = ","; public String getType(); public String getDescription(); public String getStreet(); public String getCity(); public String getState(); public String getZipCode(); public void setType(String newType); public void setDescription(String newDescription); public void setStreet(String newStreet); public void setCity(String newCity); public void setState(String newState); public void setZipCode(String newZip);
} class AddressImpl implements Address {
private String type; private String description; private String street; private String city; private String state; private String zipCode; public AddressImpl() { } public AddressImpl(String newDescription, String newStreet, String newCity, String newState, String newZipCode) { description = newDescription; street = newStreet; city = newCity; state = newState; zipCode = newZipCode; } public String getType() { return type; } public String getDescription() { return description; } public String getStreet() { return street; } public String getCity() { return city; } public String getState() { return state; } public String getZipCode() { return zipCode; } public void setType(String newType) { type = newType; } public void setDescription(String newDescription) { description = newDescription; } public void setStreet(String newStreet) { street = newStreet; } public void setCity(String newCity) { city = newCity; } public void setState(String newState) { state = newState; } public void setZipCode(String newZip) { zipCode = newZip; } public String toString() { return street + EOL_STRING + city + COMMA + SPACE + state + SPACE + zipCode + EOL_STRING; }
} class AddressRetriever implements RunnableTask {
private Address address; private long addressID; private String url; public AddressRetriever(long newAddressID, String newUrl) { addressID = newAddressID; url = newUrl; } public void execute() { try { ServerDataStore dataStore = (ServerDataStore) Naming.lookup(url); address = dataStore.retrieveAddress(addressID); } catch (Exception exc) { } } public Address getAddress() { return address; } public boolean isAddressAvailable() { return (address == null) ? false : true; }
} class ConcreteQueue implements Queue {
private Vector tasks = new Vector(); private boolean waiting; private boolean shutdown; public void setShutdown(boolean isShutdown) { shutdown = isShutdown; } public ConcreteQueue() { tasks = new Vector(); waiting = false; new Thread(new Worker()).start(); } public void put(RunnableTask r) { tasks.add(r); if (waiting) { synchronized (this) { notifyAll(); } } } public RunnableTask take() { if (tasks.isEmpty()) { synchronized (this) { waiting = true; try { wait(); } catch (InterruptedException ie) { waiting = false; } } } return (RunnableTask) tasks.remove(0); } private class Worker implements Runnable { public void run() { while (!shutdown) { RunnableTask r = take(); r.execute(); } } }
} interface Contact extends Serializable {
public static final String EOL_STRING = System .getProperty("line.separator"); public static final String SPACE = " "; public String getFirstName(); public String getLastName(); public String getTitle(); public String getOrganization(); public void setFirstName(String newFirstName); public void setLastName(String newLastName); public void setTitle(String newTitle); public void setOrganization(String newOrganization);
} class ContactImpl implements Contact {
private String firstName; private String lastName; private String title; private String organization; public ContactImpl() { } public ContactImpl(String newFirstName, String newLastName, String newTitle, String newOrganization) { firstName = newFirstName; lastName = newLastName; title = newTitle; organization = newOrganization; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public String getTitle() { return title; } public String getOrganization() { return organization; } public void setFirstName(String newFirstName) { firstName = newFirstName; } public void setLastName(String newLastName) { lastName = newLastName; } public void setTitle(String newTitle) { title = newTitle; } public void setOrganization(String newOrganization) { organization = newOrganization; } public String toString() { return firstName + SPACE + lastName + EOL_STRING; }
} class ContactRetriever implements RunnableTask {
private Contact contact; private long contactID; private String url; public ContactRetriever(long newContactID, String newUrl) { contactID = newContactID; url = newUrl; } public void execute() { try { ServerDataStore dataStore = (ServerDataStore) Naming.lookup(url); contact = dataStore.retrieveContact(contactID); } catch (Exception exc) { } } public Contact getContact() { return contact; } public boolean isContactAvailable() { return (contact == null) ? false : true; }
} interface Queue {
void put(RunnableTask r); RunnableTask take();
} interface RunnableTask {
public void execute();
} interface ServerDataStore extends Remote {
public Address retrieveAddress(long addressID) throws RemoteException; public Contact retrieveContact(long contactID) throws RemoteException;
} class ServerDataStoreImpl implements ServerDataStore {
private static final String WORKER_SERVER_SERVICE_NAME = "workerThreadServer"; public ServerDataStoreImpl() { try { UnicastRemoteObject.exportObject(this); Naming.rebind(WORKER_SERVER_SERVICE_NAME, this); } catch (Exception exc) { System.err .println("Error using RMI to register the ServerDataStoreImpl " + exc); } } public Address retrieveAddress(long addressID) { if (addressID == 5280L) { return new AddressImpl("Fine Dining", "416 Chartres St.", "New Orleans", "LA", "51720"); } else if (addressID == 2010L) { return new AddressImpl("Mystic Yacht Club", "19 Imaginary Lane", "Mystic", "CT", "46802"); } else { return new AddressImpl(); } } public Contact retrieveContact(long contactID) { if (contactID == 5280L) { return new ContactImpl("Dwayne", "Dibley", "Accountant", "Virtucon"); } else { return new ContactImpl(); } }
}
</source>