Java/Threads/Producer Consumer

Материал из Java эксперт
Перейти к: навигация, поиск

Fibonacci: producer and consumer

/*
Java Threads, 3rd Edition
By Scott Oaks, Henry Wong
3rd Edition September 2004 
ISBN: 0-596-00782-5
*/
import java.util.*;
import java.util.concurrent.*;
public class FibonacciTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue;
        queue = new ArrayBlockingQueue<Integer>(10);
        new FibonacciProducer(queue);
        int nThreads = Integer.parseInt(args[0]);
        for (int i = 0; i < nThreads; i++)
            new FibonacciConsumer(queue);
    }
}
class Fibonacci {
    private ConcurrentMap<Integer, Integer> values =
              new ConcurrentHashMap<Integer, Integer>();
    public int calculate(int x) {
        if (x < 0)
            throw new IllegalArgumentException("positive numbers only");
        if (x <= 1)
            return x;
        return calculate(x-1) + calculate(x-2);
    }
    public int calculateWithCache(int x) {
        Integer key = new Integer(x);
        Integer result = values.get(key);
        if (result == null) {
            result = new Integer(calculate(x));
            values.putIfAbsent(key, result);
        }
        return result.intValue();                
    }
    public int calculateOnlyWithCache(int x) {
        Integer v1 = values.get(new Integer(x-1));
        Integer v2 = values.get(new Integer(x-2));
        Integer key = new Integer(x);
        Integer result = values.get(key);
        
        if (result != null)
            return result.intValue();
        if ((v1 == null) || (v2 == null))
            throw new IllegalArgumentException("values not in cache");
        result = new Integer(v1.intValue() + v2.intValue());
        values.putIfAbsent(key, result);
        return result.intValue();
    }              
    public void calculateRangeInCache(int x, int y) {
        calculateWithCache(x++);
        calculateWithCache(x++);
        while (x <= y) {
            calculateOnlyWithCache(x++);
        }
    }
}
class FibonacciProducer implements Runnable {
    private Thread thr;
    private BlockingQueue<Integer> queue;
    public FibonacciProducer(BlockingQueue<Integer> q) {
        queue = q;
        thr = new Thread(this);
        thr.start();
    }
    public void run() {
        try {
            for(int x=0;;x++) {
                Thread.sleep(1000);
                queue.put(new Integer(x));
                System.out.println("Produced request " + x);
            }
        } catch (InterruptedException ex) {
        }
    }
}
class FibonacciConsumer implements Runnable {
    private Fibonacci fib = new Fibonacci();
    private Thread thr;
    private BlockingQueue<Integer> queue;
    public FibonacciConsumer(BlockingQueue<Integer> q) {
        queue = q;
        thr = new Thread(this);
        thr.start();
    }
    public void run() {
        int request, result;
        try {
            while (true) {
                request = queue.take().intValue();
                result = fib.calculateWithCache(request);
                System.out.println("Calculated result of " + result + " from " + request);
            }
        } catch (InterruptedException ex) {
        }
    }
}





Producer and Comsumer

