View Javadoc

1   package org.codehaus.wadi.tribes;
2   
3   import java.util.Collection;
4   import java.util.HashSet;
5   import java.util.Iterator;
6   import java.util.LinkedHashMap;
7   import java.util.List;
8   import java.util.Map;
9   import java.util.Set;
10  import java.util.concurrent.CopyOnWriteArrayList;
11  
12  import org.apache.catalina.tribes.Channel;
13  import org.apache.catalina.tribes.ChannelException;
14  import org.apache.catalina.tribes.Member;
15  import org.apache.catalina.tribes.MembershipListener;
16  import org.apache.catalina.tribes.group.GroupChannel;
17  import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
18  import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
19  import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
20  import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
21  import org.apache.catalina.tribes.membership.McastService;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.codehaus.wadi.group.Address;
25  import org.codehaus.wadi.group.Cluster;
26  import org.codehaus.wadi.group.ClusterException;
27  import org.codehaus.wadi.group.ClusterListener;
28  import org.codehaus.wadi.group.Dispatcher;
29  import org.codehaus.wadi.group.LocalPeer;
30  import org.codehaus.wadi.group.Peer;
31  import org.codehaus.wadi.group.PeerInfo;
32  
33  public class TribesCluster implements Cluster {
34      private static final Log LOG = LogFactory.getLog(TribesCluster.class);
35      private static final String SYS_PROP_PREFER_IPV4_STACK = "java.net.preferIPv4Stack";
36      private static final String SYS_PROP_INHIBIT_AUTO_PREFER_IPV4_STACK = 
37          "org.codehaus.wadi.tribes.inhibit.auto.preferIPv4Stack";
38  
39      private final byte[] clusterDomain;
40      protected GroupChannel channel;
41      protected List<ClusterListener> listeners = new CopyOnWriteArrayList<ClusterListener>();
42      protected boolean initialized;
43      private final TribesDispatcher dispatcher;
44  
45      public TribesCluster(byte[] clusterDomain,
46              TribesDispatcher dispatcher,
47              String localPeerName,
48              PeerInfo localPeerinfo) {
49          if (null == clusterDomain) {
50              throw new IllegalArgumentException("clusterDomain is required");
51          } else if (null == dispatcher) {
52              throw new IllegalArgumentException("dispatcher is required");
53          }
54          this.clusterDomain = clusterDomain;
55          this.dispatcher = dispatcher;
56          
57          channel = new GroupChannel();
58          channel.addInterceptor(new WadiMemberInterceptor());
59          //uncomment for java1.5
60          //channel.addInterceptor(new MessageDispatch15Interceptor());
61          //comment out for java 1.5
62          channel.addInterceptor(new MessageDispatchInterceptor());
63          channel.addMembershipListener(new WadiListener(this));
64          
65          addStaticMembers(dispatcher);
66          
67          ((McastService)channel.getMembershipService()).setMcastAddr("224.0.0.4");
68          ((McastService)channel.getMembershipService()).setDomain(clusterDomain);
69          
70          byte[] payload = TribesPeer.writePayload(localPeerName, localPeerinfo);
71          channel.getMembershipService().setPayload(payload);
72          
73          DomainFilterInterceptor filter = new DomainFilterInterceptor();
74          filter.setDomain(clusterDomain);
75          channel.addInterceptor(filter);
76          //channel.addInterceptor(new MessageTrackInterceptor());//for debug only
77          channel.addInterceptor(new TcpFailureDetector());//this one should always be at the bottom
78      }
79  
80      public String getClusterName() {
81          return new String(clusterDomain);
82      }
83      
84      public Dispatcher getDispatcher() {
85          return dispatcher;
86      }
87  
88      /***
89       * addClusterListener
90       *
91       * @param listener ClusterListener
92       * @todo Implement this org.codehaus.wadi.group.Cluster method
93       */
94      public void addClusterListener(ClusterListener listener) {
95          listeners.add(listener);
96          
97          Set existing = new HashSet(getRemotePeers().values());
98          listener.onListenerRegistration(this, existing);
99      }
100     
101     /***
102      * getAddress
103      *
104      * @return Address
105      * @todo Implement this org.codehaus.wadi.group.Cluster method
106      */
107     public Address getAddress() {
108         Member[] mbrs = channel.getMembers();
109         TribesPeer[] peers = new TribesPeer[mbrs.length + 1];
110         for (int i = 0; i < mbrs.length; i++) {
111             peers[i] = (TribesPeer) mbrs[i];
112         }
113         peers[peers.length - 1] = (TribesPeer) channel.getLocalMember(true);
114         return new TribesClusterAddress(peers);
115     }
116 
117     /***
118      * @return - the number of millis that a Peer may remain silent before being declared suspect/dead..
119      *
120      * @return - the number of millis that a Peer may remain silent before being declared suspect/dead..
121      * @todo Implement this org.codehaus.wadi.group.Cluster method
122      */
123     public long getInactiveTime() {
124         return ((McastService)channel.getMembershipService()).getMcastDropTime();
125     }
126 
127     /***
128      * getLocalPeer
129      *
130      * @return LocalPeer
131      * @todo Implement this org.codehaus.wadi.group.Cluster method
132      */
133     public LocalPeer getLocalPeer() {
134         //System.out.println("\n\n\n\n\n\n\nMEGA DEBUG Local peer="+channel.getLocalMember(true)+"\n\n\n\n");
135         return (LocalPeer)channel.getLocalMember(true);
136     }
137 
138     /***
139      * getPeerCount
140      *
141      * @return int
142      * @todo Implement this org.codehaus.wadi.group.Cluster method
143      */
144     public int getPeerCount() {
145         return channel.getMembers().length+1;
146     }
147 
148     /***
149      * getPeerFromAddress
150      *
151      * @param address Address
152      * @return Peer
153      * @todo Implement this org.codehaus.wadi.group.Cluster method
154      */
155     public Peer getPeerFromAddress(Address address) {
156         if ( address instanceof TribesPeer ) return (Peer)address;
157         return null;
158     }
159 
160     /***
161      * getRemotePeers
162      *
163      * @return Map
164      * @todo Implement this org.codehaus.wadi.group.Cluster method
165      */
166     public Map<Address, Peer> getRemotePeers() {
167         Member[] mbrs = channel.getMembers();
168         LinkedHashMap<Address, Peer> result = new LinkedHashMap<Address, Peer>();
169         for (int i=0; i<mbrs.length; i++) result.put((TribesPeer) mbrs[i], (TribesPeer) mbrs[i]);
170         return result;
171     }
172 
173     /***
174      * removeClusterListener
175      *
176      * @param listener ClusterListener
177      * @todo Implement this org.codehaus.wadi.group.Cluster method
178      */
179     public void removeClusterListener(ClusterListener listener) {
180         listeners.remove(listener);
181     }
182     
183     public void init() throws ClusterException {
184         ensurePreferIPv4StackPropertyIsSet();
185         
186         try {
187             channel.start(Channel.SND_RX_SEQ);
188             initialized = true;
189         }catch ( ChannelException x ) {
190             throw new ClusterException(x);
191         }
192     }
193 
194     protected void ensurePreferIPv4StackPropertyIsSet() {
195         String preferIPv4Stack = System.getProperty(SYS_PROP_PREFER_IPV4_STACK);
196         if (null == preferIPv4Stack) {
197             if (null != System.getProperty(SYS_PROP_INHIBIT_AUTO_PREFER_IPV4_STACK)) {
198                 LOG.warn("System property java.net.preferIPv4Stack is not set and auto-set if inhibited. Tribes " +
199                 		"multicasting will not properly work. You need to have a static cluster configuration.");
200             } else {
201                 LOG.info("System property java.net.preferIPv4Stack has been set automatically to true so that " +
202                 		"Tribes multicasting properly works.");
203                 System.setProperty(SYS_PROP_PREFER_IPV4_STACK, "true");
204             }
205         }
206     }
207 
208     /***
209      * start
210      *
211      * @throws ClusterException
212      * @todo Implement this org.codehaus.wadi.group.Cluster method
213      */
214     public void start() throws ClusterException {
215         try {
216             if (!initialized) init();
217             channel.start(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ | Channel.SND_TX_SEQ);
218         }catch ( ChannelException x ) {
219             throw new ClusterException(x);
220         }
221     }
222 
223     /***
224      * stop
225      *
226      * @throws ClusterException
227      * @todo Implement this org.codehaus.wadi.group.Cluster method
228      */
229     public void stop() throws ClusterException {
230         try {
231             channel.stop(Channel.DEFAULT);
232             initialized = false;
233         }catch ( ChannelException x ) {
234             throw new ClusterException(x);
235         }
236 
237     }
238 
239     /***
240      *
241      * @param membershipCount - when membership reaches this number or we timeout this method will return
242      * @param timeout - the number of milliseconds to wait for membership to hit membershipCount
243      * @return whether or not expected membershipCount was hit within given time
244      * @throws InterruptedException
245      * @todo Implement this org.codehaus.wadi.group.Cluster method
246      */
247     public boolean waitOnMembershipCount(int membershipCount, long timeout) throws InterruptedException {
248         long start = System.currentTimeMillis();
249         long delta = System.currentTimeMillis()-start;
250         while ( delta < timeout ) {
251             if ( (channel.getMembers().length+1) == membershipCount ) return true;
252             else {
253                 try { Thread.sleep(100); } catch (InterruptedException x) {Thread.interrupted();}
254             }
255             delta = System.currentTimeMillis()-start;
256         }//while
257         return false;
258     }
259     
260     protected void addStaticMembers(TribesDispatcher dispatcher) {
261         Collection<Member> staticMembers = dispatcher.getStaticMembers();
262         if (!staticMembers.isEmpty()) {
263             StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
264             for (Member member : staticMembers) {
265                 smi.addStaticMember(member);
266             }
267             channel.addInterceptor(smi);
268         }
269     }
270 
271     protected class WadiListener implements MembershipListener {
272         TribesCluster cluster;
273         public WadiListener(TribesCluster cluster) {
274             this.cluster = cluster;
275         }
276         
277         public synchronized void memberAdded(Member member) {
278             HashSet added = new HashSet();
279             HashSet removed = new HashSet();
280             if ( !member.equals(cluster.channel.getLocalMember(false)) ) added.add(member);
281             for (Iterator<ClusterListener> iter = cluster.listeners.iterator(); iter.hasNext();) {
282                 ClusterListener listener = iter.next();
283                 listener.onMembershipChanged(cluster,added,removed);
284             }
285         }
286         
287         public synchronized void memberDisappeared(Member member) {
288             HashSet added = new HashSet();
289             HashSet removed = new HashSet();
290             removed.add(member);
291             for (Iterator<ClusterListener> iter = cluster.listeners.iterator(); iter.hasNext();) {
292                 ClusterListener listener = iter.next();
293                 listener.onMembershipChanged(cluster, added, removed);
294             }
295         }
296     }
297 
298 }