DiscoveryClientListener.java

00001 
00022 package org.objectweb.jonas.discovery;
00023 
00024 import java.io.ByteArrayInputStream;
00025 import java.io.IOException;
00026 import java.io.ObjectInputStream;
00027 import java.net.DatagramPacket;
00028 import java.net.DatagramSocket;
00029 import java.net.InetAddress;
00030 import java.net.MulticastSocket;
00031 import java.net.SocketException;
00032 import java.net.UnknownHostException;
00033 
00034 import javax.management.Notification;
00035 
00036 import org.objectweb.jonas.common.Log;
00037 import org.objectweb.util.monolog.api.BasicLevel;
00038 import org.objectweb.util.monolog.api.Logger;
00039 
00044 public class DiscoveryClientListener implements Runnable {
00048     public static final String DISCOVERY_TYPE = "jonas.management.discovery";
00049 
00050     private static int RECEIVE_BUFFER_SIZE = 1024;
00054     private DiscoveryClient discoveryClient;
00058     private MulticastSocket multicastSocket;
00062     private int port;
00066     private InetAddress destAddress;
00070     private int             ttl;
00074     private DatagramSocket  unicastSocket;
00078     private String          sourceIp;
00082     private int             sourcePort;
00086     private boolean         notStopped          = true;
00090     private long            timeout             = 1000;
00094     private long            sequenceNumber      = 0;
00098     private static Logger   logger              = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX);
00103     public DiscoveryClientListener(DiscoveryClient discoveryClient) {
00104         this.port = discoveryClient.getListeningPort();
00105         try {
00106             this.destAddress = InetAddress.getByName(discoveryClient.getListeningIp());
00107             this.ttl = discoveryClient.getTimeToLive();
00108         } catch (UnknownHostException e) {
00109             e.printStackTrace();
00110         }
00111         this.timeout = discoveryClient.getTimeout();
00112         this.sourcePort = discoveryClient.getSourcePort();
00113         this.sourceIp = discoveryClient.getSourceIp();
00114         this.discoveryClient = discoveryClient;
00115     }
00116 
00120     public void sendDiscoveryMessage(DiscMessage msg) {
00121         logger.log(BasicLevel.DEBUG, "DiscoveryClient : The message to send is "
00122                 + msg);
00123         //send the message on the multicast socket
00124         //after packing it into a datagram
00125         byte[] messageBytes = DiscMessage.objectToBytes(msg);
00126         if (messageBytes != null) {
00127                 try {
00128                         multicastSocket.send(new DatagramPacket(messageBytes,
00129                     messageBytes.length, destAddress, port));
00130                 } catch (IOException e1) {
00131                         logger.log(BasicLevel.ERROR,
00132                         "DiscoveryClient :  Error to send discovery message");
00133                         e1.printStackTrace();
00134                 }
00135         }
00136     }
00137 
00141     public void run() {
00142         // Create a DatagramPacket to receive messages
00143         DatagramPacket datagram = new DatagramPacket(new byte[RECEIVE_BUFFER_SIZE],
00144                 RECEIVE_BUFFER_SIZE);
00145         // Object reveived in a message
00146         Object objReceived = null;
00147         ObjectInputStream in = null;
00148 
00149         // Create a multicast socket
00150         try {
00151             multicastSocket = new MulticastSocket(port);
00152             multicastSocket.setTimeToLive(ttl);
00153             /*
00154              * Not necessary as the DiscoveryClient does not waits for multicast events
00155              */
00156             //multicastSocket.joinGroup(destAddress);
00157         } catch (IOException e) {
00158             // TODO Auto-generated catch block
00159             e.printStackTrace();
00160         }
00161 
00162         // Create a unicast socket to receive responses
00163         try {
00164             unicastSocket = new DatagramSocket(sourcePort);
00165         } catch (SocketException e2) {
00166             logger.log(BasicLevel.ERROR,
00167             "DiscoveryClient : Unable to create a Datagram socket");
00168             e2.printStackTrace();
00169             return;
00170         }
00171 
00172         // Prepare a discovery message
00173         DiscMessage msg = new DiscMessage(sourceIp, sourcePort);
00174         //  First send a discovery message
00175         sendDiscoveryMessage(msg);
00176         logger.log(BasicLevel.DEBUG, " DiscoveryClient: Sent Message is" + msg);
00177 
00178         //  wait for responses during a Timeout period on the unicast socket.
00179         long lastTime = timeout + System.currentTimeMillis();
00180         DiscEvent event = null;
00181         try {
00182             while ((notStopped) && System.currentTimeMillis() <= lastTime) {
00183                 unicastSocket.receive(datagram);
00184                 in = new ObjectInputStream(new ByteArrayInputStream(datagram.getData()));
00185                 objReceived = in.readObject();
00186                 if (objReceived != null) {
00187                     if (objReceived instanceof DiscEvent) {
00188                         event = (DiscEvent) objReceived;
00189                         handleReceivedMessage(event);
00190                     }
00191                 }
00192             }
00193         } catch (SocketException e) {
00194             logger.log(BasicLevel.ERROR, "DiscoveryClient : Socket closed" + e);
00195             notStopped = false;
00196         } catch (IOException e1) {
00197             e1.printStackTrace();
00198         } catch (ClassNotFoundException e) {
00199             e.printStackTrace();
00200         }
00201     }
00202 
00207     private void handleReceivedMessage(DiscEvent msg) {
00208         logger.log(BasicLevel.DEBUG, "discovery event received: " + msg);
00209         
00210         // create a JMX notification for all listeners of a discovery event
00211         Notification notif = new Notification(DISCOVERY_TYPE, discoveryClient,
00212                 sequenceNumber++, System.currentTimeMillis(), msg.getState());
00213         notif.setUserData(msg);
00214         discoveryClient.sendNotification(notif);
00215     }
00216 
00220     public void stop() {
00221         notStopped = false;
00222         Thread.interrupted();
00223     }
00224 }

Generated on Tue Feb 15 15:05:15 2005 for JOnAS by  doxygen 1.3.9.1