00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 package org.objectweb.jonas_jms;
00027
00028 import java.util.LinkedList;
00029
00030 import javax.jms.Connection;
00031 import javax.jms.ConnectionConsumer;
00032 import javax.jms.ConnectionMetaData;
00033 import javax.jms.Destination;
00034 import javax.jms.ExceptionListener;
00035 import javax.jms.JMSException;
00036 import javax.jms.ServerSessionPool;
00037 import javax.jms.Session;
00038 import javax.jms.Topic;
00039 import javax.jms.XAConnection;
00040 import javax.jms.XAConnectionFactory;
00041
00042 import org.objectweb.transaction.jta.TransactionManager;
00043 import org.objectweb.util.monolog.api.BasicLevel;
00044
00055 public class JConnection implements Connection {
00056
00057
00058 protected XAConnection xac;
00059
00060 protected boolean closed;
00061 protected String user;
00062 protected boolean globaltx = false;
00063 protected static TransactionManager tm;
00064 protected JConnectionFactory jcf;
00065 protected LinkedList sessionlist = new LinkedList();
00066
00067
00068
00069 protected static final String INTERNAL_USER_NAME = "anInternalNameUsedOnlyByJOnAS";
00070
00074 protected JConnection(JConnectionFactory jcf, String user) throws JMSException {
00075 this.user = user;
00076 this.jcf = jcf;
00077 closed = false;
00078 if (tm == null) {
00079 tm = JmsManagerImpl.getTransactionManager();
00080 }
00081
00082
00083 try {
00084 globaltx = (tm.getTransaction() != null);
00085 } catch (Exception e) {
00086 }
00087 }
00088
00089
00096 public JConnection(JConnectionFactory jcf, XAConnectionFactory xacf, String user, String passwd)
00097 throws JMSException {
00098 this(jcf, user);
00099
00100 xac = xacf.createXAConnection(user, passwd);
00101 }
00102
00106 public JConnection(JConnectionFactory jcf, XAConnectionFactory xacf) throws JMSException {
00107 this(jcf, INTERNAL_USER_NAME);
00108
00109 xac = xacf.createXAConnection();
00110 }
00111
00112
00113
00114
00115
00119 protected synchronized boolean sessionOpen(Session s) {
00120 if (!closed) {
00121 sessionlist.add(s);
00122 return true;
00123 } else {
00124 return false;
00125 }
00126 }
00127
00131 protected synchronized void sessionClose(Session s) {
00132 sessionlist.remove(s);
00133 if (sessionlist.size() == 0 && closed) {
00134 notify();
00135 }
00136 }
00137
00141 public String getUser() {
00142 return user;
00143 }
00144
00145
00146
00147
00148
00158 public void close() throws JMSException {
00159 TraceJms.logger.log(BasicLevel.DEBUG, "");
00160 if (globaltx) {
00161
00162
00163
00164
00165 jcf.freeJConnection(this);
00166 } else {
00167
00168
00169 synchronized(this) {
00170 while (sessionlist.size() > 0) {
00171 try {
00172 wait();
00173 } catch (InterruptedException e) {
00174 TraceJms.logger.log(BasicLevel.ERROR, "interrupted");
00175 }
00176 }
00177 }
00178 closed = true;
00179 xac.close();
00180 }
00181 }
00182 public void finalClose() throws JMSException {
00183 if (!closed) {
00184 xac.close();
00185 }
00186 }
00187
00201 public ConnectionConsumer createConnectionConsumer(Destination destination,
00202 java.lang.String messageSelector,
00203 ServerSessionPool sessionPool,
00204 int maxMessages)
00205 throws JMSException {
00206 TraceJms.logger.log(BasicLevel.DEBUG, "");
00207 return xac.createConnectionConsumer(destination,
00208 messageSelector,
00209 sessionPool,
00210 maxMessages);
00211 }
00212
00226 public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
00227 java.lang.String subscriptionName,
00228 java.lang.String messageSelector,
00229 ServerSessionPool sessionPool,
00230 int maxMessages)
00231 throws JMSException {
00232 TraceJms.logger.log(BasicLevel.DEBUG, "");
00233 return xac.createDurableConnectionConsumer(topic,
00234 subscriptionName,
00235 messageSelector,
00236 sessionPool,
00237 maxMessages);
00238 }
00239
00240
00251 public Session createSession(boolean transacted,int acknowledgeMode) throws JMSException {
00252 TraceJms.logger.log(BasicLevel.DEBUG, "");
00253 return new JSession(this, xac);
00254 }
00255
00256
00265 public String getClientID() throws JMSException {
00266 TraceJms.logger.log(BasicLevel.DEBUG, "");
00267 return xac.getClientID();
00268 }
00269
00281 public void setClientID(String clientID) throws JMSException {
00282 TraceJms.logger.log(BasicLevel.DEBUG, "");
00283 xac.setClientID(clientID);
00284 }
00285
00292 public ConnectionMetaData getMetaData() throws JMSException {
00293 TraceJms.logger.log(BasicLevel.DEBUG, "");
00294 return xac.getMetaData();
00295 }
00296
00303 public ExceptionListener getExceptionListener() throws JMSException {
00304 TraceJms.logger.log(BasicLevel.DEBUG, "");
00305 return xac.getExceptionListener();
00306 }
00307
00314 public void setExceptionListener(ExceptionListener listener) throws JMSException {
00315 TraceJms.logger.log(BasicLevel.DEBUG, "");
00316 xac.setExceptionListener(listener);
00317 }
00318
00324 public void start() throws JMSException {
00325 TraceJms.logger.log(BasicLevel.DEBUG, "");
00326 xac.start();
00327 }
00328
00337 public void stop() throws JMSException {
00338 TraceJms.logger.log(BasicLevel.DEBUG, "");
00339 xac.stop();
00340 }
00341 }