001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import java.util.concurrent.atomic.AtomicBoolean;
020    import javax.jms.Connection;
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageConsumer;
025    import javax.jms.MessageListener;
026    import javax.jms.MessageProducer;
027    import org.apache.activemq.Service;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * A Destination bridge is used to bridge between to different JMS systems
033     */
034    public abstract class DestinationBridge implements Service, MessageListener {
035    
036        private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
037    
038        protected MessageConsumer consumer;
039        protected AtomicBoolean started = new AtomicBoolean(false);
040        protected JmsMesageConvertor jmsMessageConvertor;
041        protected boolean doHandleReplyTo = true;
042        protected JmsConnector jmsConnector;
043    
044        /**
045         * @return Returns the consumer.
046         */
047        public MessageConsumer getConsumer() {
048            return consumer;
049        }
050    
051        /**
052         * @param consumer The consumer to set.
053         */
054        public void setConsumer(MessageConsumer consumer) {
055            this.consumer = consumer;
056        }
057    
058        /**
059         * @param connector
060         */
061        public void setJmsConnector(JmsConnector connector) {
062            this.jmsConnector = connector;
063        }
064    
065        /**
066         * @return Returns the inboundMessageConvertor.
067         */
068        public JmsMesageConvertor getJmsMessageConvertor() {
069            return jmsMessageConvertor;
070        }
071    
072        /**
073         * @param jmsMessageConvertor
074         */
075        public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
076            this.jmsMessageConvertor = jmsMessageConvertor;
077        }
078    
079        protected Destination processReplyToDestination(Destination destination) {
080            return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
081        }
082    
083        public void start() throws Exception {
084            if (started.compareAndSet(false, true)) {
085                createConsumer();
086                createProducer();
087            }
088        }
089    
090        public void stop() throws Exception {
091            started.set(false);
092        }
093    
094        public void onMessage(Message message) {
095    
096            int attempt = 0;
097            final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries();
098    
099            while (started.get() && message != null && ++attempt <= maxRetries) {
100    
101                try {
102    
103                    if (attempt > 0) {
104                        try {
105                            Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt));
106                        } catch(InterruptedException e) {
107                            break;
108                        }
109                    }
110    
111                    Message converted;
112                    if (jmsMessageConvertor != null) {
113                        if (doHandleReplyTo) {
114                            Destination replyTo = message.getJMSReplyTo();
115                            if (replyTo != null) {
116                                converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
117                            } else {
118                                converted = jmsMessageConvertor.convert(message);
119                            }
120                        } else {
121                            message.setJMSReplyTo(null);
122                            converted = jmsMessageConvertor.convert(message);
123                        }
124                    } else {
125                        // The Producer side is not up or not yet configured, retry.
126                        continue;
127                    }
128    
129                    try {
130                        sendMessage(converted);
131                    } catch(Exception e) {
132                        jmsConnector.handleConnectionFailure(getConnectionForProducer());
133                        continue;
134                    }
135    
136                    try {
137                        message.acknowledge();
138                    } catch(Exception e) {
139                        jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
140                        continue;
141                    }
142    
143                    // if we got here then it made it out and was ack'd
144                    return;
145    
146                } catch (Exception e) {
147                    LOG.info("failed to forward message on attempt: " + attempt +
148                             " reason: " + e + " message: " + message, e);
149                }
150            }
151        }
152    
153        /**
154         * @return Returns the doHandleReplyTo.
155         */
156        protected boolean isDoHandleReplyTo() {
157            return doHandleReplyTo;
158        }
159    
160        /**
161         * @param doHandleReplyTo The doHandleReplyTo to set.
162         */
163        protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
164            this.doHandleReplyTo = doHandleReplyTo;
165        }
166    
167        protected abstract MessageConsumer createConsumer() throws JMSException;
168    
169        protected abstract MessageProducer createProducer() throws JMSException;
170    
171        protected abstract void sendMessage(Message message) throws JMSException;
172    
173        protected abstract Connection getConnnectionForConsumer();
174    
175        protected abstract Connection getConnectionForProducer();
176    
177    }