00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 package org.objectweb.jonas_jms;
00028
00029 import java.util.StringTokenizer;
00030 import java.util.Vector;
00031
00032 import javax.jms.ConnectionFactory;
00033 import javax.jms.Queue;
00034 import javax.jms.QueueConnectionFactory;
00035 import javax.jms.Topic;
00036 import javax.jms.TopicConnectionFactory;
00037 import javax.jms.XAConnectionFactory;
00038 import javax.jms.XAQueueConnectionFactory;
00039 import javax.jms.XATopicConnectionFactory;
00040 import javax.naming.InitialContext;
00041 import javax.naming.NamingException;
00042
00043 import org.objectweb.common.Env;
00044 import org.objectweb.jonas_jms.api.JmsAdministration;
00045 import org.objectweb.joram.client.jms.admin.AdminModule;
00046 import org.objectweb.joram.client.jms.admin.User;
00047 import org.objectweb.joram.client.jms.local.XALocalConnectionFactory;
00048 import org.objectweb.joram.client.jms.local.XAQueueLocalConnectionFactory;
00049 import org.objectweb.joram.client.jms.local.XATopicLocalConnectionFactory;
00050 import org.objectweb.joram.client.jms.tcp.QueueTcpConnectionFactory;
00051 import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
00052 import org.objectweb.joram.client.jms.tcp.TopicTcpConnectionFactory;
00053 import org.objectweb.joram.client.jms.tcp.XAQueueTcpConnectionFactory;
00054 import org.objectweb.joram.client.jms.tcp.XATcpConnectionFactory;
00055 import org.objectweb.joram.client.jms.tcp.XATopicTcpConnectionFactory;
00056 import org.objectweb.util.monolog.api.BasicLevel;
00057
00058 import fr.dyade.aaa.agent.AgentServer;
00059
00072 public class JmsAdminForJoram implements JmsAdministration {
00073
00074 private static String QUEUE_CONN_FACT_NAME="JQCF";
00075 private static String TOPIC_CONN_FACT_NAME="JTCF";
00076 private static String CONN_FACT_NAME="JCF";
00077 private boolean collocatedmom = false;
00078 private InitialContext ictx = null;
00079 private Vector namelist = new Vector();
00080 private XAConnectionFactory xacf = null;
00081 private XATopicConnectionFactory xatcf = null;
00082 private XAQueueConnectionFactory xaqcf = null;
00083 private ConnectionFactory jcf = null;
00084 private TopicConnectionFactory jtcf = null;
00085 private QueueConnectionFactory jqcf = null;
00086 private org.objectweb.joram.client.jms.admin.User user = null;
00087 private org.objectweb.joram.client.jms.admin.User jonasUser = null;
00088 private String host = null;
00089 private int port;
00090
00095 public JmsAdminForJoram() {
00096 }
00097
00098
00099
00100
00101
00112 private void startMOM() throws Exception {
00113 TraceJms.logger.log(BasicLevel.DEBUG,"");
00114
00115 try {
00116
00117 String TRANSACTION_PROP = "Transaction";
00118 String s1 = System.getProperty(TRANSACTION_PROP);
00119 if (s1 == null) {
00120 System.setProperty(TRANSACTION_PROP, "fr.dyade.aaa.util.NullTransaction");
00121 }
00122
00123
00124 String A3CONF_DIR_PROP = "fr.dyade.aaa.agent.A3CONF_DIR";
00125 String a3config_dir = System.getProperty(A3CONF_DIR_PROP);
00126 if (a3config_dir == null) {
00127 System.setProperty(A3CONF_DIR_PROP, System.getProperty("jonas.base")
00128 + System.getProperty("file.separator") + "conf");
00129 }
00130
00131 if(TraceJms.isDebug())
00132 TraceJms.logger.log(BasicLevel.DEBUG,"a3config_dir= "+a3config_dir);
00133
00134
00135
00136 short serverId = 0;
00137 AgentServer.init(serverId,"s0", TraceJms.getLoggerFactory());
00138 AgentServer.start();
00139
00140
00141 Thread.sleep(10);
00142
00143
00144
00145 short sid = AgentServer.getServerId();
00146 host = AgentServer.getHostname(sid);
00147 port = 16010;
00148 try {
00149 String s = AgentServer.getServiceArgs(sid, "org.objectweb.joram.mom.proxies.tcp.TcpProxyService");
00150 StringTokenizer st = new StringTokenizer(s);
00151 port = Integer.parseInt(st.nextToken());
00152 } catch (NumberFormatException exc) {
00153 TraceJms.logger.log(BasicLevel.ERROR,"exception: "+exc);
00154 } catch (Exception e) {
00155 TraceJms.logger.log(BasicLevel.ERROR,"MOM exception:"+e);
00156 throw e;
00157 }
00158 TraceJms.logger.log(BasicLevel.INFO, "starting MOM on host " + host + ", port " + port);
00159
00160 } catch (Exception ex) {
00161 TraceJms.logger.log(BasicLevel.ERROR,"exception: "+ex);
00162 throw ex;
00163 }
00164 }
00165
00166
00167
00168
00169
00179 public void start(boolean collocated, String url) throws Exception {
00180
00181
00182 if (collocated) {
00183 collocatedmom = true;
00184 startMOM();
00185 }
00186
00187
00188 ictx = new InitialContext();
00189
00190
00191 if (host != null) {
00192
00193 TraceJms.logger.log(BasicLevel.INFO, "starting JmsAdmin with host " + host + ", port " + port);
00194 AdminModule.collocatedConnect("root", "root");
00195 } else if (url != null && url.length() > 0) {
00196
00197 int indexOfHost = url.indexOf("//") + 2;
00198 int indexOfPort = url.indexOf(":", indexOfHost) + 1;
00199 host = url.substring(indexOfHost, indexOfPort - 1);
00200 port = Integer.parseInt(url.substring(indexOfPort));
00201 TraceJms.logger.log(BasicLevel.WARN, "starting JmsAdmin with host " + host + ", port " + port);
00202 AdminModule.connect(host, port, "root", "root", 100);
00203 } else {
00204
00205 TraceJms.logger.log(BasicLevel.WARN, "starting JmsAdmin with default url");
00206 AdminModule.connect("root", "root", 100);
00207 }
00208
00209 user = User.create("anonymous", "anonymous");
00210 jonasUser = User.create("jonas", "jonas");
00211
00212
00213 if (collocated) {
00214 xacf = XALocalConnectionFactory.create();
00215 xatcf = XATopicLocalConnectionFactory.create();
00216 xaqcf = XAQueueLocalConnectionFactory.create();
00217 }
00218 else {
00219 xacf = XATcpConnectionFactory.create(host, port);
00220 xatcf = XATopicTcpConnectionFactory.create(host, port);
00221 xaqcf = XAQueueTcpConnectionFactory.create(host, port);
00222 }
00223
00224
00225
00226 String name = CONN_FACT_NAME;
00227 try {
00228 jcf = TcpConnectionFactory.create(host, port);
00229 ictx.rebind(name, jcf);
00230 namelist.addElement(name);
00231 } catch (NamingException e) {
00232 TraceJms.logger.log(BasicLevel.ERROR,"cannot register "+name);
00233 }
00234
00235 name = QUEUE_CONN_FACT_NAME;
00236 try {
00237 jqcf = QueueTcpConnectionFactory.create(host, port);
00238 ictx.rebind(name, jqcf);
00239 namelist.addElement(name);
00240 } catch (NamingException e) {
00241 TraceJms.logger.log(BasicLevel.ERROR,"cannot register "+name);
00242 }
00243
00244 name = TOPIC_CONN_FACT_NAME;
00245 try {
00246 jtcf = TopicTcpConnectionFactory.create(host, port);
00247 ictx.rebind(name, jtcf);
00248 namelist.addElement(name);
00249 } catch (NamingException e) {
00250 TraceJms.logger.log(BasicLevel.ERROR,"cannot register "+name);
00251 }
00252 }
00253
00257 public void stop() {
00258 if ( Env.isJAVA4() ){
00259
00260 try {
00261 AdminModule.disconnect();
00262 } catch (Exception ex) {
00263 TraceJms.logger.log(BasicLevel.ERROR, "problem for closing admin object " + ex);
00264 }
00265 if (collocatedmom) {
00266 AgentServer.stop();
00267 if(TraceJms.isDebug())
00268 TraceJms.logger.log(BasicLevel.DEBUG, "agentServer stopped");
00269 }
00270 }
00271
00272 try {
00273 ictx.unbind(CONN_FACT_NAME);
00274 ictx.unbind(QUEUE_CONN_FACT_NAME);
00275 ictx.unbind(TOPIC_CONN_FACT_NAME);
00276 if(TraceJms.isDebug())
00277 TraceJms.logger.log(BasicLevel.DEBUG,"unbind connection factories");
00278 } catch(Exception ex) {
00279 TraceJms.logger.log(BasicLevel.ERROR,"cannot unbind connection factories");
00280 }
00281 }
00282
00283
00287 public XAConnectionFactory getXAConnectionFactory() {
00288 return xacf;
00289 }
00290
00294 public XATopicConnectionFactory getXATopicConnectionFactory() {
00295 return xatcf;
00296 }
00297
00301 public XAQueueConnectionFactory getXAQueueConnectionFactory() {
00302 return xaqcf;
00303 }
00304
00305
00309 public Queue createQueue(String name) throws Exception {
00310 TraceJms.logger.log(BasicLevel.DEBUG,"");
00311 org.objectweb.joram.client.jms.Queue queue = null;
00312 try {
00313 queue = (org.objectweb.joram.client.jms.Queue) ictx.lookup(name);
00314 } catch (Exception e) {
00315 queue = org.objectweb.joram.client.jms.Queue.create(name);
00316 ictx.rebind(name, queue);
00317 }
00318 queue.setWriter(user);
00319 queue.setReader(user);
00320 queue.setWriter(jonasUser);
00321 queue.setReader(jonasUser);
00322 return queue;
00323 }
00324
00328 public Topic createTopic(String name) throws Exception {
00329 TraceJms.logger.log(BasicLevel.DEBUG,"");
00330 org.objectweb.joram.client.jms.Topic topic = null;
00331 try {
00332 topic = (org.objectweb.joram.client.jms.Topic) ictx.lookup(name);
00333 } catch (Exception e) {
00334 topic = org.objectweb.joram.client.jms.Topic.create(name);
00335 ictx.rebind(name, topic);
00336 }
00337 topic.setWriter(user);
00338 topic.setReader(user);
00339 topic.setWriter(jonasUser);
00340 topic.setReader(jonasUser);
00341 return topic;
00342 }
00343
00347 public void deleteDestination(String name) throws Exception {
00348 TraceJms.logger.log(BasicLevel.DEBUG,"");
00349 try {
00350 org.objectweb.joram.client.jms.Destination dest = (org.objectweb.joram.client.jms.Destination) ictx.lookup(name);
00351 dest.delete();
00352 } catch (Exception e) {
00353 }
00354 }
00355
00361 public int getPendingMessages(javax.jms.Queue queue) throws Exception {
00362 return ((org.objectweb.joram.client.jms.Queue) queue).getPendingMessages();
00363 }
00364
00370 public int getPendingRequests(javax.jms.Queue queue) throws Exception {
00371 return ((org.objectweb.joram.client.jms.Queue) queue).getPendingRequests();
00372 }
00373
00379 public int getSubscriptions(javax.jms.Topic topic) throws Exception {
00380 return ((org.objectweb.joram.client.jms.Topic) topic).getSubscriptions();
00381 }
00382 }