public class ProducerComsumer extends Object {
  private Object slot;
  public ProducerComsumer() {
    slot = null; // null indicates empty
  }
  public synchronized void putIn(Object obj) 
            throws InterruptedException {
    while ( slot != null ) {
      wait(); 
    }
    slot = obj;  // put object into slot
    notifyAll(); // signal that slot has been filled
  }
  public synchronized Object takeOut() 
            throws InterruptedException {
    while ( slot == null ) {
      wait(); // wait while slot is empty
    }
    Object obj = slot;
    slot = null; // mark slot as empty
    notifyAll(); // signal that slot is empty
    return obj;
  }
  public static void main(String[] args) {
    final ProducerComsumer ch = new ProducerComsumer();
    Runnable runA = new Runnable() {
        public void run() {
          try {
            String str;
            Thread.sleep(500);
            str = "multithreaded";
            ch.putIn(str);
            str = "programming";
            ch.putIn(str);
            str = "with Java";
            ch.putIn(str);
          } catch ( InterruptedException x ) {
            x.printStackTrace();
          }
        }
      };
    Runnable runB = new Runnable() {
        public void run() {
          try {
            Object obj;
            obj = ch.takeOut();
            System.out.println("in run() - just took out: "" + 
                obj + """);
            Thread.sleep(500);
            obj = ch.takeOut();
            System.out.println("in run() - just took out: "" + 
                obj + """);
            obj = ch.takeOut();
            System.out.println("in run() - just took out: "" + 
                obj + """);
          } catch ( InterruptedException x ) {
            x.printStackTrace();
          }
        }
      };
    Thread threadA = new Thread(runA, "threadA");
    threadA.start();
    Thread threadB = new Thread(runB, "threadB");
    threadB.start();
  }
  
}





Producer consumer for J2SE 1.5 using concurrent

/*
 * Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
 * All rights reserved. Software written by Ian F. Darwin and others.
 * $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 * 
 * Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
 * cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
 * pioneering role in inventing and promulgating (and standardizing) the Java 
 * language and environment is gratefully acknowledged.
 * 
 * The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
 * inventing predecessor languages C and C++ is also gratefully acknowledged.
 */
import java.util.*;
import java.io.*;
import java.util.concurrent.*;
/** Producer-Consumer in Java, for J2SE 1.5 using concurrent.
 */
public class ProdCons15 {
  protected boolean done = false;
  /** Inner class representing the Producer side */
  class Producer implements Runnable {
    protected BlockingQueue queue;
    Producer(BlockingQueue theQueue) { this.queue = theQueue; }
    public void run() {
      try {
        while (true) {
          Object justProduced = getRequestFromNetwork();
          queue.put(justProduced);
          System.out.println("Produced 1 object; List size now " + queue.size());
          if (done) {
            return;
          }
        }
      } catch (InterruptedException ex) {
        System.out.println("Producer INTERRUPTED");
      }
    }
    Object getRequestFromNetwork() {  // Simulation of reading from client
      try {
          Thread.sleep(10); // simulate time passing during read
      } catch (InterruptedException ex) {
         System.out.println("Producer Read INTERRUPTED");
      }
      return(new Object());
    }
  }
  /** Inner class representing the Consumer side */
  class Consumer implements Runnable {
    protected BlockingQueue queue;
    Consumer(BlockingQueue theQueue) { this.queue = theQueue; }
    public void run() {
      try {
        while (true) {
          Object obj = queue.take();
          int len = queue.size();
          System.out.println("List size now " + len);
          process(obj);
          if (done) {
            return;
          }
        }
      } catch (InterruptedException ex) {
          System.out.println("CONSUMER INTERRUPTED");
      }
    }
    void process(Object obj) {
      // Thread.sleep(xxx) // Simulate time passing
      System.out.println("Consuming object " + obj);
    }
  }
  ProdCons15(int nP, int nC) {
    BlockingQueue myQueue = new LinkedBlockingQueue();
    for (int i=0; i<nP; i++)
      new Thread(new Producer(myQueue)).start();
    for (int i=0; i<nC; i++)
      new Thread(new Consumer(myQueue)).start();
  }
  public static void main(String[] args)
  throws IOException, InterruptedException {
    // Start producers and consumers
    int numProducers = 4;
    int numConsumers = 3;
    ProdCons15 pc = new ProdCons15(numProducers, numConsumers);
    // Let the simulation run for, say, 10 seconds
    Thread.sleep(10*1000); 
    // End of simulation - shut down gracefully
    pc.done = true;
  }
}





Producer consumer in Java 1

/*
 * Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
 * All rights reserved. Software written by Ian F. Darwin and others.
 * $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 * 
 * Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
 * cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
 * pioneering role in inventing and promulgating (and standardizing) the Java 
 * language and environment is gratefully acknowledged.
 * 
 * The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
 * inventing predecessor languages C and C++ is also gratefully acknowledged.
 */
import java.util.*;
import java.io.*;
/** Producer-Consumer in Java. Version 1.
 */
public class ProdCons1 {
  protected LinkedList list = new LinkedList();
  protected void produce() {
    int len = 0;
    synchronized(list) {
      Object justProduced = new Object();
      list.addFirst(justProduced);
      len = list.size();
      list.notifyAll();
    }
    System.out.println("List size now " + len);
  }
  protected void consume() {
    Object obj = null;
    int len = 0;
    synchronized(list) {
      while (list.size() == 0) {
        try {
          list.wait();
        } catch (InterruptedException ex) {
          return;
        }
      }
      obj = list.removeLast();
      len = list.size();
    }
    System.out.println("Consuming object " + obj);
    System.out.println("List size now " + len);
  }
  public static void main(String[] args) throws IOException {
    ProdCons1 pc = new ProdCons1();
    System.out.println("Ready (p to produce, c to consume):");
    int i;
    while ((i = System.in.read()) != -1) {
      char ch = (char)i;
      switch(ch) {
        case "p":  pc.produce(); break;
        case "c":  pc.consume(); break;
      }
    }
  }
}





Producer consumer in Java 2

/*
 * Copyright (c) Ian F. Darwin, http://www.darwinsys.ru/, 1996-2002.
 * All rights reserved. Software written by Ian F. Darwin and others.
 * $Id: LICENSE,v 1.8 2004/02/09 03:33:38 ian Exp $
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS""
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 * 
 * Java, the Duke mascot, and all variants of Sun"s Java "steaming coffee
 * cup" logo are trademarks of Sun Microsystems. Sun"s, and James Gosling"s,
 * pioneering role in inventing and promulgating (and standardizing) the Java 
 * language and environment is gratefully acknowledged.
 * 
 * The pioneering role of Dennis Ritchie and Bjarne Stroustrup, of AT&T, for
 * inventing predecessor languages C and C++ is also gratefully acknowledged.
 */
import java.util.*;
import java.io.*;
/** Producer-consumer in Java, Take II.
 */
public class ProdCons2 {
  /** Throughout the code, this is the object we synchronize on so this
   * is also the object we wait() and notifyAll() on.
   */
  protected LinkedList list = new LinkedList();
  protected int MAX = 10;
  protected boolean done = false; // Also protected by lock on list.
  /** Inner class representing the Producer side */
  class Producer extends Thread {
    public void run() {
      while (true) {
        Object justProduced = getRequestFromNetwork();
        // Get request from the network - outside the synch section.
        // We"re simulating this actually reading from a client, and it
        // might have to wait for hours if the client is having coffee.
        synchronized(list) {
            while (list.size() == MAX) // queue "full"
            try {
              System.out.println("Producer WAITING");
              list.wait();   // Limit the size
            } catch (InterruptedException ex) {
              System.out.println("Producer INTERRUPTED");
            }
          list.addFirst(justProduced);
          list.notifyAll();  // must own the lock
          System.out.println("Produced 1; List size now " + list.size());
          if (done)
            break;
          // yield();  // Useful for green threads & demo programs.
        }
      }
    }
    Object getRequestFromNetwork() {  // Simulation of reading from client
      // try {
      //   Thread.sleep(10); // simulate time passing during read
      // } catch (InterruptedException ex) {
      //   System.out.println("Producer Read INTERRUPTED");
      // }
      return(new Object());
    }
  }
  /** Inner class representing the Consumer side */
  class Consumer extends Thread {
    public void run() {
      while (true) {
        Object obj = null;
        synchronized(list) {
          while (list.size() == 0) {
            try {
              System.out.println("CONSUMER WAITING");
              list.wait();  // must own the lock
            } catch (InterruptedException ex) {
              System.out.println("CONSUMER INTERRUPTED");
            }
          }
          obj = list.removeLast();
          list.notifyAll();
          int len = list.size();
          System.out.println("List size now " + len);
          if (done)
            break;
        }
        process(obj);  // Outside synch section (could take time)
        //yield(); DITTO
      }
    }
    void process(Object obj) {
      // Thread.sleep(xxx) // Simulate time passing
      System.out.println("Consuming object " + obj);
    }
  }
  ProdCons2(int nP, int nC) {
    for (int i=0; i<nP; i++)
      new Producer().start();
    for (int i=0; i<nC; i++)
      new Consumer().start();
  }
  public static void main(String[] args)
  throws IOException, InterruptedException {
    // Start producers and consumers
    int numProducers = 4;
    int numConsumers = 3;
    ProdCons2 pc = new ProdCons2(numProducers, numConsumers);
    // Let it run for, say, 10 seconds
    Thread.sleep(10*1000); 
    // End of simulation - shut down gracefully
    synchronized(pc.list) {
      pc.done = true;
      pc.list.notifyAll();
    }
  }
}





Producer Consumer Test

/* From http://java.sun.ru/docs/books/tutorial/index.html */
/*
 * Copyright (c) 2006 Sun Microsystems, Inc. All Rights Reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * -Redistribution of source code must retain the above copyright notice, this
 *  list of conditions and the following disclaimer.
 *
 * -Redistribution in binary form must reproduce the above copyright notice,
 *  this list of conditions and the following disclaimer in the documentation
 *  and/or other materials provided with the distribution.
 *
 * Neither the name of Sun Microsystems, Inc. or the names of contributors may
 * be used to endorse or promote products derived from this software without
 * specific prior written permission.
 *
 * This software is provided "AS IS," without a warranty of any kind. ALL
 * EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING
 * ANY IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE
 * OR NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN MIDROSYSTEMS, INC. ("SUN")
 * AND ITS LICENSORS SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE
 * AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS
 * DERIVATIVES. IN NO EVENT WILL SUN OR ITS LICENSORS BE LIABLE FOR ANY LOST
 * REVENUE, PROFIT OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL, CONSEQUENTIAL,
 * INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND REGARDLESS OF THE THEORY
 * OF LIABILITY, ARISING OUT OF THE USE OF OR INABILITY TO USE THIS SOFTWARE,
 * EVEN IF SUN HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
 *
 * You acknowledge that this software is not designed, licensed or intended
 * for use in the design, construction, operation or maintenance of any
 * nuclear facility.
 */
public class ProducerConsumerTest {
    public static void main(String[] args) {
        CubbyHole c = new CubbyHole();
        Producer p1 = new Producer(c, 1);
        Consumer c1 = new Consumer(c, 1);
        p1.start();
        c1.start();
    }
}
class CubbyHole {
    private int contents;
    private boolean available = false;
    public synchronized int get() {
        while (available == false) {
            try {
                wait();
            } catch (InterruptedException e) { }
        }
        available = false;
        notifyAll();
        return contents;
    }
    public synchronized void put(int value) {
        while (available == true) {
            try {
                wait();
            } catch (InterruptedException e) { }
        }
        contents = value;
        available = true;
        notifyAll();
    }
}

class Consumer extends Thread {
    private CubbyHole cubbyhole;
    private int number;
    public Consumer(CubbyHole c, int number) {
        cubbyhole = c;
        this.number = number;
    }
    public void run() {
        int value = 0;
        for (int i = 0; i < 10; i++) {
            value = cubbyhole.get();
            System.out.println("Consumer #" + this.number
                               + " got: " + value);
        }
    }
}

class Producer extends Thread {
    private CubbyHole cubbyhole;
    private int number;
    public Producer(CubbyHole c, int number) {
        cubbyhole = c;
        this.number = number;
    }
    public void run() {
        for (int i = 0; i < 10; i++) {
            cubbyhole.put(i);
            System.out.println("Producer #" + this.number
                               + " put: " + i);
            try {
                sleep((int)(Math.random() * 100));
            } catch (InterruptedException e) { }
        }
    }
}





The producer-consumer approach to thread cooperation

// : c13:Restaurant.java
// The producer-consumer approach to thread cooperation.
// From "Thinking in Java, 3rd ed." (c) Bruce Eckel 2002
// www.BruceEckel.ru. See copyright notice in CopyRight.txt.
class Order {
  private static int i = 0;
  private int count = i++;
  public Order() {
    if (count == 10) {
      System.out.println("Out of food, closing");
      System.exit(0);
    }
  }
  public String toString() {
    return "Order " + count;
  }
}
class WaitPerson extends Thread {
  private Restaurant restaurant;
  public WaitPerson(Restaurant r) {
    restaurant = r;
    start();
  }
  public void run() {
    while (true) {
      while (restaurant.order == null)
        synchronized (this) {
          try {
            wait();
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
        }
      System.out.println("Waitperson got " + restaurant.order);
      restaurant.order = null;
    }
  }
}
class Chef extends Thread {
  private Restaurant restaurant;
  private WaitPerson waitPerson;
  public Chef(Restaurant r, WaitPerson w) {
    restaurant = r;
    waitPerson = w;
    start();
  }
  public void run() {
    while (true) {
      if (restaurant.order == null) {
        restaurant.order = new Order();
        System.out.print("Order up! ");
        synchronized (waitPerson) {
          waitPerson.notify();
        }
      }
      try {
        sleep(100);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }
  }
}
public class Restaurant {
  Order order; // Package access
  public static void main(String[] args) {
    Restaurant restaurant = new Restaurant();
    WaitPerson waitPerson = new WaitPerson(restaurant);
    Chef chef = new Chef(restaurant, waitPerson);
  }
} ///:~