Mega Code Archive

 
Categories / Java / Threads
 

Byte FIFO

import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class ByteFIFOTest extends Object {   private ByteFIFO fifo;   private byte[] srcData;   public ByteFIFOTest() throws IOException {     fifo = new ByteFIFO(20);     makeSrcData();     System.out.println("srcData.length=" + srcData.length);     Runnable srcRunnable = new Runnable() {         public void run() {           src();         }       };     Thread srcThread = new Thread(srcRunnable);     srcThread.start();     Runnable dstRunnable = new Runnable() {         public void run() {           dst();         }       };     Thread dstThread = new Thread(dstRunnable);     dstThread.start();   }   private void makeSrcData() throws IOException {     String[] list = {         "The first string is right here",         "The second string is a bit longer and also right here",         "The third string",         "ABCDEFGHIJKLMNOPQRSTUVWXYZ",         "0123456789",         "The last string in the list"       };     ByteArrayOutputStream baos = new ByteArrayOutputStream();     ObjectOutputStream oos = new ObjectOutputStream(baos);     oos.writeObject(list);     oos.flush();     oos.close();          srcData = baos.toByteArray();   }   private void src() {     try {       boolean justAddOne = true;       int count = 0;       while ( count < srcData.length ) {         if ( !justAddOne ) {           int writeSize = (int) ( 40.0 * Math.random() );           writeSize = Math.min(writeSize, srcData.length - count);           byte[] buf = new byte[writeSize];           System.arraycopy(srcData, count, buf, 0, writeSize);           fifo.add(buf);           count += writeSize;           System.out.println("just added " + writeSize + " bytes");         } else {           fifo.add(srcData[count]);           count++;           System.out.println("just added exactly 1 byte");         }         justAddOne = !justAddOne;       }     } catch ( InterruptedException x ) {       x.printStackTrace();     }   }   private void dst() {     try {       boolean justAddOne = true;       int count = 0;       byte[] dstData = new byte[srcData.length];       while ( count < dstData.length ) {         if ( !justAddOne ) {           byte[] buf = fifo.removeAll();           if ( buf.length > 0 ) {             System.arraycopy(buf, 0, dstData, count, buf.length);             count += buf.length;           }           System.out.println(             "just removed " + buf.length + " bytes");         } else {           byte b = fifo.remove();           dstData[count] = b;           count++;           System.out.println(             "just removed exactly 1 byte");         }         justAddOne = !justAddOne;       }       System.out.println("received all data, count=" + count);       ObjectInputStream ois = new ObjectInputStream(           new ByteArrayInputStream(dstData));       String[] line = (String[]) ois.readObject();       for ( int i = 0; i < line.length; i++ ) {         System.out.println("line[" + i + "]=" + line[i]);       }     } catch ( ClassNotFoundException x1 ) {       x1.printStackTrace();     } catch ( IOException iox ) {       iox.printStackTrace();     } catch ( InterruptedException x ) {       x.printStackTrace();     }   }   public static void main(String[] args) {     try {       new ByteFIFOTest();     } catch ( IOException iox ) {       iox.printStackTrace();     }   } } class ByteFIFO extends Object {   private byte[] queue;   private int capacity;   private int size;   private int head;   private int tail;   public ByteFIFO(int cap) {     capacity = ( cap > 0 ) ? cap : 1; // at least 1     queue = new byte[capacity];     head = 0;     tail = 0;     size = 0;   }   public int getCapacity() {     return capacity;   }   public synchronized int getSize() {     return size;   }   public synchronized boolean isEmpty() {     return ( size == 0 );   }   public synchronized boolean isFull() {     return ( size == capacity );   }   public synchronized void add(byte b)        throws InterruptedException {     waitWhileFull();     queue[head] = b;     head = ( head + 1 ) % capacity;     size++;     notifyAll(); // let any waiting threads know about change   }   public synchronized void add(byte[] list)        throws InterruptedException {     // For efficiency, the bytes are copied in blocks     // instead of one at a time. As space becomes available,     // more bytes are copied until all of them have been     // added.     int ptr = 0;     while ( ptr < list.length ) {       // If full, the lock will be released to allow        // another thread to come in and remove bytes.       waitWhileFull();       int space = capacity - size;       int distToEnd = capacity - head;       int blockLen = Math.min(space, distToEnd);       int bytesRemaining = list.length - ptr;       int copyLen = Math.min(blockLen, bytesRemaining);       System.arraycopy(list, ptr, queue, head, copyLen);       head = ( head + copyLen ) % capacity;       size += copyLen;       ptr += copyLen;       // Keep the lock, but let any waiting threads        // know that something has changed.       notifyAll();     }   }   public synchronized byte remove()        throws InterruptedException {     waitWhileEmpty();          byte b = queue[tail];     tail = ( tail + 1 ) % capacity;     size--;     notifyAll(); // let any waiting threads know about change     return b;   }   public synchronized byte[] removeAll() {     // For efficiency, the bytes are copied in blocks     // instead of one at a time.      if ( isEmpty() ) {       // Nothing to remove, return a zero-length       // array and do not bother with notification       // since nothing was removed.       return new byte[0];      }     // based on the current size     byte[] list = new byte[size];      // copy in the block from tail to the end     int distToEnd = capacity - tail;     int copyLen = Math.min(size, distToEnd);     System.arraycopy(queue, tail, list, 0, copyLen);     // If data wraps around, copy the remaining data     // from the front of the array.     if ( size > copyLen ) {       System.arraycopy(           queue, 0, list, copyLen, size - copyLen);     }     tail = ( tail + size ) % capacity;     size = 0; // everything has been removed     // Signal any and all waiting threads that      // something has changed.     notifyAll();      return list;    }   public synchronized byte[] removeAtLeastOne()        throws InterruptedException {     waitWhileEmpty(); // wait for a least one to be in FIFO     return removeAll();   }   public synchronized boolean waitUntilEmpty(long msTimeout)        throws InterruptedException {     if ( msTimeout == 0L ) {       waitUntilEmpty();  // use other method       return true;     }     // wait only for the specified amount of time     long endTime = System.currentTimeMillis() + msTimeout;     long msRemaining = msTimeout;     while ( !isEmpty() && ( msRemaining > 0L ) ) {       wait(msRemaining);       msRemaining = endTime - System.currentTimeMillis();     }     // May have timed out, or may have met condition,      // calc return value.     return isEmpty();   }   public synchronized void waitUntilEmpty()        throws InterruptedException {     while ( !isEmpty() ) {       wait();     }   }   public synchronized void waitWhileEmpty()        throws InterruptedException {     while ( isEmpty() ) {       wait();     }   }   public synchronized void waitUntilFull()        throws InterruptedException {     while ( !isFull() ) {       wait();     }   }   public synchronized void waitWhileFull()        throws InterruptedException {     while ( isFull() ) {       wait();     }   } }