View Javadoc

1   /***
2    *
3    * Copyright 2003-2005 Core Developers Network Ltd.
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.codehaus.wadi.jgroups;
18  
19  import java.io.Serializable;
20  import java.util.Collections;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.TreeSet;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.SynchronousQueue;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.codehaus.wadi.group.Address;
30  import org.codehaus.wadi.group.ClusterException;
31  import org.codehaus.wadi.group.EndPoint;
32  import org.codehaus.wadi.group.Envelope;
33  import org.codehaus.wadi.group.LocalPeer;
34  import org.codehaus.wadi.group.MessageExchangeException;
35  import org.codehaus.wadi.group.Peer;
36  import org.codehaus.wadi.group.command.BootRemotePeer;
37  import org.codehaus.wadi.group.command.ClusterCommand;
38  import org.codehaus.wadi.group.impl.AbstractCluster;
39  import org.jgroups.Channel;
40  import org.jgroups.ChannelException;
41  import org.jgroups.JChannel;
42  import org.jgroups.MembershipListener;
43  import org.jgroups.MergeView;
44  import org.jgroups.MessageListener;
45  import org.jgroups.View;
46  
47  
48  /***
49   * A WADI Cluster mapped onto a JGroups Channel
50   *
51   * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
52   * @version $Revision: 2421 $
53   */
54  public class JGroupsCluster extends AbstractCluster implements MembershipListener, MessageListener {
55      protected final boolean _excludeSelf = true;
56      // should probably be initialised in start() and dumped in stop()
57      protected final CountDownLatch _viewLatch = new CountDownLatch(1);
58      protected final ViewThread _viewThread = new ViewThread("WADI/JGroups View Thread");
59      protected final Channel _channel;
60      protected final JGroupsDispatcher _dispatcher;
61  
62      // initialised in init()
63      protected org.jgroups.Address _localJGAddress;
64  
65      public JGroupsCluster(String clusterName, String localPeerName, String config, JGroupsDispatcher dispatcher, EndPoint endPoint) throws ChannelException {
66          super(clusterName, localPeerName, dispatcher);
67          _dispatcher = dispatcher;
68          clusterPeer = new JGroupsClusterPeer(this, clusterName);
69          localPeer = new JGroupsLocalPeer(this, localPeerName, endPoint);
70          _channel = new JChannel(config);
71          clusterThreadLocal.set(this);
72      }
73  
74      public String toString() {
75          return "JGroupsCluster [" + localPeerName + "/" + clusterName + "]";
76      }
77  
78      public LocalPeer getLocalPeer() {
79          return localPeer;
80      }
81  
82      public Peer getPeerFromAddress(Address address) {
83          return (JGroupsPeer) address;
84      }
85  
86      public synchronized void start() throws ClusterException {
87          try {
88              // _channel.setOpt(Channel.LOCAL, Boolean.FALSE); // exclude
89              // ourselves from our own broadcasts... - BUT also from Unicasts :-(
90              _channel.connect(clusterName);
91              log.info(localPeerName + " - " + "connected to channel");
92              _localJGAddress = _channel.getLocalAddress();
93              ((JGroupsLocalPeer) localPeer).init(_localJGAddress);
94              backendKeyToPeer.put(_localJGAddress, localPeer);
95          } catch (Exception e) {
96              log.warn("unexpected JGroups problem", e);
97              throw new ClusterException(e);
98          }
99  
100         // start accepting new views...
101         _viewThread.start();
102         // wait for the first view to be accepted before continuing...
103         if (log.isTraceEnabled())
104             log.trace(localPeerName + " - " + "acquiring viewLatch...");
105         try {
106             _viewLatch.await();
107         } catch (InterruptedException e) {
108             log.warn("unexpected interruption", e);
109         }
110         if (log.isTraceEnabled())
111             log.trace(localPeerName + " - " + "...acquired viewLatch");
112     }
113 
114     public synchronized void stop() throws ClusterException {
115         _viewThread.stop();
116         _channel.disconnect();
117         _channel.close();
118         log.info(localPeerName + " - " + "disconnected from channel");
119         // TODO
120         // hmmm... sometimes we get messages hitting the Dispatcher even after
121         // we have closed the channel - we need to wait for them somehow...
122         // _clusterTopic=null;
123         // _localDestination=null;
124         // _localAddress=null;
125     }
126 
127     // JGroups MembershipListener API
128 
129     protected final SynchronousQueue<Set> _viewQueue = new SynchronousQueue<Set>();
130 
131     public void viewAccepted(View newView) {
132         // this is meant to happen if a network split is healed and two
133         // clusters try to reconcile their separate states into one -
134         // I have a plan...
135         if (newView instanceof MergeView) {
136             log.warn("NYI - merging: view is " + newView);
137         }
138 
139         Set members = new TreeSet(newView.getMembers());
140         try {
141             if (log.isTraceEnabled()) {
142                 log.trace(localPeerName + " - " + "handling JGroups viewAccepted(" + newView + ")...");
143             }
144             _viewQueue.put(members);
145         } catch (InterruptedException e) {
146             log.warn("unexpected interruption", e);
147         }
148     }
149 
150     public class ViewThread implements Runnable {
151         protected boolean _running;
152         protected Thread _thread;
153 
154         ViewThread(String name) {
155             _thread = new Thread(this, name);
156         }
157 
158         public void start() {
159             _running = true;
160             _thread.start();
161         }
162 
163         public void stop() {
164             _running = false;
165         }
166 
167         public void run() {
168             while (_running) {
169                 try {
170                     Set members = (Set) _viewQueue.poll(2000, TimeUnit.MILLISECONDS);
171                     if (members != null) {
172                         nextView(members);
173                     }
174                 } catch (InterruptedException e) {
175                     log.warn("unexpected interruption", e);
176                 }
177             }
178         }
179     }
180 
181     public void nextView(Set newMembers) {
182         Set joiners = new TreeSet();
183         Set leavers = new TreeSet();
184 
185         newMembers.remove(_localJGAddress);
186         synchronized (addressToPeer) {
187             // leavers :
188             for (Iterator i = addressToPeer.entrySet().iterator(); i.hasNext();) {
189                 Map.Entry entry = (Map.Entry) i.next();
190                 JGroupsPeer address = (JGroupsPeer) entry.getKey();
191                 Peer peer = (Peer) entry.getValue();
192                 log.trace("checking (leaver?): " + peer);
193                 if (!newMembers.contains(address.getJGAddress())) {
194                     leavers.add(peer);
195                     i.remove(); // remove from _AddressToPeer
196                     // remove peer
197                     synchronized (backendKeyToPeer) {
198                         backendKeyToPeer.remove(address.getJGAddress());
199                     }
200                 }
201             }
202             // joiners :
203             for (Iterator i = newMembers.iterator(); i.hasNext();) {
204                 org.jgroups.Address jgaddress = (org.jgroups.Address) i.next();
205 
206                 JGroupsPeer remotePeer = new JGroupsPeer(this, "UNDEFINED", null);
207                 remotePeer.init(jgaddress);
208                 BootRemotePeer command = new BootRemotePeer(this, remotePeer);
209                 remotePeer = (JGroupsPeer) command.getSerializedPeer();
210                 if (null == remotePeer) {
211                     return;
212                 }
213                 
214                 JGroupsPeer peer = (JGroupsPeer) getPeer(remotePeer);
215                 log.trace("checking (joiner?): " + peer);
216                 if (!addressToPeer.containsKey(peer)) {
217                     addressToPeer.put(peer, peer);
218                     joiners.add(peer);
219                 }
220             }
221         }
222 
223         // ensure that all joiners are entered into our model...
224         joiners = Collections.unmodifiableSet(joiners);
225         leavers = Collections.unmodifiableSet(leavers);
226 
227         // notify listeners of changed membership
228         notifyMembershipChanged(joiners, leavers);
229 
230         // release latch so that start() can complete
231         if (log.isTraceEnabled()) {
232             log.trace(localPeerName + " - " + "releasing viewLatch (viewAccepted)...");
233         }
234         _viewLatch.countDown();
235         if (log.isTraceEnabled()) {
236             log.trace(localPeerName + " - " + "...released viewLatch (viewAccepted)");
237         }
238     }
239 
240     // JGroups 'MembershipListener' API
241     public void suspect(org.jgroups.Address suspected_mbr) {
242         if (log.isTraceEnabled())
243             log.trace(localPeerName + " - " + "handling suspect(" + suspected_mbr + ")...");
244         if (log.isWarnEnabled())
245             log.warn("cluster suspects member may have been lost: " + suspected_mbr);
246         if (log.isTraceEnabled())
247             log.trace(localPeerName + " - " + "...suspect() handled");
248     }
249 
250     // Block sending and receiving of messages until viewAccepted() is called
251     public void block() {
252         if (log.isTraceEnabled())
253             log.trace(localPeerName + " - " + "handling block()...");
254         // NYI
255         if (log.isTraceEnabled())
256             log.trace(localPeerName + " - " + "... block() handled");
257 
258     }
259 
260     // JGroups 'MessageListener' API
261     public void receive(org.jgroups.Message msg) {
262         if (log.isTraceEnabled()) {
263             log.trace(localPeerName + " - message arrived: " + msg);
264         }
265         org.jgroups.Address src = msg.getSrc();
266         org.jgroups.Address dest = msg.getDest();
267         if (_excludeSelf && (dest == null || dest.isMulticastAddress()) && src == _localJGAddress) {
268             if (log.isTraceEnabled()) {
269                 log.trace(localPeerName + " - " + "ignoring message from self: " + msg);
270             }
271         } else {
272             // setup a ThreadLocal to be read during deserialisation...
273             clusterThreadLocal.set(this);
274             Object o = msg.getObject();
275             JGroupsEnvelope wadiMsg = (JGroupsEnvelope) o;
276             Serializable payload = wadiMsg.getPayload();
277             if (payload instanceof ClusterCommand) {
278                 ((ClusterCommand) payload).execute(wadiMsg, this);
279                 return;
280             }
281 
282             wadiMsg.setCluster(this);
283             wadiMsg.setAddress((JGroupsPeer) getPeer(wadiMsg.getAddress()));
284             wadiMsg.setReplyTo((JGroupsPeer) getPeer(wadiMsg.getReplyTo()));
285             _dispatcher.onEnvelope(wadiMsg);
286         }
287     }
288 
289     public byte[] getState() {
290         throw new UnsupportedOperationException("we do not use JGroups' state exchange protocol");
291     }
292 
293     public void setState(byte[] state) {
294         throw new UnsupportedOperationException("we do not use JGroups' state exchange protocol");
295     }
296 
297     // we should be able to pull this up into AbstractCluster...
298     public Address getAddress() {
299         return (JGroupsPeer) clusterPeer;
300     }
301 
302     // 'JGroupsCluster' API
303     public void send(Address target, Envelope message) throws MessageExchangeException {
304         JGroupsEnvelope msg = (JGroupsEnvelope) message;
305         JGroupsPeer tgt = (JGroupsPeer) target;
306         try {
307             msg.setCluster(this);
308             msg.setAddress(target);
309             msg.setCluster(null);
310             _channel.send(tgt.getJGAddress(), _localJGAddress, msg);
311         } catch (Exception e) {
312             log.warn("unexpected JGroups problem", e);
313             throw new MessageExchangeException(e);
314         }
315     }
316 
317     public Channel getChannel() {
318         return _channel;
319     }
320 
321     protected Object extractKeyFromPeerSerialization(Object backend) {
322         JGroupsPeer remotePeer = (JGroupsPeer) backend;
323         return remotePeer.getJGAddress();
324     }
325     
326     protected Peer createPeerFromPeerSerialization(Object backend) {
327         JGroupsPeer remotePeer = (JGroupsPeer) backend;
328         org.jgroups.Address jgAddress = remotePeer.getJGAddress();
329         JGroupsPeer peer;
330         if (jgAddress.isMulticastAddress()) {
331             peer = (JGroupsPeer) clusterPeer;
332             // I can't find a way to initialise the clusterAddress from the client side
333             // so I wait to receive a message, sent to the cluster, then use the address from that - clumsy
334             if (peer.getJGAddress() == null) {
335                 peer.init(jgAddress);
336             }
337         } else {
338             peer = new JGroupsRemotePeer(this, remotePeer);
339         }
340         return peer;
341     }
342 
343 }