001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import java.util.Iterator;
020    import java.util.List;
021    import java.util.Map;
022    import java.util.concurrent.CopyOnWriteArrayList;
023    import java.util.concurrent.LinkedBlockingQueue;
024    import java.util.concurrent.ThreadFactory;
025    import java.util.concurrent.ThreadPoolExecutor;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.atomic.AtomicBoolean;
028    import java.util.concurrent.atomic.AtomicReference;
029    
030    import javax.jms.Connection;
031    import javax.jms.Destination;
032    import javax.jms.QueueConnection;
033    
034    import org.apache.activemq.ActiveMQConnectionFactory;
035    import org.apache.activemq.Service;
036    import org.apache.activemq.broker.BrokerService;
037    import org.apache.activemq.util.LRUCache;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    import org.springframework.jndi.JndiTemplate;
041    
042    /**
043     * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
044     * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
045     * aimed to be in compliance with the JMS 1.0.2 specification.
046     */
047    public abstract class JmsConnector implements Service {
048    
049        private static int nextId;
050        private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
051    
052        protected JndiTemplate jndiLocalTemplate;
053        protected JndiTemplate jndiOutboundTemplate;
054        protected JmsMesageConvertor inboundMessageConvertor;
055        protected JmsMesageConvertor outboundMessageConvertor;
056        protected AtomicBoolean initialized = new AtomicBoolean(false);
057        protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
058        protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
059        protected AtomicBoolean started = new AtomicBoolean(false);
060        protected AtomicBoolean failed = new AtomicBoolean();
061        protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
062        protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
063        protected ActiveMQConnectionFactory embeddedConnectionFactory;
064        protected int replyToDestinationCacheSize = 10000;
065        protected String outboundUsername;
066        protected String outboundPassword;
067        protected String localUsername;
068        protected String localPassword;
069        protected String outboundClientId;
070        protected String localClientId;
071        protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
072    
073        private ReconnectionPolicy policy = new ReconnectionPolicy();
074        protected ThreadPoolExecutor connectionSerivce;
075        private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
076        private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
077        private String name;
078    
079        private static LRUCache<Destination, DestinationBridge> createLRUCache() {
080            return new LRUCache<Destination, DestinationBridge>() {
081                private static final long serialVersionUID = -7446792754185879286L;
082    
083                protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
084                    if (size() > maxCacheSize) {
085                        Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
086                        Map.Entry<Destination, DestinationBridge> lru = iter.next();
087                        remove(lru.getKey());
088                        DestinationBridge bridge = (DestinationBridge)lru.getValue();
089                        try {
090                            bridge.stop();
091                            LOG.info("Expired bridge: " + bridge);
092                        } catch (Exception e) {
093                            LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
094                        }
095                    }
096                    return false;
097                }
098            };
099        }
100    
101        public boolean init() {
102            boolean result = initialized.compareAndSet(false, true);
103            if (result) {
104                if (jndiLocalTemplate == null) {
105                    jndiLocalTemplate = new JndiTemplate();
106                }
107                if (jndiOutboundTemplate == null) {
108                    jndiOutboundTemplate = new JndiTemplate();
109                }
110                if (inboundMessageConvertor == null) {
111                    inboundMessageConvertor = new SimpleJmsMessageConvertor();
112                }
113                if (outboundMessageConvertor == null) {
114                    outboundMessageConvertor = new SimpleJmsMessageConvertor();
115                }
116                replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
117    
118                connectionSerivce = createExecutor();
119    
120                // Subclasses can override this to customize their own it.
121                result = doConnectorInit();
122            }
123            return result;
124        }
125    
126        protected boolean doConnectorInit() {
127    
128            // We try to make a connection via a sync call first so that the
129            // JmsConnector is fully initialized before the start call returns
130            // in order to avoid missing any messages that are dispatched
131            // immediately after startup.  If either side fails we queue an
132            // asynchronous task to manage the reconnect attempts.
133    
134            try {
135                initializeLocalConnection();
136                localSideInitialized.set(true);
137            } catch(Exception e) {
138                // Queue up the task to attempt the local connection.
139                scheduleAsyncLocalConnectionReconnect();
140            }
141    
142            try {
143                initializeForeignConnection();
144                foreignSideInitialized.set(true);
145            } catch(Exception e) {
146                // Queue up the task for the foreign connection now.
147                scheduleAsyncForeignConnectionReconnect();
148            }
149    
150            return true;
151        }
152    
153        public void start() throws Exception {
154            if (started.compareAndSet(false, true)) {
155                init();
156                for (DestinationBridge bridge : inboundBridges) {
157                    bridge.start();
158                }
159                for (DestinationBridge bridge : outboundBridges) {
160                    bridge.start();
161                }
162                LOG.info("JMS Connector " + getName() + " Started");
163            }
164        }
165    
166        public void stop() throws Exception {
167            if (started.compareAndSet(true, false)) {
168    
169                this.connectionSerivce.shutdown();
170    
171                for (DestinationBridge bridge : inboundBridges) {
172                    bridge.stop();
173                }
174                for (DestinationBridge bridge : outboundBridges) {
175                    bridge.stop();
176                }
177                LOG.info("JMS Connector " + getName() + " Stopped");
178            }
179        }
180    
181        public void clearBridges() {
182            inboundBridges.clear();
183            outboundBridges.clear();
184            replyToBridges.clear();
185        }
186    
187        protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
188    
189        /**
190         * One way to configure the local connection - this is called by The
191         * BrokerService when the Connector is embedded
192         *
193         * @param service
194         */
195        public void setBrokerService(BrokerService service) {
196            embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
197        }
198    
199        public Connection getLocalConnection() {
200            return this.localConnection.get();
201        }
202    
203        public Connection getForeignConnection() {
204            return this.foreignConnection.get();
205        }
206    
207        /**
208         * @return Returns the jndiTemplate.
209         */
210        public JndiTemplate getJndiLocalTemplate() {
211            return jndiLocalTemplate;
212        }
213    
214        /**
215         * @param jndiTemplate The jndiTemplate to set.
216         */
217        public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
218            this.jndiLocalTemplate = jndiTemplate;
219        }
220    
221        /**
222         * @return Returns the jndiOutboundTemplate.
223         */
224        public JndiTemplate getJndiOutboundTemplate() {
225            return jndiOutboundTemplate;
226        }
227    
228        /**
229         * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
230         */
231        public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
232            this.jndiOutboundTemplate = jndiOutboundTemplate;
233        }
234    
235        /**
236         * @return Returns the inboundMessageConvertor.
237         */
238        public JmsMesageConvertor getInboundMessageConvertor() {
239            return inboundMessageConvertor;
240        }
241    
242        /**
243         * @param inboundMessageConvertor The inboundMessageConvertor to set.
244         */
245        public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
246            this.inboundMessageConvertor = jmsMessageConvertor;
247        }
248    
249        /**
250         * @return Returns the outboundMessageConvertor.
251         */
252        public JmsMesageConvertor getOutboundMessageConvertor() {
253            return outboundMessageConvertor;
254        }
255    
256        /**
257         * @param outboundMessageConvertor The outboundMessageConvertor to set.
258         */
259        public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
260            this.outboundMessageConvertor = outboundMessageConvertor;
261        }
262    
263        /**
264         * @return Returns the replyToDestinationCacheSize.
265         */
266        public int getReplyToDestinationCacheSize() {
267            return replyToDestinationCacheSize;
268        }
269    
270        /**
271         * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
272         */
273        public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
274            this.replyToDestinationCacheSize = replyToDestinationCacheSize;
275        }
276    
277        /**
278         * @return Returns the localPassword.
279         */
280        public String getLocalPassword() {
281            return localPassword;
282        }
283    
284        /**
285         * @param localPassword The localPassword to set.
286         */
287        public void setLocalPassword(String localPassword) {
288            this.localPassword = localPassword;
289        }
290    
291        /**
292         * @return Returns the localUsername.
293         */
294        public String getLocalUsername() {
295            return localUsername;
296        }
297    
298        /**
299         * @param localUsername The localUsername to set.
300         */
301        public void setLocalUsername(String localUsername) {
302            this.localUsername = localUsername;
303        }
304    
305        /**
306         * @return Returns the outboundPassword.
307         */
308        public String getOutboundPassword() {
309            return outboundPassword;
310        }
311    
312        /**
313         * @param outboundPassword The outboundPassword to set.
314         */
315        public void setOutboundPassword(String outboundPassword) {
316            this.outboundPassword = outboundPassword;
317        }
318    
319        /**
320         * @return Returns the outboundUsername.
321         */
322        public String getOutboundUsername() {
323            return outboundUsername;
324        }
325    
326        /**
327         * @param outboundUsername The outboundUsername to set.
328         */
329        public void setOutboundUsername(String outboundUsername) {
330            this.outboundUsername = outboundUsername;
331        }
332    
333        /**
334         * @return the outboundClientId
335         */
336        public String getOutboundClientId() {
337            return outboundClientId;
338        }
339    
340        /**
341         * @param outboundClientId the outboundClientId to set
342         */
343        public void setOutboundClientId(String outboundClientId) {
344            this.outboundClientId = outboundClientId;
345        }
346    
347        /**
348         * @return the localClientId
349         */
350        public String getLocalClientId() {
351            return localClientId;
352        }
353    
354        /**
355         * @param localClientId the localClientId to set
356         */
357        public void setLocalClientId(String localClientId) {
358            this.localClientId = localClientId;
359        }
360    
361        /**
362         * @return the currently configured reconnection policy.
363         */
364        public ReconnectionPolicy getReconnectionPolicy() {
365            return this.policy;
366        }
367    
368        /**
369         * @param policy The new reconnection policy this {@link JmsConnector} should use.
370         */
371        public void setReconnectionPolicy(ReconnectionPolicy policy) {
372            this.policy = policy;
373        }
374    
375        /**
376         * @return returns true if the {@link JmsConnector} is connected to both brokers.
377         */
378        public boolean isConnected() {
379            return localConnection.get() != null && foreignConnection.get() != null;
380        }
381    
382        protected void addInboundBridge(DestinationBridge bridge) {
383            if (!inboundBridges.contains(bridge)) {
384                inboundBridges.add(bridge);
385            }
386        }
387    
388        protected void addOutboundBridge(DestinationBridge bridge) {
389            if (!outboundBridges.contains(bridge)) {
390                outboundBridges.add(bridge);
391            }
392        }
393    
394        protected void removeInboundBridge(DestinationBridge bridge) {
395            inboundBridges.remove(bridge);
396        }
397    
398        protected void removeOutboundBridge(DestinationBridge bridge) {
399            outboundBridges.remove(bridge);
400        }
401    
402        public String getName() {
403            if (name == null) {
404                name = "Connector:" + getNextId();
405            }
406            return name;
407        }
408    
409        public void setName(String name) {
410            this.name = name;
411        }
412    
413        private static synchronized int getNextId() {
414            return nextId++;
415        }
416    
417        public boolean isFailed() {
418            return this.failed.get();
419        }
420    
421        /**
422         * Performs the work of connection to the local side of the Connection.
423         * <p>
424         * This creates the initial connection to the local end of the {@link JmsConnector}
425         * and then sets up all the destination bridges with the information needed to bridge
426         * on the local side of the connection.
427         *
428         * @throws Exception if the connection cannot be established for any reason.
429         */
430        protected abstract void initializeLocalConnection() throws Exception;
431    
432        /**
433         * Performs the work of connection to the foreign side of the Connection.
434         * <p>
435         * This creates the initial connection to the foreign end of the {@link JmsConnector}
436         * and then sets up all the destination bridges with the information needed to bridge
437         * on the foreign side of the connection.
438         *
439         * @throws Exception if the connection cannot be established for any reason.
440         */
441        protected abstract void initializeForeignConnection() throws Exception;
442    
443        /**
444         * Callback method that the Destination bridges can use to report an exception to occurs
445         * during normal bridging operations.
446         *
447         * @param connection
448         *          The connection that was in use when the failure occured.
449         */
450        void handleConnectionFailure(Connection connection) {
451    
452            // Can happen if async exception listener kicks in at the same time.
453            if (connection == null || !this.started.get()) {
454                return;
455            }
456    
457            LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]");
458    
459            // TODO - How do we handle the re-wiring of replyToBridges in this case.
460            replyToBridges.clear();
461    
462            if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
463    
464                // Stop the inbound bridges when the foreign connection is dropped since
465                // the bridge has no consumer and needs to be restarted once a new connection
466                // to the foreign side is made.
467                for (DestinationBridge bridge : inboundBridges) {
468                    try {
469                        bridge.stop();
470                    } catch(Exception e) {
471                    }
472                }
473    
474                // We got here first and cleared the connection, now we queue a reconnect.
475                this.connectionSerivce.execute(new Runnable() {
476    
477                    @Override
478                    public void run() {
479                        try {
480                            doInitializeConnection(false);
481                        } catch (Exception e) {
482                            LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
483                        }
484                    }
485                });
486    
487            } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
488    
489                // Stop the outbound bridges when the local connection is dropped since
490                // the bridge has no consumer and needs to be restarted once a new connection
491                // to the local side is made.
492                for (DestinationBridge bridge : outboundBridges) {
493                    try {
494                        bridge.stop();
495                    } catch(Exception e) {
496                    }
497                }
498    
499                // We got here first and cleared the connection, now we queue a reconnect.
500                this.connectionSerivce.execute(new Runnable() {
501    
502                    @Override
503                    public void run() {
504                        try {
505                            doInitializeConnection(true);
506                        } catch (Exception e) {
507                            LOG.error("Failed to initialize local connection for the JMSConnector", e);
508                        }
509                    }
510                });
511            }
512        }
513    
514        private void scheduleAsyncLocalConnectionReconnect() {
515            this.connectionSerivce.execute(new Runnable() {
516                @Override
517                public void run() {
518                    try {
519                        doInitializeConnection(true);
520                    } catch (Exception e) {
521                        LOG.error("Failed to initialize local connection for the JMSConnector", e);
522                    }
523                }
524            });
525        }
526    
527        private void scheduleAsyncForeignConnectionReconnect() {
528            this.connectionSerivce.execute(new Runnable() {
529                @Override
530                public void run() {
531                    try {
532                        doInitializeConnection(false);
533                    } catch (Exception e) {
534                        LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
535                    }
536                }
537            });
538        }
539    
540        private void doInitializeConnection(boolean local) throws Exception {
541    
542            int attempt = 0;
543    
544            final int maxRetries;
545            if (local) {
546                maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
547                                                           policy.getMaxReconnectAttempts();
548            } else {
549                maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
550                                                             policy.getMaxReconnectAttempts();
551            }
552    
553            do
554            {
555                if (attempt > 0) {
556                    try {
557                        Thread.sleep(policy.getNextDelay(attempt));
558                    } catch(InterruptedException e) {
559                    }
560                }
561    
562                if (connectionSerivce.isTerminating()) {
563                    return;
564                }
565    
566                try {
567    
568                    if (local) {
569                        initializeLocalConnection();
570                        localSideInitialized.set(true);
571                    } else {
572                        initializeForeignConnection();
573                        foreignSideInitialized.set(true);
574                    }
575    
576                    // Once we are connected we ensure all the bridges are started.
577                    if (localConnection.get() != null && foreignConnection.get() != null) {
578                        for (DestinationBridge bridge : inboundBridges) {
579                            bridge.start();
580                        }
581                        for (DestinationBridge bridge : outboundBridges) {
582                            bridge.start();
583                        }
584                    }
585    
586                    return;
587                } catch(Exception e) {
588                    LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") +
589                              " connection for JmsConnector [" + attempt + "]: " + e.getMessage());
590                }
591            }
592            while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
593    
594            this.failed.set(true);
595        }
596    
597        private ThreadFactory factory = new ThreadFactory() {
598            public Thread newThread(Runnable runnable) {
599                Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
600                thread.setDaemon(true);
601                return thread;
602            }
603        };
604    
605        private ThreadPoolExecutor createExecutor() {
606            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
607            exec.allowCoreThreadTimeOut(true);
608            return exec;
609        }
610    }