00001
00026 package org.objectweb.jonas_ejb.container;
00027
00028 import java.util.ArrayList;
00029
00030 import org.objectweb.util.monolog.api.BasicLevel;
00031
00038 public class ThreadPool {
00039
00040 public static final int DEFAULT_POOL_MIN_SIZE = 1;
00041
00042 public static final int DEFAULT_POOL_MAX_SIZE = 100;
00043
00044 protected static int poolnumber = 0;
00045 private static int threadnumber = 0;
00046
00047 protected int poolsz;
00048
00049 protected int maxpoolsz;
00050 protected int minpoolsz;
00051
00052 protected ArrayList runnablelist = null;
00053
00054 protected boolean valid = true;
00055
00056 protected int freeThreads;
00057
00058 private static final int MAX_WAITING_TIME = 20000;
00059
00063 public ThreadPool() {
00064 this(DEFAULT_POOL_MIN_SIZE, DEFAULT_POOL_MAX_SIZE);
00065 }
00066
00071 public ThreadPool(int size) {
00072 this(size, size > DEFAULT_POOL_MAX_SIZE ? size : DEFAULT_POOL_MAX_SIZE);
00073 }
00074
00080 public ThreadPool(int minsz, int maxsz) {
00081 TraceEjb.thread.log(BasicLevel.DEBUG, "minpoolsize= " + minsz + " maxpoolsize= " + maxsz);
00082 runnablelist = new ArrayList(minsz);
00083 poolnumber++;
00084 for (poolsz = 0; poolsz < minsz; poolsz++) {
00085 ServerThread st = new ServerThread(this, threadnumber++, poolnumber);
00086 st.start();
00087 }
00088 minpoolsz = minsz;
00089 maxpoolsz = maxsz;
00090 }
00091
00096 public synchronized void addRunnable(Runnable runner) {
00097 TraceEjb.thread.log(BasicLevel.DEBUG, "");
00098 runnablelist.add(runner);
00099
00100
00101
00102 if (TraceEjb.isDebugThread()) {
00103 TraceEjb.thread.log(BasicLevel.DEBUG, "runnablelist size = " + runnablelist.size() + " with " + freeThreads
00104 + " free Server threads");
00105 }
00106 if (poolsz < maxpoolsz && runnablelist.size() > freeThreads) {
00107 poolsz++;
00108 ServerThread st = new ServerThread(this, threadnumber++, poolnumber);
00109 st.start();
00110 } else {
00111
00112 notify();
00113 }
00114 }
00115
00119 public synchronized Runnable nextRunnable() {
00120 Runnable run = null;
00121 boolean haswait = false;
00122 while (runnablelist.isEmpty()) {
00123 try {
00124 if ((haswait && freeThreads > minpoolsz) || !valid) {
00125 poolsz--;
00126 TraceEjb.thread.log(BasicLevel.DEBUG, "ending");
00127 return null;
00128 }
00129 freeThreads++;
00130 wait(MAX_WAITING_TIME);
00131 freeThreads--;
00132 haswait = true;
00133 } catch (InterruptedException ex) {
00134 freeThreads--;
00135 poolsz--;
00136 TraceEjb.thread.log(BasicLevel.ERROR, "interrupted");
00137 return null;
00138 }
00139 }
00140 run = (Runnable) runnablelist.remove(0);
00141 TraceEjb.thread.log(BasicLevel.DEBUG, "got a runnable");
00142 return run;
00143 }
00144
00148 public synchronized void stopThreads() {
00149 valid = false;
00150 notifyAll();
00151 poolnumber--;
00152 }
00153
00157 protected class ServerThread extends Thread {
00158
00159 ThreadPool pool;
00160 int number;
00161
00162 ServerThread(ThreadPool p, int num, int pinb) {
00163 pool = p;
00164 number = num;
00165 setName("ServerThread-" + pinb + "/" + num);
00166 }
00167
00168 public void run() {
00169 if (TraceEjb.isDebugThread()) {
00170 TraceEjb.thread.log(BasicLevel.DEBUG, getName() + " started");
00171 }
00172 while (true) {
00173 try {
00174
00175 if (TraceEjb.isDebugThread()) {
00176 TraceEjb.thread.log(BasicLevel.DEBUG, getName() + " waiting");
00177 }
00178 Runnable runner = pool.nextRunnable();
00179 if (runner == null) {
00180 TraceEjb.thread.log(BasicLevel.DEBUG, getName() + " ended");
00181 return;
00182 }
00183 if (TraceEjb.isDebugThread()) {
00184 TraceEjb.thread.log(BasicLevel.DEBUG, getName() + " running");
00185 }
00186 runner.run();
00187 } catch (Exception ex) {
00188 }
00189 }
00190 }
00191 }
00192 }
00193