00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 package org.objectweb.jonas_jms;
00027
00028 import java.io.Serializable;
00029
00030 import javax.jms.BytesMessage;
00031 import javax.jms.Destination;
00032 import javax.jms.JMSException;
00033 import javax.jms.MapMessage;
00034 import javax.jms.Message;
00035 import javax.jms.MessageConsumer;
00036 import javax.jms.MessageListener;
00037 import javax.jms.MessageProducer;
00038 import javax.jms.ObjectMessage;
00039 import javax.jms.Queue;
00040 import javax.jms.QueueBrowser;
00041 import javax.jms.Session;
00042 import javax.jms.StreamMessage;
00043 import javax.jms.TemporaryQueue;
00044 import javax.jms.TemporaryTopic;
00045 import javax.jms.TextMessage;
00046 import javax.jms.Topic;
00047 import javax.jms.TopicSubscriber;
00048 import javax.jms.XAConnection;
00049 import javax.jms.XASession;
00050 import javax.transaction.RollbackException;
00051 import javax.transaction.Synchronization;
00052 import javax.transaction.SystemException;
00053 import javax.transaction.Transaction;
00054 import javax.transaction.xa.XAResource;
00055
00056 import org.objectweb.transaction.jta.TransactionManager;
00057 import org.objectweb.util.monolog.api.BasicLevel;
00058
00059
00068 public class JSession implements Session, Synchronization {
00069
00070 protected XAResource xares = null;
00071 protected boolean txover = true;
00072 protected Transaction currtx = null;
00073 protected boolean closed = false;
00074 protected JConnection jconn;
00075 protected static TransactionManager tm = null;
00076
00077 protected XAConnection xac;
00078 protected Session sess = null;
00079 protected XASession xasess = null;
00080
00085 protected JSession(JConnection jconn) {
00086 this.jconn = jconn;
00087 if (tm == null) {
00088 tm = JmsManagerImpl.getTransactionManager();
00089 }
00090 }
00091
00095 public JSession(JConnection jconn, XAConnection xac) {
00096 this(jconn);
00097 this.xac = xac;
00098 TraceJms.logger.log(BasicLevel.DEBUG,"");
00099 }
00100
00101
00102
00103
00104
00109 protected XAResource getXAResource() {
00110 return xares;
00111 }
00112
00117 protected Session getMOMSession() throws JMSException {
00118 Transaction tx = null;
00119 try {
00120 tx = tm.getTransaction();
00121 } catch (SystemException e) {
00122 TraceJms.logger.log(BasicLevel.ERROR,"cannot get Transaction");
00123 }
00124 if (tx == null) {
00125 if (sess == null) {
00126 sess = xac.createSession(false, Session.AUTO_ACKNOWLEDGE);
00127 jconn.sessionOpen(this);
00128 }
00129 return sess;
00130 } else {
00131 if (xasess == null) {
00132 xasess = xac.createXASession();
00133 if (currtx != null) {
00134 TraceJms.logger.log(BasicLevel.ERROR,"mixed transactions");
00135 }
00136 currtx = tx;
00137 xares = xasess.getXAResource();
00138 try {
00139 tx.enlistResource(xares);
00140 txover = false;
00141 } catch (SystemException e) {
00142 TraceJms.logger.log(BasicLevel.ERROR,"cannot enlist session:"+e);
00143 throw new JMSException(e.toString());
00144 } catch (RollbackException e) {
00145 TraceJms.logger.log(BasicLevel.ERROR,"transaction rolled back");
00146 throw new JMSException(e.toString());
00147 }
00148 }
00149 return xasess.getSession();
00150 }
00151 }
00152
00153 protected void MOMSessionClose() {
00154 try {
00155 if (xasess != null) {
00156 xasess.close();
00157 xasess = null;
00158 }
00159 if (sess != null) {
00160 sess.close();
00161 sess = null;
00162 jconn.sessionClose(this);
00163 }
00164 } catch (JMSException e) {
00165 TraceJms.logger.log(BasicLevel.ERROR,"exception:"+e);
00166 }
00167 }
00168
00172 protected void PhysicalClose() {
00173 MOMSessionClose();
00174 }
00175
00176
00177
00178
00179
00183 public void close() throws JMSException {
00184 TraceJms.logger.log(BasicLevel.DEBUG, "");
00185 closed = true;
00186 if (txover) {
00187 PhysicalClose();
00188 } else {
00189
00190 if (currtx == null) {
00191 TraceJms.logger.log(BasicLevel.ERROR, "should be in a tx");
00192 } else {
00193 try {
00194 currtx.delistResource(xares, XAResource.TMSUCCESS);
00195 } catch (SystemException e) {
00196 TraceJms.logger.log(BasicLevel.ERROR, "cannot delist session:"+e);
00197 throw new JMSException(e.toString());
00198 }
00199 }
00200 }
00201 }
00202
00206 public void commit() throws JMSException {
00207 TraceJms.logger.log(BasicLevel.DEBUG, "");
00208 throw new JMSException("JSession: commit Operation Not Allowed");
00209 }
00210
00214 public QueueBrowser createBrowser(Queue queue)
00215 throws JMSException {
00216 TraceJms.logger.log(BasicLevel.DEBUG, "");
00217 return getMOMSession().createBrowser(queue);
00218 }
00219
00223 public QueueBrowser createBrowser(Queue queue, java.lang.String messageSelector)
00224 throws JMSException {
00225 TraceJms.logger.log(BasicLevel.DEBUG, "");
00226 return getMOMSession().createBrowser(queue, messageSelector);
00227 }
00228
00232 public BytesMessage createBytesMessage() throws JMSException {
00233 TraceJms.logger.log(BasicLevel.DEBUG, "");
00234 return getMOMSession().createBytesMessage();
00235 }
00236
00237
00241 public MessageConsumer createConsumer(Destination destination)
00242 throws JMSException {
00243 TraceJms.logger.log(BasicLevel.DEBUG, "");
00244 return getMOMSession().createConsumer(destination);
00245 }
00246
00247
00251 public MessageConsumer createConsumer(Destination destination,
00252 String messageSelector)
00253 throws JMSException {
00254 TraceJms.logger.log(BasicLevel.DEBUG, "");
00255 return getMOMSession().createConsumer(destination, messageSelector);
00256 }
00257
00261 public MessageConsumer createConsumer(Destination destination,
00262 String messageSelector,
00263 boolean NoLocal)
00264 throws JMSException {
00265 TraceJms.logger.log(BasicLevel.DEBUG, "");
00266 return getMOMSession().createConsumer(destination, messageSelector,NoLocal );
00267 }
00268
00272 public TopicSubscriber createDurableSubscriber(Topic topic,
00273 String name)
00274 throws JMSException {
00275 TraceJms.logger.log(BasicLevel.DEBUG, "");
00276 return getMOMSession().createDurableSubscriber(topic, name);
00277 }
00278
00282 public TopicSubscriber createDurableSubscriber(Topic topic,
00283 String name,
00284 String messageSelector,
00285 boolean noLocal)
00286 throws JMSException {
00287 TraceJms.logger.log(BasicLevel.DEBUG, "");
00288 return getMOMSession().createDurableSubscriber(topic, name, messageSelector, noLocal);
00289 }
00290
00294 public MapMessage createMapMessage() throws JMSException {
00295 TraceJms.logger.log(BasicLevel.DEBUG, "");
00296 return getMOMSession().createMapMessage();
00297 }
00298
00302 public Message createMessage() throws JMSException {
00303 TraceJms.logger.log(BasicLevel.DEBUG, "");
00304 return getMOMSession().createMessage();
00305 }
00306
00310 public ObjectMessage createObjectMessage() throws JMSException {
00311 TraceJms.logger.log(BasicLevel.DEBUG, "");
00312 return getMOMSession().createObjectMessage();
00313 }
00314
00318 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
00319 TraceJms.logger.log(BasicLevel.DEBUG, "");
00320 return getMOMSession().createObjectMessage(object);
00321 }
00322
00326 public MessageProducer createProducer(Destination destination)
00327 throws JMSException {
00328 TraceJms.logger.log(BasicLevel.DEBUG, "");
00329 return getMOMSession().createProducer(destination);
00330 }
00331
00336 public Queue createQueue(String queueName)
00337 throws JMSException {
00338 TraceJms.logger.log(BasicLevel.DEBUG, "");
00339 return getMOMSession().createQueue(queueName);
00340 }
00341
00345 public StreamMessage createStreamMessage() throws JMSException {
00346 TraceJms.logger.log(BasicLevel.DEBUG, "");
00347 return getMOMSession().createStreamMessage();
00348 }
00349
00353 public TemporaryQueue createTemporaryQueue()
00354 throws JMSException {
00355 TraceJms.logger.log(BasicLevel.DEBUG, "");
00356 return getMOMSession().createTemporaryQueue();
00357 }
00358
00362 public TemporaryTopic createTemporaryTopic()
00363 throws JMSException {
00364 TraceJms.logger.log(BasicLevel.DEBUG, "");
00365 return getMOMSession().createTemporaryTopic();
00366 }
00367
00371 public TextMessage createTextMessage() throws JMSException {
00372 TraceJms.logger.log(BasicLevel.DEBUG, "");
00373 return getMOMSession().createTextMessage();
00374 }
00375
00379 public TextMessage createTextMessage(String text) throws JMSException {
00380 TraceJms.logger.log(BasicLevel.DEBUG, "");
00381 return getMOMSession().createTextMessage(text);
00382 }
00383
00387 public Topic createTopic(String topicName)
00388 throws JMSException{
00389 TraceJms.logger.log(BasicLevel.DEBUG, "");
00390 return getMOMSession().createTopic(topicName);
00391 }
00392
00396 public MessageListener getMessageListener() throws JMSException {
00397 TraceJms.logger.log(BasicLevel.DEBUG, "");
00398 return getMOMSession().getMessageListener();
00399 }
00400
00404 public boolean getTransacted() throws JMSException {
00405 TraceJms.logger.log(BasicLevel.DEBUG, "");
00406 return getMOMSession().getTransacted();
00407 }
00408
00412 public int getAcknowledgeMode() throws JMSException {
00413 TraceJms.logger.log(BasicLevel.DEBUG, "");
00414 return getMOMSession().getAcknowledgeMode();
00415 }
00416
00417
00421 public void recover() throws JMSException {
00422 TraceJms.logger.log(BasicLevel.DEBUG, "");
00423 throw new JMSException("JSession: recover Operation Not Allowed");
00424 }
00425
00429 public void rollback() throws JMSException {
00430 TraceJms.logger.log(BasicLevel.DEBUG, "");
00431 throw new JMSException("JSession: rollback Operation Not Allowed");
00432 }
00433
00437 public void run() {
00438 TraceJms.logger.log(BasicLevel.DEBUG, "");
00439 try {
00440 getMOMSession().run();
00441 } catch (JMSException e) {
00442 TraceJms.logger.log(BasicLevel.ERROR, "exception: "+e);
00443 }
00444 }
00445
00449 public void setMessageListener(MessageListener listener) throws JMSException {
00450 TraceJms.logger.log(BasicLevel.DEBUG, "");
00451 getMOMSession().setMessageListener(listener);
00452 }
00453
00454
00458 public void unsubscribe(String name)
00459 throws JMSException {
00460 TraceJms.logger.log(BasicLevel.DEBUG, "");
00461 getMOMSession().unsubscribe(name);
00462 }
00463
00464
00465
00466
00467
00468
00473 public void beforeCompletion() {
00474 if(TraceJms.isDebug()){
00475 TraceJms.logger.log(BasicLevel.DEBUG, "");
00476 }
00477 }
00478
00484 public void afterCompletion(int status) {
00485 if(TraceJms.isDebug()){
00486 TraceJms.logger.log(BasicLevel.DEBUG, "");
00487 }
00488 txover = true;
00489 if (closed) {
00490 PhysicalClose();
00491 }
00492 }
00493
00494
00495 }
00496