Java/Threads/Producer Consumer

Материал из Java эксперт
Перейти к: навигация, поиск

Fibonacci: producer and consumer

   <source lang="java">

/* Java Threads, 3rd Edition By Scott Oaks, Henry Wong 3rd Edition September 2004 ISBN: 0-596-00782-5

  • /

import java.util.*; import java.util.concurrent.*; public class FibonacciTest {

   public static void main(String[] args) {
       ArrayBlockingQueue<Integer> queue;
       queue = new ArrayBlockingQueue<Integer>(10);
       new FibonacciProducer(queue);
       int nThreads = Integer.parseInt(args[0]);
       for (int i = 0; i < nThreads; i++)
           new FibonacciConsumer(queue);
   }

} class Fibonacci {

   private ConcurrentMap<Integer, Integer> values =
             new ConcurrentHashMap<Integer, Integer>();
   public int calculate(int x) {
       if (x < 0)
           throw new IllegalArgumentException("positive numbers only");
       if (x <= 1)
           return x;
       return calculate(x-1) + calculate(x-2);
   }
   public int calculateWithCache(int x) {
       Integer key = new Integer(x);
       Integer result = values.get(key);
       if (result == null) {
           result = new Integer(calculate(x));
           values.putIfAbsent(key, result);
       }
       return result.intValue();                
   }
   public int calculateOnlyWithCache(int x) {
       Integer v1 = values.get(new Integer(x-1));
       Integer v2 = values.get(new Integer(x-2));
       Integer key = new Integer(x);
       Integer result = values.get(key);
       
       if (result != null)
           return result.intValue();
       if ((v1 == null) || (v2 == null))
           throw new IllegalArgumentException("values not in cache");
       result = new Integer(v1.intValue() + v2.intValue());
       values.putIfAbsent(key, result);
       return result.intValue();
   }              
   public void calculateRangeInCache(int x, int y) {
       calculateWithCache(x++);
       calculateWithCache(x++);
       while (x <= y) {
           calculateOnlyWithCache(x++);
       }
   }

} class FibonacciProducer implements Runnable {

   private Thread thr;
   private BlockingQueue<Integer> queue;
   public FibonacciProducer(BlockingQueue<Integer> q) {
       queue = q;
       thr = new Thread(this);
       thr.start();
   }
   public void run() {
       try {
           for(int x=0;;x++) {
               Thread.sleep(1000);
               queue.put(new Integer(x));
               System.out.println("Produced request " + x);
           }
       } catch (InterruptedException ex) {
       }
   }

} class FibonacciConsumer implements Runnable {

   private Fibonacci fib = new Fibonacci();
   private Thread thr;
   private BlockingQueue<Integer> queue;
   public FibonacciConsumer(BlockingQueue<Integer> q) {
       queue = q;
       thr = new Thread(this);
       thr.start();
   }
   public void run() {
       int request, result;
       try {
           while (true) {
               request = queue.take().intValue();
               result = fib.calculateWithCache(request);
               System.out.println("Calculated result of " + result + " from " + request);
           }
       } catch (InterruptedException ex) {
       }
   }

}


      </source>
   
  
 
  



Producer and Comsumer

   <source lang="java">

public class ProducerComsumer extends Object {

 private Object slot;
 public ProducerComsumer() {
   slot = null; // null indicates empty
 }
 public synchronized void putIn(Object obj) 
           throws InterruptedException {
   while ( slot != null ) {
     wait(); 
   }
   slot = obj;  // put object into slot
   notifyAll(); // signal that slot has been filled
 }
 public synchronized Object takeOut() 
           throws InterruptedException {
   while ( slot == null ) {
     wait(); // wait while slot is empty
   }
   Object obj = slot;
   slot = null; // mark slot as empty
   notifyAll(); // signal that slot is empty
   return obj;
 }
 public static void main(String[] args) {
   final ProducerComsumer ch = new ProducerComsumer();
   Runnable runA = new Runnable() {
       public void run() {
         try {
           String str;
           Thread.sleep(500);
           str = "multithreaded";
           ch.putIn(str);
           str = "programming";
           ch.putIn(str);
           str = "with Java";
           ch.putIn(str);
         } catch ( InterruptedException x ) {
           x.printStackTrace();
         }
       }
     };
   Runnable runB = new Runnable() {
       public void run() {
         try {
           Object obj;
           obj = ch.takeOut();
           System.out.println("in run() - just took out: "" + 
               obj + """);
           Thread.sleep(500);
           obj = ch.takeOut();
           System.out.println("in run() - just took out: "" + 
               obj + """);
           obj = ch.takeOut();
           System.out.println("in run() - just took out: "" + 
               obj + """);
         } catch ( InterruptedException x ) {
           x.printStackTrace();
         }
       }
     };
   Thread threadA = new Thread(runA, "threadA");
   threadA.start();
   Thread threadB = new Thread(runB, "threadB");
   threadB.start();
 }
 

}


      </source>
   
  
 
  



Producer consumer for J2SE 1.5 using concurrent

   <source lang="java">

/*

* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
* All rights reserved. Software written by Ian F. Darwin and others.
* $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
*    notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
*    notice, this list of conditions and the following disclaimer in the
*    documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
* 
* Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
* cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
* pioneering role in inventing and promulgating (and standardizing) the Java 
* language and environment is gratefully acknowledged.
* 
* The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
* inventing predecessor languages C and C++ is also gratefully acknowledged.
*/

