JMdbEndpointFactory.java

00001 
00026 package org.objectweb.jonas_ejb.container;
00027 
00028 import java.lang.reflect.Method;
00029 import java.lang.reflect.Proxy;
00030 import java.util.ArrayList;
00031 import java.util.List;
00032 import java.util.LinkedList;
00033 import java.util.ListIterator;
00034 
00035 import javax.ejb.EJBException;
00036 import javax.ejb.MessageDrivenBean;
00037 import javax.ejb.MessageDrivenContext;
00038 import javax.ejb.Timer;
00039 import javax.ejb.TimerService;
00040 import javax.naming.Context;
00041 import javax.resource.spi.ActivationSpec;
00042 import javax.resource.spi.UnavailableException;
00043 import javax.resource.spi.endpoint.MessageEndpoint;
00044 import javax.resource.spi.endpoint.MessageEndpointFactory;
00045 import javax.transaction.SystemException;
00046 import javax.transaction.xa.XAResource;
00047 
00048 import org.objectweb.jonas_ejb.deployment.api.MessageDrivenDesc;
00049 import org.objectweb.jonas_ejb.deployment.api.MethodDesc;
00050 import org.objectweb.jonas_ejb.deployment.api.ActivationConfigPropertyDesc;
00051 
00052 import org.objectweb.jonas.resource.Rar;
00053 import org.objectweb.jotm.Current;
00054 
00055 import org.objectweb.util.monolog.api.BasicLevel;
00056 
00062 public class JMdbEndpointFactory extends JFactory implements MessageEndpointFactory {
00063 
00067     private List endpool = new ArrayList();
00068 
00072     protected int instanceCount = 0;
00073 
00077     protected int minPoolSize = 0;
00078 
00082     protected int maxCacheSize = 0;
00083 
00087     private ActivationSpec as = null;
00088 
00092     private String msglistenType = null;
00093 
00097     private Rar rar = null;
00098 
00102     private boolean activated = false;
00103 
00107     private static String JMESSAGEENDPOINT_CLASS = "org.objectweb.jonas_ejb.container.JMessageEndpointIntf";
00108 
00112     private static String MESSAGEENDPOINT_CLASS = "javax.resource.spi.endpoint.MessageEndpoint";
00113 
00117     private static String MESSAGEDRIVENCONTEXT_CLASS = "javax.ejb.MessageDrivenContext";
00118 
00125     public JMdbEndpointFactory(MessageDrivenDesc dd, JContainer cont, ActivationSpec as) {
00126         super(dd, cont);
00127         String dest = dd.getDestinationJndiName();
00128         // Set the AS properties and validate the required config properties
00129         List acpl = null;
00130         if (dd.getMdActivationConfigDesc() != null) {
00131             acpl = dd.getMdActivationConfigDesc().getActivationConfigPropertyList();
00132         }
00133         List jAcpl = null;
00134         if (dd.getJonasMdActivationConfigDesc() != null) {
00135             jAcpl = dd.getJonasMdActivationConfigDesc().getActivationConfigPropertyList();
00136         }
00137         mdbEFInit(dd, dest, acpl, jAcpl, as);
00138     }
00139 
00147     public JMdbEndpointFactory(MessageDrivenDesc dd, String destination, JContainer cont,
00148                                ActivationSpec as) {
00149         super(dd, cont);
00150 
00151         // Build Activation spec linked list from variables
00152 
00153         List jAcpl = new LinkedList();
00154         List acpl = null;
00155         if (dd.getMdActivationConfigDesc() != null) {
00156             acpl = dd.getMdActivationConfigDesc().getActivationConfigPropertyList();
00157         }
00158         buildACL(dd, jAcpl);
00159 
00160         mdbEFInit(dd, destination, acpl, jAcpl, as);
00161     }
00162 
00168     private void buildACL(MessageDrivenDesc dd, List jacl) {
00169 
00170         String [] aclNames = {"destination", "destinationType", "messageSelector",
00171                               "acknowledgeMode", "subscriptionDurability" };
00172 
00173         ActivationConfigPropertyDesc acp = null;
00174 
00175         acp = new ActivationConfigPropertyDesc();
00176         acp.setActivationConfigPropertyName("destination");
00177         acp.setActivationConfigPropertyValue(dd.getDestinationJndiName());
00178         jacl.add(acp);
00179 
00180         acp = new ActivationConfigPropertyDesc();
00181         acp.setActivationConfigPropertyName("destinationType");
00182         if (dd.isTopicDestination()) {
00183             acp.setActivationConfigPropertyValue("javax.jms.Topic");
00184         } else {
00185             acp.setActivationConfigPropertyValue("javax.jms.Queue");
00186         }
00187         jacl.add(acp);
00188 
00189         acp = new ActivationConfigPropertyDesc();
00190         acp.setActivationConfigPropertyName("messageSelector");
00191         acp.setActivationConfigPropertyValue(dd.getSelector());
00192         jacl.add(acp);
00193 
00194         acp = new ActivationConfigPropertyDesc();
00195         acp.setActivationConfigPropertyName("acknowledgeMode");
00196         if (dd.getAcknowledgeMode() == MessageDrivenDesc.AUTO_ACKNOWLEDGE) {
00197             acp.setActivationConfigPropertyValue("Auto-acknowledge");
00198         } else {
00199             acp.setActivationConfigPropertyValue("Dups-ok-acknowledge");
00200         }
00201         jacl.add(acp);
00202 
00203         acp = new ActivationConfigPropertyDesc();
00204         acp.setActivationConfigPropertyName("subscriptionDurability");
00205         if (dd.getSubscriptionDurability() == MessageDrivenDesc.SUBS_DURABLE) {
00206             acp.setActivationConfigPropertyValue("Durable");
00207         } else {
00208             acp.setActivationConfigPropertyValue("NonDurable");
00209         }
00210         jacl.add(acp);
00211 
00212         if (dd.getSubscriptionDurability() == MessageDrivenDesc.SUBS_DURABLE) {
00213             acp = new ActivationConfigPropertyDesc();
00214             acp.setActivationConfigPropertyName("subscriptionName");
00215             acp.setActivationConfigPropertyValue(dd.getEjbName());
00216             jacl.add(acp);
00217         }
00218 
00219         // Check existing list to determine if additional items are configured, then pass them along also
00220         List acpl = null;
00221         if (dd.getJonasMdActivationConfigDesc() != null) {
00222             acpl = dd.getJonasMdActivationConfigDesc().getActivationConfigPropertyList();
00223 
00224             String acpName = null;
00225             boolean found = false;
00226             for (int i = 0; i < acpl.size(); i++) {
00227                 acp = (ActivationConfigPropertyDesc) acpl.get(i);
00228                 acpName = acp.getActivationConfigPropertyName();
00229                 found = false;
00230                 for (int j = 0; j < aclNames.length; j++) {
00231                     if (acpName.equals(aclNames[j])) {
00232                         found = true;
00233                         break;
00234                     }
00235                 }
00236                 if (!found) {
00237                     jacl.add(acp);
00238                 }
00239             }
00240 
00241         }
00242     }
00243 
00252     private void mdbEFInit(MessageDrivenDesc dd, String dest, List acpl,
00253                            List jAcpl, ActivationSpec aSpec) {
00254 
00255         String methName = "mdbEFInit: ";
00256         String ejbName = dd.getEjbName();
00257 
00258         // Check if tx managed by the bean or the container
00259         txbeanmanaged = dd.isBeanManagedTransaction();
00260 
00261         // Set ActivationSpec
00262         as = aSpec;
00263 
00264         minPoolSize = dd.getPoolMin();
00265         maxCacheSize = dd.getCacheMax();
00266         trace(BasicLevel.DEBUG, methName + "maxCacheSize = " + maxCacheSize + " minPoolSize = " + minPoolSize);
00267 
00268         // Get the associated Resource Adapter and messagelistenerType
00269         rar = Rar.getRar(dest);
00270         if (rar == null) {
00271             trace(BasicLevel.ERROR, methName + "cannot retrieve associated Resource Adapter ");
00272             throw new EJBException("cannot retrieve associated Resource Adapter ");
00273         }
00274         msglistenType = rar.getInterface(dest);
00275 
00276         try {
00277             rar.configureAS(as, acpl, jAcpl, dest, ejbName);
00278         } catch (Exception ex) {
00279             trace(BasicLevel.ERROR, methName + "cannot configure activationspec " + ex);
00280             ex.printStackTrace();
00281             throw new EJBException("cannot configure activationspec ", ex);
00282         }
00283 
00284         // pre-allocate a set of Endpoints
00285         synchronized (endpool) {
00286             for (int i = 0; i < minPoolSize; i++) {
00287                 JMessageEndpoint ep = null;
00288                 try {
00289                     ep = createNewInstance();
00290                     endpool.add(ep);
00291                 } catch (Exception e) {
00292                     trace(BasicLevel.ERROR, methName + "cannot init pool of instances ");
00293                     throw new EJBException("cannot init pool of instances ", e);
00294                 }
00295             }
00296         }
00297         if (minPoolSize != 0) {
00298             trace(BasicLevel.INFO, methName + "pre-allocate a set of " + minPoolSize
00299                                  + " message driven bean  instances");
00300         }
00301 
00302         // Call to register an XAResource with JOTM
00303         try {
00304             ActivationSpec [] asArray = new ActivationSpec[1];
00305             asArray[0] = as;
00306             XAResource [] xar = rar.getResourceAdapter().getXAResources(asArray);
00307             if (xar != null && xar.length > 0) {
00308                 ((Current) tm).getTransactionRecovery().
00309                            registerResourceManager(ejbName+msglistenType,
00310                                                    xar[0], "", null);
00311             }
00312         } catch (Exception ex) {
00313             trace(BasicLevel.ERROR, ex.getMessage(), ex);
00314         }
00315 
00316         activated = true;
00317         // EndpointActivation
00318         try {
00319             rar.getResourceAdapter().endpointActivation(this, as);
00320         } catch (Exception ex) {
00321             activated = false;
00322             trace(BasicLevel.ERROR, methName + "cannot activate endpoint " + ex);
00323             ex.printStackTrace();
00324             throw new EJBException("cannot activate endpoint ", ex);
00325         }
00326     }
00327 
00328     // ---------------------------------------------------------------
00329     // Specific BeanFactory implementation
00330     // ---------------------------------------------------------------
00331 
00335     public void initInstancePool() {
00336     }
00337     
00341     public int getPoolSize() {
00342         return endpool.size();
00343     }
00344 
00349     public void stop() {
00350         String methName = "stop: ";
00351         trace(BasicLevel.DEBUG, methName);
00352         try {
00353             // Deactivate the endpoint
00354             rar.getResourceAdapter().endpointDeactivation(this, as);
00355             // Loop thru the endpool and release them
00356             synchronized (endpool) {
00357                 trace(BasicLevel.DEBUG, methName + "stopping " + this);
00358                 JMessageEndpoint ep = null;
00359                 while (endpool.size() > 0) {
00360                     ep = (JMessageEndpoint) endpool.remove(0);
00361                     instanceCount--;
00362                     try {
00363                         ep.mdb.ejbRemove();
00364                     } catch (Exception e) {
00365                         trace(BasicLevel.ERROR, methName + "Cannot remove mdb: "
00366                                + ep.mdb);
00367                     }
00368                 }
00369             }
00370         } catch (Exception e) {
00371             trace(BasicLevel.WARN, methName + "Cannot deactivate the endpoint");
00372         }
00373     }
00374 
00378     public void sync() {
00379     }
00380 
00384     public JHome getHome() {
00385         return null;
00386     }
00387 
00391     public JLocalHome getLocalHome() {
00392         return null;
00393     }
00394 
00395     // ---------------------------------------------------------------
00396     // MessageEndpointFactory implementation
00397     // ---------------------------------------------------------------
00398 
00406     public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException {
00407         String methName = "createEndpoint: ";
00408         trace(BasicLevel.DEBUG, methName);
00409 
00410         if (!activated) {
00411             trace(BasicLevel.ERROR, methName + "mdb is not usuable either initial deployment or undeployment ");
00412             throw new UnavailableException("mdb is not usuable either initial deployment or undeployment ");
00413         }
00414 
00415         JMessageEndpoint ep = null;
00416         try {
00417             ep = getNewInstance(xaResource);
00418         } catch (Exception ex) {
00419             trace(BasicLevel.ERROR, methName + "cannot create an endpoint ");
00420             throw new UnavailableException("cannot create an endpoint ", ex);
00421         }
00422         return (MessageEndpoint) ep.mep;
00423     }
00424 
00432     public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
00433         int txAttribute = 0;
00434         try {
00435             txAttribute = dd.getMethodDesc(method).getTxAttribute();
00436         } catch (Exception ex) {
00437             trace(BasicLevel.ERROR, "isDeliveryTransacted: No such method exists. " + method.getName());
00438             throw new NoSuchMethodException("No such method exists. " + method.getName());
00439         }
00440         return txAttribute == MethodDesc.TX_REQUIRED ? true : false;
00441     }
00442 
00443     // ---------------------------------------------------------------
00444     // Other methods
00445     // ---------------------------------------------------------------
00446 
00454     public JMessageEndpoint getMessageEndpoint() throws Exception {
00455         trace(BasicLevel.DEBUG, "getMessageEndpoint: ");
00456 
00457         return getNewInstance(null);
00458     }
00459 
00464     public void releaseEndpoint(JMessageEndpoint ep) {
00465         String methName = "releaseEndpoint: ";
00466         trace(BasicLevel.DEBUG, methName + ep);
00467 
00468         ep.setReleasedState(true);
00469         synchronized (endpool) {
00470             endpool.add(ep);
00471             trace(BasicLevel.DEBUG, methName + "notifyAll " );
00472             endpool.notifyAll();
00473         }
00474         trace(BasicLevel.DEBUG, methName + "nb instances " + getCacheSize());
00475         trace(BasicLevel.DEBUG, methName + "nb free cached instances " + getPoolSize());
00476 
00477     }
00478 
00479     // ---------------------------------------------------------------
00480     // other public methods
00481     // ---------------------------------------------------------------
00482 
00487     public TimerService getTimerService() {
00488         if (myTimerService == null) {
00489             // TODO : Check that instance implements TimedObject ?
00490             myTimerService = new JTimerService(this);
00491         }
00492         return myTimerService;
00493     }
00494 
00498     public int getMinPoolSize() {
00499         return minPoolSize;
00500     }
00501 
00505     public int getMaxCacheSize() {
00506         return maxCacheSize;
00507     }
00508 
00512     public int getCacheSize() {
00513         return instanceCount;
00514     }
00515 
00519     public int getTransactionAttribute() {
00520         return ((MessageDrivenDesc) dd).getTxAttribute();
00521     }
00522 
00528     public void checkTransaction(RequestCtx rctx) {
00529         String methName = "checkTransaction: ";
00530         if (rctx.txAttr == MethodDesc.TX_REQUIRED) {
00531             try {
00532                 if (txbeanmanaged) {
00533                     if (tm.getTransaction() == null) {
00534                         trace(BasicLevel.ERROR, methName + "No transaction and need one");
00535                         return;
00536                     }
00537                 } else {
00538                     if (tm.getTransaction() == null) {
00539                         tm.begin();
00540                     }
00541                 }
00542                 rctx.mustCommit = true;
00543                 rctx.currTx = tm.getTransaction();
00544             } catch (Exception e) {
00545                 // No exception raised in case of MDB
00546                 trace(BasicLevel.ERROR, methName + "cannot start tx:" + e);
00547                 return;
00548             }
00549         } else {
00550             if (rctx.txAttr != MethodDesc.TX_NOT_SUPPORTED) {
00551                 trace(BasicLevel.ERROR, methName + "Bad transaction attribute: " + rctx.txAttr);
00552             }
00553             try {
00554                 rctx.currTx = tm.getTransaction();
00555                 if (rctx.currTx != null) {
00556                     trace(BasicLevel.DEBUG, methName + "Suspending client tx");
00557                     rctx.clientTx = tm.suspend();
00558                     rctx.currTx = null;
00559                 }
00560             } catch (SystemException e) {
00561                 trace(BasicLevel.ERROR, methName + "cannot suspend transaction:\n", e);
00562                 return;
00563             }
00564         }
00565     }
00566 
00571     public void reduceCache() {
00572         String methName = "reduceCache: ";
00573         trace(BasicLevel.DEBUG, methName);
00574         // reduce the pool to the minPoolSize
00575         int poolsz = minPoolSize;
00576         synchronized (endpool) {
00577             trace(BasicLevel.DEBUG, methName + "try to reduce " + endpool.size() + " to " + poolsz);
00578             while (endpool.size() > poolsz) {
00579                 ListIterator i = endpool.listIterator();
00580                 if (i.hasNext()) {
00581                     i.next();
00582                     i.remove();
00583                     instanceCount--;
00584                 }
00585             }
00586         }
00587         trace(BasicLevel.DEBUG, methName + "cacheSize= " + getCacheSize());
00588 
00589     }
00590 
00595     public void notifyTimeout(Timer timer) {
00596         String methName = "notifyTimeout: ";
00597         trace(BasicLevel.DEBUG, methName);
00598 
00599         // We need an instance from the pool to process the timeout.
00600         JMessageEndpoint ep = null;
00601         try {
00602             ep = getNewInstance(null);
00603         } catch (Exception e) {
00604             trace(BasicLevel.ERROR, methName + "exception:" + e);
00605             throw new EJBException("Cannot deliver the timeout: " + e);
00606         }
00607 
00608         // deliver the timeout to the bean
00609         ep.deliverTimeout(timer);
00610 
00611         // release the instance
00612         releaseEndpoint(ep);
00613     }
00614 
00615     // ---------------------------------------------------------------
00616     // private methods
00617     // ---------------------------------------------------------------
00618 
00627     private JMessageEndpoint getNewInstance(XAResource xaResource) throws Exception {
00628         String methName = "getNewInstance: ";
00629         trace(BasicLevel.DEBUG, methName + "Factory: " + this + " XAResource: " + xaResource);
00630 
00631         // try to get one from the Pool
00632         JMessageEndpoint ep = null;
00633 
00634         // try to find a free entry in the pool
00635         synchronized (endpool) {
00636             if (!endpool.isEmpty()) {
00637                 try {
00638                     ep = (JMessageEndpoint) endpool.remove(0);
00639                 } catch (Exception ex) {
00640                     // This should never happen
00641                     trace(BasicLevel.ERROR, methName + "Exception:" + ex);
00642                     throw new EJBException("Cannot get an instance from the pool");
00643                 }
00644             } else {
00645                 trace(BasicLevel.DEBUG, methName + "pool is empty");
00646                 if (maxCacheSize == 0 || instanceCount < maxCacheSize) {
00647                     // Pool has free slots, create a new MessageEndpoint object
00648                     try {
00649                         ep = createNewInstance();
00650                     } catch (Exception e) {
00651                         trace(BasicLevel.ERROR, methName + "exception:" + e);
00652                         throw new EJBException("Cannot create a new instance");
00653                     }
00654                 } else {
00655                     while (endpool.isEmpty()) {
00656                         trace(BasicLevel.DEBUG, methName + "endpool.isEmpty() = true --> wait()");
00657                         try {
00658                             endpool.wait();
00659                             trace(BasicLevel.DEBUG, methName + "endpool notified");
00660                         } catch (InterruptedException e) {
00661                             trace(BasicLevel.DEBUG, methName + "endpool waiting interrupted");
00662                         } catch (Exception e) {
00663                             throw new EJBException("synchronization pb", e);
00664                         }
00665                     }
00666                     try {
00667                         ep = (JMessageEndpoint) endpool.remove(0);
00668                     } catch (Exception ex) {
00669                         // This should never happen
00670                         trace(BasicLevel.ERROR, methName + "Exception:" + ex);
00671                         throw new EJBException("Cannot get an instance from the pool");
00672                     }
00673                 }
00674 
00675             }
00676             trace(BasicLevel.DEBUG, methName + "nb instances " + getCacheSize());
00677             ep.setXAResource(xaResource);
00678             ep.setReleasedState(false);
00679             trace(BasicLevel.DEBUG, methName + "Returning " + ep);
00680             return ep;
00681         }
00682     }
00683 
00690     private JMessageEndpoint createNewInstance() throws Exception {
00691         String methName = "createNewInstance: ";
00692         trace(BasicLevel.DEBUG, methName);
00693         JMessageEndpointProxy epProxy = null;
00694         MessageEndpoint ep = null;
00695         JMessageEndpoint jep = null;
00696         MessageDrivenDesc mdd = (MessageDrivenDesc) dd;
00697         ClassLoader cls = myClassLoader();
00698 
00699         // Set ContextClassLoader with the ejbclassloader.
00700         // This is necessary in case ejbCreate calls another bean in the same
00701         // jar.
00702         Thread.currentThread().setContextClassLoader(cls);
00703 
00704         // Creates the new instance
00705         MessageDrivenBean mdb = null;
00706         try {
00707             mdb = (MessageDrivenBean) beanclass.newInstance();
00708         } catch (Exception e) {
00709             trace(BasicLevel.ERROR, methName + "failed to create instance:", e);
00710             throw new EJBException("Container failed to create instance of Message Driven Bean", e);
00711         }
00712 
00713         // Instanciates a new JMessageEndpoint object
00714         jep = new JMessageEndpoint(this, mdb);
00715         epProxy = new JMessageEndpointProxy(this, mdb, jep);
00716         Class endpointClass = cls.loadClass(MESSAGEENDPOINT_CLASS);
00717         Class msgListenerClass = cls.loadClass(msglistenType);
00718         ep = (MessageEndpoint) Proxy.newProxyInstance(cls, new Class[] {endpointClass, msgListenerClass}, epProxy);
00719 
00720         jep.setProxy(ep);
00721         // starts the bean instance: setMessageDrivenContext() + ejbCreate()
00722         // see EJB spec. 2.0 page 322.
00723         // Both operations must be called with the correct ComponentContext
00724         Context ctxsave = setComponentContext();
00725         mdb.setMessageDrivenContext((MessageDrivenContext) jep);
00726         try {
00727             beanclass.getMethod("ejbCreate", (Class[]) null).invoke(mdb, (Object[]) null);
00728         } catch (Exception e) {
00729             trace(BasicLevel.ERROR, methName + "cannot call ejbCreate on message driven bean instance ", e);
00730             throw new EJBException(" Container fails to call ejbCreate on message driven bean instance", e);
00731         } finally {
00732             resetComponentContext(ctxsave);
00733         }
00734 
00735         synchronized (endpool) {
00736             instanceCount++;
00737         }
00738         return jep;
00739     }
00740 
00741     
00742     private void trace(int level, String str, Exception ex) {
00743         if (TraceEjb.isDebugJms()) {
00744             TraceEjb.mdb.log(level, str, ex);
00745         }
00746     }
00747     private void trace(int level, String str) {
00748         if (TraceEjb.isDebugJms()) {
00749             TraceEjb.mdb.log(level, str);
00750         }
00751     }
00752 }

Generated on Tue Feb 15 15:05:38 2005 for JOnAS by  doxygen 1.3.9.1