00001
00022 package org.objectweb.jonas.discovery;
00023
00024 import java.io.ByteArrayInputStream;
00025 import java.io.IOException;
00026 import java.io.ObjectInputStream;
00027 import java.net.DatagramPacket;
00028 import java.net.InetAddress;
00029 import java.net.MulticastSocket;
00030 import java.net.SocketException;
00031 import java.net.UnknownHostException;
00032
00033 import javax.management.Notification;
00034 import org.objectweb.jonas.common.Log;
00035 import org.objectweb.util.monolog.api.BasicLevel;
00036 import org.objectweb.util.monolog.api.Logger;
00037
00042 public class DiscoveryListener implements Runnable {
00046 public static final String DISCOVERY_TYPE = "jonas.management.discovery";
00047
00048 private static int RECEIVE_BUFFER_SIZE = 1024;
00052 private int port;
00056 private InetAddress destAddress;
00060 private Enroller enroller;
00064 private int ttl = 1;
00068 private MulticastSocket multicastSocket;
00072 private boolean notStopped = true;
00076 private long sequenceNumber = 0;
00077 private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX);
00078
00083 public DiscoveryListener(Enroller enroller) {
00084 this.port = enroller.getListeningPort();
00085 try {
00086 this.destAddress = InetAddress.getByName(enroller.getListeningIp());
00087 this.ttl = enroller.getTimeToLive();
00088 } catch (UnknownHostException e) {
00089 e.printStackTrace();
00090 }
00091 this.enroller = enroller;
00092 }
00097 private void join() {
00098 try {
00099 multicastSocket = new MulticastSocket(port);
00100 multicastSocket.setTimeToLive(ttl);
00101 multicastSocket.joinGroup(destAddress);
00102 logger.log(BasicLevel.DEBUG, "multicast ip address is "
00103 + destAddress);
00104
00105 logger.log(BasicLevel.DEBUG, "multicast port is " + port);
00106
00107 } catch (IOException e) {
00108 logger.log(BasicLevel.ERROR, "io problem");
00109
00110 e.printStackTrace();
00111 }
00112 }
00113
00122 private void handleDiscEvent(DiscEvent msg) {
00123 logger.log(BasicLevel.DEBUG, "discovery event received: " + msg);
00124
00125
00126 Notification notif = new Notification(DISCOVERY_TYPE, enroller,
00127 sequenceNumber++, System.currentTimeMillis(), msg.getState());
00128 notif.setUserData(msg);
00129 enroller.sendNotification(notif);
00130 }
00131
00132 public void run() {
00133
00134 DatagramPacket datagram = new DatagramPacket(new byte[RECEIVE_BUFFER_SIZE],
00135 RECEIVE_BUFFER_SIZE);
00136
00137 Object objReceived = null;
00138 ObjectInputStream in = null;
00139
00140
00141 join();
00142
00143 try {
00144 while (notStopped) {
00145 multicastSocket.receive(datagram);
00146 in = new ObjectInputStream(new ByteArrayInputStream(datagram.getData()));
00147 objReceived = in.readObject();
00148
00149 if (objReceived != null) {
00150 if (objReceived instanceof DiscEvent) {
00151
00152 DiscEvent msg = (DiscEvent) objReceived;
00153 handleDiscEvent(msg);
00154 }
00155 }
00156 }
00157 }
00158 catch (SocketException e) {
00159 logger.log(BasicLevel.ERROR, "Enroller: Socket closed" + e);
00160 notStopped = false;
00161 } catch (IOException e1) {
00162 e1.printStackTrace();
00163 } catch (ClassNotFoundException e) {
00164 e.printStackTrace();
00165 }
00166 }
00167
00168 public void stopListener() {
00169 notStopped = false;
00170 }
00171
00172 }