00001
00026 package org.objectweb.jonas.dbm;
00027
00028 import java.sql.SQLException;
00029 import java.util.HashMap;
00030 import java.util.Iterator;
00031 import java.util.LinkedList;
00032 import java.util.Map;
00033
00034 import javax.sql.XAConnection;
00035 import javax.transaction.SystemException;
00036 import javax.transaction.Transaction;
00037
00038 import org.objectweb.jonas.common.Log;
00039 import org.objectweb.jonas.jdbc_xa.ConnectionImpl;
00040 import org.objectweb.jonas.jdbc_xa.XADataSourceImpl;
00041 import org.objectweb.jonas.jtm.TransactionService;
00042 import org.objectweb.jonas.service.ServiceException;
00043 import org.objectweb.jonas.service.ServiceManager;
00044 import org.objectweb.transaction.jta.TransactionManager;
00045 import org.objectweb.util.monolog.api.BasicLevel;
00046 import org.objectweb.util.monolog.api.Logger;
00047
00064 public class Pool {
00065
00066
00067
00068
00069
00073 private static Logger logger = null;
00074
00078 private TransactionManager tm = null;
00079
00084 private PoolKeeper poolKeeper;
00085
00090 private Map xac2item = new HashMap();
00091
00097 private LinkedList freeList = new LinkedList();
00098
00103 private Map tx2item = new HashMap();
00104
00108 private ConnectionManager cmgr;
00109
00113 private XADataSourceImpl xads;
00114
00118 private int waiterCount = 0;
00119
00123 private long waitingTime = 0;
00124
00128 private int busyMax = 0;
00129
00133 private int busyMin = 0;
00134
00135
00136
00137
00138
00139
00143 private static final int NO_LIMIT = 99999;
00144
00148 private static final long ONE_DAY = 1440L * 60L * 1000L;
00149
00154 private static final int MAX_REMOVE_FREELIST = 10;
00155
00159 private int poolMin = 0;
00160
00164 public int getPoolMin() {
00165 return poolMin;
00166 }
00167
00171 public synchronized void setPoolMin(int min) {
00172 if (poolMin != min) {
00173 poolMin = min;
00174 adjust();
00175 }
00176 }
00177
00182 private int poolMax = NO_LIMIT;
00183
00187 public int getPoolMax() {
00188 return poolMax;
00189 }
00190
00194 public synchronized void setPoolMax(int max) {
00195 if (poolMax != max) {
00196 if (max < 0 || max > NO_LIMIT) {
00197 if (currentWaiters > 0) {
00198 notify();
00199 }
00200 poolMax = NO_LIMIT;
00201 } else {
00202 if (currentWaiters > 0 && poolMax < max) {
00203 notify();
00204 }
00205 poolMax = max;
00206 adjust();
00207 }
00208 }
00209 }
00210
00216 private long maxAge = ONE_DAY;
00217
00221 private int maxAgeMn;
00222
00226 public int getMaxAge() {
00227 return maxAgeMn;
00228 }
00229
00233 public long getMaxAgeMilli() {
00234 return maxAge;
00235 }
00236
00240 public void setMaxAge(int mn) {
00241 maxAgeMn = mn;
00242
00243 maxAge = mn * 60L * 1000L;
00244 }
00245
00249 private long maxOpenTime = ONE_DAY;
00250
00254 private int maxOpenTimeMn;
00255
00259 public int getMaxOpenTime() {
00260 return maxOpenTimeMn;
00261 }
00262
00266 public long getMaxOpenTimeMilli() {
00267 return maxOpenTime;
00268 }
00269
00273 public void setMaxOpenTime(int mn) {
00274 maxOpenTimeMn = mn;
00275
00276 maxOpenTime = mn * 60L * 1000L;
00277 }
00278
00282 private long waiterTimeout = 10000;
00283
00287 public int getMaxWaitTime() {
00288 return (int) (waiterTimeout / 1000L);
00289 }
00290
00294 public void setMaxWaitTime(int sec) {
00295 waiterTimeout = sec * 1000L;
00296 }
00297
00301 private int maxWaiters = 1000;
00302
00306 public int getMaxWaiters() {
00307 return maxWaiters;
00308 }
00309
00313 public void setMaxWaiters(int nb) {
00314 maxWaiters = nb;
00315 }
00316
00320 private int samplingPeriod = 60;
00321
00325 public int getSamplingPeriod() {
00326 return samplingPeriod;
00327 }
00328
00332 public void setSamplingPeriod(int sec) {
00333 if (sec > 0) {
00334 samplingPeriod = sec;
00335 poolKeeper.setSamplingPeriod(sec);
00336 }
00337 }
00338
00347 private int checkLevel = 1;
00348
00352 public int getCheckLevel() {
00353 return checkLevel;
00354 }
00355
00359 public void setCheckLevel(int level) {
00360 checkLevel = level;
00361 }
00362
00366 private String testStatement;
00367
00371 public String getTestStatement() {
00372 return testStatement;
00373 }
00374
00378 public void setTestStatement(String s) {
00379 testStatement = s;
00380 }
00381
00382
00383
00384
00385
00386
00390 private int busyMaxRecent = 0;
00391
00395 public int getBusyMaxRecent() {
00396 return busyMaxRecent;
00397 }
00398
00402 private int busyMinRecent = 0;
00403
00407 public int getBusyMinRecent() {
00408 return busyMinRecent;
00409 }
00410
00414 private int currentWaiters = 0;
00415
00419 public int getCurrentWaiters() {
00420 return currentWaiters;
00421 }
00422
00426 private int openedCount = 0;
00427
00431 public int getOpenedCount() {
00432 return openedCount;
00433 }
00434
00438 private int connectionFailures = 0;
00439
00443 public int getConnectionFailures() {
00444 return connectionFailures;
00445 }
00446
00452 private int connectionLeaks = 0;
00453
00457 public int getConnectionLeaks() {
00458 return connectionLeaks;
00459 }
00460
00464 private int servedOpen = 0;
00465
00469 public int getServedOpen() {
00470 return servedOpen;
00471 }
00472
00476 private int rejectedFull = 0;
00477
00481 public int getRejectedFull() {
00482 return rejectedFull;
00483 }
00484
00488 private int rejectedTimeout = 0;
00489
00493 public int getRejectedTimeout() {
00494 return rejectedTimeout;
00495 }
00496
00500 private int rejectedOther = 0;
00501
00505 public int getRejectedOther() {
00506 return rejectedOther;
00507 }
00508
00512 public int getRejectedOpen() {
00513 return rejectedFull + rejectedTimeout + rejectedOther;
00514 }
00515
00519 private int waitersHigh = 0;
00520
00524 public int getWaitersHigh() {
00525 return waitersHigh;
00526 }
00527
00531 private int waitersHighRecent = 0;
00532
00536 public int getWaitersHighRecent() {
00537 return waitersHighRecent;
00538 }
00539
00543 private int totalWaiterCount = 0;
00544
00548 public int getWaiterCount() {
00549 return totalWaiterCount;
00550 }
00551
00555 private long totalWaitingTime = 0;
00556
00560 public long getWaitingTime() {
00561 return totalWaitingTime;
00562 }
00563
00567 private long waitingHigh = 0;
00568
00572 public long getWaitingHigh() {
00573 return waitingHigh;
00574 }
00575
00579 private long waitingHighRecent = 0;
00580
00584 public long getWaitingHighRecent() {
00585 return waitingHighRecent;
00586 }
00587
00588
00589
00590
00591
00600 public Pool(ConnectionManager cmgr, XADataSourceImpl xads) throws Exception {
00601 logger = Log.getLogger(Log.JONAS_DBM_PREFIX);
00602 this.cmgr = cmgr;
00603 this.xads = xads;
00604 try {
00605
00606 ServiceManager sm = ServiceManager.getInstance();
00607 TransactionService ts = (TransactionService) sm.getTransactionService();
00608 tm = ts.getTransactionManager();
00609 } catch (Exception e) {
00610 throw e;
00611 }
00612 poolKeeper = new PoolKeeper(this);
00613 poolKeeper.start();
00614 }
00615
00616
00617
00618
00619
00623 public int getCurrentOpened() {
00624 return xac2item.size();
00625 }
00626
00630 public int getCurrentBusy() {
00631 return xac2item.size() - freeList.size();
00632 }
00633
00637 public void recomputeBusy() {
00638 int busy = getCurrentBusy();
00639 if (busyMax < busy) {
00640 busyMax = busy;
00641 }
00642 if (busyMin > busy) {
00643 busyMin = busy;
00644 }
00645 }
00646
00650 public int getCurrentInTx() {
00651 return tx2item.size();
00652 }
00653
00661 public synchronized PoolItem openConnection(String user, Transaction tx) throws SQLException {
00662 PoolItem item = null;
00663
00664
00665
00666 if (tx != null) {
00667 item = (PoolItem) tx2item.get(tx);
00668 if (item != null) {
00669 logger.log(BasicLevel.DEBUG, "Reuse a Connection for same tx");
00670 item.open();
00671 servedOpen++;
00672 return item;
00673 }
00674 }
00675
00676
00677 long timetowait = waiterTimeout;
00678 long starttime = 0;
00679 while (item == null) {
00680
00681 if (freeList.isEmpty()) {
00682
00683 if (xac2item.size() >= poolMax) {
00684 boolean stoplooping = true;
00685
00686 if (timetowait > 0) {
00687 if (currentWaiters < maxWaiters) {
00688 currentWaiters++;
00689
00690 if (waiterCount < currentWaiters) {
00691 waiterCount = currentWaiters;
00692 }
00693 if (starttime == 0) {
00694 starttime = System.currentTimeMillis();
00695 logger.log(BasicLevel.DEBUG, "Wait for a free Connection" + xac2item.size());
00696 }
00697 try {
00698 wait(timetowait);
00699 } catch (InterruptedException ign) {
00700 logger.log(BasicLevel.WARN, "Interrupted");
00701 } finally {
00702 currentWaiters--;
00703 }
00704 long stoptime = System.currentTimeMillis();
00705 long stillwaited = stoptime - starttime;
00706 timetowait = waiterTimeout - stillwaited;
00707 stoplooping = (timetowait <= 0);
00708 if (stoplooping) {
00709
00710 totalWaiterCount++;
00711 totalWaitingTime += stillwaited;
00712 if (waitingTime < stillwaited) {
00713 waitingTime = stillwaited;
00714 }
00715 } else {
00716 if (!freeList.isEmpty() || xac2item.size() < poolMax) {
00717
00718 logger.log(BasicLevel.DEBUG, "Notified after " + stillwaited);
00719 totalWaiterCount++;
00720 totalWaitingTime += stillwaited;
00721 if (waitingTime < stillwaited) {
00722 waitingTime = stillwaited;
00723 }
00724 }
00725 continue;
00726 }
00727 }
00728 }
00729 if (stoplooping && freeList.isEmpty() && xac2item.size() >= poolMax) {
00730 if (starttime > 0) {
00731 rejectedTimeout++;
00732 logger.log(BasicLevel.WARN, "Cannot create a Connection - timeout");
00733 } else {
00734 rejectedFull++;
00735 logger.log(BasicLevel.WARN, "Cannot create a Connection");
00736 }
00737 throw new SQLException("No more connections in " + cmgr.getDatasourceName());
00738 }
00739 continue;
00740 }
00741 logger.log(BasicLevel.DEBUG, "empty free list: Create a new Connection");
00742 XAConnection xac = null;
00743 try {
00744
00745 xac = xads.getXAConnection();
00746 openedCount++;
00747 } catch (SQLException e) {
00748 connectionFailures++;
00749 rejectedOther++;
00750 logger.log(BasicLevel.WARN, "Cannot create new Connection for tx");
00751 throw e;
00752 }
00753 item = new PoolItem(this, xac, user);
00754
00755 xac.addConnectionEventListener(cmgr);
00756 xac2item.put(xac, item);
00757 } else {
00758 item = (PoolItem) freeList.removeFirst();
00759
00760
00761 if (checkLevel > 0) {
00762 try {
00763 ConnectionImpl conn = (ConnectionImpl) item.getXACon().getConnection();
00764 if (conn.isPhysicallyClosed()) {
00765 logger.log(BasicLevel.WARN, "The JDBC connection has been closed!");
00766 destroyItem(item);
00767 starttime = 0;
00768 item = null;
00769 continue;
00770 }
00771 if (checkLevel > 1) {
00772 java.sql.Statement stmt = conn.createStatement();
00773 stmt.execute(testStatement);
00774 stmt.close();
00775 }
00776 } catch (Exception e) {
00777 logger.log(BasicLevel.ERROR, "DataSource " + cmgr.getDatasourceName()
00778 + " error: removing invalid item:" + e);
00779 destroyItem(item);
00780 starttime = 0;
00781 item = null;
00782 continue;
00783 }
00784 }
00785 }
00786 }
00787 recomputeBusy();
00788 item.setTx(tx);
00789 if (tx == null) {
00790 logger.log(BasicLevel.DEBUG, "Got a Connection - no TX: ");
00791 } else {
00792 logger.log(BasicLevel.DEBUG, "Got a Connection for TX: ");
00793
00794 try {
00795 tx.registerSynchronization(item);
00796 tx2item.put(tx, item);
00797 } catch (javax.transaction.RollbackException e) {
00799 logger.log(BasicLevel.WARN, "DataSource " + cmgr.getDatasourceName()
00800 + " error: Pool item registered, but tx is rollback only");
00801 } catch (javax.transaction.SystemException e) {
00802 logger.log(BasicLevel.ERROR, "DataSource " + cmgr.getDatasourceName()
00803 + " error in pool: system exception from transaction manager " + e.getMessage());
00804 } catch (IllegalStateException e) {
00805
00806 logger.log(BasicLevel.WARN, "Got a Connection - committed TX: ");
00807 item.setTx(null);
00808 }
00809 }
00810 item.open();
00811 servedOpen++;
00812 return item;
00813 }
00814
00820 public synchronized void freeConnections(Transaction tx) {
00821 if (logger.isLoggable(BasicLevel.DEBUG)) {
00822 logger.log(BasicLevel.DEBUG, "free connection for Tx = " + tx);
00823 }
00824 PoolItem item = (PoolItem) tx2item.remove(tx);
00825 if (item == null) {
00826 logger.log(BasicLevel.ERROR, "pool: no connection found to free for Tx = " + tx);
00827 return;
00828 }
00829 item.setTx(null);
00830 if (item.isOpen()) {
00831
00832
00833 logger.log(BasicLevel.WARN, "WARNING: Connection not closed by caller");
00834 return;
00835 }
00836 freeItem(item);
00837 }
00838
00847 public synchronized PoolItem closeConnection(XAConnection xac, int flag) {
00848 PoolItem item = (PoolItem) xac2item.get(xac);
00849 if (item == null) {
00850 logger.log(BasicLevel.ERROR, "Pool: no item for this xac!");
00851 return null;
00852 }
00853
00854
00855
00856 if (!item.close()) {
00857 return null;
00858 }
00859 if (item.getTx() != null) {
00860 logger.log(BasicLevel.DEBUG, "keep the Connection for this tx");
00861 } else {
00862 freeItem(item);
00863 }
00864
00865
00866 Transaction tx = null;
00867 try {
00868 tx = tm.getTransaction();
00869 } catch (NullPointerException n) {
00870
00871 logger.log(BasicLevel.ERROR, "Pool: should not be used outside a JOnAS Server");
00872 } catch (SystemException e) {
00873 logger.log(BasicLevel.ERROR, "Pool: getTransaction failed:" + e);
00874 }
00875 if (tx != null && item.isClosed()) {
00876 try {
00877 tx.delistResource(xac.getXAResource(), flag);
00878 } catch (Exception e) {
00879 logger.log(BasicLevel.ERROR, "Pool: Exception while delisting resource:" + e);
00880 }
00881 }
00882
00883 return item;
00884 }
00885
00889 public synchronized void sampling() {
00890 waitingHighRecent = waitingTime;
00891 if (waitingHigh < waitingTime) {
00892 waitingHigh = waitingTime;
00893 }
00894 waitingTime = 0;
00895
00896 waitersHighRecent = waiterCount;
00897 if (waitersHigh < waiterCount) {
00898 waitersHigh = waiterCount;
00899 }
00900 waiterCount = 0;
00901
00902 busyMaxRecent = busyMax;
00903 busyMax = getCurrentBusy();
00904 busyMinRecent = busyMin;
00905 busyMin = getCurrentBusy();
00906 }
00907
00912 public synchronized void adjust() {
00913 logger.log(BasicLevel.DEBUG, "");
00914
00915
00916
00917
00918 int count = xac2item.size() - poolMin;
00919 if (count > 0) {
00920 if (count > MAX_REMOVE_FREELIST) {
00921 count = MAX_REMOVE_FREELIST;
00922 }
00923 for (Iterator i = freeList.iterator(); i.hasNext();) {
00924 PoolItem item = (PoolItem) i.next();
00925 if (item.isAged()) {
00926 logger.log(BasicLevel.DEBUG, "remove a timed out connection");
00927 i.remove();
00928 destroyItem(item);
00929 count--;
00930 if (count <= 0) {
00931 break;
00932 }
00933 }
00934 }
00935 }
00936 recomputeBusy();
00937
00938
00939 for (Iterator i = xac2item.values().iterator(); i.hasNext();) {
00940 PoolItem item = (PoolItem) i.next();
00941 if (item.inactive()) {
00942 logger.log(BasicLevel.WARN, "close a timed out open connection");
00943 i.remove();
00944
00945 item.remove();
00946 connectionLeaks++;
00947
00948 if (currentWaiters > 0) {
00949 notify();
00950 }
00951 }
00952 }
00953
00954
00955
00956 if (poolMax != NO_LIMIT) {
00957 while (freeList.size() > poolMin && xac2item.size() > poolMax) {
00958 PoolItem item = (PoolItem) freeList.removeFirst();
00959 destroyItem(item);
00960 }
00961 }
00962 recomputeBusy();
00963
00964
00965 while (xac2item.size() < poolMin) {
00966 XAConnection xac = null;
00967 try {
00968 xac = xads.getXAConnection();
00969 openedCount++;
00970 } catch (SQLException e) {
00971 throw new ServiceException("Could not create " + poolMin + " items in the pool : " + e.toString());
00972 }
00973
00974 logger.log(BasicLevel.DEBUG, "Create a new available Connection");
00975 PoolItem item = new PoolItem(this, xac, null);
00976 freeList.addLast(item);
00977 xac2item.put(xac, item);
00978 xac.addConnectionEventListener(cmgr);
00979 }
00980 }
00981
00985 public synchronized void closeAllConnections() {
00986 logger.log(BasicLevel.DEBUG, "");
00987
00988
00989 poolKeeper.stop();
00990
00991
00992 Iterator it = xac2item.keySet().iterator();
00993 try {
00994 while (it.hasNext()) {
00995 XAConnection xac = (XAConnection) it.next();
00996 xac.close();
00997 }
00998 } catch (java.sql.SQLException e) {
00999 logger.log(BasicLevel.ERROR, "Error while closing a Connection:" + e);
01000 }
01001 }
01002
01003
01004
01005
01006
01011 private void freeItem(PoolItem item) {
01012
01013
01014
01015 freeList.addLast(item);
01016 if (logger.isLoggable(BasicLevel.DEBUG)) {
01017 logger.log(BasicLevel.DEBUG, "item added to freeList: " + freeList.size());
01018 }
01019
01020 if (currentWaiters > 0) {
01021 notify();
01022 }
01023 recomputeBusy();
01024 }
01025
01030 private void destroyItem(PoolItem item) {
01031 logger.log(BasicLevel.DEBUG, "remove an item in xac2item");
01032 xac2item.remove(item.getXACon());
01033 item.remove();
01034
01035 if (currentWaiters > 0) {
01036 notify();
01037 }
01038 recomputeBusy();
01039 }
01040
01041 }