Pool.java

00001 
00026 package org.objectweb.jonas.dbm;
00027 
00028 import java.sql.SQLException;
00029 import java.util.HashMap;
00030 import java.util.Iterator;
00031 import java.util.LinkedList;
00032 import java.util.Map;
00033 
00034 import javax.sql.XAConnection;
00035 import javax.transaction.SystemException;
00036 import javax.transaction.Transaction;
00037 
00038 import org.objectweb.jonas.common.Log;
00039 import org.objectweb.jonas.jdbc_xa.ConnectionImpl;
00040 import org.objectweb.jonas.jdbc_xa.XADataSourceImpl;
00041 import org.objectweb.jonas.jtm.TransactionService;
00042 import org.objectweb.jonas.service.ServiceException;
00043 import org.objectweb.jonas.service.ServiceManager;
00044 import org.objectweb.transaction.jta.TransactionManager;
00045 import org.objectweb.util.monolog.api.BasicLevel;
00046 import org.objectweb.util.monolog.api.Logger;
00047 
00064 public class Pool {
00065 
00066     // ----------------------------------------------------------------
00067     // Private data
00068     // ----------------------------------------------------------------
00069 
00073     private static Logger logger = null;
00074 
00078     private TransactionManager tm = null;
00079 
00084     private PoolKeeper poolKeeper;
00085 
00090     private Map xac2item = new HashMap();
00091 
00097     private LinkedList freeList = new LinkedList();
00098 
00103     private Map tx2item = new HashMap();
00104 
00108     private ConnectionManager cmgr;
00109 
00113     private XADataSourceImpl xads;
00114 
00118     private int waiterCount = 0;
00119 
00123     private long waitingTime = 0;
00124 
00128     private int busyMax = 0;
00129 
00133     private int busyMin = 0;
00134 
00135     // ----------------------------------------------------------------
00136     // Configuration
00137     // Each attibute have setter and getter, plus a default value.
00138     // ----------------------------------------------------------------
00139 
00143     private static final int NO_LIMIT = 99999;
00144 
00148     private static final long ONE_DAY = 1440L * 60L * 1000L;
00149 
00154     private static final int MAX_REMOVE_FREELIST = 10;
00155 
00159     private int poolMin = 0;
00160 
00164     public int getPoolMin() {
00165         return poolMin;
00166     }
00167 
00171     public synchronized void setPoolMin(int min) {
00172         if (poolMin != min) {
00173             poolMin = min;
00174             adjust();
00175         }
00176     }
00177 
00182     private int poolMax = NO_LIMIT;
00183 
00187     public int getPoolMax() {
00188         return poolMax;
00189     }
00190 
00194     public synchronized void setPoolMax(int max) {
00195         if (poolMax != max) {
00196             if (max < 0 || max > NO_LIMIT) {
00197                 if (currentWaiters > 0) {
00198                     notify();
00199                 }
00200                 poolMax = NO_LIMIT;
00201             } else {
00202                 if (currentWaiters > 0 && poolMax < max) {
00203                     notify();
00204                 }
00205                 poolMax = max;
00206                 adjust();
00207             }
00208         }
00209     }
00210 
00216     private long maxAge = ONE_DAY;
00217 
00221     private int maxAgeMn;
00222 
00226     public int getMaxAge() {
00227         return maxAgeMn;
00228     }
00229 
00233     public long getMaxAgeMilli() {
00234         return maxAge;
00235     }
00236 
00240     public void setMaxAge(int mn) {
00241         maxAgeMn = mn;
00242         // set times in milliseconds
00243         maxAge = mn * 60L * 1000L;
00244     }
00245 
00249     private long maxOpenTime = ONE_DAY;
00250 
00254     private int maxOpenTimeMn;
00255 
00259     public int getMaxOpenTime() {
00260         return maxOpenTimeMn;
00261     }
00262 
00266     public long getMaxOpenTimeMilli() {
00267         return maxOpenTime;
00268     }
00269 
00273     public void setMaxOpenTime(int mn) {
00274         maxOpenTimeMn = mn;
00275         // set times in milliseconds
00276         maxOpenTime = mn * 60L * 1000L;
00277     }
00278 
00282     private long waiterTimeout = 10000;
00283 
00287     public int getMaxWaitTime() {
00288         return (int) (waiterTimeout / 1000L);
00289     }
00290 
00294     public void setMaxWaitTime(int sec) {
00295         waiterTimeout = sec * 1000L;
00296     }
00297 
00301     private int maxWaiters = 1000;
00302 
00306     public int getMaxWaiters() {
00307         return maxWaiters;
00308     }
00309 
00313     public void setMaxWaiters(int nb) {
00314         maxWaiters = nb;
00315     }
00316 
00320     private int samplingPeriod = 60; //default sampling period
00321 
00325     public int getSamplingPeriod() {
00326         return samplingPeriod;
00327     }
00328 
00332     public void setSamplingPeriod(int sec) {
00333         if (sec > 0) {
00334             samplingPeriod = sec;
00335             poolKeeper.setSamplingPeriod(sec);
00336         }
00337     }
00338 
00347     private int checkLevel = 1; // default = 1
00348 
00352     public int getCheckLevel() {
00353         return checkLevel;
00354     }
00355 
00359     public void setCheckLevel(int level) {
00360         checkLevel = level;
00361     }
00362 
00366     private String testStatement;
00367 
00371     public String getTestStatement() {
00372         return testStatement;
00373     }
00374 
00378     public void setTestStatement(String s) {
00379         testStatement = s;
00380     }
00381 
00382     // ----------------------------------------------------------------
00383     // Monitoring Attributes
00384     // Each attribute should have a get accessor.
00385     // ----------------------------------------------------------------
00386 
00390     private int busyMaxRecent = 0;
00391 
00395     public int getBusyMaxRecent() {
00396         return busyMaxRecent;
00397     }
00398 
00402     private int busyMinRecent = 0;
00403 
00407     public int getBusyMinRecent() {
00408         return busyMinRecent;
00409     }
00410 
00414     private int currentWaiters = 0;
00415 
00419     public int getCurrentWaiters() {
00420         return currentWaiters;
00421     }
00422 
00426     private int openedCount = 0;
00427 
00431     public int getOpenedCount() {
00432         return openedCount;
00433     }
00434 
00438     private int connectionFailures = 0;
00439 
00443     public int getConnectionFailures() {
00444         return connectionFailures;
00445     }
00446 
00452     private int connectionLeaks = 0;
00453 
00457     public int getConnectionLeaks() {
00458         return connectionLeaks;
00459     }
00460 
00464     private int servedOpen = 0;
00465 
00469     public int getServedOpen() {
00470         return servedOpen;
00471     }
00472 
00476     private int rejectedFull = 0;
00477 
00481     public int getRejectedFull() {
00482         return rejectedFull;
00483     }
00484 
00488     private int rejectedTimeout = 0;
00489 
00493     public int getRejectedTimeout() {
00494         return rejectedTimeout;
00495     }
00496 
00500     private int rejectedOther = 0;
00501 
00505     public int getRejectedOther() {
00506         return rejectedOther;
00507     }
00508 
00512     public int getRejectedOpen() {
00513         return rejectedFull + rejectedTimeout + rejectedOther;
00514     }
00515 
00519     private int waitersHigh = 0;
00520 
00524     public int getWaitersHigh() {
00525         return waitersHigh;
00526     }
00527 
00531     private int waitersHighRecent = 0;
00532 
00536     public int getWaitersHighRecent() {
00537         return waitersHighRecent;
00538     }
00539 
00543     private int totalWaiterCount = 0;
00544 
00548     public int getWaiterCount() {
00549         return totalWaiterCount;
00550     }
00551 
00555     private long totalWaitingTime = 0;
00556 
00560     public long getWaitingTime() {
00561         return totalWaitingTime;
00562     }
00563 
00567     private long waitingHigh = 0;
00568 
00572     public long getWaitingHigh() {
00573         return waitingHigh;
00574     }
00575 
00579     private long waitingHighRecent = 0;
00580 
00584     public long getWaitingHighRecent() {
00585         return waitingHighRecent;
00586     }
00587 
00588     // ----------------------------------------------------------------
00589     // Constructors
00590     // ----------------------------------------------------------------
00591 
00600     public Pool(ConnectionManager cmgr, XADataSourceImpl xads) throws Exception {
00601         logger = Log.getLogger(Log.JONAS_DBM_PREFIX);
00602         this.cmgr = cmgr;
00603         this.xads = xads;
00604         try {
00605             // Modified for use with the gcj compiler
00606             ServiceManager sm = ServiceManager.getInstance();
00607             TransactionService ts = (TransactionService) sm.getTransactionService();
00608             tm = ts.getTransactionManager();
00609         } catch (Exception e) {
00610             throw e;
00611         }
00612         poolKeeper = new PoolKeeper(this);
00613         poolKeeper.start();
00614     }
00615 
00616     // ----------------------------------------------------------------
00617     // Public Methods
00618     // ----------------------------------------------------------------
00619 
00623     public int getCurrentOpened() {
00624         return xac2item.size();
00625     }
00626 
00630     public int getCurrentBusy() {
00631         return xac2item.size() - freeList.size();
00632     }
00633 
00637     public void recomputeBusy() {
00638         int busy = getCurrentBusy();
00639         if (busyMax < busy) {
00640             busyMax = busy;
00641         }
00642         if (busyMin > busy) {
00643             busyMin = busy;
00644         }
00645     }
00646 
00650     public int getCurrentInTx() {
00651         return tx2item.size();
00652     }
00653 
00661     public synchronized PoolItem openConnection(String user, Transaction tx) throws SQLException {
00662         PoolItem item = null;
00663 
00664         // If a Connection exists already for this tx, just return it.
00665         // If no transaction, never reuse a connection already used.
00666         if (tx != null) {
00667             item = (PoolItem) tx2item.get(tx);
00668             if (item != null) {
00669                 logger.log(BasicLevel.DEBUG, "Reuse a Connection for same tx");
00670                 item.open();
00671                 servedOpen++;
00672                 return item;
00673             }
00674         }
00675 
00676         // Loop until a valid item is found
00677         long timetowait = waiterTimeout;
00678         long starttime = 0;
00679         while (item == null) {
00680             // try to find an item in the free list
00681             if (freeList.isEmpty()) {
00682                 // In case the pool is full, we must wait until a connection is released.
00683                 if (xac2item.size() >= poolMax) {
00684                     boolean stoplooping = true;
00685                     // If a timeout has been specified, wait, unless maxWaiters is reached.
00686                     if (timetowait > 0) {
00687                         if (currentWaiters < maxWaiters) {
00688                             currentWaiters++;
00689                             // Store the maximum concurrent waiters
00690                             if (waiterCount < currentWaiters) {
00691                                 waiterCount = currentWaiters;
00692                             }
00693                             if (starttime == 0) {
00694                                 starttime = System.currentTimeMillis();
00695                                 logger.log(BasicLevel.DEBUG, "Wait for a free Connection" + xac2item.size());
00696                             }
00697                             try {
00698                                 wait(timetowait);
00699                             } catch (InterruptedException ign) {
00700                                 logger.log(BasicLevel.WARN, "Interrupted");
00701                             } finally {
00702                                 currentWaiters--;
00703                             }
00704                             long stoptime = System.currentTimeMillis();
00705                             long stillwaited = stoptime - starttime;
00706                             timetowait = waiterTimeout - stillwaited;
00707                             stoplooping = (timetowait <= 0);
00708                             if (stoplooping) {
00709                                 // We have been waked up by the timeout.
00710                                 totalWaiterCount++;
00711                                 totalWaitingTime += stillwaited;
00712                                 if (waitingTime < stillwaited) {
00713                                     waitingTime = stillwaited;
00714                                 }
00715                             } else {
00716                                 if (!freeList.isEmpty() || xac2item.size() < poolMax) {
00717                                     // We have been notified by a connection released.
00718                                     logger.log(BasicLevel.DEBUG, "Notified after " + stillwaited);
00719                                     totalWaiterCount++;
00720                                     totalWaitingTime += stillwaited;
00721                                     if (waitingTime < stillwaited) {
00722                                         waitingTime = stillwaited;
00723                                     }
00724                                 }
00725                                 continue;
00726                             }
00727                         }
00728                     }
00729                     if (stoplooping && freeList.isEmpty() && xac2item.size() >= poolMax) {
00730                         if (starttime > 0) {
00731                             rejectedTimeout++;
00732                             logger.log(BasicLevel.WARN, "Cannot create a Connection - timeout");
00733                         } else {
00734                             rejectedFull++;
00735                             logger.log(BasicLevel.WARN, "Cannot create a Connection");
00736                         }
00737                         throw new SQLException("No more connections in " + cmgr.getDatasourceName());
00738                     }
00739                     continue;
00740                 }
00741                 logger.log(BasicLevel.DEBUG, "empty free list: Create a new Connection");
00742                 XAConnection xac = null;
00743                 try {
00744                     // create a new XA Connection
00745                     xac = xads.getXAConnection();
00746                     openedCount++;
00747                 } catch (SQLException e) {
00748                     connectionFailures++;
00749                     rejectedOther++;
00750                     logger.log(BasicLevel.WARN, "Cannot create new Connection for tx");
00751                     throw e;
00752                 }
00753                 item = new PoolItem(this, xac, user);
00754                 // Register the connection manager as a ConnectionEventListener
00755                 xac.addConnectionEventListener(cmgr);
00756                 xac2item.put(xac, item);
00757             } else {
00758                 item = (PoolItem) freeList.removeFirst();
00759 
00760                 // Check the connection before reusing it
00761                 if (checkLevel > 0) {
00762                     try {
00763                         ConnectionImpl conn = (ConnectionImpl) item.getXACon().getConnection();
00764                         if (conn.isPhysicallyClosed()) {
00765                             logger.log(BasicLevel.WARN, "The JDBC connection has been closed!");
00766                             destroyItem(item);
00767                             starttime = 0;
00768                             item = null;
00769                             continue;
00770                         }
00771                         if (checkLevel > 1) {
00772                             java.sql.Statement stmt = conn.createStatement();
00773                             stmt.execute(testStatement);
00774                             stmt.close();
00775                         }
00776                     } catch (Exception e) {
00777                         logger.log(BasicLevel.ERROR, "DataSource " + cmgr.getDatasourceName()
00778                                    + " error: removing invalid item:" + e);
00779                         destroyItem(item);
00780                         starttime = 0;
00781                         item = null;
00782                         continue;
00783                     }
00784                 }
00785             }
00786         }
00787         recomputeBusy();
00788         item.setTx(tx);
00789         if (tx == null) {
00790             logger.log(BasicLevel.DEBUG, "Got a Connection - no TX: ");
00791         } else {
00792             logger.log(BasicLevel.DEBUG, "Got a Connection for TX: ");
00793             // register synchronization
00794             try {
00795                 tx.registerSynchronization(item);
00796                 tx2item.put(tx, item);  // only if registerSynchronization was OK.
00797             } catch (javax.transaction.RollbackException e) {
00799                 logger.log(BasicLevel.WARN, "DataSource " + cmgr.getDatasourceName()
00800                            + " error: Pool item registered, but tx is rollback only");
00801             } catch (javax.transaction.SystemException e) {
00802                 logger.log(BasicLevel.ERROR, "DataSource " + cmgr.getDatasourceName()
00803                            + " error in pool: system exception from transaction manager " + e.getMessage());
00804             } catch (IllegalStateException e) {
00805                 // In case transaction has already committed, do as if no tx.
00806                 logger.log(BasicLevel.WARN, "Got a Connection - committed TX: ");
00807                 item.setTx(null);
00808             }
00809         }
00810         item.open();
00811         servedOpen++;
00812         return item;
00813     }
00814 
00820     public synchronized void freeConnections(Transaction tx) {
00821         if (logger.isLoggable(BasicLevel.DEBUG)) {
00822             logger.log(BasicLevel.DEBUG, "free connection for Tx = " + tx);
00823         }
00824         PoolItem item = (PoolItem) tx2item.remove(tx);
00825         if (item == null) {
00826             logger.log(BasicLevel.ERROR, "pool: no connection found to free for Tx = " + tx);
00827             return;
00828         }
00829         item.setTx(null);
00830         if (item.isOpen()) {
00831             // Connection may be not closed in case of stateful session bean that
00832             // keeps connection to be reused in another method of the bean.
00833             logger.log(BasicLevel.WARN, "WARNING: Connection not closed by caller");
00834             return;
00835         }
00836         freeItem(item);
00837     }
00838 
00847     public synchronized PoolItem closeConnection(XAConnection xac, int flag) {
00848         PoolItem item = (PoolItem) xac2item.get(xac);
00849         if (item == null) {
00850             logger.log(BasicLevel.ERROR, "Pool: no item for this xac!");
00851             return null;
00852         }
00853         // The connection will be available only if not associated
00854         // to a transaction. Else, it will be reusable only for the
00855         // same transaction.
00856         if (!item.close()) {
00857             return null;
00858         }
00859         if (item.getTx() != null) {
00860             logger.log(BasicLevel.DEBUG, "keep the Connection for this tx");
00861         } else {
00862             freeItem(item);
00863         }
00864 
00865         // delist Resource if in transaction
00866         Transaction tx = null;
00867         try {
00868             tx = tm.getTransaction();
00869         } catch (NullPointerException n) {
00870             // current is null: we are not in JOnAS Server.
00871             logger.log(BasicLevel.ERROR, "Pool: should not be used outside a JOnAS Server");
00872         } catch (SystemException e) {
00873             logger.log(BasicLevel.ERROR, "Pool: getTransaction failed:" + e);
00874         }
00875         if (tx != null && item.isClosed()) {
00876             try {
00877                 tx.delistResource(xac.getXAResource(), flag);
00878             } catch (Exception e) {
00879                 logger.log(BasicLevel.ERROR, "Pool: Exception while delisting resource:" + e);
00880             }
00881         }
00882 
00883         return item;
00884     }
00885 
00889     public synchronized void sampling() {
00890         waitingHighRecent = waitingTime;
00891         if (waitingHigh < waitingTime) {
00892             waitingHigh = waitingTime;
00893         }
00894         waitingTime = 0;
00895 
00896         waitersHighRecent = waiterCount;
00897         if (waitersHigh < waiterCount) {
00898             waitersHigh = waiterCount;
00899         }
00900         waiterCount = 0;
00901 
00902         busyMaxRecent = busyMax;
00903         busyMax = getCurrentBusy();
00904         busyMinRecent = busyMin;
00905         busyMin = getCurrentBusy();
00906     }
00907 
00912     public synchronized void adjust() {
00913         logger.log(BasicLevel.DEBUG, "");
00914 
00915         // Remove max aged elements in freelist
00916         // - Not more than MAX_REMOVE_FREELIST
00917         // - Don't reduce pool size less than poolMin
00918         int count = xac2item.size() - poolMin;
00919         if (count > 0) {
00920             if (count > MAX_REMOVE_FREELIST) {
00921                 count = MAX_REMOVE_FREELIST;
00922             }
00923             for (Iterator i = freeList.iterator(); i.hasNext();) {
00924                 PoolItem item = (PoolItem) i.next();
00925                 if (item.isAged()) {
00926                     logger.log(BasicLevel.DEBUG, "remove a timed out connection");
00927                     i.remove();
00928                     destroyItem(item);
00929                     count--;
00930                     if (count <= 0) {
00931                         break;
00932                     }
00933                 }
00934             }
00935         }
00936         recomputeBusy();
00937 
00938         // Close (physically) connections lost (opened for too long time)
00939         for (Iterator i = xac2item.values().iterator(); i.hasNext();) {
00940             PoolItem item = (PoolItem) i.next();
00941             if (item.inactive()) {
00942                 logger.log(BasicLevel.WARN, "close a timed out open connection");
00943                 i.remove();
00944                 // destroy item
00945                 item.remove();
00946                 connectionLeaks++;
00947                 // Notify 1 thread waiting for a Connection.
00948                 if (currentWaiters > 0) {
00949                     notify();
00950                 }
00951             }
00952         }
00953 
00954         // Shrink the pool in case of max pool size
00955         // This occurs when max pool size has been reduced by jonas admin.
00956         if (poolMax != NO_LIMIT) {
00957             while (freeList.size() > poolMin && xac2item.size() > poolMax) {
00958                 PoolItem item = (PoolItem) freeList.removeFirst();
00959                 destroyItem(item);
00960             }
00961         }
00962         recomputeBusy();
00963 
00964         // Recreate more Connections while poolMin is not reached
00965         while (xac2item.size() < poolMin) {
00966             XAConnection xac = null;
00967             try {
00968                 xac = xads.getXAConnection();
00969                 openedCount++;
00970             } catch (SQLException e) {
00971                 throw new ServiceException("Could not create " + poolMin + " items in the pool : " + e.toString());
00972             }
00973             // tx = null. Assumes maxage already configured.
00974             logger.log(BasicLevel.DEBUG, "Create a new available Connection");
00975             PoolItem item = new PoolItem(this, xac, null);
00976             freeList.addLast(item);
00977             xac2item.put(xac, item);
00978             xac.addConnectionEventListener(cmgr);
00979         }
00980     }
00981 
00985     public synchronized void closeAllConnections() {
00986         logger.log(BasicLevel.DEBUG, "");
00987 
00988         // Stop the pool keeper, since all connections will be closed.
00989         poolKeeper.stop();
00990 
00991         // Close physically all connections
00992         Iterator it = xac2item.keySet().iterator();
00993         try {
00994             while (it.hasNext()) {
00995                 XAConnection xac = (XAConnection) it.next();
00996                 xac.close();
00997             }
00998         } catch (java.sql.SQLException e) {
00999             logger.log(BasicLevel.ERROR, "Error while closing a Connection:" + e);
01000         }
01001     }
01002 
01003     // ----------------------------------------------------------------
01004     // Private Methods
01005     // ----------------------------------------------------------------
01006 
01011     private void freeItem(PoolItem item) {
01012         // Add it to the free list
01013         // Even if maxage is reached, because we avoids going under min pool size.
01014         // PoolKeeper will manage aged connections.
01015         freeList.addLast(item);
01016         if (logger.isLoggable(BasicLevel.DEBUG)) {
01017             logger.log(BasicLevel.DEBUG, "item added to freeList: " + freeList.size());
01018         }
01019         // Notify 1 thread waiting for a Connection.
01020         if (currentWaiters > 0) {
01021             notify();
01022         }
01023         recomputeBusy();
01024     }
01025 
01030     private void destroyItem(PoolItem item) {
01031         logger.log(BasicLevel.DEBUG, "remove an item in xac2item");
01032         xac2item.remove(item.getXACon());
01033         item.remove();
01034         // Notify 1 thread waiting for a Connection.
01035         if (currentWaiters > 0) {
01036             notify();
01037         }
01038         recomputeBusy();
01039     }
01040 
01041 }

Generated on Tue Feb 15 15:05:15 2005 for JOnAS by  doxygen 1.3.9.1