001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.util.concurrent.atomic.AtomicLong;
021    
022    import org.apache.activemq.Service;
023    import org.apache.activemq.command.ActiveMQQueue;
024    import org.apache.activemq.command.ActiveMQTopic;
025    import org.apache.activemq.command.BrokerId;
026    import org.apache.activemq.command.BrokerInfo;
027    import org.apache.activemq.command.Command;
028    import org.apache.activemq.command.ConnectionId;
029    import org.apache.activemq.command.ConnectionInfo;
030    import org.apache.activemq.command.ConsumerInfo;
031    import org.apache.activemq.command.ExceptionResponse;
032    import org.apache.activemq.command.Message;
033    import org.apache.activemq.command.MessageAck;
034    import org.apache.activemq.command.MessageDispatch;
035    import org.apache.activemq.command.ProducerInfo;
036    import org.apache.activemq.command.Response;
037    import org.apache.activemq.command.SessionInfo;
038    import org.apache.activemq.command.ShutdownInfo;
039    import org.apache.activemq.transport.DefaultTransportListener;
040    import org.apache.activemq.transport.FutureResponse;
041    import org.apache.activemq.transport.ResponseCallback;
042    import org.apache.activemq.transport.Transport;
043    import org.apache.activemq.util.IdGenerator;
044    import org.apache.activemq.util.ServiceStopper;
045    import org.apache.activemq.util.ServiceSupport;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * Forwards all messages from the local broker to the remote broker.
051     * 
052     * @org.apache.xbean.XBean
053     * 
054     * 
055     */
056    public class ForwardingBridge implements Service {
057    
058        private static final IdGenerator ID_GENERATOR = new IdGenerator();
059        private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class);
060    
061        final AtomicLong enqueueCounter = new AtomicLong();
062        final AtomicLong dequeueCounter = new AtomicLong();
063        ConnectionInfo connectionInfo;
064        SessionInfo sessionInfo;
065        ProducerInfo producerInfo;
066        ConsumerInfo queueConsumerInfo;
067        ConsumerInfo topicConsumerInfo;
068        BrokerId localBrokerId;
069        BrokerId remoteBrokerId;
070        BrokerInfo localBrokerInfo;
071        BrokerInfo remoteBrokerInfo;
072    
073        private final Transport localBroker;
074        private final Transport remoteBroker;
075        private String clientId;
076        private int prefetchSize = 1000;
077        private boolean dispatchAsync;
078        private String destinationFilter = ">";
079        private NetworkBridgeListener bridgeFailedListener;
080    
081        public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
082            this.localBroker = localBroker;
083            this.remoteBroker = remoteBroker;
084        }
085    
086        public void start() throws Exception {
087            LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker
088                     + " has been established.");
089    
090            localBroker.setTransportListener(new DefaultTransportListener() {
091                public void onCommand(Object o) {
092                    Command command = (Command)o;
093                    serviceLocalCommand(command);
094                }
095    
096                public void onException(IOException error) {
097                    serviceLocalException(error);
098                }
099            });
100    
101            remoteBroker.setTransportListener(new DefaultTransportListener() {
102                public void onCommand(Object o) {
103                    Command command = (Command)o;
104                    serviceRemoteCommand(command);
105                }
106    
107                public void onException(IOException error) {
108                    serviceRemoteException(error);
109                }
110            });
111    
112            localBroker.start();
113            remoteBroker.start();
114        }
115    
116        protected void triggerStartBridge() throws IOException {
117            Thread thead = new Thread() {
118                public void run() {
119                    try {
120                        startBridge();
121                    } catch (IOException e) {
122                        LOG.error("Failed to start network bridge: " + e, e);
123                    }
124                }
125            };
126            thead.start();
127        }
128    
129        /**
130         * @throws IOException
131         */
132        final void startBridge() throws IOException {
133            connectionInfo = new ConnectionInfo();
134            connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId()));
135            connectionInfo.setClientId(clientId);
136            localBroker.oneway(connectionInfo);
137            remoteBroker.oneway(connectionInfo);
138    
139            sessionInfo = new SessionInfo(connectionInfo, 1);
140            localBroker.oneway(sessionInfo);
141            remoteBroker.oneway(sessionInfo);
142    
143            queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
144            queueConsumerInfo.setDispatchAsync(dispatchAsync);
145            queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
146            queueConsumerInfo.setPrefetchSize(prefetchSize);
147            queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
148            localBroker.oneway(queueConsumerInfo);
149    
150            producerInfo = new ProducerInfo(sessionInfo, 1);
151            producerInfo.setResponseRequired(false);
152            remoteBroker.oneway(producerInfo);
153    
154            if (connectionInfo.getClientId() != null) {
155                topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
156                topicConsumerInfo.setDispatchAsync(dispatchAsync);
157                topicConsumerInfo.setSubscriptionName("topic-bridge");
158                topicConsumerInfo.setRetroactive(true);
159                topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter));
160                topicConsumerInfo.setPrefetchSize(prefetchSize);
161                topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
162                localBroker.oneway(topicConsumerInfo);
163            }
164            LOG.info("Network connection between " + localBroker + " and " + remoteBroker
165                     + " has been established.");
166        }
167    
168        public void stop() throws Exception {
169            try {
170                if (connectionInfo != null) {
171                    localBroker.request(connectionInfo.createRemoveCommand());
172                    remoteBroker.request(connectionInfo.createRemoveCommand());
173                }
174                localBroker.setTransportListener(null);
175                remoteBroker.setTransportListener(null);
176                localBroker.oneway(new ShutdownInfo());
177                remoteBroker.oneway(new ShutdownInfo());
178            } finally {
179                ServiceStopper ss = new ServiceStopper();
180                ss.stop(localBroker);
181                ss.stop(remoteBroker);
182                ss.throwFirstException();
183            }
184        }
185    
186        public void serviceRemoteException(Throwable error) {
187            LOG.info("Unexpected remote exception: " + error);
188            LOG.debug("Exception trace: ", error);
189        }
190    
191        protected void serviceRemoteCommand(Command command) {
192            try {
193                if (command.isBrokerInfo()) {
194                    synchronized (this) {
195                        remoteBrokerInfo = (BrokerInfo)command;
196                        remoteBrokerId = remoteBrokerInfo.getBrokerId();
197                        if (localBrokerId != null) {
198                            if (localBrokerId.equals(remoteBrokerId)) {
199                                LOG.info("Disconnecting loop back connection.");
200                                ServiceSupport.dispose(this);
201                            } else {
202                                triggerStartBridge();
203                            }
204                        }
205                    }
206                } else {
207                    LOG.warn("Unexpected remote command: " + command);
208                }
209            } catch (IOException e) {
210                serviceLocalException(e);
211            }
212        }
213    
214        public void serviceLocalException(Throwable error) {
215            LOG.info("Unexpected local exception: " + error);
216            LOG.debug("Exception trace: ", error);
217            fireBridgeFailed();
218        }
219    
220        protected void serviceLocalCommand(Command command) {
221            try {
222                if (command.isMessageDispatch()) {
223    
224                    enqueueCounter.incrementAndGet();
225    
226                    final MessageDispatch md = (MessageDispatch)command;
227                    Message message = md.getMessage();
228                    message.setProducerId(producerInfo.getProducerId());
229    
230                    if (message.getOriginalTransactionId() == null) {
231                        message.setOriginalTransactionId(message.getTransactionId());
232                    }
233                    message.setTransactionId(null);
234    
235                    if (!message.isResponseRequired()) {
236                        // If the message was originally sent using async send, we
237                        // will preserve that QOS
238                        // by bridging it using an async send (small chance of
239                        // message loss).
240                        remoteBroker.oneway(message);
241                        dequeueCounter.incrementAndGet();
242                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
243    
244                    } else {
245    
246                        // The message was not sent using async send, so we should
247                        // only ack the local
248                        // broker when we get confirmation that the remote broker
249                        // has received the message.
250                        ResponseCallback callback = new ResponseCallback() {
251                            public void onCompletion(FutureResponse future) {
252                                try {
253                                    Response response = future.getResult();
254                                    if (response.isException()) {
255                                        ExceptionResponse er = (ExceptionResponse)response;
256                                        serviceLocalException(er.getException());
257                                    } else {
258                                        dequeueCounter.incrementAndGet();
259                                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
260                                    }
261                                } catch (IOException e) {
262                                    serviceLocalException(e);
263                                }
264                            }
265                        };
266    
267                        remoteBroker.asyncRequest(message, callback);
268                    }
269    
270                    // Ack on every message since we don't know if the broker is
271                    // blocked due to memory
272                    // usage and is waiting for an Ack to un-block him.
273    
274                    // Acking a range is more efficient, but also more prone to
275                    // locking up a server
276                    // Perhaps doing something like the following should be policy
277                    // based.
278                    // if(
279                    // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
280                    // ) {
281                    // queueDispatched++;
282                    // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
283                    // ) {
284                    // localBroker.oneway(new MessageAck(md,
285                    // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
286                    // queueDispatched=0;
287                    // }
288                    // } else {
289                    // topicDispatched++;
290                    // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
291                    // ) {
292                    // localBroker.oneway(new MessageAck(md,
293                    // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
294                    // topicDispatched=0;
295                    // }
296                    // }
297                } else if (command.isBrokerInfo()) {
298                    synchronized (this) {
299                        localBrokerInfo = (BrokerInfo)command;
300                        localBrokerId = localBrokerInfo.getBrokerId();
301                        if (remoteBrokerId != null) {
302                            if (remoteBrokerId.equals(localBrokerId)) {
303                                LOG.info("Disconnecting loop back connection.");
304                                ServiceSupport.dispose(this);
305                            } else {
306                                triggerStartBridge();
307                            }
308                        }
309                    }
310                } else {
311                    LOG.debug("Unexpected local command: " + command);
312                }
313            } catch (IOException e) {
314                serviceLocalException(e);
315            }
316        }
317    
318        public String getClientId() {
319            return clientId;
320        }
321    
322        public void setClientId(String clientId) {
323            this.clientId = clientId;
324        }
325    
326        public int getPrefetchSize() {
327            return prefetchSize;
328        }
329    
330        public void setPrefetchSize(int prefetchSize) {
331            this.prefetchSize = prefetchSize;
332        }
333    
334        public boolean isDispatchAsync() {
335            return dispatchAsync;
336        }
337    
338        public void setDispatchAsync(boolean dispatchAsync) {
339            this.dispatchAsync = dispatchAsync;
340        }
341    
342        public String getDestinationFilter() {
343            return destinationFilter;
344        }
345    
346        public void setDestinationFilter(String destinationFilter) {
347            this.destinationFilter = destinationFilter;
348        }
349    
350        public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
351            this.bridgeFailedListener = listener;
352        }
353    
354        private void fireBridgeFailed() {
355            NetworkBridgeListener l = this.bridgeFailedListener;
356            if (l != null) {
357                l.bridgeFailed();
358            }
359        }
360    
361        public String getRemoteAddress() {
362            return remoteBroker.getRemoteAddress();
363        }
364    
365        public String getLocalAddress() {
366            return localBroker.getRemoteAddress();
367        }
368    
369        public String getLocalBrokerName() {
370            return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
371        }
372    
373        public String getRemoteBrokerName() {
374            return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
375        }
376    
377        public long getDequeueCounter() {
378            return dequeueCounter.get();
379        }
380    
381        public long getEnqueueCounter() {
382            return enqueueCounter.get();
383        }
384    
385    }