00001
00022 package org.objectweb.jonas.discovery;
00023
00024 import java.io.ByteArrayInputStream;
00025 import java.io.IOException;
00026 import java.io.ObjectInputStream;
00027 import java.net.DatagramPacket;
00028 import java.net.DatagramSocket;
00029 import java.net.InetAddress;
00030 import java.net.MulticastSocket;
00031 import java.net.SocketException;
00032 import java.net.UnknownHostException;
00033
00034 import javax.management.MBeanServer;
00035 import javax.management.remote.JMXServiceURL;
00036
00037 import org.objectweb.jonas.common.Log;
00038 import org.objectweb.util.monolog.api.BasicLevel;
00039 import org.objectweb.util.monolog.api.Logger;
00040
00045 public class DiscoveryComm implements Runnable {
00046 private int RECEIVE_BUFFER_SIZE = 1024;
00050 private MulticastSocket multicastSocket;
00054 private DatagramSocket unicastSocket;
00058 private DiscoveryManager dm;
00062 private int port;
00066 private InetAddress destAddress;
00070 private boolean notStopped = true;
00074 private int ttl = 1;
00075
00076 private String jonasName = null;
00077 private String domainName = null;
00078 private String[] urls = null;
00079
00083 private static Logger logger = Log.getLogger(Log.JONAS_DISCOVERY_PREFIX);
00088 public DiscoveryComm(DiscoveryManager dm) {
00089 this.port = dm.getListeningPort();
00090 try {
00091 this.destAddress = InetAddress.getByName(dm.getListeningIp());
00092 this.ttl = dm.getTimeToLive();
00093 } catch (UnknownHostException e) {
00094 e.printStackTrace();
00095 }
00096 this.dm = dm;
00097 }
00098
00104 private void join() {
00105 try {
00106 multicastSocket = new MulticastSocket(port);
00107 multicastSocket.setTimeToLive(ttl);
00108 multicastSocket.joinGroup(destAddress);
00109 logger.log(BasicLevel.DEBUG, "multicast ip address is "
00110 + destAddress);
00111
00112 logger.log(BasicLevel.DEBUG, "multicast port is " + port);
00113
00114 } catch (IOException e) {
00115 logger.log(BasicLevel.ERROR, "io problem");
00116
00117 e.printStackTrace();
00118 }
00119 }
00120
00124 public void sendNotif(DiscEvent msg) {
00125 try {
00126
00127
00128 logger.log(BasicLevel.DEBUG, msg);
00129 byte[] messageBytes = DiscEvent.objectToBytes(msg);
00130 multicastSocket.send(new DatagramPacket(messageBytes,
00131 messageBytes.length, destAddress, port));
00132 } catch (IOException e1) {
00133 logger.log(BasicLevel.ERROR, "DiscoveryComm: Error to send notification");
00134 e1.printStackTrace();
00135 }
00136
00137 }
00145 private void sendResponse(DiscEvent msg, InetAddress destAddress, int port) {
00146 logger.log(BasicLevel.DEBUG, "DiscoveryComm : The message to send is "
00147 + msg);
00148 byte[] messageBytes = DiscEvent.objectToBytes(msg);
00149 if (messageBytes != null) {
00150 try {
00151
00152 unicastSocket.send(new DatagramPacket(messageBytes,
00153 messageBytes.length, destAddress, port));
00154 } catch (IOException e) {
00155 logger.log(BasicLevel.ERROR, "DiscoveryComm: Error to send response to discovery message");
00156 e.printStackTrace();
00157 }
00158 }
00159 }
00160
00171 public DiscEvent createNotifMessage(String state) throws Exception {
00172 String theHostName;
00173 MBeanServer mbeanServer;
00174 try {
00175 theHostName = InetAddress.getLocalHost().getHostName();
00176 } catch (UnknownHostException e) {
00177 e.printStackTrace();
00178 return null;
00179 }
00180
00181 if (!state.equals(DiscEvent.RUNNING)) {
00182 urls = null;
00183 }
00184
00185
00186 DiscEvent resp = new DiscEvent(theHostName, port, jonasName, domainName, urls);
00187 resp.setState(state);
00188 return resp;
00189 }
00190
00195 public void run() {
00196
00197 DatagramPacket datagram = new DatagramPacket(new byte[RECEIVE_BUFFER_SIZE],
00198 RECEIVE_BUFFER_SIZE);
00199
00200 Object objReceived = null;
00201 ObjectInputStream in = null;
00202
00203
00204 join();
00205
00206 DiscEvent msg = null;
00207 try {
00208 msg = createNotifMessage(DiscEvent.RUNNING);
00209 } catch (Exception e) {
00210 logger.log(BasicLevel.ERROR,
00211 "DiscoveryComm: Unable to create a notification message");
00212 e.printStackTrace();
00213 }
00214 if (msg != null) {
00215
00216 sendNotif(msg);
00217 }
00218
00219
00220 try {
00221 unicastSocket = new DatagramSocket();
00222 } catch (SocketException e3) {
00223 e3.printStackTrace();
00224 return;
00225 }
00226 try {
00227 while (notStopped) {
00228 multicastSocket.receive(datagram);
00229 in = new ObjectInputStream(new ByteArrayInputStream(datagram.getData()));
00230 objReceived = in.readObject();
00231
00232 if (objReceived != null) {
00233
00234 if (objReceived instanceof DiscMessage) {
00235 if (objReceived instanceof DiscEvent) {
00236 logger.log(BasicLevel.DEBUG,
00237 "This discovery event is ignored " + objReceived);
00238 } else {
00239 DiscMessage request = (DiscMessage) objReceived;
00240 logger.log(BasicLevel.DEBUG,
00241 "A dicovery message is received "
00242 + objReceived);
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257 if (msg != null) {
00258 InetAddress destAddress = InetAddress.getByName(request.getSourceAddress());
00259 int destPort = request.getSourcePort();
00260 sendResponse(msg, destAddress, destPort);
00261 }
00262 }
00263 }
00264 }
00265 }
00266 } catch (SocketException e) {
00267 logger.log(BasicLevel.ERROR, "Socket closed: " + e);
00268 notStopped = false;
00269 } catch (IOException e1) {
00270 e1.printStackTrace();
00271 } catch (ClassNotFoundException e) {
00272 e.printStackTrace();
00273 }
00274 }
00275
00280 public void stop() {
00281
00282 DiscEvent msg = null;
00283 try {
00284 msg = createNotifMessage(DiscEvent.STOPPING);
00285 } catch (Exception e) {
00286 e.printStackTrace();
00287 }
00288 if (msg != null) {
00289 sendNotif(msg);
00290 }
00291 Thread.interrupted();
00292 }
00293
00297 protected void setJonasName(String jonasName) {
00298 this.jonasName = jonasName;
00299 }
00303 protected void setDomainName(String domainName) {
00304 this.domainName = domainName;
00305 }
00309 protected void setUrls(String[] urls) {
00310 this.urls = urls;
00311 }
00312 }