00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 package org.objectweb.jonas_jms;
00033
00034 import java.util.Enumeration;
00035 import java.util.Hashtable;
00036 import java.util.Vector;
00037
00038 import javax.jms.ConnectionFactory;
00039 import javax.jms.JMSException;
00040 import javax.jms.Queue;
00041 import javax.jms.QueueConnectionFactory;
00042 import javax.jms.Topic;
00043 import javax.jms.TopicConnectionFactory;
00044 import javax.jms.XAConnectionFactory;
00045 import javax.jms.XAQueueConnectionFactory;
00046 import javax.jms.XATopicConnectionFactory;
00047 import javax.naming.InitialContext;
00048 import javax.naming.NamingException;
00049
00050 import org.objectweb.jonas_jms.api.JmsAdministration;
00051 import org.objectweb.jonas_jms.api.JmsManager;
00052 import org.objectweb.transaction.jta.TransactionManager;
00053 import org.objectweb.util.monolog.api.BasicLevel;
00054
00066 public class JmsManagerImpl implements JmsManager, JmsJmxManagement {
00067
00068 private JmsAdministration momadmin = null;
00069 private InitialContext ictx = null;
00070 private ConnectionFactory cf = null;
00071 private TopicConnectionFactory tcf = null;
00072 private QueueConnectionFactory qcf = null;
00073 private Hashtable queues = new Hashtable();
00074 private Hashtable topics = new Hashtable();
00075 private Vector namelist = new Vector();
00076
00077 private static TransactionManager tm = null;
00078 private static JmsManagerImpl unique = null;
00079
00083 private JmsManagerImpl() {
00084 TraceJms.logger.log(BasicLevel.DEBUG,"");
00085 }
00086
00090 public static JmsManager getJmsManager() {
00091 if (unique == null) {
00092 unique = new JmsManagerImpl();
00093 }
00094 return (JmsManager) unique;
00095 }
00096
00100 public static JmsJmxManagement getJmsJmxManagement() {
00101 if (unique == null) {
00102 unique = new JmsManagerImpl();
00103 }
00104 return (JmsJmxManagement) unique;
00105 }
00106
00107
00108
00109
00110
00114 public static TransactionManager getTransactionManager() {
00115 return tm;
00116 }
00117
00118
00119
00120
00121
00129 public void init(Class cl, boolean collocated, String url, TransactionManager trm) throws Exception {
00130 TraceJms.logger.log(BasicLevel.DEBUG,"");
00131 tm = trm;
00132 int maxloops = collocated ? 1 : 5;
00133 for (int i = 0; i < maxloops; i++) {
00134 try {
00135
00136 ictx = new InitialContext();
00137
00138 momadmin = (JmsAdministration) cl.newInstance();
00139 momadmin.start(collocated, url);
00140 break;
00141 } catch (NamingException e) {
00142 TraceJms.logger.log(BasicLevel.ERROR, "cannot get Initial Context", e);
00143 throw e;
00144 } catch (NullPointerException e) {
00145 TraceJms.logger.log(BasicLevel.ERROR, "exception: ", e);
00146 throw e;
00147 } catch (Exception e) {
00148 if (i < maxloops) {
00149 if(TraceJms.isDebug())
00150 TraceJms.logger.log(BasicLevel.DEBUG, "cannot reach the MOM - retrying...");
00151 try {
00152
00153 Thread.sleep(2000*(i+1));
00154 } catch (InterruptedException e2) {
00155 throw new JMSException("Cannot reach the MOM");
00156 }
00157 } else {
00158 TraceJms.logger.log(BasicLevel.ERROR, "cannot load admin class "+e);
00159 throw e;
00160 }
00161 }
00162 }
00163
00164
00165 getConnectionFactory();
00166 getTopicConnectionFactory();
00167 getQueueConnectionFactory();
00168 }
00169
00173 public Queue createQueue(String name) throws Exception {
00174 Queue queue = null;
00175
00176 try {
00177 queue = (Queue) ictx.lookup(name);
00178 if(TraceJms.isDebug())
00179 TraceJms.logger.log(BasicLevel.DEBUG, "queue " + name +" already found");
00180 return queue;
00181 } catch (NamingException ex) {
00182 }
00183 if(TraceJms.isDebug())
00184 TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering queue "+name);
00185 queue = momadmin.createQueue(name);
00186
00187
00188 namelist.addElement(name);
00189 queues.put(name, queue);
00190 return queue;
00191 }
00192
00196 public Topic createTopic(String name) throws Exception {
00197 Topic topic = null;
00198
00199 try {
00200 topic = (Topic) ictx.lookup(name);
00201 if(TraceJms.isDebug())
00202 TraceJms.logger.log(BasicLevel.DEBUG, "topic " + name + " already found");
00203 return topic;
00204 } catch (NamingException ex) {
00205 }
00206 if(TraceJms.isDebug())
00207 TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering topic " + name);
00208 topic = momadmin.createTopic(name);
00209
00210
00211 namelist.addElement(name);
00212 topics.put(name, topic);
00213 return topic;
00214 }
00215
00219 public ConnectionFactory getConnectionFactory() {
00220 if (cf == null) {
00221 String name = "CF";
00222 cf = new org.objectweb.jonas_jms.JConnectionFactory(name);
00223 try {
00224 if(TraceJms.isDebug())
00225 TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering "+name);
00226 ictx.rebind(name, cf);
00227 } catch (NamingException e) {
00228 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind "+name+" :"+e);
00229 }
00230 }
00231 return cf;
00232 }
00233
00237 public TopicConnectionFactory getTopicConnectionFactory() {
00238 if (tcf == null) {
00239 String name = "TCF";
00240 tcf = new org.objectweb.jonas_jms.JTopicConnectionFactory(name);
00241 try {
00242 if(TraceJms.isDebug())
00243 TraceJms.logger.log(BasicLevel.DEBUG,"creating and registering " + name);
00244 ictx.rebind(name, tcf);
00245 } catch (NamingException e) {
00246 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind " + name + " :" + e);
00247 }
00248 }
00249 return tcf;
00250 }
00251
00252
00253
00254
00255 public QueueConnectionFactory getQueueConnectionFactory() {
00256 if (qcf == null) {
00257 String name = "QCF";
00258 qcf = new org.objectweb.jonas_jms.JQueueConnectionFactory(name);
00259 try {
00260 if(TraceJms.isDebug())
00261 TraceJms.logger.log(BasicLevel.DEBUG, "creating and registering " + name);
00262 ictx.rebind(name, qcf);
00263 } catch (NamingException e) {
00264 TraceJms.logger.log(BasicLevel.ERROR, "cannot rebind " + name + " :" + e);
00265 }
00266 }
00267 return qcf;
00268 }
00269
00273 public XAConnectionFactory getXAConnectionFactory() {
00274 return momadmin.getXAConnectionFactory();
00275 }
00276
00280 public XATopicConnectionFactory getXATopicConnectionFactory() {
00281 return momadmin.getXATopicConnectionFactory();
00282 }
00283
00287 public XAQueueConnectionFactory getXAQueueConnectionFactory() {
00288 return momadmin.getXAQueueConnectionFactory();
00289 }
00290
00294 public Queue getQueue(String name) throws Exception {
00295 Queue q = (Queue)queues.get(name);
00296 if (q == null) {
00297 q = createQueue(name);
00298 }
00299 return q;
00300 }
00301
00305 public Topic getTopic(String name) throws Exception {
00306 Topic t = (Topic)topics.get(name);
00307 if (t == null) {
00308 t = createTopic(name);
00309 }
00310 return t;
00311 }
00312
00316 public Enumeration getTopicsNames() {
00317 return topics.keys();
00318 }
00319
00323 public Enumeration getQueuesNames() {
00324 return queues.keys();
00325 }
00326
00330 public void stop() throws Exception {
00331
00332 if (cf != null) {
00333 ((JConnectionFactory) cf).cleanPool();
00334 }
00335
00336 if (tcf != null) {
00337 ((JConnectionFactory) tcf).cleanPool();
00338 }
00339
00340 if (qcf != null) {
00341 ((JConnectionFactory) qcf).cleanPool();
00342 }
00343
00344
00345 if (momadmin != null) {
00346 momadmin.stop();
00347 }
00348
00349
00350 Enumeration ln = namelist.elements();
00351 while (ln.hasMoreElements()) {
00352 String name = (String)ln.nextElement();
00353 try {
00354 ictx.unbind(name);
00355 if(TraceJms.isDebug())
00356 TraceJms.logger.log(BasicLevel.DEBUG, "unbind " + name);
00357 } catch(NamingException e) {
00358 if(TraceJms.isDebug())
00359 TraceJms.logger.log(BasicLevel.ERROR, "cannot unbind " + name);
00360 }
00361 }
00362
00363 }
00364
00365
00366
00367
00368
00372 public int getCurrentNumberOfJmsConnectionFactory() {
00373 int result = 0;
00374 if (cf != null) result++;
00375 return result;
00376 }
00377
00381 public int getCurrentNumberOfJmsTopicConnectionFactory() {
00382 int result = 0;
00383 if (tcf != null) result++;
00384 return result;
00385 }
00386
00390 public int getCurrentNumberOfJmsQueueConnectionFactory() {
00391 int result = 0;
00392 if (qcf != null) result++;
00393 return result;
00394 }
00395
00399 public int getCurrentNumberOfJmsTopicDestination() {
00400 return topics.size();
00401 }
00402
00406 public int getCurrentNumberOfJmsQueueDestination() {
00407 return queues.size();
00408 }
00409
00415 public String removeJmsDestination(String jndiName) throws Exception {
00416
00417 momadmin.deleteDestination(jndiName);
00418
00419 String destType = null;
00420 if ((queues.containsKey(jndiName))||(topics.containsKey(jndiName))) {
00421 try {
00422 InitialContext ictx = new InitialContext();
00423 ictx.unbind(jndiName);
00424 Object o = null;
00425 o = queues.remove(jndiName);
00426 if (o == null) {
00427
00428 o = topics.remove(jndiName);
00429 if (o != null) {
00430 destType = "topic";
00431 } else {
00432
00433 destType = "unknown";
00434 }
00435 } else {
00436 destType = "queue";
00437 }
00438 return destType;
00439 } catch(Exception ex) {
00440 throw new Exception("JmsManagerImpl remove destination : cannot unbind " + jndiName );
00441 }
00442 } else {
00443 throw new Exception("Unexisting jms destination :" + jndiName +" remove abord");
00444 }
00445 }
00446
00450 public String getDefaultConnectionFactoryName() {
00451 return "CF";
00452 }
00453
00457 public String getDefaultQueueConnectionFactoryName() {
00458 return "QCF";
00459 }
00460
00464 public String getDefaultTopicConnectionFactoryName() {
00465 return "TCF";
00466 }
00467
00468
00469
00476 public String getConnectionFactoryMode(String jndiName) throws Exception {
00477 TraceJms.logger.log(BasicLevel.DEBUG, "");
00478
00479 if (jndiName.equals("CF"))
00480 return "Point-To-Point and Publish/Subscribe";
00481 else if (jndiName.equals("QCF"))
00482 return "Point-To-Point";
00483 else if (jndiName.equals("TCF"))
00484 return "Publish/Subscribe";
00485 else
00486 throw new IllegalStateException("Unknown factory.");
00487 }
00488
00489
00496 public int getPendingMessages(String jndiName) throws Exception {
00497 TraceJms.logger.log(BasicLevel.DEBUG, "");
00498 Queue queue = (Queue)queues.get(jndiName);
00499 int n;
00500 if (queue != null) {
00501
00502 n = momadmin.getPendingMessages(queue);
00503 } else {
00504 throw new IllegalStateException(jndiName + " not a queue created by the jms service");
00505 }
00506 return n;
00507 }
00508
00515 public int getPendingRequests(String jndiName) throws Exception {
00516 TraceJms.logger.log(BasicLevel.DEBUG, "");
00517 Queue queue = (Queue)queues.get(jndiName);
00518 int n;
00519 if (queue != null) {
00520
00521 n = momadmin.getPendingRequests(queue);
00522 } else {
00523 throw new IllegalStateException(jndiName + " not a queue created by the jms service");
00524 }
00525 return n;
00526 }
00527
00534 public int getSubscriptions(String jndiName) throws Exception {
00535 Topic topic = (Topic)topics.get(jndiName);
00536 int n;
00537 if (topic != null) {
00538
00539 n = momadmin.getSubscriptions(topic);
00540 } else {
00541 throw new IllegalStateException(jndiName + " not a topic created by the jms service");
00542 }
00543 return n;
00544 }
00545 }