1 /***
2 *
3 * Copyright 2003-2005 Core Developers Network Ltd.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.codehaus.wadi.jgroups;
18
19 import java.io.Serializable;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.TreeSet;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.SynchronousQueue;
27 import java.util.concurrent.TimeUnit;
28
29 import org.codehaus.wadi.group.Address;
30 import org.codehaus.wadi.group.ClusterException;
31 import org.codehaus.wadi.group.EndPoint;
32 import org.codehaus.wadi.group.Envelope;
33 import org.codehaus.wadi.group.LocalPeer;
34 import org.codehaus.wadi.group.MessageExchangeException;
35 import org.codehaus.wadi.group.Peer;
36 import org.codehaus.wadi.group.command.BootRemotePeer;
37 import org.codehaus.wadi.group.command.ClusterCommand;
38 import org.codehaus.wadi.group.impl.AbstractCluster;
39 import org.jgroups.Channel;
40 import org.jgroups.ChannelException;
41 import org.jgroups.JChannel;
42 import org.jgroups.MembershipListener;
43 import org.jgroups.MergeView;
44 import org.jgroups.MessageListener;
45 import org.jgroups.View;
46
47
48 /***
49 * A WADI Cluster mapped onto a JGroups Channel
50 *
51 * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
52 * @version $Revision: 2421 $
53 */
54 public class JGroupsCluster extends AbstractCluster implements MembershipListener, MessageListener {
55 protected final boolean _excludeSelf = true;
56
57 protected final CountDownLatch _viewLatch = new CountDownLatch(1);
58 protected final ViewThread _viewThread = new ViewThread("WADI/JGroups View Thread");
59 protected final Channel _channel;
60 protected final JGroupsDispatcher _dispatcher;
61
62
63 protected org.jgroups.Address _localJGAddress;
64
65 public JGroupsCluster(String clusterName, String localPeerName, String config, JGroupsDispatcher dispatcher, EndPoint endPoint) throws ChannelException {
66 super(clusterName, localPeerName, dispatcher);
67 _dispatcher = dispatcher;
68 clusterPeer = new JGroupsClusterPeer(this, clusterName);
69 localPeer = new JGroupsLocalPeer(this, localPeerName, endPoint);
70 _channel = new JChannel(config);
71 clusterThreadLocal.set(this);
72 }
73
74 public String toString() {
75 return "JGroupsCluster [" + localPeerName + "/" + clusterName + "]";
76 }
77
78 public LocalPeer getLocalPeer() {
79 return localPeer;
80 }
81
82 public Peer getPeerFromAddress(Address address) {
83 return (JGroupsPeer) address;
84 }
85
86 public synchronized void start() throws ClusterException {
87 try {
88
89
90 _channel.connect(clusterName);
91 log.info(localPeerName + " - " + "connected to channel");
92 _localJGAddress = _channel.getLocalAddress();
93 ((JGroupsLocalPeer) localPeer).init(_localJGAddress);
94 backendKeyToPeer.put(_localJGAddress, localPeer);
95 } catch (Exception e) {
96 log.warn("unexpected JGroups problem", e);
97 throw new ClusterException(e);
98 }
99
100
101 _viewThread.start();
102
103 if (log.isTraceEnabled())
104 log.trace(localPeerName + " - " + "acquiring viewLatch...");
105 try {
106 _viewLatch.await();
107 } catch (InterruptedException e) {
108 log.warn("unexpected interruption", e);
109 }
110 if (log.isTraceEnabled())
111 log.trace(localPeerName + " - " + "...acquired viewLatch");
112 }
113
114 public synchronized void stop() throws ClusterException {
115 _viewThread.stop();
116 _channel.disconnect();
117 _channel.close();
118 log.info(localPeerName + " - " + "disconnected from channel");
119
120
121
122
123
124
125 }
126
127
128
129 protected final SynchronousQueue<Set> _viewQueue = new SynchronousQueue<Set>();
130
131 public void viewAccepted(View newView) {
132
133
134
135 if (newView instanceof MergeView) {
136 log.warn("NYI - merging: view is " + newView);
137 }
138
139 Set members = new TreeSet(newView.getMembers());
140 try {
141 if (log.isTraceEnabled()) {
142 log.trace(localPeerName + " - " + "handling JGroups viewAccepted(" + newView + ")...");
143 }
144 _viewQueue.put(members);
145 } catch (InterruptedException e) {
146 log.warn("unexpected interruption", e);
147 }
148 }
149
150 public class ViewThread implements Runnable {
151 protected boolean _running;
152 protected Thread _thread;
153
154 ViewThread(String name) {
155 _thread = new Thread(this, name);
156 }
157
158 public void start() {
159 _running = true;
160 _thread.start();
161 }
162
163 public void stop() {
164 _running = false;
165 }
166
167 public void run() {
168 while (_running) {
169 try {
170 Set members = (Set) _viewQueue.poll(2000, TimeUnit.MILLISECONDS);
171 if (members != null) {
172 nextView(members);
173 }
174 } catch (InterruptedException e) {
175 log.warn("unexpected interruption", e);
176 }
177 }
178 }
179 }
180
181 public void nextView(Set newMembers) {
182 Set joiners = new TreeSet();
183 Set leavers = new TreeSet();
184
185 newMembers.remove(_localJGAddress);
186 synchronized (addressToPeer) {
187
188 for (Iterator i = addressToPeer.entrySet().iterator(); i.hasNext();) {
189 Map.Entry entry = (Map.Entry) i.next();
190 JGroupsPeer address = (JGroupsPeer) entry.getKey();
191 Peer peer = (Peer) entry.getValue();
192 log.trace("checking (leaver?): " + peer);
193 if (!newMembers.contains(address.getJGAddress())) {
194 leavers.add(peer);
195 i.remove();
196
197 synchronized (backendKeyToPeer) {
198 backendKeyToPeer.remove(address.getJGAddress());
199 }
200 }
201 }
202
203 for (Iterator i = newMembers.iterator(); i.hasNext();) {
204 org.jgroups.Address jgaddress = (org.jgroups.Address) i.next();
205
206 JGroupsPeer remotePeer = new JGroupsPeer(this, "UNDEFINED", null);
207 remotePeer.init(jgaddress);
208 BootRemotePeer command = new BootRemotePeer(this, remotePeer);
209 remotePeer = (JGroupsPeer) command.getSerializedPeer();
210 if (null == remotePeer) {
211 return;
212 }
213
214 JGroupsPeer peer = (JGroupsPeer) getPeer(remotePeer);
215 log.trace("checking (joiner?): " + peer);
216 if (!addressToPeer.containsKey(peer)) {
217 addressToPeer.put(peer, peer);
218 joiners.add(peer);
219 }
220 }
221 }
222
223
224 joiners = Collections.unmodifiableSet(joiners);
225 leavers = Collections.unmodifiableSet(leavers);
226
227
228 notifyMembershipChanged(joiners, leavers);
229
230
231 if (log.isTraceEnabled()) {
232 log.trace(localPeerName + " - " + "releasing viewLatch (viewAccepted)...");
233 }
234 _viewLatch.countDown();
235 if (log.isTraceEnabled()) {
236 log.trace(localPeerName + " - " + "...released viewLatch (viewAccepted)");
237 }
238 }
239
240
241 public void suspect(org.jgroups.Address suspected_mbr) {
242 if (log.isTraceEnabled())
243 log.trace(localPeerName + " - " + "handling suspect(" + suspected_mbr + ")...");
244 if (log.isWarnEnabled())
245 log.warn("cluster suspects member may have been lost: " + suspected_mbr);
246 if (log.isTraceEnabled())
247 log.trace(localPeerName + " - " + "...suspect() handled");
248 }
249
250
251 public void block() {
252 if (log.isTraceEnabled())
253 log.trace(localPeerName + " - " + "handling block()...");
254
255 if (log.isTraceEnabled())
256 log.trace(localPeerName + " - " + "... block() handled");
257
258 }
259
260
261 public void receive(org.jgroups.Message msg) {
262 if (log.isTraceEnabled()) {
263 log.trace(localPeerName + " - message arrived: " + msg);
264 }
265 org.jgroups.Address src = msg.getSrc();
266 org.jgroups.Address dest = msg.getDest();
267 if (_excludeSelf && (dest == null || dest.isMulticastAddress()) && src == _localJGAddress) {
268 if (log.isTraceEnabled()) {
269 log.trace(localPeerName + " - " + "ignoring message from self: " + msg);
270 }
271 } else {
272
273 clusterThreadLocal.set(this);
274 Object o = msg.getObject();
275 JGroupsEnvelope wadiMsg = (JGroupsEnvelope) o;
276 Serializable payload = wadiMsg.getPayload();
277 if (payload instanceof ClusterCommand) {
278 ((ClusterCommand) payload).execute(wadiMsg, this);
279 return;
280 }
281
282 wadiMsg.setCluster(this);
283 wadiMsg.setAddress((JGroupsPeer) getPeer(wadiMsg.getAddress()));
284 wadiMsg.setReplyTo((JGroupsPeer) getPeer(wadiMsg.getReplyTo()));
285 _dispatcher.onEnvelope(wadiMsg);
286 }
287 }
288
289 public byte[] getState() {
290 throw new UnsupportedOperationException("we do not use JGroups' state exchange protocol");
291 }
292
293 public void setState(byte[] state) {
294 throw new UnsupportedOperationException("we do not use JGroups' state exchange protocol");
295 }
296
297
298 public Address getAddress() {
299 return (JGroupsPeer) clusterPeer;
300 }
301
302
303 public void send(Address target, Envelope message) throws MessageExchangeException {
304 JGroupsEnvelope msg = (JGroupsEnvelope) message;
305 JGroupsPeer tgt = (JGroupsPeer) target;
306 try {
307 msg.setCluster(this);
308 msg.setAddress(target);
309 msg.setCluster(null);
310 _channel.send(tgt.getJGAddress(), _localJGAddress, msg);
311 } catch (Exception e) {
312 log.warn("unexpected JGroups problem", e);
313 throw new MessageExchangeException(e);
314 }
315 }
316
317 public Channel getChannel() {
318 return _channel;
319 }
320
321 protected Object extractKeyFromPeerSerialization(Object backend) {
322 JGroupsPeer remotePeer = (JGroupsPeer) backend;
323 return remotePeer.getJGAddress();
324 }
325
326 protected Peer createPeerFromPeerSerialization(Object backend) {
327 JGroupsPeer remotePeer = (JGroupsPeer) backend;
328 org.jgroups.Address jgAddress = remotePeer.getJGAddress();
329 JGroupsPeer peer;
330 if (jgAddress.isMulticastAddress()) {
331 peer = (JGroupsPeer) clusterPeer;
332
333
334 if (peer.getJGAddress() == null) {
335 peer.init(jgAddress);
336 }
337 } else {
338 peer = new JGroupsRemotePeer(this, remotePeer);
339 }
340 return peer;
341 }
342
343 }