import java.util.*; import java.io.*; import java.util.concurrent.*; /** Producer-Consumer in Java, for J2SE 1.5 using concurrent.

*/

public class ProdCons15 {

 protected boolean done = false;
 /** Inner class representing the Producer side */
 class Producer implements Runnable {
   protected BlockingQueue queue;
   Producer(BlockingQueue theQueue) { this.queue = theQueue; }
   public void run() {
     try {
       while (true) {
         Object justProduced = getRequestFromNetwork();
         queue.put(justProduced);
         System.out.println("Produced 1 object; List size now " + queue.size());
         if (done) {
           return;
         }
       }
     } catch (InterruptedException ex) {
       System.out.println("Producer INTERRUPTED");
     }
   }
   Object getRequestFromNetwork() {  // Simulation of reading from client
     try {
         Thread.sleep(10); // simulate time passing during read
     } catch (InterruptedException ex) {
        System.out.println("Producer Read INTERRUPTED");
     }
     return(new Object());
   }
 }
 /** Inner class representing the Consumer side */
 class Consumer implements Runnable {
   protected BlockingQueue queue;
   Consumer(BlockingQueue theQueue) { this.queue = theQueue; }
   public void run() {
     try {
       while (true) {
         Object obj = queue.take();
         int len = queue.size();
         System.out.println("List size now " + len);
         process(obj);
         if (done) {
           return;
         }
       }
     } catch (InterruptedException ex) {
         System.out.println("CONSUMER INTERRUPTED");
     }
   }
   void process(Object obj) {
     // Thread.sleep(xxx) // Simulate time passing
     System.out.println("Consuming object " + obj);
   }
 }
 ProdCons15(int nP, int nC) {
   BlockingQueue myQueue = new LinkedBlockingQueue();
   for (int i=0; i<nP; i++)
     new Thread(new Producer(myQueue)).start();
   for (int i=0; i<nC; i++)
     new Thread(new Consumer(myQueue)).start();
 }
 public static void main(String[] args)
 throws IOException, InterruptedException {
   // Start producers and consumers
   int numProducers = 4;
   int numConsumers = 3;
   ProdCons15 pc = new ProdCons15(numProducers, numConsumers);
   // Let the simulation run for, say, 10 seconds
   Thread.sleep(10*1000); 
   // End of simulation - shut down gracefully
   pc.done = true;
 }

}

      </source>
   
  
 
  



Producer consumer in Java 1

   <source lang="java">

/*

* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
* All rights reserved. Software written by Ian F. Darwin and others.
* $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
*    notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
*    notice, this list of conditions and the following disclaimer in the
*    documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
* 
* Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
* cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
* pioneering role in inventing and promulgating (and standardizing) the Java 
* language and environment is gratefully acknowledged.
* 
* The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
* inventing predecessor languages C and C++ is also gratefully acknowledged.
*/

import java.util.*; import java.io.*; /** Producer-Consumer in Java. Version 1.

*/

public class ProdCons1 {

