Clover coverage report - Maven Clover report
Coverage timestamp: Sun Jun 1 2008 19:59:48 EST
file stats: LOC: 316   Methods: 31
NCLOC: 249   Classes: 2
 
 Source file Conditionals Statements Methods TOTAL
AbstractDispatcher.java 39.3% 60% 71% 58.7%
coverage coverage
 1    /**
 2    *
 3    * Copyright 2003-2005 Core Developers Network Ltd.
 4    *
 5    * Licensed under the Apache License, Version 2.0 (the "License");
 6    * you may not use this file except in compliance with the License.
 7    * You may obtain a copy of the License at
 8    *
 9    * http://www.apache.org/licenses/LICENSE-2.0
 10    *
 11    * Unless required by applicable law or agreed to in writing, software
 12    * distributed under the License is distributed on an "AS IS" BASIS,
 13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14    * See the License for the specific language governing permissions and
 15    * limitations under the License.
 16    */
 17    package org.codehaus.wadi.group.impl;
 18   
 19    import java.io.Serializable;
 20    import java.util.ArrayList;
 21    import java.util.Collection;
 22    import java.util.List;
 23    import java.util.Map;
 24    import java.util.concurrent.ConcurrentHashMap;
 25    import java.util.concurrent.CopyOnWriteArrayList;
 26   
 27    import org.apache.commons.logging.Log;
 28    import org.apache.commons.logging.LogFactory;
 29    import org.codehaus.wadi.group.Address;
 30    import org.codehaus.wadi.group.Dispatcher;
 31    import org.codehaus.wadi.group.DispatcherContext;
 32    import org.codehaus.wadi.group.Envelope;
 33    import org.codehaus.wadi.group.EnvelopeInterceptor;
 34    import org.codehaus.wadi.group.MessageExchangeException;
 35    import org.codehaus.wadi.group.Quipu;
 36    import org.codehaus.wadi.group.QuipuException;
 37    import org.codehaus.wadi.group.ServiceEndpoint;
 38   
 39    /**
 40    * The portable aspects of a Dispatcher implementation
 41    *
 42    * @version $Revision: 1595 $
 43    */
 44    public abstract class AbstractDispatcher implements Dispatcher {
 45    protected final Log log = LogFactory.getLog(getClass());
 46    protected final ThreadPool _executor;
 47    protected final Map<String, Quipu> rvMap = new ConcurrentHashMap<String, Quipu>();
 48    protected final List<EnvelopeInterceptor> interceptors;
 49    private final DispatcherContext context;
 50    private final EnvelopeDispatcherManager inboundEnvelopeDispatcher;
 51   
 52  8 public AbstractDispatcher(ThreadPool executor) {
 53  8 _executor = executor;
 54   
 55  8 interceptors = new CopyOnWriteArrayList<EnvelopeInterceptor>();
 56  8 inboundEnvelopeDispatcher = new BasicEnvelopeDispatcherManager(this, _executor);
 57  8 context = new BasicDispatcherContext();
 58    }
 59   
 60  8 public AbstractDispatcher() {
 61  8 this(new PooledExecutorAdapter(10));
 62    }
 63   
 64  0 public DispatcherContext getContext() {
 65  0 return context;
 66    }
 67   
 68  0 public void addInterceptor(EnvelopeInterceptor interceptor) {
 69  0 interceptor.registerLoopbackEnvelopeListener(this);
 70  0 interceptors.add(interceptor);
 71    }
 72   
 73  0 public void removeInterceptor(EnvelopeInterceptor interceptor) {
 74  0 interceptors.remove(interceptor);
 75  0 interceptor.unregisterLoopbackEnvelopeListener(this);
 76    }
 77   
 78  0 public List<EnvelopeInterceptor> getInterceptors() {
 79  0 return new ArrayList<EnvelopeInterceptor>(interceptors);
 80    }
 81   
 82  3 public void register(ServiceEndpoint msgDispatcher) {
 83  3 inboundEnvelopeDispatcher.register(msgDispatcher);
 84    }
 85   
 86  3 public void unregister(ServiceEndpoint msgDispatcher, int nbAttemp, long delayMillis) {
 87  3 inboundEnvelopeDispatcher.unregister(msgDispatcher, nbAttemp, delayMillis);
 88    }
 89   
 90  3 public final void onEnvelope(Envelope envelope) {
 91  3 try {
 92  3 envelope = onInboundEnvelope(envelope);
 93    } catch (MessageExchangeException e) {
 94  0 log.error("See nested", e);
 95    }
 96  3 if (null == envelope) {
 97  0 return;
 98    }
 99   
 100  3 doOnEnvelope(envelope);
 101    }
 102   
 103  3 protected void doOnEnvelope(Envelope envelope) {
 104  3 inboundEnvelopeDispatcher.onEnvelope(envelope);
 105    }
 106   
 107    class SimpleCorrelationIDFactory {
 108   
 109    protected int count;
 110   
 111  1 public synchronized String create() {
 112  1 return Integer.toString(count++);
 113    }
 114   
 115    }
 116   
 117    protected final SimpleCorrelationIDFactory _factory = new SimpleCorrelationIDFactory();
 118   
 119  1 public void addRendezVousEnvelope(Envelope envelope) {
 120  1 String targetCorrelationId = envelope.getTargetCorrelationId();
 121  1 if (null == targetCorrelationId) {
 122  0 throw new IllegalStateException("No targetCorrelationId");
 123    }
 124  1 Quipu rv= rvMap.get(targetCorrelationId);
 125  1 if (null == rv) {
 126  0 if (log.isTraceEnabled()) {
 127  0 log.trace("no one waiting for [" + targetCorrelationId + "]");
 128    }
 129    } else {
 130  1 if (log.isTraceEnabled()) {
 131  0 log.trace("successful correlation [" + targetCorrelationId + "]");
 132    }
 133  1 rv.putResult(envelope);
 134    }
 135    }
 136   
 137  1 public Quipu newRendezVous(int numLlamas) {
 138  1 return setRendezVous(_factory.create(), numLlamas);
 139    }
 140   
 141  1 public Envelope attemptRendezVous(Quipu rv, long timeout) throws MessageExchangeException {
 142  1 Collection messages = attemptMultiRendezVous(rv, timeout);
 143  1 if (messages.size() > 1) {
 144  0 throw new MessageExchangeException("[" + messages.size() + "] result messages. Expected only one.");
 145    }
 146  1 return (Envelope) messages.iterator().next();
 147    }
 148   
 149  1 public Collection attemptMultiRendezVous(Quipu rv, long timeout) throws MessageExchangeException {
 150  1 Collection response = null;
 151  1 try {
 152  1 do {
 153  1 try {
 154  1 long startTime = System.currentTimeMillis();
 155  1 if (rv.waitFor(timeout)) {
 156  1 response = rv.getResults();
 157  1 long elapsedTime = System.currentTimeMillis()-startTime;
 158  1 if (log.isTraceEnabled()) {
 159  0 log.trace("successful message exchange within timeframe (" + elapsedTime + "<" + timeout + " millis) {" + rv + "}");
 160    }
 161    } else {
 162  0 log.debug("unsuccessful message exchange within timeframe (" + timeout +" millis) {" + rv + "}", new Exception());
 163    }
 164    } catch (InterruptedException e) {
 165  0 log.debug("waiting for response - interruption ignored");
 166    } catch (QuipuException e) {
 167  0 throw new MessageExchangeException(e);
 168    }
 169  1 } while (Thread.interrupted());
 170    } finally {
 171  1 rvMap.remove(rv.getCorrelationId());
 172    }
 173  1 if (null == response) {
 174  0 throw new MessageExchangeException("No correlated messages received within [" + timeout + "]ms");
 175    }
 176  1 return response;
 177    }
 178   
 179  1 public Envelope exchangeSend(Address to, Serializable body, long timeout) throws MessageExchangeException {
 180  1 return exchangeSend(to, body, timeout, null);
 181    }
 182   
 183  1 public void reply(Envelope envelope, Serializable body) throws MessageExchangeException {
 184  1 Envelope reply = createEnvelope();
 185  1 reply.setPayload(body);
 186  1 reply(envelope, reply);
 187    }
 188   
 189  1 public void reply(Envelope request, Envelope reply) throws MessageExchangeException {
 190  1 Address from = getCluster().getLocalPeer().getAddress();
 191  1 reply.setReplyTo(from);
 192  1 Address to = request.getReplyTo();
 193  1 reply.setAddress(to);
 194  1 String incomingCorrelationId = request.getSourceCorrelationId();
 195  1 reply.setTargetCorrelationId(incomingCorrelationId);
 196  1 EnvelopeHelper.setAsReply(reply);
 197  1 send(to, reply);
 198    }
 199   
 200  1 public void send(Address to, Serializable body) throws MessageExchangeException {
 201  1 try {
 202  1 Envelope envelope = createEnvelope();
 203  1 envelope.setReplyTo(getCluster().getLocalPeer().getAddress());
 204  1 envelope.setAddress(to);
 205  1 envelope.setPayload(body);
 206  1 send(to, envelope);
 207    } catch (Exception e) {
 208  0 log.error("problem sending " + body, e);
 209    }
 210    }
 211   
 212  0 public void send(Address target, Quipu quipu, Serializable pojo) throws MessageExchangeException {
 213  0 try {
 214  0 Envelope envelope = createEnvelope();
 215  0 envelope.setReplyTo(getCluster().getLocalPeer().getAddress());
 216  0 envelope.setAddress(target);
 217  0 envelope.setPayload(pojo);
 218  0 envelope.setQuipu(quipu);
 219  0 send(target, envelope);
 220    } catch (Exception e) {
 221  0 log.error("problem sending " + pojo, e);
 222    }
 223    }
 224   
 225  0 public void send(Address source, Address target, Quipu quipu, Serializable pojo) throws MessageExchangeException {
 226  0 Envelope envelope = createEnvelope();
 227  0 envelope.setReplyTo(source);
 228  0 envelope.setAddress(target);
 229  0 envelope.setQuipu(quipu);
 230  0 envelope.setPayload(pojo);
 231  0 send(target, envelope);
 232    }
 233   
 234  3 public final void send(Address target, Envelope envelope) throws MessageExchangeException {
 235  3 envelope = onOutboundEnvelope(envelope);
 236  3 if (null == envelope) {
 237  0 return;
 238    }
 239   
 240  3 doSend(target, envelope);
 241    }
 242   
 243  1 public Envelope exchangeSend(Address target, Serializable pojo, long timeout, String targetCorrelationId) throws MessageExchangeException {
 244  1 Envelope envelope = createEnvelope();
 245  1 envelope.setPayload(pojo);
 246  1 return exchangeSend(target, envelope, timeout, targetCorrelationId);
 247    }
 248   
 249  0 public Envelope exchangeSend(Address target, Envelope envelope, long timeout) throws MessageExchangeException {
 250  0 return exchangeSend(target, envelope, timeout, null);
 251    }
 252   
 253  1 public Envelope exchangeSend(Address target, Envelope envelope, long timeout, String targetCorrelationId) throws MessageExchangeException {
 254  1 Address from=getCluster().getLocalPeer().getAddress();
 255  1 envelope.setReplyTo(from);
 256  1 envelope.setAddress(target);
 257   
 258  1 Quipu rv = newRendezVous(1);
 259  1 envelope.setQuipu(rv);
 260   
 261  1 if (targetCorrelationId!=null) {
 262  0 envelope.setTargetCorrelationId(targetCorrelationId);
 263    }
 264   
 265  1 send(target, envelope);
 266  1 return attemptRendezVous(rv, timeout);
 267    }
 268   
 269  0 public void reply(Address from, Address to, String incomingCorrelationId, Serializable body)
 270    throws MessageExchangeException {
 271  0 Envelope envelope = createEnvelope();
 272  0 envelope.setReplyTo(from);
 273  0 envelope.setAddress(to);
 274  0 envelope.setTargetCorrelationId(incomingCorrelationId);
 275  0 envelope.setPayload(body);
 276  0 EnvelopeHelper.setAsReply(envelope);
 277  0 send(to, envelope);
 278    }
 279   
 280  1 protected Quipu setRendezVous(String correlationId, int numLlamas) {
 281  1 Quipu rv = new Quipu(numLlamas, correlationId);
 282  1 rvMap.put(correlationId, rv);
 283  1 return rv;
 284    }
 285   
 286  3 protected void hook() {
 287  3 AbstractCluster.clusterThreadLocal.set(getCluster());
 288    }
 289   
 290  0 public ThreadPool getExecutor() {
 291  0 return _executor;
 292    }
 293   
 294    protected abstract void doSend(Address target, Envelope envelope) throws MessageExchangeException;
 295   
 296  3 protected Envelope onOutboundEnvelope(Envelope envelope) throws MessageExchangeException {
 297  3 for (EnvelopeInterceptor interceptor : interceptors) {
 298  0 envelope = interceptor.onOutboundEnvelope(envelope);
 299  0 if (null == envelope) {
 300  0 return null;
 301    }
 302    }
 303  3 return envelope;
 304    }
 305   
 306  3 protected Envelope onInboundEnvelope(Envelope envelope) throws MessageExchangeException {
 307  3 for (EnvelopeInterceptor interceptor : interceptors) {
 308  0 envelope = interceptor.onInboundEnvelope(envelope);
 309  0 if (null == envelope) {
 310  0 return null;
 311    }
 312    }
 313  3 return envelope;
 314    }
 315   
 316    }