DiscoveryComm.java

00001 
00022 package org.objectweb.jonas.discovery;
00023 
00024 import java.io.ByteArrayInputStream;
00025 import java.io.IOException;
00026 import java.io.ObjectInputStream;
00027 import java.net.DatagramPacket;
00028 import java.net.DatagramSocket;
00029 import java.net.InetAddress;
00030 import java.net.MulticastSocket;
00031 import java.net.SocketException;
00032 import java.net.UnknownHostException;
00033 
00034 import javax.management.MBeanServer;
00035 import javax.management.remote.JMXServiceURL;
00036 
00037 import org.objectweb.jonas.common.Log;
00038 import org.objectweb.util.monolog.api.BasicLevel;
00039 import org.objectweb.util.monolog.api.Logger;
00040 
00045 public class DiscoveryComm implements Runnable {
00046     private int              RECEIVE_BUFFER_SIZE = 1024;
00050     private MulticastSocket  multicastSocket;
00054     private DatagramSocket unicastSocket;
00058     private DiscoveryManager dm;
00062     private int port;
00066     private InetAddress destAddress;
00070     private boolean notStopped = true;
00074     private int ttl = 1; // why 1 ??
00075 
00076     private String jonasName = null;
00077     private String domainName = null;
00078     private String[] urls = null;
00079 
00083     private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX);
00088     public DiscoveryComm(DiscoveryManager dm) {
00089         this.port = dm.getListeningPort();
00090         try {
00091             this.destAddress = InetAddress.getByName(dm.getListeningIp());
00092             this.ttl = dm.getTimeToLive();
00093         } catch (UnknownHostException e) {
00094             e.printStackTrace();
00095         }
00096         this.dm = dm;
00097     }
00098     
00104     private void join() {
00105         try {
00106             multicastSocket = new MulticastSocket(port);
00107             multicastSocket.setTimeToLive(ttl);
00108             multicastSocket.joinGroup(destAddress);
00109             logger.log(BasicLevel.DEBUG, "multicast ip address is "
00110                     + destAddress);
00111 
00112             logger.log(BasicLevel.DEBUG, "multicast port is " + port);
00113 
00114         } catch (IOException e) {
00115             logger.log(BasicLevel.ERROR, "io problem");
00116             // TODO Auto-generated catch block
00117             e.printStackTrace();
00118         }
00119     }
00120 
00124     public void sendNotif(DiscEvent msg) {
00125         try {
00126             //send it on the multicast address
00127             //after transforming the object to a datagram
00128             logger.log(BasicLevel.DEBUG, msg);
00129             byte[] messageBytes = DiscEvent.objectToBytes(msg);
00130             multicastSocket.send(new DatagramPacket(messageBytes,
00131                     messageBytes.length, destAddress, port));
00132         } catch (IOException e1) {
00133             logger.log(BasicLevel.ERROR, "DiscoveryComm: Error to send notification");
00134             e1.printStackTrace();
00135         }
00136 
00137     }
00145     private void sendResponse(DiscEvent msg, InetAddress destAddress, int port) {
00146         logger.log(BasicLevel.DEBUG, "DiscoveryComm : The message to send is "
00147                 + msg);
00148         byte[] messageBytes = DiscEvent.objectToBytes(msg);
00149         if (messageBytes != null) {
00150             try {
00151                 // send the unicast response to the discovery client
00152                 unicastSocket.send(new DatagramPacket(messageBytes,
00153                         messageBytes.length, destAddress, port));
00154             } catch (IOException e) {
00155                 logger.log(BasicLevel.ERROR, "DiscoveryComm: Error to send response to discovery message");
00156                 e.printStackTrace();
00157             }
00158         }
00159     }
00160 
00171     public DiscEvent createNotifMessage(String state) throws Exception {
00172         String theHostName;
00173         MBeanServer mbeanServer;
00174         try {
00175             theHostName = InetAddress.getLocalHost().getHostName();
00176         } catch (UnknownHostException e) {
00177             e.printStackTrace();
00178             return null;
00179         }
00180 
00181         if (!state.equals(DiscEvent.RUNNING)) {
00182             urls = null;
00183         }
00184         // In the case of a notification, the field port is not important since the
00185         // notifier is not waiting for an acknowledfement.
00186         DiscEvent resp = new DiscEvent(theHostName, port, jonasName, domainName, urls);
00187         resp.setState(state);
00188         return resp;
00189     }
00190 
00195     public void run()  {
00196         // Create a DatagramPacket to receive messages
00197         DatagramPacket datagram = new DatagramPacket(new byte[RECEIVE_BUFFER_SIZE],
00198                 RECEIVE_BUFFER_SIZE);
00199         // Object reveived in a message
00200         Object objReceived = null;
00201         ObjectInputStream in = null;
00202         
00203         // Join the group in order to receive multicast messages
00204         join();
00205         // Create notification message containing a discovery event with state RUNNING
00206         DiscEvent msg = null;
00207         try {
00208             msg = createNotifMessage(DiscEvent.RUNNING);
00209         } catch (Exception e) {
00210             logger.log(BasicLevel.ERROR,
00211                 "DiscoveryComm:  Unable to create a notification message");
00212             e.printStackTrace();
00213         }
00214         if (msg != null) {
00215             // Multicast the message
00216             sendNotif(msg);
00217         }
00218         // Wait for notification messages
00219         // Create the socket to be used for responding
00220         try {
00221             unicastSocket = new DatagramSocket();
00222         } catch (SocketException e3) {
00223             e3.printStackTrace();
00224             return;
00225         }
00226         try {
00227             while (notStopped) {
00228                 multicastSocket.receive(datagram);
00229                 in = new ObjectInputStream(new ByteArrayInputStream(datagram.getData()));
00230                 objReceived = in.readObject();
00231 
00232                 if (objReceived != null) {
00233                     // The DiscEvents are ignored
00234                     if (objReceived instanceof DiscMessage) {
00235                         if (objReceived instanceof DiscEvent) {
00236                             logger.log(BasicLevel.DEBUG,
00237                                     "This discovery event is ignored " + objReceived);
00238                         } else {
00239                             DiscMessage request = (DiscMessage) objReceived;
00240                             logger.log(BasicLevel.DEBUG,
00241                                     "A dicovery message is received "
00242                                     + objReceived);
00243 
00244                             /* do not create  a new discovery event object with state RUNNING but 
00245                              * reuse the one created above
00246                              */
00247                             /*
00248                             DiscEvent msg = null;
00249                             try {
00250                                 msg = createNotifMessage(DiscEvent.RUNNING);
00251                             } catch (Exception e) {
00252                                 logger.log(BasicLevel.ERROR,
00253                                     "DiscoveryComm:  Unable to create a notification message");
00254                                 e.printStackTrace();
00255                             }
00256                             */
00257                             if (msg != null) {
00258                                 InetAddress destAddress = InetAddress.getByName(request.getSourceAddress());
00259                                 int destPort = request.getSourcePort();
00260                                 sendResponse(msg, destAddress, destPort);
00261                             }
00262                         }
00263                     }
00264                 }
00265             }
00266         } catch (SocketException e) {
00267             logger.log(BasicLevel.ERROR, "Socket closed: " + e);
00268             notStopped = false;
00269         } catch (IOException e1) {
00270             e1.printStackTrace();
00271         } catch (ClassNotFoundException e) {
00272             e.printStackTrace();
00273         }
00274     }
00275 
00280     public void stop() {
00281         // send a notification message of type STOPPING
00282         DiscEvent msg = null;
00283         try {
00284             msg = createNotifMessage(DiscEvent.STOPPING);
00285         } catch (Exception e) {
00286             e.printStackTrace();
00287         }
00288         if (msg != null) {
00289             sendNotif(msg);
00290         }
00291         Thread.interrupted();
00292     }
00293 
00297     protected void setJonasName(String jonasName) {
00298         this.jonasName = jonasName;
00299     }
00303     protected void setDomainName(String domainName) {
00304         this.domainName = domainName;
00305     }
00309     protected void setUrls(String[] urls) {
00310         this.urls = urls;
00311     }
00312 }

Generated on Tue Feb 15 15:05:15 2005 for JOnAS by  doxygen 1.3.9.1