 protected LinkedList list = new LinkedList();
 protected void produce() {
   int len = 0;
   synchronized(list) {
     Object justProduced = new Object();
     list.addFirst(justProduced);
     len = list.size();
     list.notifyAll();
   }
   System.out.println("List size now " + len);
 }
 protected void consume() {
   Object obj = null;
   int len = 0;
   synchronized(list) {
     while (list.size() == 0) {
       try {
         list.wait();
       } catch (InterruptedException ex) {
         return;
       }
     }
     obj = list.removeLast();
     len = list.size();
   }
   System.out.println("Consuming object " + obj);
   System.out.println("List size now " + len);
 }
 public static void main(String[] args) throws IOException {
   ProdCons1 pc = new ProdCons1();
   System.out.println("Ready (p to produce, c to consume):");
   int i;
   while ((i = System.in.read()) != -1) {
     char ch = (char)i;
     switch(ch) {
       case "p":  pc.produce(); break;
       case "c":  pc.consume(); break;
     }
   }
 }

}


      </source>
   
  
 
  



Producer consumer in Java 2

   <source lang="java">

/*

* Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
* All rights reserved. Software written by Ian F. Darwin and others.
* $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
*    notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
*    notice, this list of conditions and the following disclaimer in the
*    documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
* 
* Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
* cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
* pioneering role in inventing and promulgating (and standardizing) the Java 
* language and environment is gratefully acknowledged.
* 
* The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
* inventing predecessor languages C and C++ is also gratefully acknowledged.
*/

import java.util.*; import java.io.*; /** Producer-consumer in Java, Take II.

*/

public class ProdCons2 {

 /** Throughout the code, this is the object we synchronize on so this
  * is also the object we wait() and notifyAll() on.
  */
 protected LinkedList list = new LinkedList();
 protected int MAX = 10;
 protected boolean done = false; // Also protected by lock on list.
 /** Inner class representing the Producer side */
 class Producer extends Thread {
   public void run() {
     while (true) {
       Object justProduced = getRequestFromNetwork();
       // Get request from the network - outside the synch section.
       // We"re simulating this actually reading from a client, and it
       // might have to wait for hours if the client is having coffee.
       synchronized(list) {
           while (list.size() == MAX) // queue "full"
           try {
             System.out.println("Producer WAITING");
             list.wait();   // Limit the size
           } catch (InterruptedException ex) {
             System.out.println("Producer INTERRUPTED");
           }
         list.addFirst(justProduced);
         list.notifyAll();  // must own the lock
         System.out.println("Produced 1; List size now " + list.size());
         if (done)
           break;
         // yield();  // Useful for green threads & demo programs.
       }
     }
   }
   Object getRequestFromNetwork() {  // Simulation of reading from client
     // try {
     //   Thread.sleep(10); // simulate time passing during read
     // } catch (InterruptedException ex) {
     //   System.out.println("Producer Read INTERRUPTED");
     // }
     return(new Object());
   }
 }
 /** Inner class representing the Consumer side */
 class Consumer extends Thread {
   public void run() {
     while (true) {
       Object obj = null;
       synchronized(list) {
         while (list.size() == 0) {
           try {
             System.out.println("CONSUMER WAITING");
             list.wait();  // must own the lock
           } catch (InterruptedException ex) {
             System.out.println("CONSUMER INTERRUPTED");
           }
         }
         obj = list.removeLast();
         list.notifyAll();
         int len = list.size();
         System.out.println("List size now " + len);
         if (done)
           break;
       }
       process(obj);  // Outside synch section (could take time)
       //yield(); DITTO
     }
   }
   void process(Object obj) {
     // Thread.sleep(xxx) // Simulate time passing
     System.out.println("Consuming object " + obj);
   }
 }
 ProdCons2(int nP, int nC) {
   for (int i=0; i<nP; i++)
     new Producer().start();
   for (int i=0; i<nC; i++)
     new Consumer().start();
 }
 public static void main(String[] args)
 throws IOException, InterruptedException {
   // Start producers and consumers
   int numProducers = 4;
   int numConsumers = 3;
   ProdCons2 pc = new ProdCons2(numProducers, numConsumers);
   // Let it run for, say, 10 seconds
   Thread.sleep(10*1000); 
   // End of simulation - shut down gracefully
   synchronized(pc.list) {
     pc.done = true;
     pc.list.notifyAll();
   }
 }

}


      </source>
   
  
 
  



Producer Consumer Test

   <source lang="java">

