Mega Code Archive

 
Categories / Java / Threads
 

Object FIFO

public class ObjectFIFOTest extends Object {   private static void fullCheck(ObjectFIFO fifo) {     try {       // Sync'd to allow messages to print while       // condition is still true.       synchronized (fifo) {         while (true) {           fifo.waitUntilFull();           print("FULL");           fifo.waitWhileFull();           print("NO LONGER FULL");         }       }     } catch (InterruptedException ix) {       return;     }   }   private static void emptyCheck(ObjectFIFO fifo) {     try {       // Sync'd to allow messages to print while       // condition is still true.       synchronized (fifo) {         while (true) {           fifo.waitUntilEmpty();           print("EMPTY");           fifo.waitWhileEmpty();           print("NO LONGER EMPTY");         }       }     } catch (InterruptedException ix) {       return;     }   }   private static void consumer(ObjectFIFO fifo) {     try {       print("just entered consumer()");       for (int i = 0; i < 3; i++) {         synchronized (fifo) {           Object obj = fifo.remove();           print("DATA-OUT - did remove(), obj=" + obj);         }         Thread.sleep(3000);       }       synchronized (fifo) {         boolean resultOfWait = fifo.waitUntilEmpty(500);         print("did waitUntilEmpty(500), resultOfWait=" + resultOfWait             + ", getSize()=" + fifo.getSize());       }       for (int i = 0; i < 3; i++) {         synchronized (fifo) {           Object[] list = fifo.removeAll();           print("did removeAll(), list.length=" + list.length);           for (int j = 0; j < list.length; j++) {             print("DATA-OUT - list[" + j + "]=" + list[j]);           }         }         Thread.sleep(100);       }       for (int i = 0; i < 3; i++) {         synchronized (fifo) {           Object[] list = fifo.removeAtLeastOne();           print("did removeAtLeastOne(), list.length=" + list.length);           for (int j = 0; j < list.length; j++) {             print("DATA-OUT - list[" + j + "]=" + list[j]);           }         }         Thread.sleep(1000);       }       while (!fifo.isEmpty()) {         synchronized (fifo) {           Object obj = fifo.remove();           print("DATA-OUT - did remove(), obj=" + obj);         }         Thread.sleep(1000);       }       print("leaving consumer()");     } catch (InterruptedException ix) {       return;     }   }   private static void producer(ObjectFIFO fifo) {     try {       print("just entered producer()");       int count = 0;       Object obj0 = new Integer(count);       count++;       synchronized (fifo) {         fifo.add(obj0);         print("DATA-IN - did add(), obj0=" + obj0);         boolean resultOfWait = fifo.waitUntilEmpty(500);         print("did waitUntilEmpty(500), resultOfWait=" + resultOfWait             + ", getSize()=" + fifo.getSize());       }       for (int i = 0; i < 10; i++) {         Object obj = new Integer(count);         count++;         synchronized (fifo) {           fifo.add(obj);           print("DATA-IN - did add(), obj=" + obj);         }         Thread.sleep(1000);       }       Thread.sleep(2000);       Object obj = new Integer(count);       count++;       synchronized (fifo) {         fifo.add(obj);         print("DATA-IN - did add(), obj=" + obj);       }       Thread.sleep(500);       Integer[] list1 = new Integer[3];       for (int i = 0; i < list1.length; i++) {         list1[i] = new Integer(count);         count++;       }       synchronized (fifo) {         fifo.addEach(list1);         print("did addEach(), list1.length=" + list1.length);       }       Integer[] list2 = new Integer[8];       for (int i = 0; i < list2.length; i++) {         list2[i] = new Integer(count);         count++;       }       synchronized (fifo) {         fifo.addEach(list2);         print("did addEach(), list2.length=" + list2.length);       }       synchronized (fifo) {         fifo.waitUntilEmpty();         print("fifo.isEmpty()=" + fifo.isEmpty());       }       print("leaving producer()");     } catch (InterruptedException ix) {       return;     }   }   private static synchronized void print(String msg) {     System.out.println(Thread.currentThread().getName() + ": " + msg);   }   public static void main(String[] args) {     final ObjectFIFO fifo = new ObjectFIFO(5);     Runnable fullCheckRunnable = new Runnable() {       public void run() {         fullCheck(fifo);       }     };     Thread fullCheckThread = new Thread(fullCheckRunnable, "fchk");     fullCheckThread.setPriority(9);     fullCheckThread.setDaemon(true); // die automatically     fullCheckThread.start();     Runnable emptyCheckRunnable = new Runnable() {       public void run() {         emptyCheck(fifo);       }     };     Thread emptyCheckThread = new Thread(emptyCheckRunnable, "echk");     emptyCheckThread.setPriority(8);     emptyCheckThread.setDaemon(true); // die automatically     emptyCheckThread.start();     Runnable consumerRunnable = new Runnable() {       public void run() {         consumer(fifo);       }     };     Thread consumerThread = new Thread(consumerRunnable, "cons");     consumerThread.setPriority(7);     consumerThread.start();     Runnable producerRunnable = new Runnable() {       public void run() {         producer(fifo);       }     };     Thread producerThread = new Thread(producerRunnable, "prod");     producerThread.setPriority(6);     producerThread.start();   } } class ObjectFIFO extends Object {   private Object[] queue;   private int capacity;   private int size;   private int head;   private int tail;   public ObjectFIFO(int cap) {     capacity = (cap > 0) ? cap : 1; // at least 1     queue = new Object[capacity];     head = 0;     tail = 0;     size = 0;   }   public int getCapacity() {     return capacity;   }   public synchronized int getSize() {     return size;   }   public synchronized boolean isEmpty() {     return (size == 0);   }   public synchronized boolean isFull() {     return (size == capacity);   }   public synchronized void add(Object obj) throws InterruptedException {     waitWhileFull();     queue[head] = obj;     head = (head + 1) % capacity;     size++;     notifyAll(); // let any waiting threads know about change   }   public synchronized void addEach(Object[] list) throws InterruptedException {     //     // You might want to code a more efficient     // implementation here ... (see ByteFIFO.java)     //     for (int i = 0; i < list.length; i++) {       add(list[i]);     }   }   public synchronized Object remove() throws InterruptedException {     waitWhileEmpty();     Object obj = queue[tail];     // don't block GC by keeping unnecessary reference     queue[tail] = null;     tail = (tail + 1) % capacity;     size--;     notifyAll(); // let any waiting threads know about change     return obj;   }   public synchronized Object[] removeAll() throws InterruptedException {     //     // You might want to code a more efficient     // implementation here ... (see ByteFIFO.java)     //     Object[] list = new Object[size]; // use the current size     for (int i = 0; i < list.length; i++) {       list[i] = remove();     }     // if FIFO was empty, a zero-length array is returned     return list;   }   public synchronized Object[] removeAtLeastOne() throws InterruptedException {     waitWhileEmpty(); // wait for a least one to be in FIFO     return removeAll();   }   public synchronized boolean waitUntilEmpty(long msTimeout)       throws InterruptedException {     if (msTimeout == 0L) {       waitUntilEmpty(); // use other method       return true;     }     // wait only for the specified amount of time     long endTime = System.currentTimeMillis() + msTimeout;     long msRemaining = msTimeout;     while (!isEmpty() && (msRemaining > 0L)) {       wait(msRemaining);       msRemaining = endTime - System.currentTimeMillis();     }     // May have timed out, or may have met condition,     // calc return value.     return isEmpty();   }   public synchronized void waitUntilEmpty() throws InterruptedException {     while (!isEmpty()) {       wait();     }   }   public synchronized void waitWhileEmpty() throws InterruptedException {     while (isEmpty()) {       wait();     }   }   public synchronized void waitUntilFull() throws InterruptedException {     while (!isFull()) {       wait();     }   }   public synchronized void waitWhileFull() throws InterruptedException {     while (isFull()) {       wait();     }   } }