ThreadPool.java

00001 
00026 package org.objectweb.jonas_ejb.container;
00027 
00028 import java.util.ArrayList;
00029 
00030 import org.objectweb.util.monolog.api.BasicLevel;
00031 
00038 public class ThreadPool {
00039 
00040     public static final int DEFAULT_POOL_MIN_SIZE = 1;
00041 
00042     public static final int DEFAULT_POOL_MAX_SIZE = 100;
00043 
00044     protected static int poolnumber = 0;
00045     private static int threadnumber = 0;
00046 
00047     protected int poolsz; // current size of thread pool
00048 
00049     protected int maxpoolsz; // max size of thread pool
00050     protected int minpoolsz; // min size of thread pool
00051 
00052     protected ArrayList runnablelist = null; // List of runnable ready to run
00053 
00054     protected boolean valid = true; // set to false when pool is removed.
00055     
00056     protected int freeThreads;  // free threads ready to work
00057 
00058     private static final int MAX_WAITING_TIME = 20000;  // 20 sec.
00059 
00063     public ThreadPool() {
00064         this(DEFAULT_POOL_MIN_SIZE, DEFAULT_POOL_MAX_SIZE);
00065     }
00066 
00071     public ThreadPool(int size) {
00072         this(size, size > DEFAULT_POOL_MAX_SIZE ? size : DEFAULT_POOL_MAX_SIZE);
00073     }
00074 
00080     public ThreadPool(int minsz, int maxsz) {
00081         TraceEjb.thread.log(BasicLevel.DEBUG, "minpoolsize= " + minsz + " maxpoolsize= " + maxsz);
00082         runnablelist = new ArrayList(minsz);
00083         poolnumber++;
00084         for (poolsz = 0; poolsz < minsz; poolsz++) {
00085             ServerThread st = new ServerThread(this, threadnumber++, poolnumber);
00086             st.start();
00087         }
00088         minpoolsz = minsz;
00089         maxpoolsz = maxsz;
00090     }
00091 
00096     public synchronized void addRunnable(Runnable runner) {
00097         TraceEjb.thread.log(BasicLevel.DEBUG, "");
00098         runnablelist.add(runner);
00099         // The policy here is to create 1 more thread when there are more
00100         // waiting runnable than free threads. 
00101         // Do not create more than maxpoolsz threads.
00102         if (TraceEjb.isDebugThread()) {
00103             TraceEjb.thread.log(BasicLevel.DEBUG, "runnablelist size = " + runnablelist.size() + " with " + freeThreads
00104                     + " free Server threads");
00105         }
00106         if (poolsz < maxpoolsz && runnablelist.size() > freeThreads) {
00107             poolsz++;
00108             ServerThread st = new ServerThread(this, threadnumber++, poolnumber);
00109             st.start();
00110         } else {
00111             // Wake up 1 thread if it's already waiting for a Runnable
00112             notify();
00113         }
00114     }
00115 
00119     public synchronized Runnable nextRunnable() {
00120         Runnable run = null;
00121         boolean haswait = false;
00122         while (runnablelist.isEmpty()) {
00123             try {
00124                 if ((haswait && freeThreads > minpoolsz) || !valid) {
00125                     poolsz--;
00126                     TraceEjb.thread.log(BasicLevel.DEBUG, "ending");
00127                     return null;
00128                 }
00129                 freeThreads++;
00130                 wait(MAX_WAITING_TIME);
00131                 freeThreads--;
00132                 haswait = true;
00133             } catch (InterruptedException ex) {
00134                 freeThreads--;
00135                 poolsz--;
00136                 TraceEjb.thread.log(BasicLevel.ERROR, "interrupted");
00137                 return null;
00138             }
00139         }
00140         run = (Runnable) runnablelist.remove(0);
00141         TraceEjb.thread.log(BasicLevel.DEBUG, "got a runnable");
00142         return run;
00143     }
00144 
00148     public synchronized void stopThreads() {
00149         valid = false;
00150         notifyAll();
00151         poolnumber--;
00152     }
00153     
00157     protected class ServerThread extends Thread {
00158 
00159         ThreadPool pool;
00160         int number;
00161 
00162         ServerThread(ThreadPool p, int num, int pinb) {
00163             pool = p;
00164             number = num;
00165             setName("ServerThread-" + pinb + "/" + num);
00166         }
00167 
00168         public void run() {
00169             if (TraceEjb.isDebugThread()) {
00170                 TraceEjb.thread.log(BasicLevel.DEBUG, getName() + " started");
00171             }
00172             while (true) {
00173                 try {
00174                     // get next runnable, or wait if empty list.
00175                     if (TraceEjb.isDebugThread()) {
00176                         TraceEjb.thread.log(BasicLevel.DEBUG, getName() + " waiting");
00177                     }
00178                     Runnable runner = pool.nextRunnable();
00179                     if (runner == null) {
00180                         TraceEjb.thread.log(BasicLevel.DEBUG, getName() + " ended");
00181                         return;
00182                     }
00183                     if (TraceEjb.isDebugThread()) {
00184                         TraceEjb.thread.log(BasicLevel.DEBUG, getName() + " running");
00185                     }
00186                     runner.run();
00187                 } catch (Exception ex) {
00188                 }
00189             }
00190         }
00191     }
00192 }
00193 

Generated on Tue Feb 15 15:05:43 2005 for JOnAS by  doxygen 1.3.9.1