Authors : Yannick Braeuner, Benoit Pelletier
February 12, 2008
Version : 2.0
JORAM distributed configuration
What's Load-balancing for Queue
First scenario for Queue : distribution of the load at the server side
Second scenario for Queue : load-balancing at the client side
The JMS API provides a separate domain for each messaging approach, point-to-point or publish/subscribe. The point-to-point domain is built around the concept of queues, senders and receivers. The publish/subscribe domain is built around the concept of topic, publisher and subscriber. Additionally it provides a unified domain with common interfaces that enable the use of queue and topic. This domain defines the concept of producers and consumers. The classic sample uses a very simple configuration (centralized) made of one server hosting a queue and a topic. The server is administratively configured for accepting connection requests from the anonymous user.
JMS clustering aims to offer a solution for both the scalability and the high availability for the JMS accesses. This document gives an overview of the JORAM capabilities for clustering a JMS application in the J2EE context. The load-balancing and fail-over mechanisms are described and a user guide describing how to build such a configuration is provided. Further information is available in the JORAM documentation here.
The following information will be presented:
Getting started :
Two instances of JOnAS are configured ("J1" and "J2"). Each JOnAS instance has a dedicated collocated JORAM server: server "S1" for JOnAS "J1", "S2" for "J2". Those two servers are aware of each other.
Set a JORAM distributed configuration:
<?xml version="1.0"?> <config <domain name="D1"/> <property name="Transaction" value="fr.dyade.aaa.util.NTransaction"/> <server id="1" name="S1" hostname="localhost"> <network domain="D1" port="16301"/> <service class="org.objectweb.joram.mom.proxies.ConnectionManager" args="root root"/> <service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService" args="16010"/> </server> <server id="2" name="S2" hostname="localhost"> <network domain="D1" port="16302"/> <service class="org.objectweb.joram.mom.proxies.ConnectionManager" args="root root"/> <service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService" args="16020"/> </server> </config>
<config-property> <config-property-name>ServerId</config-property-name> <config-property-type>java.lang.Short</config-property-type> <config-property-value>1</config-property-value> </config-property>
<config-property> <config-property-name>ServerName</config-property-name> <config-property-type>java.lang.String</config-property-type> <config-property-value>s1</config-property-value> </config-property>
<config-property> <config-property-name>HostName</config-property-name> <config-property-type>java.lang.String</config-property-type> <config-property-value>localhost</config-property-value> </config-property>
<config-property> <config-property-name>ServerPort</config-property-name> <config-property-type>java.lang.Integer</config-property-type> <config-property-value>16010</config-property-value> </config-property>
<config-property> <config-property-name>PersistentPlatform</config-property-name> <config-property-type>java.lang.Boolean</config-property-type> <config-property-value>true</config-property-value> </config-property>
<User name="anonymous" password="anonymous" serverId="1"/>
<ConnectionFactory className="org.objectweb.joram.client.jms.tcp.TcpConnectionFactory"> <tcp host="localhost" port="16010"/> <jndi name="JCF"/> </ConnectionFactory> <ConnectionFactory className="org.objectweb.joram.client.jms.tcp.QueueTcpConnectionFactory"> <tcp host="localhost" port="16010"/> <jndi name="JQCF"/> </ConnectionFactory> <ConnectionFactory className="org.objectweb.joram.client.jms.tcp.TopicTcpConnectionFactory"> <tcp host="localhost" port="16010"/> <jndi name="JTCF"/> </ConnectionFactory>
A non hierarchical topic might also be distributed among many servers. Such a topic, to be considered as a single logical topic, is made of topic representatives, one per server. Such an architecture allows a publisher to publish messages on a representative of the topic. In the example, the publisher works with the representative on server 1. If a subscriber subscribed to any other representative (on server 2 in the example), it will get the messages produced by the publisher.
Load balancing of topics is very useful because it allows distributed topic subscriptions across the cluster.
The following scenario and general settings are proposed:
The cluster definition with the topics must be added in $JONAS_BASE/conf/joramAdmin.xml file. The connection factories and the anonymous user must be defined with the local server id and the local server port number according to the a3servers.xml content. Here only the cluster related elements are shown:
<Topic name="mdbTopic1" serverId="1"> <freeReader/> <freeWriter/> <jndi name="mdbTopic"/> </Topic> <Topic name="mdbTopic2" serverId="2"> <freeReader/> <freeWriter/> <jndi name="mdbTopic2"/> </Topic> <ClusterTopic> <ClusterElement name="mdbTopic1" location="s1"/> <ClusterElement name="mdbTopic2" location="s2"/> <jndi name="clusterMdbTopic"/> </ClusterTopic>
<Topic name="mdbTopic1" serverId="1"> <freeReader/> <freeWriter/> <jndi name="mdbTopic1"/> </Topic> <Topic name="mdbTopic2" serverId="2"> <freeReader/> <freeWriter/> <jndi name="mdbTopic"/> </Topic> <ClusterTopic> <ClusterElement name="mdbTopic1" location="s1"/> <ClusterElement name="mdbTopic2" location="s2"/> <jndi name="clusterMdbTopic"/> </ClusterTopic>
The joramAdmin.xml file has to be loaded when all cluster members are started since some remote cluster elements are defined. An alternative consists in splitting the configuration into two different files joramAdmin-local.xml and joramAdmin-cluster.xml, the first one containing only the local elements and the second one, both local and remote elements. At the JOnAS starting, a script could copy the right file to joramAdmin.xml according to the others members presence (joramAdmin-local.xml if it's the first member which starts and joramAdmin-cluster.xml if all the cluster members are started).
Run the sampleDeploy the application, for example, create a deploy.sh file:
#!/bin/ksh export JONAS_BASE=$PWD/jb1 cp $JONAS_ROOT/examples/output/ejbjars/newsamplemdb.jar $JONAS_BASE/ejbjars/ jonas admin -a newsamplemdb.jar -n node1 export JONAS_BASE=$PWD/jb2 cp $JONAS_ROOT/examples/output/ejbjars/newsamplemdb.jar $JONAS_BASE/ejbjars/ jonas admin -a newsamplemdb.jar -n node2
Finally launch the client
jclient newsamplemdb.MdbClient
Something similar to this should appear in the client console :
ClientContainer.info : Starting client... JMS client: tcf = TCF:localhost-16010 JMS client: topic = topic#1.1.1026 JMS client: tc = Cnx:#0.0.1026:5 MDBsample is Ok
In addition, the following should appear on each JOnAS instance console:
Message received: Message6 MdbBean onMessage Message received: Message7 MdbBean onMessage Message received: Message8
The fact that each message appears on the two different JOnAS servers consoles shows the messages broadcasting between the topic elements.
Globally, the load balancing in the context of queues may be meaningless in comparison of load balancing topic. It would be a bit like load balancing a stateful session bean instance (which just requires failover). But the JORAM distributed architecture enables :
A load balancing message queue may be needed for a high rate of messages. A clustered queue is a cluster of queues exchanging messages depending on their load. The example has a cluster of two queues. A heavy producer accesses its local queue and sends messages. It quickly becomes loaded and decides to forward messages to the other queue of its cluster which is not under heavy load.
For this case some parameters must be set:
For further information, see the JORAM documentation here.
The scenario for Queue is similar to the topic one. A client sent messages to a queue in S1. MDB gets messages from each local cluster queue representative. After having sent a burst of messages to the server S1, the load distribution should occur and message should be moved to S2.
The Queue definition in $JONAS_BASE/conf/joramAdmin.xml file is as following :
<Queue name="mdbQueue1" serverId="1" className="org.objectweb.joram.mom.dest.ClusterQueue"> <freeReader/> <freeWriter/> <property name="period" value="10000"/> <property name="producThreshold" value="50"/> <property name="consumThreshold" value="2"/> <property name="autoEvalThreshold" value="false"/> <property name="waitAfterClusterReq" value="1000"/> <jndi name="mdbQueue"/> </Queue> <Queue name="mdbQueue2" serverId="2" className="org.objectweb.joram.mom.dest.ClusterQueue"> <freeReader/> <freeWriter/> <property name="period" value="10000"/> <property name="producThreshold" value="50"/> <property name="consumThreshold" value="2"/> <property name="autoEvalThreshold" value="false"/> <property name="waitAfterClusterReq" value="1000"/> <jndi name="mdbQueue2"/> <ClusterQueue> <ClusterElement name="mdbQueue1" location="s1"/> <ClusterElement name="mdbQueue2" location="s2"/> <jndi name="mdbQueueCluster"/> </ClusterQueue>
<Queue name="mdbQueue1" serverId="1" className="org.objectweb.joram.mom.dest.ClusterQueue"> <freeReader/> <freeWriter/> <property name="period" value="10000"/> <property name="producThreshold" value="50"/> <property name="consumThreshold" value="2"/> <property name="autoEvalThreshold" value="false"/> <property name="waitAfterClusterReq" value="1000"/> <jndi name="mdbQueue1"/> </Queue> <Queue name="mdbQueue2" serverId="2" className="org.objectweb.joram.mom.dest.ClusterQueue"> <freeReader/> <freeWriter/> <property name="period" value="10000"/> <property name="producThreshold" value="50"/> <property name="consumThreshold" value="2"/> <property name="autoEvalThreshold" value="false"/> <property name="waitAfterClusterReq" value="1000"/> <jndi name="mdbQueue"/> <ClusterQueue> <ClusterElement name="mdbQueue1" location="s1"/> <ClusterElement name="mdbQueue2" location="s2"/> <jndi name="mdbQueueCluster"/> </ClusterQueue>Run the sample
The procedure is similar to the topic example described above, just use the newsamplemdb2 example rather than newsample one.
The load-balancing is done at the client side. A server is selected randomly among the cluster members at the first message sending or through the 'location' java property. And then, for a given client, all the messages are sent to the same server unless the java property resetting.
For setting the load-balancing at the client side, the client application must use a clustered connection factory that embeds the network connection parameters of the cluster members. This factory must be registered in the JORAM's distributed JNDI for ensuring that the client gets an up to date object. A complete description of the JORAM JNDI setting is given here and the main parameters are given below :
Setting of the JORAM's distributed jndiAt first, the a3servers.xml file must enhanced with the JORAM's jndi service as following :
<?xml version="1.0"?> <config> <domain name="D1"/> <property name="Transaction" value="fr.dyade.aaa.util.NTransaction"/> <server id="1" name="s1" hostname="localhost"> <network domain="D1" port="16301"/> <service class="org.objectweb.joram.mom.proxies.ConnectionManager" args="root root"/> <service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService" args="16010"/> <service class="fr.dyade.aaa.jndi2.distributed.DistributedJndiServer" args="16401 0 1"/> </server> <server id="2" name="s2" hostname="localhost"> <network domain="D1" port="16302"/> <service class="org.objectweb.joram.mom.proxies.ConnectionManager" args="root root"/> <service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService" args="16020"/> <service class="fr.dyade.aaa.jndi2.distributed.DistributedJndiServer" args="16402 1 0"/> </server> </config>
Only the JMS objects must be registered in the JORAM's jndi. The standard routing mechanism is used through a jndi.properties file put in each $JONAS_BASE/conf directory :
java.naming.factory.url.pkgs org.objectweb.jonas.naming:fr.dyade.aaa.jndi2 scn.naming.factory.host localhost scn.naming.factory.port 16402
The port number must be adapted according to the local server configuration (16401 for S1 and 16402 for S2). The 'scn' prefix is defined for identifying the objects to bind or to lookup in this registry.
Setting of the clustered connection factoriesThe clustered connection factories are defined in the $JONAS_BASE/conf/joramAdmin.xml file as following :
<ConnectionFactory name="JQCF1" className="org.objectweb.joram.client.jms.tcp.QueueTcpConnectionFactory"> <tcp host="localhost" port="16010"/> <jndi name="scn:comp/JQCF1"/> </ConnectionFactory> <ConnectionFactory name="JQCF2" className="org.objectweb.joram.client.jms.tcp.QueueTcpConnectionFactory"> <tcp host="localhost" port="16020"/> <jndi name="scn:comp/JQCF2"/> </ConnectionFactory> <ClusterCF> <ClusterElement name="JQCF1" location="s1"/> <ClusterElement name="JQCF2" location="s2"/> <jndi name="scn:comp/clusterJQCF"/> </ClusterCF>
The 'scn:comp/' prefix in the jndi name indicates that the object must be bound in the JORAM's jndi.
Cluster queue definitionThe cluster queue is defined in the $JONAS_BASE/conf/joramAdmin.xml file :
<Queue name="mdbQueue0" serverId="1" className="org.objectweb.joram.mom.dest.ClusterQueue"> <freeReader/> <freeWriter/> <jndi name="scn:comp/mdbQueue1"/> </Queue> <Queue name="mdbQueue1" serverId="2" className="org.objectweb.joram.mom.dest.ClusterQueue"> <freeReader/> <freeWriter/> <jndi name="scn:comp/mdbQueue2"/> </Queue> <ClusterQueue> <ClusterElement name="mdbQueue1" location="s1"/> <ClusterElement name="mdbQueue2" location="s2"/> <jndi name="scn:comp/mdbQueue"/> </ClusterQueue>
Note that the cluster queue definition is symetric accross the cluster members. The well known jndi name is set on the cluster object (and not in the local representative as for the topic cluster).
Note that same for the topic declaration, the joramAdmin.xml file has to be loaded when all cluster members are started since some remote cluster elements are defined. An alternative consists in splitting the configuration into two different files joramAdmin-local.xml and joramAdmin-cluster.xml, the first one containing only the local elements and the second one, both local and remote elements. At the JOnAS starting, a script could copy the right file to joramAdmin.xml according to the others members presence (joramAdmin-local.xml if it's the first member which starts and joramAdmin-cluster.xml if all the cluster members are started).
MDB configurationThe message driven bean must be configured with the queue registered in the JORAM jndi ('scn:/comp' selector). Edit the deployment descriptor file (ejb-jar.xml) :
<message-driven> <description>Describe here the message driven bean Mdb</description> <display-name>Message Driven Bean Mdb</display-name> <ejb-name>Mdb</ejb-name> <ejb-class>newsamplemdb2.MdbBean</ejb-class> <messaging-type>javax.jms.MessageListener</messaging-type> <transaction-type>Container</transaction-type> <message-destination-type>javax.jms.Queue</message-destination-type> <activation-config> <activation-config-property> <activation-config-property-name>destination</activation-config-property-name> <activation-config-property-value>scn:comp/mdbQueue</activation-config-property-value> </activation-config-property> <activation-config-property> <activation-config-property-name>destinationType</activation-config-property-name> <activation-config-property-value>javax.jms.Queue</activation-config-property-value> </activation-config-property> <activation-config-property> <activation-config-property-name>subscriptionDurability</activation-config-property-name> <activation-config-property-value>NonDurable</activation-config-property-value> </activation-config-property> </activation-config> </message-driven>Client code
The client must lookup the clustered objects in the JORAM's jndi by using the 'scn:/comp' selector.
static String queueName = "scn:comp/mdbQueue"; static String conFactName = "scn:comp/clusterJQCF"; Context ictx = new InitialContext(); ConnectionFactory qcf = (ConnectionFactory) ictx.lookup(conFactName ); Queue queue = (Queue) ictx.lookup(queueName);
The connection creation, session creation and producer are quite classic:
Connection qc = qcf.createConnection(); Session session = qc.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer qp = session.createProducer(queue);
A server is chosen at the first message sending. A switch may be forced through the resetting of the 'location' java property. Below a new server election is requested for each odd iteration.
TextMessage message; for (int i=0;i<10;i++){ message = session.createTextMessage(); message.setText("Msg "+i); qp.send(message); System.out.println("location=" + System.getProperty("location")); if (i%2 == 0) { System.setProperty("location", ""); } }Run the sample
The procedure is similar to the topic and queue ones described above, just adapt the newsamplemdb2 example with the configuration and code given previously.
An HA server is actually a group of servers, one of which is the master server that coordinates the other slave servers. An external server that communicates with the HA server is actually connected to the master server.
Each replicated JORAM server executes the same code as a standard server except for the communication with the clients.
In the example, the collocated clients use a client module (newsamplemdb). If the server replica is the master, then the connection is active enabling the client to use the HA JORAM server. If the replica is a slave, then the connection opening is blocked until the replica becomes the master.
Several files must be changed to create a JORAM HA configuration:
A clustered server is defined by the element "cluster". A cluster owns an identifier and a name defined by the attributes "id" and "name" (exactly like a standard server). Two properties must be defined:
<?xml version="1.0"?/> <config> <domain name="D1"/> <property name="Transaction" value="fr.dyade.aaa.util.NullTransaction"/> <cluster id="0" name="s0"> <property name="Engine" value="fr.dyade.aaa.agent.HAEngine" /> <property name="nbClusterExpected" value="1" />
For each replica, an element "server" must be added. The attribute "id" defines the identifier of the replica inside the cluster. The attribute "hostname" gives the address of the host where the replica is running. The network is used by the replica to communicate with external agent servers, i.e., servers located outside of the cluster and not replicas.
This is the entire configuration for the a3servers.xml file of the first JOnAS instance jb1:
<?xml version="1.0"?> <config< <domain name="D1"/> <property name="Transaction" value="fr.dyade.aaa.util.NullTransaction"/> <cluster id="0" name="s0"> <property name="Engine" value="fr.dyade.aaa.agent.HAEngine" /> <property name="nbClusterExpected" value="1" /> <server id="1" hostname="localhost"> <network domain="D1" port="16300"/> <service class="org.objectweb.joram.mom.proxies.ConnectionManager" args="root root"/> <service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService" args="16010"/> <service class="org.objectweb.joram.client.jms.ha.local.HALocalConnection"/> </server> <server id="2" hostname="localhost"> <network domain="D1" port="16301"/> <service class="org.objectweb.joram.mom.proxies.ConnectionManager" args="root root"/> <service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService" args="16020"/> <service class="org.objectweb.joram.client.jms.ha.local.HALocalConnection"/> </server> </cluster> </config>The cluster id = 0 and the name S0. It is exactly the same file for the second instance of JOnAS.
<?xml version="1.0"?> <JoramAdmin> <AdminModule> <collocatedConnect name="root" password="root"/> </AdminModule> <ConnectionFactory className="org.objectweb.joram.client.jms.ha.tcp.HATcpConnectionFactory"> <hatcp url="hajoram://localhost:16010,localhost:16020" reliableClass="org.objectweb.joram.client.jms.tcp.ReliableTcpClient"/> <jndi name="JCF"/> </ConnectionFactory> <ConnectionFactory className="org.objectweb.joram.client.jms.ha.tcp.QueueHATcpConnectionFactory"> <hatcp url="hajoram://localhost:16010,localhost:16020" reliableClass="org.objectweb.joram.client.jms.tcp.ReliableTcpClient"/> <jndi name="JQCF"/> </ConnectionFactory> <ConnectionFactory className="org.objectweb.joram.client.jms.ha.tcp.TopicHATcpConnectionFactory"> <hatcp url="hajoram://localhost:16010,localhost:16020" reliableClass="org.objectweb.joram.client.jms.tcp.ReliableTcpClient"/> <jndi name="JTCF"/> </ConnectionFactory>
Each connection factory has its own specification. One is in case of the Queue, one for Topic, and one for no define arguments. Each time the hatcp url must be entered, the url of the two instances. In the example, it is localhost:16010 and localhost:16020. It allows the client to change the instance when the first one is dead.
After this definition the user, the queue and topic can be created.
First, in order to recognize the cluster, a new parameter must be declared in these files.
<config-property> <config-property-name>ClusterId</config-property-name> <config-property-type>java.lang.Short</config-property-type> <config-property-value>0</config-property-value> </config-property>
Here the name is not really appropriate but in order to keep some coherence this name was used. In fact it represents a replica so it would have been better to call it replicaId.
Consequently, for the first JOnAS instance, copy the code just above. For the second instance, change the value to 1 (in order to signify this is another replica).
First launch the two JOnAS bases. Create a runHa.sh file in which the following code will be added:
export JONAS_BASE=$PWD/jb1 export CATALINA_BASE=$JONAS_BASE rm -f $JONAS_BASE/logs/* jonas start -win -n node1 -Ddomain.name=HA
Then do the same for the second JOnAS base. After that launch the script.
One of the two JOnAS bases (the one which is the slowest) will be in a waiting state when reading the joramAdmin.xml
JoramAdapter.start : - Collocated JORAM server has successfully started. JoramAdapter.start : - Reading the provided admin file: joramAdmin.xml
whereas the other one is launched successfully.
Then launch (through a script or not) the newsamplemdb example:
jclient -cp /JONAS_BASE/jb1/ejbjars/newsamplemdb.jar:/JONAS_ROOT/examples/classes -carolFile clientConfig/carol.properties newsamplemdb.MdbClient
Messages are sent on the JOnAS base which was launched before. Launch it again and kill the current JOnAS. The second JOnAS will automatically wake up and take care of the other messages.
This is a proposal for building an MDB clustering based application.
This is like contested Queues. i.e., there is more than one receiver on different machines receiving from the queue. This load balances the work done by the queue receiver, not the queue itself.
The HA mechanism can be mixed with the load balancing policy based on clustered destinations. The load is balanced between several HA servers. Each element of a clustered destination is deployed on a separate HA server.
Here is the supposed configuration (supposed because it has not been verifed).
The configuration must now be tested, as follows: