00001
00026 package org.objectweb.jonas_ejb.container;
00027
00028
00029 import java.util.ArrayList;
00030 import java.util.List;
00031 import java.util.ListIterator;
00032
00033 import javax.ejb.EJBException;
00034 import javax.ejb.MessageDrivenBean;
00035 import javax.ejb.MessageDrivenContext;
00036 import javax.ejb.Timer;
00037 import javax.ejb.TimerService;
00038 import javax.jms.ConnectionConsumer;
00039 import javax.jms.JMSException;
00040 import javax.jms.MessageListener;
00041 import javax.jms.Queue;
00042 import javax.jms.ServerSession;
00043 import javax.jms.ServerSessionPool;
00044 import javax.jms.Session;
00045 import javax.jms.Topic;
00046 import javax.jms.XAQueueConnection;
00047 import javax.jms.XAQueueConnectionFactory;
00048 import javax.jms.XATopicConnection;
00049 import javax.jms.XATopicConnectionFactory;
00050 import javax.naming.Context;
00051
00052 import org.objectweb.jonas_ejb.deployment.api.MessageDrivenDesc;
00053 import org.objectweb.jonas_ejb.deployment.api.MethodDesc;
00054
00055 import org.objectweb.jonas_jms.api.JmsManager;
00056
00057 import org.objectweb.util.monolog.api.BasicLevel;
00058
00066 public class JMdbFactory extends JFactory implements ServerSessionPool {
00067
00071 private JmsManager jms = null;
00072
00076 ConnectionConsumer cc = null;
00077
00081 private List sspool = new ArrayList();
00082
00083 protected int instanceCount = 0;
00084
00085 protected int minPoolSize = 0;
00086
00087 protected int maxCacheSize = 0;
00088
00092 protected ThreadPool threadpool = null;
00093
00098 protected XATopicConnection tconn = null;
00099
00104 protected XAQueueConnection qconn = null;
00105
00112 public JMdbFactory(MessageDrivenDesc dd, JContainer cont, ThreadPool thp) {
00113 super(dd, cont);
00114 threadpool = thp;
00115
00116
00117 txbeanmanaged = dd.isBeanManagedTransaction();
00118
00119
00120 jms = cont.getJmsManager();
00121 if (jms == null) {
00122 TraceEjb.logger.log(BasicLevel.ERROR, "cannot deploy a message driven bean without the JMS Service");
00123 throw new EJBException("JMS Service must be run");
00124 }
00125
00126
00127 String selector = dd.getSelector();
00128 String dest = dd.getDestinationJndiName();
00129
00130
00131
00132
00133
00134
00135 int maxMessages = 1;
00136
00137 if (dest == null) {
00138 throw new EJBException("The destination JNDI name is null in bean " + dd.getEjbName());
00139 }
00140
00141 try {
00142 if (dd.isTopicDestination()) {
00143
00144 XATopicConnectionFactory tcf = jms.getXATopicConnectionFactory();
00145 tconn = tcf.createXATopicConnection();
00146 Topic t = jms.getTopic(dest);
00147 if (dd.isSubscriptionDurable()) {
00148 TraceEjb.mdb.log(BasicLevel.DEBUG, "createDurableConnectionConsumer for "+ejbname);
00149 cc = tconn.createDurableConnectionConsumer(t, ejbname, selector, this, maxMessages);
00150 } else {
00151 TraceEjb.mdb.log(BasicLevel.DEBUG, "createConnectionConsumer for "+dest);
00152 cc = tconn.createConnectionConsumer(t, selector, this, maxMessages);
00153 }
00154 tconn.start();
00155 } else {
00156
00157 XAQueueConnectionFactory qcf = jms.getXAQueueConnectionFactory();
00158 qconn = qcf.createXAQueueConnection();
00159 Queue q = jms.getQueue(dest);
00160 TraceEjb.mdb.log(BasicLevel.DEBUG, "createConnectionConsumer for "+dest);
00161 cc = qconn.createConnectionConsumer(q, selector, this, maxMessages);
00162 qconn.start();
00163 }
00164 } catch (Exception e) {
00165 throw new EJBException("Cannot create connection consumer in bean " + dd.getEjbName() + " :", e);
00166 }
00167
00168 minPoolSize = dd.getPoolMin();
00169 maxCacheSize = dd.getCacheMax();
00170 if(TraceEjb.isDebugSwapper()) {
00171 TraceEjb.swapper.log(BasicLevel.DEBUG," maxCacheSize = "+ maxCacheSize +
00172 " minPoolSize = "+minPoolSize);
00173 }
00174 }
00175
00176
00177
00178
00179
00183 public void initInstancePool() {
00184 if (minPoolSize != 0) {
00185 TraceEjb.mdb.log(BasicLevel.INFO, "pre-allocate a set of " + minPoolSize
00186 + " message driven bean instances");
00187
00188 synchronized (sspool) {
00189 for (int i = 0; i < minPoolSize; i++) {
00190 ServerSession ss = null;
00191 try {
00192 ss = createNewInstance();
00193 sspool.add(ss);
00194 } catch (Exception e) {
00195 TraceEjb.mdb.log(BasicLevel.ERROR, "cannot init pool of instances ");
00196 throw new EJBException("cannot init pool of instances ", e);
00197 }
00198 }
00199 }
00200 }
00201 }
00202
00206 public int getPoolSize() {
00207 return sspool.size();
00208 }
00209
00216 public void stop() {
00217 TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00218 try {
00219 cc.close();
00220 if (tconn != null)
00221 tconn.close();
00222 if (qconn != null)
00223 qconn.close();
00224 } catch(javax.jms.JMSException e) {
00225 TraceEjb.logger.log(BasicLevel.WARN, "unregister: Cannot close Connection Consumer");
00226 }
00227 }
00228
00232 public void sync() {
00233 }
00234
00235
00239 public JHome getHome() {
00240 return null;
00241 }
00242
00246 public JLocalHome getLocalHome() {
00247 return null;
00248 }
00249
00250
00251
00252
00253
00261 public ServerSession getServerSession() throws JMSException {
00262 TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00263
00264 return getNewInstance();
00265 }
00266
00267
00268
00269
00270
00271
00272
00277 public void releaseServerSession(ServerSession ss) {
00278 TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00279
00280 synchronized (sspool) {
00281 sspool.add(ss);
00282 if(TraceEjb.isDebugSwapper()) {
00283 TraceEjb.swapper.log(BasicLevel.DEBUG, "notifyAll " );
00284 }
00285 sspool.notifyAll();
00286 }
00287 if(TraceEjb.isDebugJms()) {
00288 TraceEjb.mdb.log(BasicLevel.DEBUG, "nb instances " + getCacheSize());
00289 }
00290
00291 }
00292
00293
00294
00295
00296
00301 public TimerService getTimerService() {
00302 if (myTimerService == null) {
00303
00304 myTimerService = new JTimerService(this);
00305 }
00306 return myTimerService;
00307 }
00308
00313 public int getMinPoolSize() {
00314 return minPoolSize;
00315 }
00316
00321 public int getMaxCacheSize() {
00322 return maxCacheSize;
00323 }
00324
00329 public int getCacheSize() {
00330 return instanceCount;
00331 }
00332
00336 public int getTransactionAttribute() {
00337 return ((MessageDrivenDesc)dd).getTxAttribute();
00338 }
00339
00344 public void checkTransaction(RequestCtx rctx) {
00345 if (rctx.txAttr == MethodDesc.TX_REQUIRED) {
00346 try {
00347 if (tm.getTransaction() != null) {
00348
00349 TraceEjb.logger.log(BasicLevel.ERROR, "Transaction already opened by this thread.");
00350 TraceEjb.logger.log(BasicLevel.ERROR, "Transaction status = " + tm.getStatus());
00351 TraceEjb.logger.log(BasicLevel.ERROR, "Transaction = " + tm.getTransaction());
00352 Thread.dumpStack();
00353 return;
00354 }
00355 tm.begin();
00356 rctx.mustCommit = true;
00357 rctx.currTx = tm.getTransaction();
00358 TraceEjb.tx.log(BasicLevel.DEBUG, "Transaction started: " + rctx.currTx);
00359 } catch (Exception e) {
00360
00361 TraceEjb.logger.log(BasicLevel.ERROR, "cannot start tx:"+e);
00362 return;
00363 }
00364 }
00365 }
00366
00371 public void reduceCache() {
00372 TraceEjb.swapper.log(BasicLevel.DEBUG, "");
00373
00374 int poolsz = minPoolSize;
00375 synchronized (sspool) {
00376 if(TraceEjb.isDebugSwapper()) {
00377 TraceEjb.swapper.log(BasicLevel.DEBUG, "try to reduce " + sspool.size() +
00378 " to " + poolsz);
00379 }
00380 while (sspool.size() > poolsz) {
00381 ListIterator i = sspool.listIterator();
00382 if (i.hasNext()) {
00383 i.next();
00384 i.remove();
00385 instanceCount--;
00386 }
00387 }
00388 }
00389 if (TraceEjb.isDebugSwapper()) {
00390 TraceEjb.swapper.log(BasicLevel.DEBUG, "cacheSize= " + getCacheSize());
00391 }
00392
00393 }
00394
00399 public void notifyTimeout(Timer timer) {
00400 TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00401
00402
00403 JMessageDrivenBean jmdb = null;
00404 try {
00405 jmdb = getNewInstance();
00406 } catch (JMSException e) {
00407 TraceEjb.logger.log(BasicLevel.ERROR, "exception:" + e);
00408 throw new EJBException("Cannot deliver the timeout: " + e);
00409 }
00410
00411
00412 jmdb.deliverTimeout(timer);
00413
00414
00415 releaseServerSession(jmdb);
00416 }
00417
00418
00419
00420
00421
00426 private JMessageDrivenBean getNewInstance() throws JMSException {
00427 TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00428
00429
00430 JMessageDrivenBean ss = null;
00431
00432
00433 synchronized(sspool) {
00434 if (!sspool.isEmpty()) {
00435 try {
00436 ss = (JMessageDrivenBean) sspool.remove(0);
00437 return ss;
00438 } catch(IndexOutOfBoundsException ex) {
00439
00440 TraceEjb.logger.log(BasicLevel.ERROR, "exception:"+ex);
00441 throw new EJBException("Cannot get an instance from the pool");
00442 }
00443 } else {
00444 TraceEjb.mdb.log(BasicLevel.DEBUG,"pool is empty");
00445 if (maxCacheSize == 0 || instanceCount < maxCacheSize) {
00446
00447 try {
00448 ss = createNewInstance();
00449 } catch (Exception e) {
00450 TraceEjb.logger.log(BasicLevel.ERROR, "exception:"+e);
00451 throw new EJBException("Cannot create a new instance");
00452 }
00453 } else {
00454 while (sspool.isEmpty()) {
00455 TraceEjb.swapper.log(BasicLevel.DEBUG,"sspool.isEmpty() = true --> wait()");
00456 try {
00457 sspool.wait();
00458 TraceEjb.swapper.log(BasicLevel.DEBUG,"sspool notified");
00459 } catch (InterruptedException e) {
00460 TraceEjb.swapper.log(BasicLevel.DEBUG, "sspool waiting interrupted");
00461 } catch (Exception e) {
00462 throw new EJBException("synchronization pb", e);
00463 }
00464 }
00465 try {
00466 ListIterator i = sspool.listIterator();
00467 if (i.hasNext()) {
00468 ss = (JMessageDrivenBean) i.next();
00469 i.remove();
00470 }
00471 return ss;
00472 } catch(IndexOutOfBoundsException ex) {
00473
00474 }
00475 }
00476
00477 }
00478 if(TraceEjb.isDebugSwapper()) {
00479 TraceEjb.swapper.log(BasicLevel.DEBUG, "nb instances " + getCacheSize());
00480 }
00481 return ss;
00482 }
00483 }
00484
00488 private JMessageDrivenBean createNewInstance() throws Exception {
00489 TraceEjb.mdb.log(BasicLevel.DEBUG, "");
00490 Session sess = null;
00491 JMessageDrivenBean ss = null;
00492 MessageDrivenDesc mdd = (MessageDrivenDesc) dd;
00493 if (tconn != null) {
00494 if (mdd.isRequired()) {
00495 sess = tconn.createXATopicSession();
00496 } else {
00497 sess = tconn.createTopicSession(false, mdd.getAcknowledgeMode());
00498 }
00499 } else if (qconn != null) {
00500 if (mdd.isRequired()) {
00501 sess = qconn.createXAQueueSession();
00502 } else {
00503 sess = qconn.createQueueSession(false, mdd.getAcknowledgeMode());
00504 }
00505 } else {
00506 TraceEjb.mdb.log(BasicLevel.ERROR, "connection not initialized");
00507 throw new Exception("JMS connection not initialized");
00508 }
00509
00510
00511
00512 Thread.currentThread().setContextClassLoader(myClassLoader());
00513
00514
00515 MessageDrivenBean mdb = null;
00516 try {
00517 mdb = (MessageDrivenBean) beanclass.newInstance();
00518 } catch (Exception e) {
00519 TraceEjb.logger.log(BasicLevel.ERROR, "failed to create instance:", e);
00520 throw new EJBException("Container failed to create instance of Message Driven Bean", e);
00521 }
00522
00523
00524
00525 ss = new JMessageDrivenBean(this, sess, mdb, threadpool);
00526 sess.setMessageListener((MessageListener)ss);
00527
00528
00529
00530
00531 Context ctxsave = setComponentContext();
00532 mdb.setMessageDrivenContext((MessageDrivenContext)ss);
00533 try {
00534 beanclass.getMethod("ejbCreate", (Class[]) null).invoke(mdb, (Object[]) null);
00535 } catch (Exception e) {
00536 TraceEjb.logger.log(BasicLevel.ERROR, "cannot call ejbCreate on message driven bean instance ", e);
00537 throw new EJBException(" Container fails to call ejbCreate on message driven bean instance", e);
00538 } finally {
00539 resetComponentContext(ctxsave);
00540 }
00541
00542 synchronized (sspool) {
00543 instanceCount++;
00544 }
00545 return ss;
00546 }
00547
00548 }