Mega Code Archive

 
Categories / Java Tutorial / Thread
 

Synchronized Queue with Producer and Consumer

public class ThreadTester {   public static void main(String[] args) {     SynchronizedQueue<String> queue = new SynchronizedQueue<String>(10);     final int GREETING_COUNT = 100;     Runnable run1 = new Producer("Hello, World!", queue, GREETING_COUNT);     Runnable run2 = new Producer("Goodbye, World!", queue, GREETING_COUNT);     Runnable run3 = new Consumer(queue, 2 * GREETING_COUNT);     Thread thread1 = new Thread(run1);     Thread thread2 = new Thread(run2);     Thread thread3 = new Thread(run3);     thread1.start();     thread2.start();     thread3.start();   } } class Producer implements Runnable {   private String greeting;   private SynchronizedQueue<String> queue;   private int greetingCount;   public Producer(String aGreeting, SynchronizedQueue<String> aQueue, int count) {     greeting = aGreeting;     queue = aQueue;     greetingCount = count;   }   public void run() {     try {       int i = 1;       while (i <= greetingCount) {         queue.add(i + ": " + greeting);         i++;         Thread.sleep(2000);       }     } catch (InterruptedException exception) {     }   } } class Consumer implements Runnable {   private SynchronizedQueue<String> queue;   private int greetingCount;   public Consumer(SynchronizedQueue<String> aQueue, int count) {     queue = aQueue;     greetingCount = count;   }   public void run() {     try {       int i = 1;       while (i <= greetingCount) {         String greeting = queue.remove();         System.out.println(greeting);         i++;         Thread.sleep(3000);       }     } catch (InterruptedException exception) {     }   } } class SynchronizedQueue<V> {   private Object[] elements;   private int head;   private int tail;   private int size;   public SynchronizedQueue(int capacity) {     elements = new Object[capacity];     head = 0;     tail = 0;     size = 0;   }   public synchronized V remove() throws InterruptedException {     while (size == 0)       wait();     V r = (V) elements[head];     head++;     size--;     if (head == elements.length)       head = 0;     notifyAll();     return r;   }   public synchronized void add(V newValue) throws InterruptedException {     while (size == elements.length)       wait();     elements[tail] = newValue;     tail++;     size++;     if (tail == elements.length)       tail = 0;     notifyAll();   } }