Mega Code Archive

 
Categories / Java / Threads
 

Coordinates threads for multi-threaded operations

/* Copyright (C) 2005-2008 by Peter Eastman    This program is free software; you can redistribute it and/or modify it under the    terms of the GNU General Public License as published by the Free Software    Foundation; either version 2 of the License, or (at your option) any later version.    This program is distributed in the hope that it will be useful, but WITHOUT ANY     WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A     PARTICULAR PURPOSE.  See the GNU General Public License for more details. */ import java.util.concurrent.atomic.*; import java.util.*; /**  * This class coordinates threads for multi-threaded operations.  The execution model  * provided by this class is a single "task" (e.g. tracing a ray through a single pixel)  * which must be executed many times.  The task is parameterized by a single index  * (e.g. the column containing the pixel).  * <p>  * To use this class, pass it an object which implements the Task interface.  It  * automatically creates an appropriate number of threads based on the number of  * available processors.  When you call run(), the task is repeatedly executed by  * the worker threads, with the index running  * over the desired range.  You may invoke run() any number of times (e.g. once  * for each row of the image).  Finally, call finish() to clean up the worker threads.  */ public class ThreadManager {   private int numIndices;   private AtomicInteger nextIndex;   private Thread thread[];   private HashSet<Thread> waitingThreads;   private Task task;   private Object controller;   private boolean controllerWaiting;   /**    * Create a new uninitialized ThreadManager.  You must invoke setNumIndices() and setTask()    * to initialize it before calling run().    */   public ThreadManager()   {     this(0, null);   }   /**    * Create a new ThreadManager.    *    * @param numIndices      the number of values the index should take on (from 0 to numIndices-1)    * @param task            the task to perform    */   public ThreadManager(int numIndices, Task task)   {     this.numIndices = numIndices;     this.task = task;     nextIndex = new AtomicInteger(numIndices);     controller = new Object();     controllerWaiting = false;     waitingThreads = new HashSet<Thread>();   }   /**    * Create and start the worker threads.  This is invoked the first time run() is called.    */   private void createThreads()   {     // Create a worker thread for each processor.     thread = new Thread [Runtime.getRuntime().availableProcessors()];     for (int i = 0; i < thread.length; i++)     {       thread[i] = new Thread("Worker thread "+(i+1)) {         public void run()         {           // Repeatedly perform the task until we are finished.           while (true)           {             try             {               int index = nextIndex();               task.execute(index);             }             catch (InterruptedException ex)             {               task.cleanup();               return;             }             catch (Exception ex)             {               cancel();               ex.printStackTrace();             }           }         }       };       thread[i].start();     }   }   /**    * Set the number of values the index should take on.  This must be invoked from the same    * thread that instantiated the ThreadManager and that calls run().    */   public void setNumIndices(int numIndices)   {     this.numIndices = numIndices;     nextIndex.set(numIndices);   }   /**    * Set the Task to be executed by the worker threads.  If another Task has already been set,    * that one is discarded immediately and cleanup() will never be invoked on in.  This method    * must be invoked from the same thread that instantiated the ThreadManager and that calls run().    */   public void setTask(Task task)   {     this.task = task;   }   /**    * Perform the task the specified number of times.  This method blocks until all    * occurrences of the task are completed.  If the current thread is interrupted    * while this method is in progress, all of the worker threads will be interrupted    * and disposed of.    */   public void run()   {     controllerWaiting = false;     nextIndex.set(0);     waitingThreads.clear();     if (thread == null)       createThreads();     // Notify all the worker threads, then wait for them to finish.     synchronized (this)     {       notifyAll();     }     synchronized (controller)     {       try       {         controllerWaiting = true;         controller.wait();       }       catch (InterruptedException ex)       {         finish();       }     }   }   /**    * Cancel a run which is in progress.  Calling this method does not interrupt any tasks that    * are currently executing, but it prevents any more from being started until the next time    * run() is called.    */   public void cancel()   {     nextIndex.set(numIndices);   }   /**    * Dispose of all the worker threads.  Once this has been called, do not call run() again.    */   public void finish()   {     if (thread != null)       for (int i = 0; i < thread.length; i++)         thread[i].interrupt();   }   private int nextIndex() throws InterruptedException   {     int index;     while ((index = nextIndex.getAndIncrement()) >= numIndices)     {       // Wait until run() is called again.       synchronized (this)       {         waitingThreads.add(Thread.currentThread());         if (waitingThreads.size() == thread.length)         {           while (!controllerWaiting)             wait(1);           synchronized (controller)           {             controller.notify();           }         }         wait();       }     }     return index;   }   /**    * This interface defines a task to be performed by the worker threads.    */   public static interface Task   {     /**      * Execute the task for the specified index.      */     public void execute(int index);     /**      * This is called once from each worker thread when finish() is called.  It gives a chance      * to do any necessary cleanup.      */     public void cleanup();   } }