00001
00026 package org.objectweb.jonas_ejb.container;
00027
00028 import java.lang.reflect.Method;
00029 import java.lang.reflect.Proxy;
00030 import java.util.ArrayList;
00031 import java.util.List;
00032 import java.util.LinkedList;
00033 import java.util.ListIterator;
00034
00035 import javax.ejb.EJBException;
00036 import javax.ejb.MessageDrivenBean;
00037 import javax.ejb.MessageDrivenContext;
00038 import javax.ejb.Timer;
00039 import javax.ejb.TimerService;
00040 import javax.naming.Context;
00041 import javax.resource.spi.ActivationSpec;
00042 import javax.resource.spi.UnavailableException;
00043 import javax.resource.spi.endpoint.MessageEndpoint;
00044 import javax.resource.spi.endpoint.MessageEndpointFactory;
00045 import javax.transaction.SystemException;
00046 import javax.transaction.xa.XAResource;
00047
00048 import org.objectweb.jonas_ejb.deployment.api.MessageDrivenDesc;
00049 import org.objectweb.jonas_ejb.deployment.api.MethodDesc;
00050 import org.objectweb.jonas_ejb.deployment.api.ActivationConfigPropertyDesc;
00051
00052 import org.objectweb.jonas.resource.Rar;
00053 import org.objectweb.jotm.Current;
00054
00055 import org.objectweb.util.monolog.api.BasicLevel;
00056
00062 public class JMdbEndpointFactory extends JFactory implements MessageEndpointFactory {
00063
00067 private List endpool = new ArrayList();
00068
00072 protected int instanceCount = 0;
00073
00077 protected int minPoolSize = 0;
00078
00082 protected int maxCacheSize = 0;
00083
00087 private ActivationSpec as = null;
00088
00092 private String msglistenType = null;
00093
00097 private Rar rar = null;
00098
00102 private boolean activated = false;
00103
00107 private static String JMESSAGEENDPOINT_CLASS = "org.objectweb.jonas_ejb.container.JMessageEndpointIntf";
00108
00112 private static String MESSAGEENDPOINT_CLASS = "javax.resource.spi.endpoint.MessageEndpoint";
00113
00117 private static String MESSAGEDRIVENCONTEXT_CLASS = "javax.ejb.MessageDrivenContext";
00118
00125 public JMdbEndpointFactory(MessageDrivenDesc dd, JContainer cont, ActivationSpec as) {
00126 super(dd, cont);
00127 String dest = dd.getDestinationJndiName();
00128
00129 List acpl = null;
00130 if (dd.getMdActivationConfigDesc() != null) {
00131 acpl = dd.getMdActivationConfigDesc().getActivationConfigPropertyList();
00132 }
00133 List jAcpl = null;
00134 if (dd.getJonasMdActivationConfigDesc() != null) {
00135 jAcpl = dd.getJonasMdActivationConfigDesc().getActivationConfigPropertyList();
00136 }
00137 mdbEFInit(dd, dest, acpl, jAcpl, as);
00138 }
00139
00147 public JMdbEndpointFactory(MessageDrivenDesc dd, String destination, JContainer cont,
00148 ActivationSpec as) {
00149 super(dd, cont);
00150
00151
00152
00153 List jAcpl = new LinkedList();
00154 List acpl = null;
00155 if (dd.getMdActivationConfigDesc() != null) {
00156 acpl = dd.getMdActivationConfigDesc().getActivationConfigPropertyList();
00157 }
00158 buildACL(dd, jAcpl);
00159
00160 mdbEFInit(dd, destination, acpl, jAcpl, as);
00161 }
00162
00168 private void buildACL(MessageDrivenDesc dd, List jacl) {
00169
00170 String [] aclNames = {"destination", "destinationType", "messageSelector",
00171 "acknowledgeMode", "subscriptionDurability" };
00172
00173 ActivationConfigPropertyDesc acp = null;
00174
00175 acp = new ActivationConfigPropertyDesc();
00176 acp.setActivationConfigPropertyName("destination");
00177 acp.setActivationConfigPropertyValue(dd.getDestinationJndiName());
00178 jacl.add(acp);
00179
00180 acp = new ActivationConfigPropertyDesc();
00181 acp.setActivationConfigPropertyName("destinationType");
00182 if (dd.isTopicDestination()) {
00183 acp.setActivationConfigPropertyValue("javax.jms.Topic");
00184 } else {
00185 acp.setActivationConfigPropertyValue("javax.jms.Queue");
00186 }
00187 jacl.add(acp);
00188
00189 acp = new ActivationConfigPropertyDesc();
00190 acp.setActivationConfigPropertyName("messageSelector");
00191 acp.setActivationConfigPropertyValue(dd.getSelector());
00192 jacl.add(acp);
00193
00194 acp = new ActivationConfigPropertyDesc();
00195 acp.setActivationConfigPropertyName("acknowledgeMode");
00196 if (dd.getAcknowledgeMode() == MessageDrivenDesc.AUTO_ACKNOWLEDGE) {
00197 acp.setActivationConfigPropertyValue("Auto-acknowledge");
00198 } else {
00199 acp.setActivationConfigPropertyValue("Dups-ok-acknowledge");
00200 }
00201 jacl.add(acp);
00202
00203 acp = new ActivationConfigPropertyDesc();
00204 acp.setActivationConfigPropertyName("subscriptionDurability");
00205 if (dd.getSubscriptionDurability() == MessageDrivenDesc.SUBS_DURABLE) {
00206 acp.setActivationConfigPropertyValue("Durable");
00207 } else {
00208 acp.setActivationConfigPropertyValue("NonDurable");
00209 }
00210 jacl.add(acp);
00211
00212 if (dd.getSubscriptionDurability() == MessageDrivenDesc.SUBS_DURABLE) {
00213 acp = new ActivationConfigPropertyDesc();
00214 acp.setActivationConfigPropertyName("subscriptionName");
00215 acp.setActivationConfigPropertyValue(dd.getEjbName());
00216 jacl.add(acp);
00217 }
00218
00219
00220 List acpl = null;
00221 if (dd.getJonasMdActivationConfigDesc() != null) {
00222 acpl = dd.getJonasMdActivationConfigDesc().getActivationConfigPropertyList();
00223
00224 String acpName = null;
00225 boolean found = false;
00226 for (int i = 0; i < acpl.size(); i++) {
00227 acp = (ActivationConfigPropertyDesc) acpl.get(i);
00228 acpName = acp.getActivationConfigPropertyName();
00229 found = false;
00230 for (int j = 0; j < aclNames.length; j++) {
00231 if (acpName.equals(aclNames[j])) {
00232 found = true;
00233 break;
00234 }
00235 }
00236 if (!found) {
00237 jacl.add(acp);
00238 }
00239 }
00240
00241 }
00242 }
00243
00252 private void mdbEFInit(MessageDrivenDesc dd, String dest, List acpl,
00253 List jAcpl, ActivationSpec aSpec) {
00254
00255 String methName = "mdbEFInit: ";
00256 String ejbName = dd.getEjbName();
00257
00258
00259 txbeanmanaged = dd.isBeanManagedTransaction();
00260
00261
00262 as = aSpec;
00263
00264 minPoolSize = dd.getPoolMin();
00265 maxCacheSize = dd.getCacheMax();
00266 trace(BasicLevel.DEBUG, methName + "maxCacheSize = " + maxCacheSize + " minPoolSize = " + minPoolSize);
00267
00268
00269 rar = Rar.getRar(dest);
00270 if (rar == null) {
00271 trace(BasicLevel.ERROR, methName + "cannot retrieve associated Resource Adapter ");
00272 throw new EJBException("cannot retrieve associated Resource Adapter ");
00273 }
00274 msglistenType = rar.getInterface(dest);
00275
00276 try {
00277 rar.configureAS(as, acpl, jAcpl, dest, ejbName);
00278 } catch (Exception ex) {
00279 trace(BasicLevel.ERROR, methName + "cannot configure activationspec " + ex);
00280 ex.printStackTrace();
00281 throw new EJBException("cannot configure activationspec ", ex);
00282 }
00283
00284
00285 synchronized (endpool) {
00286 for (int i = 0; i < minPoolSize; i++) {
00287 JMessageEndpoint ep = null;
00288 try {
00289 ep = createNewInstance();
00290 endpool.add(ep);
00291 } catch (Exception e) {
00292 trace(BasicLevel.ERROR, methName + "cannot init pool of instances ");
00293 throw new EJBException("cannot init pool of instances ", e);
00294 }
00295 }
00296 }
00297 if (minPoolSize != 0) {
00298 trace(BasicLevel.INFO, methName + "pre-allocate a set of " + minPoolSize
00299 + " message driven bean instances");
00300 }
00301
00302
00303 try {
00304 ActivationSpec [] asArray = new ActivationSpec[1];
00305 asArray[0] = as;
00306 XAResource [] xar = rar.getResourceAdapter().getXAResources(asArray);
00307 if (xar != null && xar.length > 0) {
00308 ((Current) tm).getTransactionRecovery().
00309 registerResourceManager(ejbName+msglistenType,
00310 xar[0], "", null);
00311 }
00312 } catch (Exception ex) {
00313 trace(BasicLevel.ERROR, ex.getMessage(), ex);
00314 }
00315
00316 activated = true;
00317
00318 try {
00319 rar.getResourceAdapter().endpointActivation(this, as);
00320 } catch (Exception ex) {
00321 activated = false;
00322 trace(BasicLevel.ERROR, methName + "cannot activate endpoint " + ex);
00323 ex.printStackTrace();
00324 throw new EJBException("cannot activate endpoint ", ex);
00325 }
00326 }
00327
00328
00329
00330
00331
00335 public void initInstancePool() {
00336 }
00337
00341 public int getPoolSize() {
00342 return endpool.size();
00343 }
00344
00349 public void stop() {
00350 String methName = "stop: ";
00351 trace(BasicLevel.DEBUG, methName);
00352 try {
00353
00354 rar.getResourceAdapter().endpointDeactivation(this, as);
00355
00356 synchronized (endpool) {
00357 trace(BasicLevel.DEBUG, methName + "stopping " + this);
00358 JMessageEndpoint ep = null;
00359 while (endpool.size() > 0) {
00360 ep = (JMessageEndpoint) endpool.remove(0);
00361 instanceCount--;
00362 try {
00363 ep.mdb.ejbRemove();
00364 } catch (Exception e) {
00365 trace(BasicLevel.ERROR, methName + "Cannot remove mdb: "
00366 + ep.mdb);
00367 }
00368 }
00369 }
00370 } catch (Exception e) {
00371 trace(BasicLevel.WARN, methName + "Cannot deactivate the endpoint");
00372 }
00373 }
00374
00378 public void sync() {
00379 }
00380
00384 public JHome getHome() {
00385 return null;
00386 }
00387
00391 public JLocalHome getLocalHome() {
00392 return null;
00393 }
00394
00395
00396
00397
00398
00406 public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException {
00407 String methName = "createEndpoint: ";
00408 trace(BasicLevel.DEBUG, methName);
00409
00410 if (!activated) {
00411 trace(BasicLevel.ERROR, methName + "mdb is not usuable either initial deployment or undeployment ");
00412 throw new UnavailableException("mdb is not usuable either initial deployment or undeployment ");
00413 }
00414
00415 JMessageEndpoint ep = null;
00416 try {
00417 ep = getNewInstance(xaResource);
00418 } catch (Exception ex) {
00419 trace(BasicLevel.ERROR, methName + "cannot create an endpoint ");
00420 throw new UnavailableException("cannot create an endpoint ", ex);
00421 }
00422 return (MessageEndpoint) ep.mep;
00423 }
00424
00432 public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
00433 int txAttribute = 0;
00434 try {
00435 txAttribute = dd.getMethodDesc(method).getTxAttribute();
00436 } catch (Exception ex) {
00437 trace(BasicLevel.ERROR, "isDeliveryTransacted: No such method exists. " + method.getName());
00438 throw new NoSuchMethodException("No such method exists. " + method.getName());
00439 }
00440 return txAttribute == MethodDesc.TX_REQUIRED ? true : false;
00441 }
00442
00443
00444
00445
00446
00454 public JMessageEndpoint getMessageEndpoint() throws Exception {
00455 trace(BasicLevel.DEBUG, "getMessageEndpoint: ");
00456
00457 return getNewInstance(null);
00458 }
00459
00464 public void releaseEndpoint(JMessageEndpoint ep) {
00465 String methName = "releaseEndpoint: ";
00466 trace(BasicLevel.DEBUG, methName + ep);
00467
00468 ep.setReleasedState(true);
00469 synchronized (endpool) {
00470 endpool.add(ep);
00471 trace(BasicLevel.DEBUG, methName + "notifyAll " );
00472 endpool.notifyAll();
00473 }
00474 trace(BasicLevel.DEBUG, methName + "nb instances " + getCacheSize());
00475 trace(BasicLevel.DEBUG, methName + "nb free cached instances " + getPoolSize());
00476
00477 }
00478
00479
00480
00481
00482
00487 public TimerService getTimerService() {
00488 if (myTimerService == null) {
00489
00490 myTimerService = new JTimerService(this);
00491 }
00492 return myTimerService;
00493 }
00494
00498 public int getMinPoolSize() {
00499 return minPoolSize;
00500 }
00501
00505 public int getMaxCacheSize() {
00506 return maxCacheSize;
00507 }
00508
00512 public int getCacheSize() {
00513 return instanceCount;
00514 }
00515
00519 public int getTransactionAttribute() {
00520 return ((MessageDrivenDesc) dd).getTxAttribute();
00521 }
00522
00528 public void checkTransaction(RequestCtx rctx) {
00529 String methName = "checkTransaction: ";
00530 if (rctx.txAttr == MethodDesc.TX_REQUIRED) {
00531 try {
00532 if (txbeanmanaged) {
00533 if (tm.getTransaction() == null) {
00534 trace(BasicLevel.ERROR, methName + "No transaction and need one");
00535 return;
00536 }
00537 } else {
00538 if (tm.getTransaction() == null) {
00539 tm.begin();
00540 }
00541 }
00542 rctx.mustCommit = true;
00543 rctx.currTx = tm.getTransaction();
00544 } catch (Exception e) {
00545
00546 trace(BasicLevel.ERROR, methName + "cannot start tx:" + e);
00547 return;
00548 }
00549 } else {
00550 if (rctx.txAttr != MethodDesc.TX_NOT_SUPPORTED) {
00551 trace(BasicLevel.ERROR, methName + "Bad transaction attribute: " + rctx.txAttr);
00552 }
00553 try {
00554 rctx.currTx = tm.getTransaction();
00555 if (rctx.currTx != null) {
00556 trace(BasicLevel.DEBUG, methName + "Suspending client tx");
00557 rctx.clientTx = tm.suspend();
00558 rctx.currTx = null;
00559 }
00560 } catch (SystemException e) {
00561 trace(BasicLevel.ERROR, methName + "cannot suspend transaction:\n", e);
00562 return;
00563 }
00564 }
00565 }
00566
00571 public void reduceCache() {
00572 String methName = "reduceCache: ";
00573 trace(BasicLevel.DEBUG, methName);
00574
00575 int poolsz = minPoolSize;
00576 synchronized (endpool) {
00577 trace(BasicLevel.DEBUG, methName + "try to reduce " + endpool.size() + " to " + poolsz);
00578 while (endpool.size() > poolsz) {
00579 ListIterator i = endpool.listIterator();
00580 if (i.hasNext()) {
00581 i.next();
00582 i.remove();
00583 instanceCount--;
00584 }
00585 }
00586 }
00587 trace(BasicLevel.DEBUG, methName + "cacheSize= " + getCacheSize());
00588
00589 }
00590
00595 public void notifyTimeout(Timer timer) {
00596 String methName = "notifyTimeout: ";
00597 trace(BasicLevel.DEBUG, methName);
00598
00599
00600 JMessageEndpoint ep = null;
00601 try {
00602 ep = getNewInstance(null);
00603 } catch (Exception e) {
00604 trace(BasicLevel.ERROR, methName + "exception:" + e);
00605 throw new EJBException("Cannot deliver the timeout: " + e);
00606 }
00607
00608
00609 ep.deliverTimeout(timer);
00610
00611
00612 releaseEndpoint(ep);
00613 }
00614
00615
00616
00617
00618
00627 private JMessageEndpoint getNewInstance(XAResource xaResource) throws Exception {
00628 String methName = "getNewInstance: ";
00629 trace(BasicLevel.DEBUG, methName + "Factory: " + this + " XAResource: " + xaResource);
00630
00631
00632 JMessageEndpoint ep = null;
00633
00634
00635 synchronized (endpool) {
00636 if (!endpool.isEmpty()) {
00637 try {
00638 ep = (JMessageEndpoint) endpool.remove(0);
00639 } catch (Exception ex) {
00640
00641 trace(BasicLevel.ERROR, methName + "Exception:" + ex);
00642 throw new EJBException("Cannot get an instance from the pool");
00643 }
00644 } else {
00645 trace(BasicLevel.DEBUG, methName + "pool is empty");
00646 if (maxCacheSize == 0 || instanceCount < maxCacheSize) {
00647
00648 try {
00649 ep = createNewInstance();
00650 } catch (Exception e) {
00651 trace(BasicLevel.ERROR, methName + "exception:" + e);
00652 throw new EJBException("Cannot create a new instance");
00653 }
00654 } else {
00655 while (endpool.isEmpty()) {
00656 trace(BasicLevel.DEBUG, methName + "endpool.isEmpty() = true --> wait()");
00657 try {
00658 endpool.wait();
00659 trace(BasicLevel.DEBUG, methName + "endpool notified");
00660 } catch (InterruptedException e) {
00661 trace(BasicLevel.DEBUG, methName + "endpool waiting interrupted");
00662 } catch (Exception e) {
00663 throw new EJBException("synchronization pb", e);
00664 }
00665 }
00666 try {
00667 ep = (JMessageEndpoint) endpool.remove(0);
00668 } catch (Exception ex) {
00669
00670 trace(BasicLevel.ERROR, methName + "Exception:" + ex);
00671 throw new EJBException("Cannot get an instance from the pool");
00672 }
00673 }
00674
00675 }
00676 trace(BasicLevel.DEBUG, methName + "nb instances " + getCacheSize());
00677 ep.setXAResource(xaResource);
00678 ep.setReleasedState(false);
00679 trace(BasicLevel.DEBUG, methName + "Returning " + ep);
00680 return ep;
00681 }
00682 }
00683
00690 private JMessageEndpoint createNewInstance() throws Exception {
00691 String methName = "createNewInstance: ";
00692 trace(BasicLevel.DEBUG, methName);
00693 JMessageEndpointProxy epProxy = null;
00694 MessageEndpoint ep = null;
00695 JMessageEndpoint jep = null;
00696 MessageDrivenDesc mdd = (MessageDrivenDesc) dd;
00697 ClassLoader cls = myClassLoader();
00698
00699
00700
00701
00702 Thread.currentThread().setContextClassLoader(cls);
00703
00704
00705 MessageDrivenBean mdb = null;
00706 try {
00707 mdb = (MessageDrivenBean) beanclass.newInstance();
00708 } catch (Exception e) {
00709 trace(BasicLevel.ERROR, methName + "failed to create instance:", e);
00710 throw new EJBException("Container failed to create instance of Message Driven Bean", e);
00711 }
00712
00713
00714 jep = new JMessageEndpoint(this, mdb);
00715 epProxy = new JMessageEndpointProxy(this, mdb, jep);
00716 Class endpointClass = cls.loadClass(MESSAGEENDPOINT_CLASS);
00717 Class msgListenerClass = cls.loadClass(msglistenType);
00718 ep = (MessageEndpoint) Proxy.newProxyInstance(cls, new Class[] {endpointClass, msgListenerClass}, epProxy);
00719
00720 jep.setProxy(ep);
00721
00722
00723
00724 Context ctxsave = setComponentContext();
00725 mdb.setMessageDrivenContext((MessageDrivenContext) jep);
00726 try {
00727 beanclass.getMethod("ejbCreate", (Class[]) null).invoke(mdb, (Object[]) null);
00728 } catch (Exception e) {
00729 trace(BasicLevel.ERROR, methName + "cannot call ejbCreate on message driven bean instance ", e);
00730 throw new EJBException(" Container fails to call ejbCreate on message driven bean instance", e);
00731 } finally {
00732 resetComponentContext(ctxsave);
00733 }
00734
00735 synchronized (endpool) {
00736 instanceCount++;
00737 }
00738 return jep;
00739 }
00740
00741
00742 private void trace(int level, String str, Exception ex) {
00743 if (TraceEjb.isDebugJms()) {
00744 TraceEjb.mdb.log(level, str, ex);
00745 }
00746 }
00747 private void trace(int level, String str) {
00748 if (TraceEjb.isDebugJms()) {
00749 TraceEjb.mdb.log(level, str);
00750 }
00751 }
00752 }