00001
00022 package org.objectweb.jonas.discovery;
00023
00024 import java.io.ByteArrayInputStream;
00025 import java.io.IOException;
00026 import java.io.ObjectInputStream;
00027 import java.net.DatagramPacket;
00028 import java.net.DatagramSocket;
00029 import java.net.InetAddress;
00030 import java.net.MulticastSocket;
00031 import java.net.SocketException;
00032 import java.net.UnknownHostException;
00033
00034 import javax.management.Notification;
00035
00036 import org.objectweb.jonas.common.Log;
00037 import org.objectweb.util.monolog.api.BasicLevel;
00038 import org.objectweb.util.monolog.api.Logger;
00039
00044 public class DiscoveryClientListener implements Runnable {
00048 public static final String DISCOVERY_TYPE = "jonas.management.discovery";
00049
00050 private static int RECEIVE_BUFFER_SIZE = 1024;
00054 private DiscoveryClient discoveryClient;
00058 private MulticastSocket multicastSocket;
00062 private int port;
00066 private InetAddress destAddress;
00070 private int ttl;
00074 private DatagramSocket unicastSocket;
00078 private String sourceIp;
00082 private int sourcePort;
00086 private boolean notStopped = true;
00090 private long timeout = 1000;
00094 private long sequenceNumber = 0;
00098 private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX);
00103 public DiscoveryClientListener(DiscoveryClient discoveryClient) {
00104 this.port = discoveryClient.getListeningPort();
00105 try {
00106 this.destAddress = InetAddress.getByName(discoveryClient.getListeningIp());
00107 this.ttl = discoveryClient.getTimeToLive();
00108 } catch (UnknownHostException e) {
00109 e.printStackTrace();
00110 }
00111 this.timeout = discoveryClient.getTimeout();
00112 this.sourcePort = discoveryClient.getSourcePort();
00113 this.sourceIp = discoveryClient.getSourceIp();
00114 this.discoveryClient = discoveryClient;
00115 }
00116
00120 public void sendDiscoveryMessage(DiscMessage msg) {
00121 logger.log(BasicLevel.DEBUG, "DiscoveryClient : The message to send is "
00122 + msg);
00123
00124
00125 byte[] messageBytes = DiscMessage.objectToBytes(msg);
00126 if (messageBytes != null) {
00127 try {
00128 multicastSocket.send(new DatagramPacket(messageBytes,
00129 messageBytes.length, destAddress, port));
00130 } catch (IOException e1) {
00131 logger.log(BasicLevel.ERROR,
00132 "DiscoveryClient : Error to send discovery message");
00133 e1.printStackTrace();
00134 }
00135 }
00136 }
00137
00141 public void run() {
00142
00143 DatagramPacket datagram = new DatagramPacket(new byte[RECEIVE_BUFFER_SIZE],
00144 RECEIVE_BUFFER_SIZE);
00145
00146 Object objReceived = null;
00147 ObjectInputStream in = null;
00148
00149
00150 try {
00151 multicastSocket = new MulticastSocket(port);
00152 multicastSocket.setTimeToLive(ttl);
00153
00154
00155
00156
00157 } catch (IOException e) {
00158
00159 e.printStackTrace();
00160 }
00161
00162
00163 try {
00164 unicastSocket = new DatagramSocket(sourcePort);
00165 } catch (SocketException e2) {
00166 logger.log(BasicLevel.ERROR,
00167 "DiscoveryClient : Unable to create a Datagram socket");
00168 e2.printStackTrace();
00169 return;
00170 }
00171
00172
00173 DiscMessage msg = new DiscMessage(sourceIp, sourcePort);
00174
00175 sendDiscoveryMessage(msg);
00176 logger.log(BasicLevel.DEBUG, " DiscoveryClient: Sent Message is" + msg);
00177
00178
00179 long lastTime = timeout + System.currentTimeMillis();
00180 DiscEvent event = null;
00181 try {
00182 while ((notStopped) && System.currentTimeMillis() <= lastTime) {
00183 unicastSocket.receive(datagram);
00184 in = new ObjectInputStream(new ByteArrayInputStream(datagram.getData()));
00185 objReceived = in.readObject();
00186 if (objReceived != null) {
00187 if (objReceived instanceof DiscEvent) {
00188 event = (DiscEvent) objReceived;
00189 handleReceivedMessage(event);
00190 }
00191 }
00192 }
00193 } catch (SocketException e) {
00194 logger.log(BasicLevel.ERROR, "DiscoveryClient : Socket closed" + e);
00195 notStopped = false;
00196 } catch (IOException e1) {
00197 e1.printStackTrace();
00198 } catch (ClassNotFoundException e) {
00199 e.printStackTrace();
00200 }
00201 }
00202
00207 private void handleReceivedMessage(DiscEvent msg) {
00208 logger.log(BasicLevel.DEBUG, "discovery event received: " + msg);
00209
00210
00211 Notification notif = new Notification(DISCOVERY_TYPE, discoveryClient,
00212 sequenceNumber++, System.currentTimeMillis(), msg.getState());
00213 notif.setUserData(msg);
00214 discoveryClient.sendNotification(notif);
00215 }
00216
00220 public void stop() {
00221 notStopped = false;
00222 Thread.interrupted();
00223 }
00224 }