JmsManagerImpl.java

00001 /*
00002  * JOnAS: Java(TM) Open Application Server
00003  * Copyright (C) 1999 Bull S.A.
00004  * Contact: jonas-team@objectweb.org
00005  * 
00006  * This library is free software; you can redistribute it and/or
00007  * modify it under the terms of the GNU Lesser General Public
00008  * License as published by the Free Software Foundation; either
00009  * version 2.1 of the License, or any later version.
00010  * 
00011  * This library is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014  * Lesser General Public License for more details.
00015  * 
00016  * You should have received a copy of the GNU Lesser General Public
00017  * License along with this library; if not, write to the Free Software
00018  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00019  * USA
00020  *
00021  * Initial developer(s): 
00022  * Contributor(s): 
00023  * Christophe Ney: for making easier Enhydra integration
00024  * Philippe Durieux
00025  * Jeff Mesnil: for JORAM 3.0 integration
00026  * --------------------------------------------------------------------------
00027  * $Id: JmsManagerImpl.java,v 1.28 2004/10/06 14:41:54 benoitf Exp $
00028  * --------------------------------------------------------------------------
00029  */
00030 
00031 
00032 package org.objectweb.jonas_jms;
00033 
00034 import java.util.Enumeration;
00035 import java.util.Hashtable;
00036 import java.util.Vector;
00037 
00038 import javax.jms.ConnectionFactory;
00039 import javax.jms.JMSException;
00040 import javax.jms.Queue;
00041 import javax.jms.QueueConnectionFactory;
00042 import javax.jms.Topic;
00043 import javax.jms.TopicConnectionFactory;
00044 import javax.jms.XAConnectionFactory;
00045 import javax.jms.XAQueueConnectionFactory;
00046 import javax.jms.XATopicConnectionFactory;
00047 import javax.naming.InitialContext;
00048 import javax.naming.NamingException;
00049 
00050 import org.objectweb.jonas_jms.api.JmsAdministration;
00051 import org.objectweb.jonas_jms.api.JmsManager;
00052 import org.objectweb.transaction.jta.TransactionManager;
00053 import org.objectweb.util.monolog.api.BasicLevel;
00054 
00066 public class JmsManagerImpl implements JmsManager, JmsJmxManagement {
00067 
00068     private JmsAdministration momadmin = null;
00069     private InitialContext ictx = null;
00070     private ConnectionFactory cf = null;
00071     private TopicConnectionFactory tcf = null;
00072     private QueueConnectionFactory qcf = null;
00073     private Hashtable queues = new Hashtable();
00074     private Hashtable topics = new Hashtable();
00075     private Vector namelist = new Vector();
00076 
00077     private static TransactionManager tm = null;
00078     private static JmsManagerImpl unique = null;
00079 
00083     private JmsManagerImpl() {
00084         TraceJms.logger.log(BasicLevel.DEBUG,"");
00085     }
00086 
00090     public static JmsManager getJmsManager() {
00091         if (unique == null) {
00092             unique = new JmsManagerImpl();
00093         }
00094         return (JmsManager) unique;
00095     }
00096 
00100     public static JmsJmxManagement getJmsJmxManagement() {
00101         if (unique == null) {
00102             unique = new JmsManagerImpl();
00103         }
00104         return (JmsJmxManagement) unique;
00105     }
00106 
00107     // -------------------------------------------------------------------
00108     // Internal Methods
00109     // -------------------------------------------------------------------
00110 
00114     public static TransactionManager getTransactionManager() {
00115         return tm;
00116     }
00117 
00118     // -------------------------------------------------------------------
00119     // JmsManager  Implementation
00120     // -------------------------------------------------------------------
00121 
00129     public void init(Class cl, boolean collocated, String url, TransactionManager trm) throws Exception  {
00130         TraceJms.logger.log(BasicLevel.DEBUG,"");
00131         tm = trm;
00132         int maxloops = collocated ? 1 : 5;
00133         for (int i = 0; i < maxloops; i++) {
00134             try {           
00135                 // Create an InitialContext
00136                 ictx = new InitialContext();
00137                 // Create the MOM instance and start it
00138                 momadmin  = (JmsAdministration) cl.newInstance();
00139                 momadmin.start(collocated, url);
00140                 break;
00141             } catch (NamingException e) {
00142                 TraceJms.logger.log(BasicLevel.ERROR, "cannot get Initial Context", e);
00143                 throw e;
00144             } catch (NullPointerException e) {
00145                 TraceJms.logger.log(BasicLevel.ERROR, "exception: ", e);
00146                 throw e;
00147             } catch (Exception e) {
00148                 if (i < maxloops) {
00149                     if(TraceJms.isDebug())
00150                         TraceJms.logger.log(BasicLevel.DEBUG, "cannot reach the MOM - retrying...");
00151                     try {
00152                         // Thread.sleep() -> force current Thread to sleep
00153                         Thread.sleep(2000*(i+1));
00154                     } catch (InterruptedException e2) {
00155                         throw new JMSException("Cannot reach the MOM");
00156                     }
00157                 } else {
00158                     TraceJms.logger.log(BasicLevel.ERROR, "cannot load admin class "+e);
00159                     throw e;
00160                 }
00161             }
00162         }
00163         // We must register these factories in JNDI before any client 
00164         // try to get them.
00165         getConnectionFactory();
00166         getTopicConnectionFactory();
00167         getQueueConnectionFactory();
00168     }   
00169 
00173     public Queue createQueue(String name) throws Exception {
00174         Queue queue  = null;
00175         // Don't recreate if already found in JNDI
00176         try {
00177             queue = (Queue) ictx.lookup(name);
00178             if(TraceJms.isDebug())
00179                 TraceJms.logger.log(BasicLevel.DEBUG, "queue " + name +" already found");
00180             return queue;
00181         } catch (NamingException ex) {
00182         }
00183         if(TraceJms.isDebug())
00184             TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering queue "+name);
00185         queue = momadmin.createQueue(name);
00186         // rebind is already done in momadmin.createQueue()
00187         //ictx.rebind(name, queue);
00188         namelist.addElement(name);
00189         queues.put(name, queue);
00190         return queue;
00191     }
00192         
00196     public Topic createTopic(String name) throws Exception {
00197         Topic topic = null;
00198         // Don't recreate if already found in JNDI
00199         try {
00200             topic = (Topic) ictx.lookup(name);
00201             if(TraceJms.isDebug())
00202                 TraceJms.logger.log(BasicLevel.DEBUG, "topic " + name + " already found");
00203             return topic;
00204         } catch (NamingException ex) {
00205         }
00206         if(TraceJms.isDebug())
00207             TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering topic " + name);
00208         topic = momadmin.createTopic(name);
00209         // rebind is already done in momadmin.createTopic()
00210         //ictx.rebind(name, topic);
00211         namelist.addElement(name);
00212         topics.put(name, topic);
00213         return topic;
00214     }
00215 
00219     public ConnectionFactory getConnectionFactory() {
00220         if (cf == null) {
00221             String name = "CF";
00222             cf = new org.objectweb.jonas_jms.JConnectionFactory(name);
00223             try {
00224                 if(TraceJms.isDebug())
00225                     TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering "+name);
00226                 ictx.rebind(name, cf);
00227             } catch (NamingException e) {
00228                 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind "+name+" :"+e);
00229             }
00230         }
00231         return cf;
00232     }
00233 
00237     public TopicConnectionFactory getTopicConnectionFactory() {
00238         if (tcf == null) {
00239             String name = "TCF";
00240             tcf = new org.objectweb.jonas_jms.JTopicConnectionFactory(name);
00241             try {
00242                 if(TraceJms.isDebug())
00243                     TraceJms.logger.log(BasicLevel.DEBUG,"creating and registering " + name);
00244                 ictx.rebind(name, tcf);
00245             } catch (NamingException e) {
00246                 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind " + name + " :" + e);
00247             }
00248         }
00249         return tcf;
00250     }
00251  
00252     /*
00253      *  Get the unique QueueConnectionFactory
00254      */
00255     public QueueConnectionFactory getQueueConnectionFactory() {
00256         if (qcf == null) {
00257             String name = "QCF";
00258             qcf = new org.objectweb.jonas_jms.JQueueConnectionFactory(name);
00259             try {
00260                 if(TraceJms.isDebug())
00261                     TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering " + name);
00262                 ictx.rebind(name, qcf);
00263             } catch (NamingException e) {
00264                 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind " + name + " :" + e);
00265             }
00266         }
00267         return qcf;
00268     }
00269 
00273     public XAConnectionFactory getXAConnectionFactory() {
00274         return momadmin.getXAConnectionFactory();
00275     }
00276 
00280     public XATopicConnectionFactory getXATopicConnectionFactory() {
00281         return momadmin.getXATopicConnectionFactory();
00282     }
00283  
00287     public XAQueueConnectionFactory getXAQueueConnectionFactory() {
00288         return momadmin.getXAQueueConnectionFactory();
00289     }
00290 
00294     public Queue getQueue(String name) throws Exception {
00295         Queue q = (Queue)queues.get(name);
00296         if (q == null) {
00297             q = createQueue(name);
00298         }
00299         return q;
00300     }
00301 
00305     public Topic getTopic(String name) throws Exception {
00306         Topic t = (Topic)topics.get(name);
00307         if (t == null) {
00308             t = createTopic(name);
00309         }
00310         return t;
00311     }
00312 
00316     public Enumeration getTopicsNames() {
00317         return topics.keys();
00318     }   
00319 
00323     public Enumeration getQueuesNames() {
00324         return queues.keys();
00325     }
00326 
00330     public void stop() throws Exception {
00331         // Before stopping the MOM clean up the connection pools
00332         if (cf != null) {
00333             ((JConnectionFactory) cf).cleanPool();
00334         }
00335 
00336         if (tcf != null) {
00337             ((JConnectionFactory) tcf).cleanPool();
00338         }
00339 
00340         if (qcf != null) {
00341             ((JConnectionFactory) qcf).cleanPool();
00342         }
00343 
00344         // Stop the MOM
00345         if (momadmin != null) {
00346             momadmin.stop();
00347         }
00348 
00349         // clean up JNDI        
00350         Enumeration ln = namelist.elements();
00351         while (ln.hasMoreElements()) {
00352             String name = (String)ln.nextElement();
00353             try {                   
00354                 ictx.unbind(name);              
00355                 if(TraceJms.isDebug())
00356                     TraceJms.logger.log(BasicLevel.DEBUG, "unbind " + name);
00357             } catch(NamingException e) {
00358                 if(TraceJms.isDebug())
00359                     TraceJms.logger.log(BasicLevel.ERROR, "cannot unbind " + name);
00360             }
00361         }
00362         
00363     }
00364 
00365     // -------------------------------------------------------------
00366     // Management methods
00367     // -------------------------------------------------------------
00368 
00372     public int getCurrentNumberOfJmsConnectionFactory() {
00373         int result = 0;
00374         if (cf != null) result++;
00375         return result;
00376     }
00377 
00381     public int getCurrentNumberOfJmsTopicConnectionFactory() {
00382         int result = 0;
00383         if (tcf != null) result++;
00384         return result;
00385     }
00386 
00390     public int getCurrentNumberOfJmsQueueConnectionFactory() {
00391         int result = 0;
00392         if (qcf != null) result++;
00393         return result;
00394     }
00395 
00399     public int getCurrentNumberOfJmsTopicDestination() {
00400         return topics.size();
00401     }
00402 
00406     public int getCurrentNumberOfJmsQueueDestination() {
00407         return queues.size();
00408     }
00409 
00415     public String removeJmsDestination(String jndiName) throws Exception {
00416 
00417         momadmin.deleteDestination(jndiName);
00418 
00419         String destType = null;
00420         if ((queues.containsKey(jndiName))||(topics.containsKey(jndiName))) {
00421             try {           
00422                 InitialContext ictx = new InitialContext();
00423                 ictx.unbind(jndiName);
00424                 Object o = null; // object to which the jndiName is mapped
00425                 o = queues.remove(jndiName);
00426                 if (o == null) {
00427                     // jndiName did not have a mapping, try in the topics hashtable
00428                     o = topics.remove(jndiName);
00429                     if (o != null) {
00430                         destType = "topic"; 
00431                     } else {
00432                         // nither queue nor topic ??? 
00433                         destType = "unknown";
00434                     }
00435                 } else {
00436                     destType = "queue";
00437                 }
00438                 return destType;
00439             } catch(Exception ex) {
00440                 throw new Exception("JmsManagerImpl remove destination : cannot unbind " + jndiName );
00441             }
00442         } else {
00443             throw new Exception("Unexisting jms destination :" + jndiName +" remove abord");
00444         }
00445     }
00446 
00450     public String getDefaultConnectionFactoryName() {
00451         return "CF"; 
00452     }
00453     
00457     public String getDefaultQueueConnectionFactoryName() {
00458         return "QCF"; 
00459     }
00460 
00464     public String getDefaultTopicConnectionFactoryName() {
00465         return "TCF"; 
00466     }
00467 
00468     // Monitoring methods
00469 
00476     public String getConnectionFactoryMode(String jndiName) throws Exception {
00477         TraceJms.logger.log(BasicLevel.DEBUG, "");
00478 
00479         if (jndiName.equals("CF"))
00480             return "Point-To-Point and Publish/Subscribe";
00481         else if (jndiName.equals("QCF"))
00482             return "Point-To-Point";
00483         else if (jndiName.equals("TCF"))
00484             return "Publish/Subscribe";
00485         else
00486             throw new IllegalStateException("Unknown factory.");
00487     }
00488 
00489  
00496     public int getPendingMessages(String jndiName) throws Exception {
00497         TraceJms.logger.log(BasicLevel.DEBUG, "");
00498         Queue queue = (Queue)queues.get(jndiName);
00499         int n;
00500         if (queue != null) {
00501             // existent queue (created by JOnAS)
00502             n = momadmin.getPendingMessages(queue);
00503         } else {
00504             throw new IllegalStateException(jndiName + " not a queue created by the jms service");
00505         }
00506         return n;
00507     }
00508 
00515     public int getPendingRequests(String jndiName) throws Exception {
00516         TraceJms.logger.log(BasicLevel.DEBUG, "");
00517         Queue queue = (Queue)queues.get(jndiName);
00518         int n;
00519         if (queue != null) {
00520             // existent queue (created by JOnAS)
00521             n = momadmin.getPendingRequests(queue);
00522         } else {
00523             throw new IllegalStateException(jndiName + " not a queue created by the jms service");
00524         }
00525         return n;
00526     } 
00527 
00534     public int getSubscriptions(String jndiName) throws Exception {
00535         Topic topic = (Topic)topics.get(jndiName);
00536         int n;
00537         if (topic != null) {
00538             // existent topic (created by JOnAS)
00539             n = momadmin.getSubscriptions(topic);
00540         } else {
00541             throw new IllegalStateException(jndiName + " not a topic created by the jms service");
00542         }
00543         return n;
00544     }  
00545 }

Generated on Tue Feb 15 15:05:49 2005 for JOnAS by  doxygen 1.3.9.1