Mega Code Archive

 
Categories / Java / Threads
 

Simple thread pool A task is executed by obtaining a thread from the pool

/*  * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *   http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing,  * software distributed under the License is distributed on an  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  * KIND, either express or implied.  See the License for the  * specific language governing permissions and limitations  * under the License.      */ import java.util.ArrayList; import java.util.List; /** Simple thread pool. A task is executed by obtaining a thread from  * the pool  */ public class ThreadPool {   /** The thread pool contains instances of {@link ThreadPool.Task}.    */   public interface Task {     /** Performs the task.      * @throws Throwable The task failed, and the worker thread won't be used again.      */     void run() throws Throwable;   }     /** A task, which may be interrupted, if the pool is shutting down.       */     public interface InterruptableTask extends Task {         /** Interrupts the task.          * @throws Throwable Shutting down the task failed.          */         void shutdown() throws Throwable;     }     private class Poolable {         private boolean shuttingDown;         private Task task;         private Thread thread;         Poolable(ThreadGroup pGroup, int pNum) {             thread = new Thread(pGroup, pGroup.getName() + "-" + pNum){                 public void run() {                     while (!isShuttingDown()) {                         final Task t = getTask();                         if (t == null) {                             try {                                 synchronized (this) {                                     if (!isShuttingDown()  &&  getTask() == null) {                                         wait();                                     }                                 }                             } catch (InterruptedException e) {                                 // Do nothing                             }                         } else {                             try {                                 t.run();                                 resetTask();                                 repool(Poolable.this);                             } catch (Throwable e) {                                 discard(Poolable.this);                                 resetTask();                             }                         }                     }                 }             };             thread.start();         }         synchronized void shutdown() {             shuttingDown = true;             final Task t = getTask();             if (t != null  &&  t instanceof InterruptableTask) {                 try {                     ((InterruptableTask) t).shutdown();                 } catch (Throwable th) {                     // Ignore me                 }             }             task = null;             synchronized (thread) {                 thread.notify();             }         }         private synchronized boolean isShuttingDown() { return shuttingDown; }         String getName() { return thread.getName(); }         private synchronized Task getTask() {             return task;         }         private synchronized void resetTask() {             task = null;         }         synchronized void start(Task pTask) {             task = pTask;             synchronized (thread) {                 thread.notify();             }         }     }   private final ThreadGroup threadGroup;   private final int maxSize;   private final List waitingThreads = new ArrayList();   private final List runningThreads = new ArrayList();   private final List waitingTasks = new ArrayList();   private int num;   /** Creates a new instance.    * @param pMaxSize Maximum number of concurrent threads.    * @param pName Thread group name.    */   public ThreadPool(int pMaxSize, String pName) {     maxSize = pMaxSize;     threadGroup = new ThreadGroup(pName);   }   synchronized void discard(Poolable pPoolable) {     pPoolable.shutdown();         runningThreads.remove(pPoolable);         waitingThreads.remove(pPoolable);   }   synchronized void repool(Poolable pPoolable) {         if (runningThreads.remove(pPoolable)) {             if (maxSize != 0  &&  runningThreads.size() + waitingThreads.size() >= maxSize) {                 discard(pPoolable);             } else {                 waitingThreads.add(pPoolable);                 if (waitingTasks.size() > 0) {                     Task task = (Task) waitingTasks.remove(waitingTasks.size() - 1);                     startTask(task);                 }             }         } else {             discard(pPoolable);         }   }   /** Starts a task immediately.    * @param pTask The task being started.    * @return True, if the task could be started immediately. False, if    * the maxmimum number of concurrent tasks was exceeded. If so, you    * might consider to use the {@link #addTask(ThreadPool.Task)} method instead.    */   public synchronized boolean startTask(Task pTask) {     if (maxSize != 0  &&  runningThreads.size() >= maxSize) {       return false;     }         Poolable poolable;     if (waitingThreads.size() > 0) {         poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);     } else {             poolable = new Poolable(threadGroup, num++);     }     runningThreads.add(poolable);         poolable.start(pTask);     return true;   }   /** Adds a task for immediate or deferred execution.    * @param pTask The task being added.    * @return True, if the task was started immediately. False, if    * the task will be executed later.    */   public synchronized boolean addTask(Task pTask) {     if (startTask(pTask)) {       return true;     }     waitingTasks.add(pTask);     return false;   }   /** Closes the pool.    */   public synchronized void shutdown() {         while (!waitingThreads.isEmpty()) {             Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);             poolable.shutdown();         }         while (!runningThreads.isEmpty()) {             Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size()-1);             poolable.shutdown();         }   }   /** Returns the maximum number of concurrent threads.    * @return Maximum number of threads.    */   public int getMaxThreads() { return maxSize; }   /** Returns the number of threads, which have actually been created,      * as opposed to the number of currently running threads.    */     public synchronized int getNumThreads() { return num; } }