1 package org.codehaus.wadi.tribes;
2
3 import java.util.Collection;
4 import java.util.HashSet;
5 import java.util.Iterator;
6 import java.util.LinkedHashMap;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Set;
10 import java.util.concurrent.CopyOnWriteArrayList;
11
12 import org.apache.catalina.tribes.Channel;
13 import org.apache.catalina.tribes.ChannelException;
14 import org.apache.catalina.tribes.Member;
15 import org.apache.catalina.tribes.MembershipListener;
16 import org.apache.catalina.tribes.group.GroupChannel;
17 import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
18 import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
19 import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
20 import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
21 import org.apache.catalina.tribes.membership.McastService;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.codehaus.wadi.group.Address;
25 import org.codehaus.wadi.group.Cluster;
26 import org.codehaus.wadi.group.ClusterException;
27 import org.codehaus.wadi.group.ClusterListener;
28 import org.codehaus.wadi.group.Dispatcher;
29 import org.codehaus.wadi.group.LocalPeer;
30 import org.codehaus.wadi.group.Peer;
31 import org.codehaus.wadi.group.PeerInfo;
32
33 public class TribesCluster implements Cluster {
34 private static final Log LOG = LogFactory.getLog(TribesCluster.class);
35 private static final String SYS_PROP_PREFER_IPV4_STACK = "java.net.preferIPv4Stack";
36 private static final String SYS_PROP_INHIBIT_AUTO_PREFER_IPV4_STACK =
37 "org.codehaus.wadi.tribes.inhibit.auto.preferIPv4Stack";
38
39 private final byte[] clusterDomain;
40 protected GroupChannel channel;
41 protected List<ClusterListener> listeners = new CopyOnWriteArrayList<ClusterListener>();
42 protected boolean initialized;
43 private final TribesDispatcher dispatcher;
44
45 public TribesCluster(byte[] clusterDomain,
46 TribesDispatcher dispatcher,
47 String localPeerName,
48 PeerInfo localPeerinfo) {
49 if (null == clusterDomain) {
50 throw new IllegalArgumentException("clusterDomain is required");
51 } else if (null == dispatcher) {
52 throw new IllegalArgumentException("dispatcher is required");
53 }
54 this.clusterDomain = clusterDomain;
55 this.dispatcher = dispatcher;
56
57 channel = new GroupChannel();
58 channel.addInterceptor(new WadiMemberInterceptor());
59
60
61
62 channel.addInterceptor(new MessageDispatchInterceptor());
63 channel.addMembershipListener(new WadiListener(this));
64
65 addStaticMembers(dispatcher);
66
67 ((McastService)channel.getMembershipService()).setMcastAddr("224.0.0.4");
68 ((McastService)channel.getMembershipService()).setDomain(clusterDomain);
69
70 byte[] payload = TribesPeer.writePayload(localPeerName, localPeerinfo);
71 channel.getMembershipService().setPayload(payload);
72
73 DomainFilterInterceptor filter = new DomainFilterInterceptor();
74 filter.setDomain(clusterDomain);
75 channel.addInterceptor(filter);
76
77 channel.addInterceptor(new TcpFailureDetector());
78 }
79
80 public String getClusterName() {
81 return new String(clusterDomain);
82 }
83
84 public Dispatcher getDispatcher() {
85 return dispatcher;
86 }
87
88 /***
89 * addClusterListener
90 *
91 * @param listener ClusterListener
92 * @todo Implement this org.codehaus.wadi.group.Cluster method
93 */
94 public void addClusterListener(ClusterListener listener) {
95 listeners.add(listener);
96
97 Set existing = new HashSet(getRemotePeers().values());
98 listener.onListenerRegistration(this, existing);
99 }
100
101 /***
102 * getAddress
103 *
104 * @return Address
105 * @todo Implement this org.codehaus.wadi.group.Cluster method
106 */
107 public Address getAddress() {
108 Member[] mbrs = channel.getMembers();
109 TribesPeer[] peers = new TribesPeer[mbrs.length + 1];
110 for (int i = 0; i < mbrs.length; i++) {
111 peers[i] = (TribesPeer) mbrs[i];
112 }
113 peers[peers.length - 1] = (TribesPeer) channel.getLocalMember(true);
114 return new TribesClusterAddress(peers);
115 }
116
117 /***
118 * @return - the number of millis that a Peer may remain silent before being declared suspect/dead..
119 *
120 * @return - the number of millis that a Peer may remain silent before being declared suspect/dead..
121 * @todo Implement this org.codehaus.wadi.group.Cluster method
122 */
123 public long getInactiveTime() {
124 return ((McastService)channel.getMembershipService()).getMcastDropTime();
125 }
126
127 /***
128 * getLocalPeer
129 *
130 * @return LocalPeer
131 * @todo Implement this org.codehaus.wadi.group.Cluster method
132 */
133 public LocalPeer getLocalPeer() {
134
135 return (LocalPeer)channel.getLocalMember(true);
136 }
137
138 /***
139 * getPeerCount
140 *
141 * @return int
142 * @todo Implement this org.codehaus.wadi.group.Cluster method
143 */
144 public int getPeerCount() {
145 return channel.getMembers().length+1;
146 }
147
148 /***
149 * getPeerFromAddress
150 *
151 * @param address Address
152 * @return Peer
153 * @todo Implement this org.codehaus.wadi.group.Cluster method
154 */
155 public Peer getPeerFromAddress(Address address) {
156 if ( address instanceof TribesPeer ) return (Peer)address;
157 return null;
158 }
159
160 /***
161 * getRemotePeers
162 *
163 * @return Map
164 * @todo Implement this org.codehaus.wadi.group.Cluster method
165 */
166 public Map<Address, Peer> getRemotePeers() {
167 Member[] mbrs = channel.getMembers();
168 LinkedHashMap<Address, Peer> result = new LinkedHashMap<Address, Peer>();
169 for (int i=0; i<mbrs.length; i++) result.put((TribesPeer) mbrs[i], (TribesPeer) mbrs[i]);
170 return result;
171 }
172
173 /***
174 * removeClusterListener
175 *
176 * @param listener ClusterListener
177 * @todo Implement this org.codehaus.wadi.group.Cluster method
178 */
179 public void removeClusterListener(ClusterListener listener) {
180 listeners.remove(listener);
181 }
182
183 public void init() throws ClusterException {
184 ensurePreferIPv4StackPropertyIsSet();
185
186 try {
187 channel.start(Channel.SND_RX_SEQ);
188 initialized = true;
189 }catch ( ChannelException x ) {
190 throw new ClusterException(x);
191 }
192 }
193
194 protected void ensurePreferIPv4StackPropertyIsSet() {
195 String preferIPv4Stack = System.getProperty(SYS_PROP_PREFER_IPV4_STACK);
196 if (null == preferIPv4Stack) {
197 if (null != System.getProperty(SYS_PROP_INHIBIT_AUTO_PREFER_IPV4_STACK)) {
198 LOG.warn("System property java.net.preferIPv4Stack is not set and auto-set if inhibited. Tribes " +
199 "multicasting will not properly work. You need to have a static cluster configuration.");
200 } else {
201 LOG.info("System property java.net.preferIPv4Stack has been set automatically to true so that " +
202 "Tribes multicasting properly works.");
203 System.setProperty(SYS_PROP_PREFER_IPV4_STACK, "true");
204 }
205 }
206 }
207
208 /***
209 * start
210 *
211 * @throws ClusterException
212 * @todo Implement this org.codehaus.wadi.group.Cluster method
213 */
214 public void start() throws ClusterException {
215 try {
216 if (!initialized) init();
217 channel.start(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ | Channel.SND_TX_SEQ);
218 }catch ( ChannelException x ) {
219 throw new ClusterException(x);
220 }
221 }
222
223 /***
224 * stop
225 *
226 * @throws ClusterException
227 * @todo Implement this org.codehaus.wadi.group.Cluster method
228 */
229 public void stop() throws ClusterException {
230 try {
231 channel.stop(Channel.DEFAULT);
232 initialized = false;
233 }catch ( ChannelException x ) {
234 throw new ClusterException(x);
235 }
236
237 }
238
239 /***
240 *
241 * @param membershipCount - when membership reaches this number or we timeout this method will return
242 * @param timeout - the number of milliseconds to wait for membership to hit membershipCount
243 * @return whether or not expected membershipCount was hit within given time
244 * @throws InterruptedException
245 * @todo Implement this org.codehaus.wadi.group.Cluster method
246 */
247 public boolean waitOnMembershipCount(int membershipCount, long timeout) throws InterruptedException {
248 long start = System.currentTimeMillis();
249 long delta = System.currentTimeMillis()-start;
250 while ( delta < timeout ) {
251 if ( (channel.getMembers().length+1) == membershipCount ) return true;
252 else {
253 try { Thread.sleep(100); } catch (InterruptedException x) {Thread.interrupted();}
254 }
255 delta = System.currentTimeMillis()-start;
256 }
257 return false;
258 }
259
260 protected void addStaticMembers(TribesDispatcher dispatcher) {
261 Collection<Member> staticMembers = dispatcher.getStaticMembers();
262 if (!staticMembers.isEmpty()) {
263 StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
264 for (Member member : staticMembers) {
265 smi.addStaticMember(member);
266 }
267 channel.addInterceptor(smi);
268 }
269 }
270
271 protected class WadiListener implements MembershipListener {
272 TribesCluster cluster;
273 public WadiListener(TribesCluster cluster) {
274 this.cluster = cluster;
275 }
276
277 public synchronized void memberAdded(Member member) {
278 HashSet added = new HashSet();
279 HashSet removed = new HashSet();
280 if ( !member.equals(cluster.channel.getLocalMember(false)) ) added.add(member);
281 for (Iterator<ClusterListener> iter = cluster.listeners.iterator(); iter.hasNext();) {
282 ClusterListener listener = iter.next();
283 listener.onMembershipChanged(cluster,added,removed);
284 }
285 }
286
287 public synchronized void memberDisappeared(Member member) {
288 HashSet added = new HashSet();
289 HashSet removed = new HashSet();
290 removed.add(member);
291 for (Iterator<ClusterListener> iter = cluster.listeners.iterator(); iter.hasNext();) {
292 ClusterListener listener = iter.next();
293 listener.onMembershipChanged(cluster, added, removed);
294 }
295 }
296 }
297
298 }