/* From http://java.sun.ru/docs/books/tutorial/index.html */ /*

* Copyright (c) 2006 Sun Microsystems, Inc. All Rights Reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* -Redistribution of source code must retain the above copyright notice, this
*  list of conditions and the following disclaimer.
*
* -Redistribution in binary form must reproduce the above copyright notice,
*  this list of conditions and the following disclaimer in the documentation
*  and/or other materials provided with the distribution.
*
* Neither the name of Sun Microsystems, Inc. or the names of contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING
* ANY IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE
* OR NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN MIDROSYSTEMS, INC. ("SUN")
* AND ITS LICENSORS SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE
* AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS
* DERIVATIVES. IN NO EVENT WILL SUN OR ITS LICENSORS BE LIABLE FOR ANY LOST
* REVENUE, PROFIT OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL, CONSEQUENTIAL,
* INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND REGARDLESS OF THE THEORY
* OF LIABILITY, ARISING OUT OF THE USE OF OR INABILITY TO USE THIS SOFTWARE,
* EVEN IF SUN HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
*
* You acknowledge that this software is not designed, licensed or intended
* for use in the design, construction, operation or maintenance of any
* nuclear facility.
*/

public class ProducerConsumerTest {

   public static void main(String[] args) {
       CubbyHole c = new CubbyHole();
       Producer p1 = new Producer(c, 1);
       Consumer c1 = new Consumer(c, 1);
       p1.start();
       c1.start();
   }

} class CubbyHole {

   private int contents;
   private boolean available = false;
   public synchronized int get() {
       while (available == false) {
           try {
               wait();
           } catch (InterruptedException e) { }
       }
       available = false;
       notifyAll();
       return contents;
   }
   public synchronized void put(int value) {
       while (available == true) {
           try {
               wait();
           } catch (InterruptedException e) { }
       }
       contents = value;
       available = true;
       notifyAll();
   }

}

class Consumer extends Thread {

   private CubbyHole cubbyhole;
   private int number;
   public Consumer(CubbyHole c, int number) {
       cubbyhole = c;
       this.number = number;
   }
   public void run() {
       int value = 0;
       for (int i = 0; i < 10; i++) {
           value = cubbyhole.get();
           System.out.println("Consumer #" + this.number
                              + " got: " + value);
       }
   }

}

class Producer extends Thread {

   private CubbyHole cubbyhole;
   private int number;
   public Producer(CubbyHole c, int number) {
       cubbyhole = c;
       this.number = number;
   }
   public void run() {
       for (int i = 0; i < 10; i++) {
           cubbyhole.put(i);
           System.out.println("Producer #" + this.number
                              + " put: " + i);
           try {
               sleep((int)(Math.random() * 100));
           } catch (InterruptedException e) { }
       }
   }

}

      </source>
   
  
 
  



The producer-consumer approach to thread cooperation

   <source lang="java">

// : c13:Restaurant.java // The producer-consumer approach to thread cooperation. // From "Thinking in Java, 3rd ed." (c) Bruce Eckel 2002 // www.BruceEckel.ru. See copyright notice in CopyRight.txt. class Order {

 private static int i = 0;
 private int count = i++;
 public Order() {
   if (count == 10) {
     System.out.println("Out of food, closing");
     System.exit(0);
   }
 }
 public String toString() {
   return "Order " + count;
 }

} class WaitPerson extends Thread {

 private Restaurant restaurant;
 public WaitPerson(Restaurant r) {
   restaurant = r;
   start();
 }
 public void run() {
   while (true) {
     while (restaurant.order == null)
       synchronized (this) {
         try {
           wait();
         } catch (InterruptedException e) {
           throw new RuntimeException(e);
         }
       }
     System.out.println("Waitperson got " + restaurant.order);
     restaurant.order = null;
   }
 }

} class Chef extends Thread {

 private Restaurant restaurant;
 private WaitPerson waitPerson;
 public Chef(Restaurant r, WaitPerson w) {
   restaurant = r;
   waitPerson = w;
   start();
 }
 public void run() {
   while (true) {
     if (restaurant.order == null) {
       restaurant.order = new Order();
       System.out.print("Order up! ");
       synchronized (waitPerson) {
         waitPerson.notify();
       }
     }
     try {
       sleep(100);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
   }
 }

} public class Restaurant {

 Order order; // Package access
 public static void main(String[] args) {
   Restaurant restaurant = new Restaurant();
   WaitPerson waitPerson = new WaitPerson(restaurant);
   Chef chef = new Chef(restaurant, waitPerson);
 }

} ///:~


      </source>