JMdbFactory.java

00001 
00026 package org.objectweb.jonas_ejb.container;
00027 
00028 
00029 import java.util.ArrayList;
00030 import java.util.List;
00031 import java.util.ListIterator;
00032 
00033 import javax.ejb.EJBException;
00034 import javax.ejb.MessageDrivenBean;
00035 import javax.ejb.MessageDrivenContext;
00036 import javax.ejb.Timer;
00037 import javax.ejb.TimerService;
00038 import javax.jms.ConnectionConsumer;
00039 import javax.jms.JMSException;
00040 import javax.jms.MessageListener;
00041 import javax.jms.Queue;
00042 import javax.jms.ServerSession;
00043 import javax.jms.ServerSessionPool;
00044 import javax.jms.Session;
00045 import javax.jms.Topic;
00046 import javax.jms.XAQueueConnection;
00047 import javax.jms.XAQueueConnectionFactory;
00048 import javax.jms.XATopicConnection;
00049 import javax.jms.XATopicConnectionFactory;
00050 import javax.naming.Context;
00051 
00052 import org.objectweb.jonas_ejb.deployment.api.MessageDrivenDesc;
00053 import org.objectweb.jonas_ejb.deployment.api.MethodDesc;
00054 
00055 import org.objectweb.jonas_jms.api.JmsManager;
00056 
00057 import org.objectweb.util.monolog.api.BasicLevel;
00058 
00066 public class JMdbFactory extends JFactory implements ServerSessionPool {
00067 
00071     private JmsManager jms = null;
00072 
00076     ConnectionConsumer cc = null;
00077 
00081     private List  sspool = new ArrayList();
00082 
00083     protected int instanceCount = 0;
00084     // initial value for pool size
00085     protected int minPoolSize = 0;
00086     // nb max of instances in pool
00087     protected int maxCacheSize = 0;
00088 
00092     protected ThreadPool threadpool = null;
00093 
00098     protected XATopicConnection tconn = null;
00099 
00104     protected XAQueueConnection qconn = null;
00105 
00112     public JMdbFactory(MessageDrivenDesc dd, JContainer cont, ThreadPool thp) {
00113         super(dd, cont);
00114         threadpool = thp;
00115 
00116         // Check if tx managed by the bean or the container
00117         txbeanmanaged = dd.isBeanManagedTransaction();
00118 
00119         // Check that JMS Service has been run in the server
00120         jms = cont.getJmsManager();
00121         if (jms == null) {
00122             TraceEjb.logger.log(BasicLevel.ERROR, "cannot deploy a message driven bean without the JMS Service");
00123             throw new EJBException("JMS Service must be run");
00124         }
00125 
00126         // Create a Connection Consumer, depending on Deployment Descriptor
00127         String selector = dd.getSelector();
00128         String dest = dd.getDestinationJndiName();
00129         
00130         // Get the number max of messages sent on a Session at one time.
00131         // There is a bug in Joram: If the number is greater than 1, Joram
00132         // will wait until this nb is reached!
00133         // We set the number max to 1 to work around this bug.
00134         // LATER: maxMessage = dd.getMaxMessages(), with this value configurable.
00135         int maxMessages = 1;
00136 
00137         if (dest == null) {
00138                 throw new EJBException("The destination JNDI name is null in bean " + dd.getEjbName());
00139         }
00140 
00141         try {
00142             if (dd.isTopicDestination()) {
00143                 // topic
00144                 XATopicConnectionFactory tcf = jms.getXATopicConnectionFactory();
00145                 tconn = tcf.createXATopicConnection();
00146                 Topic t = jms.getTopic(dest);
00147                 if (dd.isSubscriptionDurable()) {
00148                     TraceEjb.mdb.log(BasicLevel.DEBUG, "createDurableConnectionConsumer for "+ejbname);
00149                     cc  = tconn.createDurableConnectionConsumer(t, ejbname, selector, this, maxMessages);
00150                 } else {
00151                     TraceEjb.mdb.log(BasicLevel.DEBUG, "createConnectionConsumer for "+dest);
00152                     cc  = tconn.createConnectionConsumer(t, selector, this, maxMessages);
00153                 }
00154                 tconn.start();
00155             } else {
00156                 // queue
00157                 XAQueueConnectionFactory qcf = jms.getXAQueueConnectionFactory();
00158                 qconn = qcf.createXAQueueConnection();
00159                 Queue q = jms.getQueue(dest);
00160                 TraceEjb.mdb.log(BasicLevel.DEBUG, "createConnectionConsumer for "+dest);
00161                 cc = qconn.createConnectionConsumer(q, selector, this, maxMessages);
00162                 qconn.start();
00163             }
00164         } catch (Exception e) {
00165             throw new EJBException("Cannot create connection consumer in bean " + dd.getEjbName() + " :", e);
00166         }
00167 
00168         minPoolSize = dd.getPoolMin();
00169         maxCacheSize = dd.getCacheMax();
00170         if(TraceEjb.isDebugSwapper()) {
00171             TraceEjb.swapper.log(BasicLevel.DEBUG," maxCacheSize = "+ maxCacheSize +
00172                                  " minPoolSize = "+minPoolSize);
00173         }
00174     }
00175 
00176     // ---------------------------------------------------------------
00177     // Specific BeanFactory implementation
00178     // ---------------------------------------------------------------
00179 
00183     public void initInstancePool() {
00184         if (minPoolSize != 0) {
00185             TraceEjb.mdb.log(BasicLevel.INFO, "pre-allocate a set of " + minPoolSize
00186                     + " message driven bean  instances");
00187             // pre-allocate a set of ServerSession
00188             synchronized (sspool) {
00189                 for (int i = 0; i < minPoolSize; i++) {
00190                     ServerSession ss = null;
00191                     try {
00192                         ss = createNewInstance();
00193                         sspool.add(ss);
00194                     } catch (Exception e) {
00195                         TraceEjb.mdb.log(BasicLevel.ERROR, "cannot init pool of instances ");
00196                         throw new EJBException("cannot init pool of instances ", e);
00197                     }
00198                 }
00199             }
00200         }
00201     }
00202     
00206     public int getPoolSize() {
00207         return sspool.size();
00208     }
00209 
00216     public void stop() {
00217         TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00218         try {
00219             cc.close();
00220             if (tconn != null)
00221                 tconn.close();
00222             if (qconn != null)
00223                 qconn.close();
00224         } catch(javax.jms.JMSException e) {
00225             TraceEjb.logger.log(BasicLevel.WARN, "unregister: Cannot close Connection Consumer");
00226         }
00227     }
00228 
00232     public void sync() {
00233     }
00234 
00235 
00239     public JHome getHome() {
00240         return null;
00241     }
00242 
00246     public JLocalHome getLocalHome() {
00247         return null;
00248     }
00249 
00250     // ---------------------------------------------------------------
00251     // ServerSessionPool Implementation
00252     // ---------------------------------------------------------------
00253 
00261     public ServerSession getServerSession() throws JMSException {
00262         TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00263 
00264         return getNewInstance();
00265     }
00266 
00267 
00268 
00269     // ---------------------------------------------------------------
00270     // Other methods
00271     // ---------------------------------------------------------------
00272 
00277     public void releaseServerSession(ServerSession ss) {
00278         TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00279 
00280         synchronized (sspool) {
00281             sspool.add(ss);
00282             if(TraceEjb.isDebugSwapper()) {
00283                     TraceEjb.swapper.log(BasicLevel.DEBUG, "notifyAll " );
00284             }
00285             sspool.notifyAll();
00286         }
00287         if(TraceEjb.isDebugJms()) {
00288             TraceEjb.mdb.log(BasicLevel.DEBUG, "nb instances " + getCacheSize());
00289         }
00290 
00291     }
00292 
00293     // ---------------------------------------------------------------
00294     // other public methods
00295     // ---------------------------------------------------------------
00296 
00301     public TimerService getTimerService() {
00302         if (myTimerService == null) {
00303             // TODO : Check that instance implements TimedObject ?
00304             myTimerService = new JTimerService(this);
00305         }
00306         return myTimerService;
00307     }
00308 
00313     public int getMinPoolSize() {
00314         return minPoolSize;
00315     }
00316 
00321     public int getMaxCacheSize() {
00322         return maxCacheSize;
00323     }
00324 
00329     public int getCacheSize() {
00330         return instanceCount;
00331     }
00332 
00336     public int getTransactionAttribute() {
00337         return ((MessageDrivenDesc)dd).getTxAttribute();
00338     }
00339 
00344     public void checkTransaction(RequestCtx rctx) {
00345         if (rctx.txAttr == MethodDesc.TX_REQUIRED) {
00346             try {
00347                 if (tm.getTransaction() != null) {
00348                     // This should not occur (DEBUG)
00349                     TraceEjb.logger.log(BasicLevel.ERROR, "Transaction already opened by this thread.");
00350                     TraceEjb.logger.log(BasicLevel.ERROR, "Transaction status = " + tm.getStatus());
00351                     TraceEjb.logger.log(BasicLevel.ERROR, "Transaction = " + tm.getTransaction());
00352                     Thread.dumpStack();
00353                     return;
00354                 }
00355                 tm.begin();
00356                 rctx.mustCommit = true;
00357                 rctx.currTx = tm.getTransaction();
00358                 TraceEjb.tx.log(BasicLevel.DEBUG, "Transaction started: " + rctx.currTx);
00359             } catch (Exception e) {
00360                 // No exception raised in case of MDB
00361                 TraceEjb.logger.log(BasicLevel.ERROR, "cannot start tx:"+e);
00362                 return;
00363             }
00364         }
00365     }
00366 
00371     public void reduceCache() {
00372         TraceEjb.swapper.log(BasicLevel.DEBUG, "");
00373         // reduce the pool to the minPoolSize
00374         int poolsz = minPoolSize;
00375         synchronized (sspool) {
00376             if(TraceEjb.isDebugSwapper()) {
00377                 TraceEjb.swapper.log(BasicLevel.DEBUG, "try to reduce " + sspool.size() +
00378                                      " to " + poolsz);
00379             }
00380             while (sspool.size() > poolsz) {
00381                 ListIterator i = sspool.listIterator();
00382                 if (i.hasNext()) {
00383                     i.next();
00384                     i.remove();
00385                     instanceCount--;
00386                 }
00387             }
00388         }
00389         if (TraceEjb.isDebugSwapper()) {
00390             TraceEjb.swapper.log(BasicLevel.DEBUG, "cacheSize= " + getCacheSize());
00391         }
00392 
00393     }
00394 
00399     public void notifyTimeout(Timer timer) {
00400         TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00401 
00402         // We need an instance from the pool to process the timeout.
00403         JMessageDrivenBean jmdb = null;
00404         try {
00405             jmdb = getNewInstance();
00406         } catch (JMSException e) {
00407             TraceEjb.logger.log(BasicLevel.ERROR, "exception:" + e);
00408             throw new EJBException("Cannot deliver the timeout: " + e);
00409         }
00410 
00411         // deliver the timeout to the bean
00412         jmdb.deliverTimeout(timer);
00413 
00414         // release the instance
00415         releaseServerSession(jmdb);
00416     }
00417 
00418     // ---------------------------------------------------------------
00419     // private methods
00420     // ---------------------------------------------------------------
00421 
00426     private JMessageDrivenBean getNewInstance() throws JMSException {
00427         TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00428 
00429         // try to get one from the Pool
00430         JMessageDrivenBean ss = null;
00431 
00432         // try to find a free context in the pool
00433         synchronized(sspool) {
00434             if (!sspool.isEmpty()) {
00435                 try {
00436                     ss = (JMessageDrivenBean) sspool.remove(0);
00437                     return ss;
00438                 } catch(IndexOutOfBoundsException ex) {
00439                     // This should never happen
00440                     TraceEjb.logger.log(BasicLevel.ERROR, "exception:"+ex);
00441                     throw new EJBException("Cannot get an instance from the pool");
00442                 }
00443             } else {
00444                 TraceEjb.mdb.log(BasicLevel.DEBUG,"pool is empty");
00445                 if (maxCacheSize == 0 ||  instanceCount < maxCacheSize) {
00446                     // Pool is empty creates the ServerSession object
00447                     try {
00448                         ss = createNewInstance();
00449                     } catch (Exception e) {
00450                         TraceEjb.logger.log(BasicLevel.ERROR, "exception:"+e);
00451                         throw new EJBException("Cannot create a new instance");
00452                     }
00453                 } else {
00454                     while (sspool.isEmpty()) {
00455                         TraceEjb.swapper.log(BasicLevel.DEBUG,"sspool.isEmpty() = true --> wait()");
00456                         try {
00457                             sspool.wait();
00458                             TraceEjb.swapper.log(BasicLevel.DEBUG,"sspool notified");
00459                         } catch (InterruptedException e) {
00460                             TraceEjb.swapper.log(BasicLevel.DEBUG, "sspool waiting interrupted");
00461                         } catch (Exception e) {
00462                             throw new EJBException("synchronization pb", e);
00463                         }
00464                     }
00465                     try {
00466                         ListIterator i = sspool.listIterator();
00467                         if (i.hasNext()) {
00468                             ss = (JMessageDrivenBean) i.next();
00469                             i.remove();
00470                         }
00471                         return ss;
00472                     } catch(IndexOutOfBoundsException ex) {
00473                         // pool is empty
00474                     }
00475                 }
00476 
00477             }
00478             if(TraceEjb.isDebugSwapper()) {
00479                 TraceEjb.swapper.log(BasicLevel.DEBUG, "nb instances " + getCacheSize());
00480             }
00481             return ss;
00482         }
00483     }
00484 
00488     private JMessageDrivenBean createNewInstance() throws Exception {
00489         TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00490         Session sess = null;
00491         JMessageDrivenBean ss = null;
00492         MessageDrivenDesc mdd = (MessageDrivenDesc) dd;
00493         if (tconn != null) {
00494             if (mdd.isRequired()) {
00495                 sess = tconn.createXATopicSession();
00496             } else {
00497                 sess = tconn.createTopicSession(false, mdd.getAcknowledgeMode());
00498             }
00499         } else if (qconn != null) {
00500             if (mdd.isRequired()) {
00501                 sess = qconn.createXAQueueSession();
00502             } else {
00503                 sess = qconn.createQueueSession(false, mdd.getAcknowledgeMode());
00504             }
00505         } else {
00506             TraceEjb.mdb.log(BasicLevel.ERROR, "connection not initialized");
00507             throw new Exception("JMS connection not initialized");
00508         }
00509 
00510         // Set ContextClassLoader with the ejbclassloader.
00511         // This is necessary in case ejbCreate calls another bean in the same jar.
00512         Thread.currentThread().setContextClassLoader(myClassLoader());
00513 
00514         // Creates the new instance
00515         MessageDrivenBean mdb = null;
00516         try {
00517             mdb  = (MessageDrivenBean) beanclass.newInstance();
00518         } catch (Exception e) {
00519             TraceEjb.logger.log(BasicLevel.ERROR, "failed to create instance:", e);
00520             throw new EJBException("Container failed to create instance of Message Driven Bean", e);
00521         }
00522 
00523         // Instanciates a new JMessageDrivenBean object
00524         // and set it as the MessageListener for this Session.
00525         ss = new JMessageDrivenBean(this, sess, mdb, threadpool);
00526         sess.setMessageListener((MessageListener)ss);
00527 
00528         // starts the bean instance: setMessageDrivenContext() + ejbCreate()
00529         // see EJB spec. 2.0 page 322.
00530         // Both operations must be called with the correct ComponentContext
00531         Context ctxsave = setComponentContext();
00532         mdb.setMessageDrivenContext((MessageDrivenContext)ss);
00533         try {
00534             beanclass.getMethod("ejbCreate", (Class[]) null).invoke(mdb, (Object[]) null);
00535         } catch (Exception e) {
00536             TraceEjb.logger.log(BasicLevel.ERROR, "cannot call ejbCreate on message driven bean instance ", e);
00537             throw new EJBException(" Container fails to call ejbCreate on message driven bean instance", e);
00538         } finally {
00539             resetComponentContext(ctxsave);
00540         }
00541 
00542         synchronized (sspool) {
00543             instanceCount++;
00544         }
00545         return ss;
00546     }
00547 
00548 }

Generated on Tue Feb 15 15:05:38 2005 for JOnAS by  doxygen 1.3.9.1