ConnectionManagerImpl.java

00001 
00027 package org.objectweb.jonas.resource;
00028 
00029 import java.io.PrintWriter;
00030 import java.sql.Connection;
00031 import java.sql.PreparedStatement;
00032 import java.sql.ResultSet;
00033 import java.sql.SQLException;
00034 import java.util.Enumeration;
00035 import java.util.HashSet;
00036 import java.util.Hashtable;
00037 import java.util.Iterator;
00038 import java.util.Set;
00039 import java.util.Vector;
00040 
00041 import javax.naming.Context;
00042 import javax.naming.NamingException;
00043 import javax.resource.ResourceException;
00044 import javax.resource.spi.ConnectionEvent;
00045 import javax.resource.spi.ConnectionEventListener;
00046 import javax.resource.spi.ConnectionManager;
00047 import javax.resource.spi.ConnectionRequestInfo;
00048 import javax.resource.spi.ManagedConnection;
00049 import javax.resource.spi.ManagedConnectionFactory;
00050 import javax.resource.spi.ResourceAllocationException;
00051 import javax.resource.spi.ValidatingManagedConnectionFactory;
00052 import javax.security.auth.Subject;
00053 import javax.transaction.RollbackException;
00054 import javax.transaction.Transaction;
00055 import javax.transaction.xa.XAResource;
00056 
00057 import org.objectweb.jonas.resource.pool.api.Pool;
00058 import org.objectweb.jonas.resource.pool.api.PoolMatchFactory;
00059 import org.objectweb.jonas.resource.pool.lib.HArrayPool;
00060 import org.objectweb.jotm.Current;
00061 import org.objectweb.jotm.TransactionResourceManager;
00062 import org.objectweb.transaction.jta.ResourceManagerEventListener;
00063 import org.objectweb.transaction.jta.TransactionManager;
00064 import org.objectweb.util.monolog.api.BasicLevel;
00065 import org.objectweb.util.monolog.api.Logger;
00066 import org.objectweb.util.monolog.api.LoggerFactory;
00067 import org.objectweb.util.monolog.wrapper.printwriter.LoggerImpl;
00068 
00075 public class ConnectionManagerImpl implements ConnectionEventListener, ConnectionManager,
00076                                               PoolMatchFactory, SQLManager,
00077                                               TransactionResourceManager {
00078 
00082     protected static Logger trace = null;
00086     protected static Logger poolTrace = null;
00087 
00092     protected ResourceManagerEventListener rmel = null;
00093 
00097     private String resourceBundleName = null;
00098 
00102     protected TransactionManager tm;
00103 
00108     protected Hashtable mc2mci = null;
00109 
00113     private int mcMaxPoolSize = -1;
00114 
00118     private int mcMinPoolSize = 0;
00119 
00124     private ManagedConnectionFactory mcf;
00129     private ValidatingManagedConnectionFactory vmcf = null;
00130 
00134     protected Pool poolMCs = null;
00135 
00140     protected Hashtable usedMCs = null;
00141 
00145     static final int MAX_PSTMT_SIZE = 10;
00146 
00150     private int maxPstmtPoolSize = MAX_PSTMT_SIZE;
00151 
00155     private int jdbcConnLevel = 0;
00159     private String jdbcConnTestStmt = "";
00160 
00164     protected Vector mcs = new Vector();
00165 
00169     protected Vector synchros = new Vector();
00170 
00175     private ResourceSpec rs = null;
00176 
00180     private String transSupport = null;
00181 
00185     public final static String RESOURCE_BUNDLE_NAME = "resourceBundleName";
00189     public final static String LOGGER = "org.objectweb.util.monolog.logger";
00193     public final static String POOL_LOGGER = "org.objectweb.util.monolog.logger_pool";
00197     public final static String LOGGER_FACTORY = "org.objectweb.util.monolog.loggerFactory";
00201     public final static String TRANSACTION_MANAGER = "transactionManager";
00205     public final static String RESOURCE_MANAGER_EVENT_LISTENER = "resourceManagerEventListener";
00209     public final static String RESOURCE_ADAPTER = "resourceAdapter";
00213     public final static String PRINT_WRITER = "printWriter";
00214 
00215     /*
00216      * These constants define the different transaction support values
00217      */
00221     public final static String NO_TRANS_SUPPORT = "NoTransaction";
00225     public final static String LOCAL_TRANS_SUPPORT = "LocalTransaction";
00229     public final static String XA_TRANS_SUPPORT = "XATransaction";
00230 
00235     public final static int PSWRAP_1 = 1;
00236     public final static int PSWRAP_2 = 2;
00237     public final static int PSWRAP_3 = 3;
00238     public final static int PSWRAP_4 = 4;
00239     public final static int PSWRAP_5 = 5;
00240 
00241     // JOTM variables
00245     private ManagedConnection jotmMc = null;
00249     private XAResource jotmXar = null;
00253     private String xaName = null;
00254 
00259     public ConnectionManagerImpl(String transSupport) {
00260         if (transSupport.length() == 0) {
00261             transSupport = NO_TRANS_SUPPORT;
00262         } else {
00263             this.transSupport = transSupport;
00264         }
00265     }
00266 
00273     public void setLogger(Logger l) {
00274         trace = l;
00275     }
00276 
00277 
00284     public void setLoggerFactory(LoggerFactory lf) {
00285         if (resourceBundleName == null) {
00286             trace = lf.getLogger("org.objectweb.resource.server");
00287         } else {
00288             trace = lf.getLogger("org.objectweb.resource.server");
00289             //,resourceBundleName);
00290         }
00291     }
00292 
00293 
00300     public void setPrintWriter(PrintWriter pw) {
00301         trace = new LoggerImpl(pw);
00302     }
00303 
00304 
00311     public void setResourceManagerEventListener(
00312             ResourceManagerEventListener rmel) {
00313         this.rmel = rmel;
00314     }
00315 
00316 
00323     public void setTransactionManager(TransactionManager tm) {
00324         this.tm = tm;
00325     }
00326 
00327 
00336     public void setResourceAdapter(ManagedConnectionFactory tmcf)
00337             throws Exception {
00338         setResourceAdapter(tmcf, new ConnectionManagerPoolParams());
00339     }
00340 
00350     public void setResourceAdapter(ManagedConnectionFactory tmcf,
00351                                    ConnectionManagerPoolParams cmpp)
00352             throws Exception {
00353 
00354         // set the max/min pool values
00355         if (cmpp.getPoolMax() != 0) {
00356             mcMaxPoolSize = cmpp.getPoolMax();
00357         }
00358         if (cmpp.getPoolMin() > 0) {
00359             mcMinPoolSize = cmpp.getPoolMin();
00360         }
00361 
00362         // set the jdbc connection info
00363         jdbcConnLevel = cmpp.getJdbcConnLevel();
00364         jdbcConnTestStmt = cmpp.getJdbcConnTestStmt();
00365 
00366         mcf = tmcf;
00367         if (mcf instanceof ValidatingManagedConnectionFactory) {
00368             vmcf = (ValidatingManagedConnectionFactory) mcf;
00369         }
00370 
00371         poolMCs = new HArrayPool(poolTrace);
00372         poolMCs.setMatchFactory(this);
00373         poolMCs.setMaxSize(mcMaxPoolSize);
00374         poolMCs.setMinSize(mcMinPoolSize);
00375         poolMCs.setInitSize(cmpp.getPoolInit());
00376         
00377         if (cmpp.getPoolMaxAge() > 0) {
00378             poolMCs.setMaxAge(cmpp.getPoolMaxAge());
00379         } else {
00380             long tAge = cmpp.getPoolMaxAgeMinutes()* 60L * 1000L;
00381             poolMCs.setMaxAge(tAge);
00382         }
00383 
00384         if (cmpp.getPoolMaxOpentime() > 0) {
00385             poolMCs.setMaxOpentime(cmpp.getPoolMaxOpentime());
00386         }
00387         poolMCs.setMaxWaiters(cmpp.getPoolMaxWaiters());
00388         poolMCs.setMaxWaitTime(cmpp.getPoolMaxWaittime());
00389 
00390         poolMCs.startMonitor();
00391         poolMCs.setSamplingPeriod(cmpp.getPoolSamplingPeriod());
00392         maxPstmtPoolSize = cmpp.getPstmtMax();
00393 
00394         usedMCs = new Hashtable();
00395         rs = new ResourceSpec(null, null);
00396 
00397         if (trace.isLoggable(BasicLevel.DEBUG)) {
00398             trace.log(BasicLevel.DEBUG, "");
00399         }
00400     }
00401 
00402 
00418     public void init(Context ctx) throws Exception {
00419         mc2mci = new Hashtable();
00420 
00421         // Get the resource bundle name to internationalise the log
00422         // Optional
00423         String resourceBundleName = null;
00424         try {
00425             resourceBundleName = (String) ctx.lookup(RESOURCE_BUNDLE_NAME);
00426         } catch (NamingException e) {
00427         }
00428 
00429         // Get the logger or the logger factory or the printwriter
00430         try {
00431             trace = (Logger) ctx.lookup(LOGGER);
00432             poolTrace = (Logger) ctx.lookup(POOL_LOGGER);
00433         } catch (NamingException e) {
00434         }
00435        
00436         if (trace == null) {
00437             try {
00438                 setLoggerFactory((LoggerFactory) ctx.lookup(LOGGER_FACTORY));
00439             } catch (NamingException e2) {
00440             }
00441         }
00442         if (trace == null) {
00443             PrintWriter pw = null;
00444             try {
00445                 pw = (PrintWriter) ctx.lookup(PRINT_WRITER);
00446             } catch (NamingException e3) {
00447             }
00448             setPrintWriter(pw);
00449         }
00450 
00451         if (!transSupport.equalsIgnoreCase(NO_TRANS_SUPPORT)) {
00452             // Get the transaction manager
00453             tm = (TransactionManager) ctx.lookup(TRANSACTION_MANAGER);
00454         }
00455 
00456         // Get the set of connection
00457         try {
00458             rmel = (ResourceManagerEventListener)
00459                     ctx.lookup(RESOURCE_MANAGER_EVENT_LISTENER);
00460         } catch (NamingException ne) {
00461         }
00462 
00463         // Get the managedConnectionFactory instance which represents the resource
00464         // adapter
00465         try {
00466             setResourceAdapter(
00467                     (ManagedConnectionFactory) ctx.lookup(RESOURCE_ADAPTER));
00468         } catch (NamingException ne) {
00469         }
00470     }
00471 
00472 
00478     public void cleanResourceAdapter() throws ResourceException {
00479         // Delete the PreparedStatements
00480         PreparedStatementWrapper pw = null;
00481         while (mcs != null && mcs.size() > 0) {
00482             MCInfo mci = (MCInfo) mcs.remove(0);
00483             mci.usedCs.clear();
00484             synchronized (mci.pStmts) {
00485                 // Remove the PreparedStatements on the MC
00486                 while (mci.pStmts != null && mci.pStmts.size() > 0) {
00487                     pw = (PreparedStatementWrapper) mci.pStmts.remove(0);
00488                     try {
00489                         pw.destroy();
00490                     } catch (Exception ex) {
00491                     }
00492                 }
00493             }
00494             try {
00495                 mc2mci.remove(mci.mc);
00496                 mci.mc.destroy();
00497             } catch (Exception ex) {
00498             }
00499         }
00500         if (usedMCs != null) {
00501             for (Enumeration en = usedMCs.keys(); en.hasMoreElements();) {
00502                 Transaction tx = (Transaction) en.nextElement();
00503                 MCInfo mci = (MCInfo) usedMCs.get(tx);
00504                 if (mci == null) {
00505                     continue;
00506                 }
00507                 if (mci.rmeCalled) {
00508                     mci.rme.isValid = false;
00509                     rmel.connectionClosed(mci.rme);
00510                     mci.rmeCalled = false;
00511                 }
00512                 mci.usedCs.clear();
00513                 // Remove the PreparedStatements on the MC
00514                 synchronized (mci.pStmts) {
00515                     while (mci.pStmts != null && mci.pStmts.size() > 0) {
00516                         pw = (PreparedStatementWrapper) mci.pStmts.remove(0);
00517                         try {
00518                             pw.destroy();
00519                         } catch (Exception ex) {
00520                         }
00521                     }
00522                 }
00523                 try {
00524                     mc2mci.remove(mci.mc);
00525                     mci.mc.destroy();
00526                 } catch (Exception ex) {
00527                 }
00528             }
00529         }
00530         while (synchros != null && synchros.size() > 0) {
00531             MCInfo mci = (MCInfo) synchros.remove(0);
00532             mci.usedCs.clear();
00533             // Remove the PreparedStatements on the MC
00534             synchronized (mci.pStmts) {
00535                 while (mci.pStmts != null && mci.pStmts.size() > 0) {
00536                     pw = (PreparedStatementWrapper) mci.pStmts.remove(0);
00537                     try {
00538                         pw.destroy();
00539                     } catch (Exception ex) {
00540                     }
00541                 }
00542             }
00543             try {
00544                 mc2mci.remove(mci.mc);
00545                 mci.mc.destroy();
00546             } catch (Exception ex) {
00547             }
00548         }
00549     }
00550 
00551 
00559     public synchronized Object allocateConnection(
00560             ManagedConnectionFactory pMcf, ConnectionRequestInfo cxRequestInfo)
00561             throws ResourceException {
00562 
00563         MCInfo mci = null;
00564         Transaction currentTx = null;
00565         Object connection = null;
00566         int retries = 0;
00567         Subject subject = null;
00568 
00569         while (connection == null && retries < 20) {
00570             if (mcf != pMcf) {
00571                 throw new ResourceException(
00572                         "This ConnectionManager doesn't manage this RA:" + mcf);
00573             }
00574 
00575             currentTx = null;
00576             try {
00577                 if (tm != null) {
00578                     currentTx = tm.getTransaction();
00579                 }
00580             } catch (Exception e) {
00581                 trace.log(BasicLevel.ERROR,
00582                         "Impossible to get the current transaction", e,
00583                         "ConnectionManagerImpl", "allocateConnection");
00584             }
00585 
00586             //if there is a transaction check if a MC is already associated
00587             mci = (currentTx == null ? null : (MCInfo) usedMCs.get(currentTx));
00588             if (mci != null) {
00589                 if (mci.mc != null) {
00590                     // There are connections, try to match with the
00591                     // ManagedConnectionFactory
00592                     trace.log(BasicLevel.DEBUG, "MC (" + mci.mc + ") associated to the current Tx (" + currentTx + ") found");
00593                     Set s = new HashSet();
00594                     s.add(mci.mc);
00595                     if (mci.mc != mcf.matchManagedConnections(s, null, cxRequestInfo)) {
00596                         throw new ResourceException(
00597                                 "ConnectionManagerImpl.allocateConnection: illegal state : no mc is matched by mcf");
00598                     }
00599                     trace.log(BasicLevel.DEBUG, "XA Resource " + mci.getXAResource()
00600                                                 + " is already enlisted in Tx:" + mci.ctx);
00601                 } else {
00602                     // This connection occurred an error before
00603                     trace.log(BasicLevel.INFO, "remnant of an old failed connection");
00604                     mci.ctx = null;
00605                     mci = null;
00606                     usedMCs.remove(currentTx);
00607                 }
00608             }
00609 
00610             if (mci == null) {
00611                 // No managed connection found => get a free ManagedConnection
00612                 // from the right pool
00613 
00614                 //ri.rs.subject = null; // Never set => already at null
00615                 rs.cxRequestInfo = cxRequestInfo;
00616                 if (subject == null && cxRequestInfo != null) {
00617                     // Create a subject to pass on
00618                 }
00619                 try {
00620                     ManagedConnection mc = (ManagedConnection) poolMCs.getResource(rs);
00621                     if (mc == null) {
00622                         throw new ResourceException("ConnectionManagerImpl.allocateConnection: cannot allocate a ManagedConnection");
00623                     }
00624                     mci = (MCInfo) mc2mci.get(mc);
00625                     if (mci == null) {
00626                         mci = new MCInfo(mc);
00627                         mc2mci.put(mc, mci);
00628                     }
00629                     trace.log(BasicLevel.DEBUG, "get a MC from the ra pool, mc=" + mci.mc);
00630                     if (transSupport.equalsIgnoreCase(LOCAL_TRANS_SUPPORT)) {
00631                         if (mci.lw == null) {
00632                             mci.lw = new LocalXAWrapper(mci.mc.getLocalTransaction(), trace);
00633                         }
00634                     } else if (mci.lw != null) {
00635                         mci.lw = null;
00636                     }
00637                     if (!mci.connectionEventListener) {
00638                         mci.mc.addConnectionEventListener(this);
00639                         mci.connectionEventListener = true;
00640                     }
00641                     mci.synchro = null;
00642                     // If a global transaction is already started then enlist the
00643                     // ManagedConnection instance
00644                     if (currentTx != null) {
00645                         trace.log(BasicLevel.DEBUG, "Enlist the XA Resource "
00646                                                     + mci.getXAResource() + " in Tx:"
00647                                                     + currentTx);
00648                         currentTx.enlistResource(mci.getXAResource());
00649                         usedMCs.put(currentTx, mci);
00650                         mci.ctx = currentTx;
00651                     } else {
00652                         // There are not Transaction at the moment but the user can
00653                         // start a transaction after the getConnection call.
00654                         mci.ctx = null;
00655                         // must be clean
00656                         mcs.add(mci);
00657                         mci.rme = new RMEImpl(mci, trace);
00658                         trace.log(BasicLevel.DEBUG, "Register the managed connection (no tx)");
00659                         // Always put in list, fix bug on connection late enlistment
00660                         if (!mci.rmeCalled) {
00661                             mci.rme.isValid = true;
00662                             rmel.connectionOpened(mci.rme);
00663                             mci.rmeCalled = true;
00664                         }
00665                     }
00666                 } catch (ResourceException re) {
00667                     trace.log(BasicLevel.ERROR, re.getMessage(), re);
00668                     throw re;
00669                 } catch (Exception e) {
00670                     ResourceException re = new ResourceException("ResourceService : an error related allocation of ManagedConnection ");
00671                     re.initCause(e);
00672                     trace.log(BasicLevel.ERROR, re.getMessage(), e);
00673                     throw re;
00674                 }
00675 
00676             }
00677 
00678             //Fetch a free Connection from the ManagedConnection
00679             connection = mci.mc.getConnection(null, cxRequestInfo);
00680             // Want to add the non-wrapped Connection object
00681             mci.usedCs.add(connection);
00682             if (connection instanceof java.sql.Connection) {
00683                 try {
00684                     // Need a wrapper for non JOnAS jdbc ConnectionImpl
00685                     if (connection instanceof org.objectweb.jonas.jdbc.ConnectionImpl) {
00686                         ((org.objectweb.jonas.jdbc.ConnectionImpl) connection).setJonasInfo(mci, this);
00687                     } else {
00688                         connection = JonasSQLWrapper.createSQLWrapper(connection, mci, this, trace);
00689                     }
00690                     // Check the connection before reusing it
00691                     if (jdbcConnLevel > 0) {
00692                         try {
00693                             if (trace.isLoggable(BasicLevel.DEBUG)) {
00694                                 trace.log(BasicLevel.DEBUG, "Check the JDBC connection");
00695                             }
00696                             boolean isClosed = true;
00697                             // Used for logical testing of connection
00698                             if (connection instanceof org.objectweb.jonas.jdbc.ConnectionImpl) {
00699                                 isClosed = ((org.objectweb.jonas.jdbc.ConnectionImpl) connection).isPhysicallyClosed();
00700                             } else {
00701                                 isClosed = ((Connection) connection).isClosed();
00702                             }
00703                             if (isClosed) {
00704                                 connectionErrorOccurred(new ConnectionEvent(mci.mc,
00705                                         ConnectionEvent.CONNECTION_ERROR_OCCURRED));
00706                                 connection = null;
00707                                 retries++;
00708                                 continue;
00709                             }
00710                             if (jdbcConnLevel > 1 && jdbcConnTestStmt != null
00711                                     && jdbcConnTestStmt.length() > 0) {
00712                                 if (trace.isLoggable(BasicLevel.DEBUG)) {
00713                                     trace.log(BasicLevel.DEBUG, "retrying connection: " + jdbcConnTestStmt);
00714                                 }
00715                                 java.sql.Statement stmt = ((Connection) connection).createStatement();
00716                                 stmt.execute(jdbcConnTestStmt);
00717                                 stmt.close();
00718                             }
00719                         } catch (Exception e) {
00720                             trace.log(BasicLevel.ERROR, "Error on connection: removing invalid managed connection " + mci.mc + ": ", e);
00721                             connectionErrorOccurred(new ConnectionEvent(mci.mc,
00722                                     ConnectionEvent.CONNECTION_ERROR_OCCURRED));
00723                             connection = null;
00724                             retries++;
00725                             continue;
00726                         }
00727                     }
00728                 } catch (Exception ex) {
00729                     trace.log(BasicLevel.ERROR, ex.getMessage(), ex);
00730                     throw new ResourceException(ex.getMessage());
00731                 }
00732             }
00733 
00734         }
00735         trace.log(BasicLevel.DEBUG, "get a logical connection on MC:" + connection);
00736 
00737         if (connection == null) {
00738             if (retries > 0) {
00739                 throw new ResourceAllocationException(
00740                         "Unable to obtain a connection object.  Check the validity of the jdbc-test-statement");
00741             } else {
00742                 throw new ResourceAllocationException("Unable to obtain a connection object");
00743             }
00744         }
00745 
00746         return connection;
00747     }
00748 
00749 
00756     public boolean matchResource(Object res, Object hints) {
00757         return true;
00758     }
00759 
00766     public Object matchResource(Set res, Object hints) throws Exception {
00767         ResourceSpec spec = (hints != null) ? (ResourceSpec) hints : new ResourceSpec(null, null);
00768         return mcf.matchManagedConnections(res, null, spec.cxRequestInfo);
00769     }
00770 
00771 
00779     public Object createResource(Object hints) throws Exception {
00780         ResourceSpec spec = (hints != null) ? (ResourceSpec) hints : new ResourceSpec(null, null);
00781         ManagedConnection mc = mcf.createManagedConnection(spec.subject,
00782                 spec.cxRequestInfo);
00783         trace.log(BasicLevel.DEBUG, "Created MC: " + mc);
00784         return mc;
00785     }
00786 
00792     public void validateResource(Set res) throws Exception {
00793         if (vmcf == null) {
00794             return;
00795         }
00796         try {
00797             Set invMcs = vmcf.getInvalidConnections(res);
00798             Iterator it = invMcs.iterator();
00799             while (it.hasNext()) {
00800                 Object obj = it.next();
00801                 poolMCs.releaseResource(obj, true, true);
00802             }
00803         } catch (Exception ex) {
00804             trace.log(BasicLevel.DEBUG, "Error trying to validate Connections for " + mcf);
00805         }
00806     }
00807 
00808 
00811     public PreparedStatement getPStatement(MCInfo mcinfo, Object conn, String user, String sql)
00812             throws SQLException {
00813         return getPStatement(mcinfo, conn, user, sql, ResultSet.TYPE_FORWARD_ONLY,
00814                              ResultSet.CONCUR_READ_ONLY, -1, -1, null, null, PSWRAP_1);
00815 
00816     }
00817 
00818     public PreparedStatement getPStatement(MCInfo mcinfo, Object conn, String user, String sql,
00819                                            int resultSetType, int resultSetConcurrency)
00820             throws SQLException {
00821         return getPStatement(mcinfo, conn, user, sql, resultSetType, resultSetConcurrency,
00822                              -1, -1, null, null, PSWRAP_1);
00823     }
00824 
00825     // JDBC 3.0
00826     public PreparedStatement getPStatement(MCInfo mcinfo,
00827                                            Object conn,
00828                                            String user, String sql,
00829                                            int resultSetType,
00830                                            int resultSetConcurrency,
00831                                            int resultSetHoldability)
00832             throws SQLException {
00833               return getPStatement(mcinfo, conn, user, sql, resultSetType, resultSetConcurrency,
00834                       resultSetHoldability, -1, null, null, PSWRAP_2);
00835     }
00836 
00837     // JDBC 3.0
00838     public PreparedStatement getPStatement(MCInfo mcinfo,
00839                                            Object conn,
00840                                            String user, String sql,
00841                                            int autoGeneratedKeys)
00842             throws SQLException {
00843                 return getPStatement(mcinfo, conn, user, sql, -1, -1, -1,
00844                         autoGeneratedKeys, null, null, PSWRAP_3);
00845     }
00846 
00847     // JDBC 3.0
00848     public PreparedStatement getPStatement(MCInfo mcinfo,
00849                                            Object conn,
00850                                            String user, String sql,
00851                                            int[] columnIndexes)
00852             throws SQLException {
00853               return getPStatement(mcinfo, conn, user, sql, -1, -1, -1, -1,
00854                       columnIndexes, null, PSWRAP_4);
00855     }
00856 
00857     // JDBC 3.0
00858     public PreparedStatement getPStatement(MCInfo mcinfo,
00859                                            Object conn,
00860                                            String user, String sql,
00861                                            String[] columnNames)
00862             throws SQLException {
00863               return getPStatement(mcinfo, conn, user, sql, -1, -1, -1, -1,
00864                       null, columnNames, PSWRAP_5);
00865     }
00866 
00867 
00868     private PreparedStatement getPStatement(MCInfo mcinfo,
00869                                             Object conn,
00870                                             String user,
00871                                             String sql,
00872                                             int resultSetType,
00873                                             int resultSetConcurrency,
00874                                             int resultSetHoldability,
00875                                             int autoGeneratedKeys,
00876                                             int [] columnIndexes,
00877                                             String [] columnNames,
00878                                             int pswrapType)
00879             throws SQLException {
00880 
00881         trace.log(BasicLevel.DEBUG, "Sql: " + sql + " User: " + user);
00882         // Use for the equals test
00883         PreparedStatementWrapper psw = null;
00884         switch (pswrapType) {
00885             case PSWRAP_1:
00886                 psw = new PreparedStatementWrapper(user, sql,
00887                                                    resultSetType,
00888                                                    resultSetConcurrency,
00889                                                    trace);
00890                 break;
00891             case PSWRAP_2:
00892                 psw = new PreparedStatementWrapper(user, sql,
00893                                                    resultSetType,
00894                                                    resultSetConcurrency,
00895                                                    resultSetHoldability,
00896                                                    trace);
00897                 break;
00898             case PSWRAP_3:
00899                 psw = new PreparedStatementWrapper(user, sql,
00900                                                    autoGeneratedKeys,
00901                                                    trace);
00902                 break;
00903             case PSWRAP_4:
00904                 psw = new PreparedStatementWrapper(user, sql,
00905                                                    columnIndexes,
00906                                                    trace);
00907                 break;
00908             case PSWRAP_5:
00909                 psw = new PreparedStatementWrapper(user, sql,
00910                                                    columnNames,
00911                                                    trace);
00912                 break;
00913             default:
00914                 break;
00915         }
00916 
00917         synchronized (mcinfo.pStmts) {
00918             trace.log(BasicLevel.DEBUG, "MC pStmts: " + mcinfo.pStmts);
00919             //Check if statement matches ManagedConnection list & valid
00920             for (int i = 0; i < mcinfo.pStmts.size(); i++) {
00921                 if (mcinfo.pStmts.get(i).equals(psw)) {
00922                     PreparedStatementWrapper ps = (PreparedStatementWrapper) mcinfo.pStmts.remove(i);
00923                     ps.clearPstmtValues();
00924                     mcinfo.pStmts.add(ps);  // add last
00925                     mcinfo.stmtHash.add(ps);
00926                     return ps;
00927                 }
00928             }
00929 
00930             //If not in either, call con.prepareStatement, wrap the returned one
00931             // and add it to MC pool
00932             PreparedStatement ps = null;
00933             switch (pswrapType) {
00934                 case PSWRAP_1:
00935                     ps = ((java.sql.Connection) conn).
00936                            prepareStatement(sql, resultSetType, resultSetConcurrency);
00937                     break;
00938                 case PSWRAP_2:
00939                     ps = ((java.sql.Connection) conn).
00940                            prepareStatement(sql, resultSetType, resultSetConcurrency,
00941                                             resultSetHoldability);
00942                     break;
00943                 case PSWRAP_3:
00944                     ps = ((java.sql.Connection) conn).
00945                            prepareStatement(sql, autoGeneratedKeys);
00946                     break;
00947                 case PSWRAP_4:
00948                     ps = ((java.sql.Connection) conn).
00949                            prepareStatement(sql, columnIndexes);
00950                     break;
00951                 case PSWRAP_5:
00952                     ps = ((java.sql.Connection) conn).
00953                            prepareStatement(sql, columnNames);
00954                     break;
00955                 default:
00956                     break;
00957             }
00958             if (maxPstmtPoolSize < 0) {
00959                 // No prepared statement pool
00960                 trace.log(BasicLevel.DEBUG, "Pooling is disabled");
00961                 return ps;
00962             } else if (maxPstmtPoolSize == 0 || mcinfo.pStmts.size() < maxPstmtPoolSize) {
00963                 psw.setPreparedStatement(ps);
00964                 mcinfo.pStmts.add(psw);
00965                 mcinfo.stmtHash.add(ps);
00966                 trace.log(BasicLevel.DEBUG, "Adding PStmt: " + psw);
00967                 return psw;
00968             } else if (mcinfo.isFreeStmt()) {
00969                 // Remove an entry from the current pool
00970                 PreparedStatementWrapper pw = (PreparedStatementWrapper) mcinfo.pStmts.remove(0);
00971                 pw.destroy();
00972                 psw.setPreparedStatement(ps);
00973                 mcinfo.pStmts.add(psw);
00974                 mcinfo.stmtHash.add(ps);
00975                 trace.log(BasicLevel.DEBUG, "Replacing " + pw + " with " + psw);
00976                 return psw;
00977             } else {
00978                 // No room for the Wrapped statement just return the PreparedStatement
00979                 trace.log(BasicLevel.DEBUG, "No room in pool");
00980                 return ps;
00981             }
00982         }
00983     }
00984 
00985 
00991     public void releaseResource(Object rMc) throws Exception {
00992         trace.log(BasicLevel.DEBUG, "MC: " + rMc);
00993         if (rMc instanceof ManagedConnection) {
00994             ManagedConnection mc = (ManagedConnection) rMc;
00995             MCInfo mcinfo = (MCInfo) mc2mci.remove(mc);
00996             if (mcinfo != null) {
00997                 destroyPStmts(mcinfo);
00998             }
00999         }
01000     }
01001 
01007     public void destroyPStmts(MCInfo mcinfo) throws Exception {
01008         trace.log(BasicLevel.DEBUG, "MCInfo: " + mcinfo);
01009         synchronized (mcinfo.pStmts) {
01010             if (mcinfo.pStmts.size() <= 0) {
01011                 return;
01012             }
01013             int stmtSize = mcinfo.pStmts.size();
01014             try {
01015                 for (int i = 0; i < stmtSize; i++) {
01016                     PreparedStatementWrapper psw = (PreparedStatementWrapper)
01017                             mcinfo.pStmts.remove(0);
01018                     psw.closePstmt();
01019                 }
01020             } catch (Exception ex) {
01021                 throw ex;
01022             }
01023         }
01024     }
01025 
01037     public synchronized void connectionClosed(ConnectionEvent event) {
01038         ManagedConnection mc = (ManagedConnection) event.getSource();
01039         if (mc == null) {
01040             trace.log(BasicLevel.ERROR, "no mc found in Event!");
01041         }
01042 
01043         MCInfo mci = (MCInfo) mc2mci.get(mc);
01044         if (mci == null) {
01045             trace.log(BasicLevel.ERROR, "no mci found!");
01046             return;
01047         }
01048 
01049         if (poolTrace.isLoggable(BasicLevel.DEBUG)) {
01050             poolTrace.log(BasicLevel.DEBUG, "enter\n" + getState("\t"));
01051         }
01052 
01053         mci.usedCs.remove(event.getConnectionHandle());
01054 
01055         if (mci.usedCs.isEmpty()) {
01056             trace.log(BasicLevel.DEBUG, "Last Connection has just been removed");
01057 
01058             try {
01059                 Transaction currentTx = null;
01060                 if (tm != null) {
01061                     currentTx = tm.getTransaction();
01062                 }
01063 
01064                 if (mci.localTransaction && currentTx == null) {
01065                     trace.log(BasicLevel.ERROR, "The managed connection is being closed while a localtransaction is not finished");
01066                 }
01067 
01068                 if (currentTx != null) {
01069                     mci.ctx = currentTx;
01070 
01071                     if (mci.synchro == null) {
01072                         try {
01073                             currentTx.registerSynchronization(
01074                                     new MySynchro(this, mci));
01075                             trace.log(BasicLevel.DEBUG, "registerSynchro mc=" + mci.mc);
01076                         } catch (RollbackException e) {
01077                             // The transaction has been marked as rollbackOnly
01078                             // but the synchronization is registered.
01079                             trace.log(BasicLevel.INFO, "registerSynchronization on transaction marked as Rollback only, mc=" + mci.mc);
01080                         }
01081                     }
01082                 } else if (mci.localTransaction) {
01083                     trace.log(BasicLevel.DEBUG, "MC isn't released because local transaction is not finished");
01084 
01085                 } else {
01086                     trace.log(BasicLevel.DEBUG, "no currentTx");
01087 
01088                     mcs.remove(mci);
01089 
01090                     if (mci.ctx != null) {
01091                         usedMCs.remove(mci.ctx);
01092                         mci.ctx = null;
01093                     }
01094 
01095                     mci.mc.cleanup();
01096 
01097                     // Free up the PreparedStatements
01098                     mci.stmtHash.clear();
01099 
01100                     // Release the MC from its pool
01101                     poolMCs.releaseResource(mci.mc, false, true);
01102 
01103                 }
01104                 if (mci.rmeCalled) {
01105                     //Signal to the RessourceManagerEventListener mc is released
01106                     mci.rme.isValid = false;
01107                     rmel.connectionClosed(mci.rme);
01108                     mci.rmeCalled = false;
01109                 }
01110             } catch (Exception e) {
01111                 trace.log(BasicLevel.ERROR,
01112                         "an error during delisting of ManagedConection: ", e);
01113             }
01114             if (poolTrace.isLoggable(BasicLevel.DEBUG)) {
01115                 poolTrace.log(BasicLevel.DEBUG, "exit\n" + getState("\t"));
01116             }
01117         }
01118     }
01119 
01120 
01131     public void connectionErrorOccurred(ConnectionEvent event) {
01132         ManagedConnection mc = (ManagedConnection) event.getSource();
01133         if (mc == null) {
01134             trace.log(BasicLevel.ERROR, "no mc found in Event!");
01135         }
01136 
01137         MCInfo mci = (MCInfo) mc2mci.get(mc);
01138         if (mci == null) {
01139             trace.log(BasicLevel.ERROR, "no mci found!");
01140             return;
01141         }
01142 
01143         if (poolTrace.isLoggable(BasicLevel.DEBUG)) {
01144             poolTrace.log(BasicLevel.DEBUG, "enter\n" + getState("\t"));
01145         }
01146 
01147         mci.usedCs.clear();
01148 
01149         try {
01150             if (mci.rmeCalled) {
01151                 //Signal to the RessourceManagerEventListener mc is released
01152                 mci.rme.isValid = false;
01153                 rmel.connectionErrorOccured(mci.rme);
01154                 mci.rmeCalled = false;
01155             }
01156             // Detach the connectionManager from the mc
01157             mc.removeConnectionEventListener(this);
01158             // Remove the association (transaction ctx / mc)
01159             if (mci.ctx != null) {
01160                 usedMCs.remove(mci.ctx);
01161                 mci.ctx = null;
01162             } else {
01163                 mcs.remove(mci);
01164             }
01165 
01166             trace.log(BasicLevel.DEBUG, "Destroying managed connection (" + mc + ")");
01167             // Destroy the PreparedStatements
01168             destroyPStmts(mci);
01169 
01170             poolMCs.releaseResource(mc, true, false);
01171             if (poolTrace.isLoggable(BasicLevel.DEBUG)) {
01172                 poolTrace.log(BasicLevel.DEBUG, "enter\n" + getState("\t"));
01173             }
01174             
01175             mc2mci.remove(mc);
01176 
01177         } catch (Exception e) {
01178             trace.log(BasicLevel.ERROR,
01179                     "an error related during delisting of ManagedConection");
01180         }
01181     }
01182 
01183 
01190     public void localTransactionCommitted(ConnectionEvent event) {
01191         ManagedConnection mc = (ManagedConnection) event.getSource();
01192         if (mc == null) {
01193             trace.log(BasicLevel.ERROR, "no mc found in Event!");
01194         }
01195 
01196         MCInfo mci = (MCInfo) mc2mci.get(mc);
01197         if (mci == null) {
01198             trace.log(BasicLevel.ERROR, "no mci found!");
01199             return;
01200         }
01201         mci.localTransaction = false;
01202 
01203         if (mci.usedCs.isEmpty()) {
01204             trace.log(BasicLevel.DEBUG, "Close the managed connection");
01205 
01206             if (mci.rmeCalled) {
01207                 //Signal to the RessourceManagerEventListener mc is released
01208                 mci.rme.isValid = false;
01209                 rmel.connectionClosed(mci.rme);
01210                 mci.rmeCalled = false;
01211             }
01212 
01213             if (mci.synchro == null) {
01214                 mcs.remove(mci);
01215                 if (mci.ctx != null) {
01216                     usedMCs.remove(mci.ctx);
01217                     mci.ctx = null;
01218                 }
01219 
01220                 try {
01221                     mci.mc.cleanup();
01222 
01223                     // Free up the PreparedStatements
01224                     mci.stmtHash.clear();
01225 
01226                     // Release the MC from its pool
01227                     poolMCs.releaseResource(mci.mc, false, true);
01228                 } catch (Exception e) {
01229                     trace.log(BasicLevel.ERROR,
01230                             "an error related ManagedConection release",
01231                             e, "ConnectionManagerImpl", "localTransactionCommitted");
01232                 }
01233             }
01234         }
01235     }
01236 
01237 
01244     public void localTransactionRolledback(ConnectionEvent event) {
01245         ManagedConnection mc = (ManagedConnection) event.getSource();
01246         if (mc == null) {
01247             trace.log(BasicLevel.ERROR, "no mc found in Event!");
01248         }
01249 
01250         MCInfo mci = (MCInfo) mc2mci.get(mc);
01251         if (mci == null) {
01252             trace.log(BasicLevel.ERROR, "no mci found!");
01253             return;
01254         }
01255         mci.localTransaction = false;
01256 
01257         if (mci.usedCs.isEmpty()) {
01258             trace.log(BasicLevel.DEBUG, "Close the managed connection");
01259 
01260             if (mci.rmeCalled) {
01261                 //Signal to the ResourceManagerEventListener mc is released
01262                 mci.rme.isValid = false;
01263                 rmel.connectionClosed(mci.rme);
01264                 mci.rmeCalled = false;
01265             }
01266 
01267             if (mci.synchro == null) {
01268                 mcs.remove(mci);
01269                 if (mci.ctx != null) {
01270                     usedMCs.remove(mci.ctx);
01271                     mci.ctx = null;
01272                 }
01273 
01274                 try {
01275                     mci.mc.cleanup();
01276 
01277                     // Free up the PreparedStatements
01278                     mci.stmtHash.clear();
01279 
01280                     // Release the MC from its pool
01281                     poolMCs.releaseResource(mci.mc, false, true);
01282                 } catch (Exception e) {
01283                     trace.log(BasicLevel.ERROR,
01284                             "an error related during ManagedConection release:",
01285                             e, "ConnectionManagerImpl", "localTransactionRolledback");
01286                 }
01287             }
01288         }
01289     }
01290 
01291 
01298     public void localTransactionStarted(ConnectionEvent event) {
01299         ManagedConnection mc = (ManagedConnection) event.getSource();
01300         if (mc == null) {
01301             trace.log(BasicLevel.ERROR, "no mc found in Event!");
01302         }
01303 
01304         MCInfo mci = (MCInfo) mc2mci.get(mc);
01305         if (mci == null) {
01306             trace.log(BasicLevel.ERROR, "no mci found!");
01307             return;
01308         }
01309         mci.localTransaction = true;
01310     }
01311 
01312 
01318     public String toString() {
01319         String m = super.toString();
01320         // remove package name
01321         int c1 = 0;
01322         int current = m.indexOf(".");
01323         while (current != -1) {
01324             c1 = current;
01325             current = m.indexOf(".", current + 1);
01326         }
01327         return m.substring(c1 + 1, m.length());
01328     }
01329 
01330 
01337     protected String getState(String prefix) {
01338         String res = prefix + "mcf=" + mcf + "\n";
01339         res += prefix + "ResourceSpec=" + rs.toString() + "\n";
01340         res += prefix + "size of MC pool:" + poolMCs.getSize() + "\n";
01341         res += prefix + "size of usedMCs:" + usedMCs.size() + "\n";
01342         res += prefix + "mcs attached to a tx:\n";
01343         for (Enumeration en = usedMCs.keys(); en.hasMoreElements();) {
01344             Object tx = en.nextElement();
01345             MCInfo mci = (MCInfo) usedMCs.get(tx);
01346             res += prefix + "MCI : tx=" + tx + "\n";
01347             res += mci.getState(prefix + "\t");
01348         }
01349         res += prefix + "mcs not attached to a tx:\n";
01350         for (Enumeration en = mcs.elements(); en.hasMoreElements();) {
01351             MCInfo mci = (MCInfo) en.nextElement();
01352             res += mci.getState(prefix + "\t");
01353         }
01354         res += prefix + "mcs waiting for tx commit or rollback:\n";
01355         for (Enumeration en = synchros.elements(); en.hasMoreElements();) {
01356             MCInfo mci = (MCInfo) en.nextElement();
01357             res += mci.getState(prefix + "\t");
01358         }
01359         return res;
01360     }
01361 
01366     public void setXAName(String xanm) {
01367         xaName = xanm;
01368     }
01369 
01374     public String getXAName() {
01375         return (xaName);
01376     }
01377 
01381     public void registerXAResource() {
01382         // If no RM or the RAR doesn't support XA, then just return
01383         if (tm == null) {
01384             return;
01385         }
01386         if (!transSupport.equalsIgnoreCase(XA_TRANS_SUPPORT)) {
01387             return;
01388         }
01389 
01390         // Find an entry in free pool
01391         // If none, then create one
01392         MCInfo mci = null;
01393         XAResource xar = null;
01394         try {
01395             ManagedConnection mc = (ManagedConnection) poolMCs.getResource(rs);
01396             if (mc == null) {
01397                 trace.log(BasicLevel.DEBUG, "Cannot allocate a ManagedConnection for registerXAResource");
01398                 return;
01399             }
01400 
01401             mci = (MCInfo) mc2mci.get(mc);
01402             if (mci == null) {
01403                 mci = new MCInfo(mc);
01404                 mc2mci.put(mc, mci);
01405             }
01406 
01407             xar = mc.getXAResource();
01408             trace.log(BasicLevel.DEBUG, "got a MC from the ra pool, mc=" + mci.mc
01409                                         + " xar=" + xar);
01410 
01411             if (!mci.connectionEventListener) {
01412                 mci.mc.addConnectionEventListener(this);
01413                 mci.connectionEventListener = true;
01414             }
01415 
01416             mci.synchro = null;
01417 
01418         } catch (ResourceException re) {
01419             return;
01420         } catch (Exception e) {
01421             trace.log(BasicLevel.ERROR, e.getMessage(), e);
01422             return;
01423         }
01424 
01425         // Setup globals so it can be returned to the pool
01426         jotmMc = mci.mc;
01427         jotmXar = xar;
01428 
01429         // Call to register it
01430         try {
01431             trace.log(BasicLevel.INFO, "Registering name = " + xaName + " xar = " + jotmXar);
01432             ((Current) tm).getTransactionRecovery().registerResourceManager(xaName, jotmXar, "", this);
01433        } catch (Exception ex) {
01434             trace.log(BasicLevel.ERROR, ex.getMessage(), ex);
01435             returnXAResource(xaName, jotmXar);
01436         }
01437     }
01438 
01446     public void returnXAResource(String rmName, XAResource rmXares) {
01447         // Get the associated MC
01448         // Clean it up and return it to the free pool
01449         trace.log(BasicLevel.INFO, "Removing name = " + xaName + " xar = " + jotmXar);
01450         if (jotmXar == null) {
01451             return;
01452         }
01453         if (!rmXares.equals(jotmXar)) {
01454             trace.log(BasicLevel.ERROR, "XAResource of " + rmXares + " and "
01455                                         + jotmXar + " not equal!");
01456             return;
01457         }
01458 
01459         MCInfo mci = (MCInfo) mc2mci.get(jotmMc);
01460         if (mci == null) {
01461             trace.log(BasicLevel.ERROR, "no mci found for " + jotmMc);
01462             return;
01463         }
01464 
01465         try {
01466             mci.mc.cleanup();
01467 
01468             // Release the MC from its pool
01469             poolMCs.releaseResource(mci.mc, false, true);
01470         } catch (Exception ex) {
01471             trace.log(BasicLevel.ERROR, ex.getMessage(), ex);
01472         }
01473 
01474         jotmMc = null;
01475         jotmXar = null;
01476     }
01477 }

Generated on Tue Feb 15 15:05:16 2005 for JOnAS by  doxygen 1.3.9.1