DiscoveryListener.java

00001 
00022 package org.objectweb.jonas.discovery;
00023 
00024 import java.io.ByteArrayInputStream;
00025 import java.io.IOException;
00026 import java.io.ObjectInputStream;
00027 import java.net.DatagramPacket;
00028 import java.net.InetAddress;
00029 import java.net.MulticastSocket;
00030 import java.net.SocketException;
00031 import java.net.UnknownHostException;
00032 
00033 import javax.management.Notification;
00034 import org.objectweb.jonas.common.Log;
00035 import org.objectweb.util.monolog.api.BasicLevel;
00036 import org.objectweb.util.monolog.api.Logger;
00037 
00042 public class DiscoveryListener implements Runnable {
00046     public static final String DISCOVERY_TYPE = "jonas.management.discovery";
00047 
00048     private static int RECEIVE_BUFFER_SIZE = 1024;
00052     private int             port;
00056     private InetAddress     destAddress;
00060     private Enroller        enroller;
00064     private int             ttl = 1; // why 1 ??
00068     private MulticastSocket multicastSocket;
00072     private boolean         notStopped          = true;
00076     private long            sequenceNumber      = 0;
00077     private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX);
00078 
00083     public DiscoveryListener(Enroller enroller) {
00084         this.port = enroller.getListeningPort();
00085         try {
00086             this.destAddress = InetAddress.getByName(enroller.getListeningIp());
00087             this.ttl = enroller.getTimeToLive();
00088         } catch (UnknownHostException e) {
00089             e.printStackTrace();
00090         }
00091         this.enroller = enroller;
00092     }
00097     private void join() {
00098         try {
00099             multicastSocket = new MulticastSocket(port);
00100             multicastSocket.setTimeToLive(ttl);
00101             multicastSocket.joinGroup(destAddress);
00102             logger.log(BasicLevel.DEBUG, "multicast ip address is "
00103                     + destAddress);
00104 
00105             logger.log(BasicLevel.DEBUG, "multicast port is " + port);
00106 
00107         } catch (IOException e) {
00108             logger.log(BasicLevel.ERROR, "io problem");
00109             // TODO Auto-generated catch block
00110             e.printStackTrace();
00111         }
00112     }
00113 
00122     private void handleDiscEvent(DiscEvent msg) {
00123         logger.log(BasicLevel.DEBUG, "discovery event received: " + msg);
00124 
00125         //  create a JMX notification for all listeners of a discovery event
00126         Notification notif = new Notification(DISCOVERY_TYPE, enroller,
00127                 sequenceNumber++, System.currentTimeMillis(), msg.getState());
00128         notif.setUserData(msg);
00129         enroller.sendNotification(notif);
00130     }
00131 
00132     public void run() {
00133         // Create a DatagramPacket to receive messages (DiscoverEvents)
00134         DatagramPacket datagram = new DatagramPacket(new byte[RECEIVE_BUFFER_SIZE],
00135                 RECEIVE_BUFFER_SIZE);
00136         // Object contained in a message
00137         Object objReceived = null;
00138         ObjectInputStream in = null;
00139 
00140         // Join the group in order to receive multicast messages
00141         join();
00142 
00143         try {
00144             while (notStopped) {
00145                 multicastSocket.receive(datagram);
00146                 in = new ObjectInputStream(new ByteArrayInputStream(datagram.getData()));
00147                 objReceived = in.readObject();
00148 
00149                 if (objReceived != null) {
00150                     if (objReceived instanceof DiscEvent) {
00151                         // Treat DiscEvents
00152                         DiscEvent msg = (DiscEvent) objReceived;
00153                         handleDiscEvent(msg);
00154                     }
00155                 }
00156             }
00157         }
00158         catch (SocketException e) {
00159             logger.log(BasicLevel.ERROR, "Enroller: Socket closed" + e);
00160             notStopped = false;
00161         } catch (IOException e1) {
00162             e1.printStackTrace();
00163         } catch (ClassNotFoundException e) {
00164             e.printStackTrace();
00165         }
00166     }
00167 
00168     public void stopListener() {
00169         notStopped = false;
00170     }
00171 
00172 }

Generated on Tue Feb 15 15:05:15 2005 for JOnAS by  doxygen 1.3.9.1