001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.ra;
018    
019    import java.lang.reflect.Method;
020    
021    import java.util.concurrent.atomic.AtomicBoolean;
022    import javax.jms.Connection;
023    import javax.jms.ConnectionConsumer;
024    import javax.jms.ExceptionListener;
025    import javax.jms.JMSException;
026    import javax.jms.Message;
027    import javax.jms.MessageListener;
028    import javax.jms.Session;
029    import javax.jms.Topic;
030    import javax.resource.ResourceException;
031    import javax.resource.spi.endpoint.MessageEndpointFactory;
032    import javax.resource.spi.work.Work;
033    import javax.resource.spi.work.WorkException;
034    import javax.resource.spi.work.WorkManager;
035    
036    import org.apache.activemq.ActiveMQConnection;
037    import org.apache.activemq.command.ActiveMQDestination;
038    import org.apache.activemq.command.ActiveMQQueue;
039    import org.apache.activemq.command.ActiveMQTopic;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    /**
044     *  $Date$
045     */
046    public class ActiveMQEndpointWorker {
047    
048        public static final Method ON_MESSAGE_METHOD;
049        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQEndpointWorker.class);
050    
051        private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
052        private static final long MAX_RECONNECT_DELAY = 1000 * 30; // 30 seconds.
053        private static final ThreadLocal<Session> THREAD_LOCAL = new ThreadLocal<Session>();
054    
055        static {
056            try {
057                ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
058                    Message.class
059                });
060            } catch (Exception e) {
061                throw new ExceptionInInitializerError(e);
062            }
063        }
064    
065        protected final ActiveMQEndpointActivationKey endpointActivationKey;
066        protected final MessageEndpointFactory endpointFactory;
067        protected final WorkManager workManager;
068        protected final boolean transacted;
069    
070        private final ActiveMQDestination dest;
071        private final Work connectWork;
072        private final AtomicBoolean connecting = new AtomicBoolean(false);    
073        private final Object shutdownMutex = new String("shutdownMutex");
074        
075        private ActiveMQConnection connection;
076        private ConnectionConsumer consumer;
077        private ServerSessionPoolImpl serverSessionPool;
078        private boolean running;
079    
080        protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
081            this.endpointActivationKey = key;
082            this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
083            this.workManager = adapter.getBootstrapContext().getWorkManager();
084            try {
085                this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
086            } catch (NoSuchMethodException e) {
087                throw new ResourceException("Endpoint does not implement the onMessage method.");
088            }
089    
090            connectWork = new Work() {
091                long currentReconnectDelay = INITIAL_RECONNECT_DELAY;
092    
093                public void release() {
094                    //
095                }
096    
097                public void run() {
098                    currentReconnectDelay = INITIAL_RECONNECT_DELAY;
099                    MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
100                    if ( LOG.isInfoEnabled() ) {
101                        LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
102                    }
103    
104                    while ( connecting.get() && running ) {
105                    try {
106                        connection = adapter.makeConnection(activationSpec);
107                        connection.setExceptionListener(new ExceptionListener() {
108                            public void onException(JMSException error) {
109                                if (!serverSessionPool.isClosing()) {
110                                        // initiate reconnection only once, i.e. on initial exception
111                                        // and only if not already trying to connect
112                                        LOG.error("Connection to broker failed: " + error.getMessage(), error);
113                                        if ( connecting.compareAndSet(false, true) ) {
114                                            synchronized ( connectWork ) {
115                                                disconnect();
116                                                serverSessionPool.closeIdleSessions();
117                                                connect();
118                                }
119                                        } else {
120                                            // connection attempt has already been initiated
121                                            LOG.info("Connection attempt already in progress, ignoring connection exception");
122                            }
123                                    }
124                                }
125                        });
126                            connection.start();
127    
128                            int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
129                        if (activationSpec.isDurableSubscription()) {
130                                consumer = connection.createDurableConnectionConsumer(
131                                        (Topic) dest,
132                                        activationSpec.getSubscriptionName(), 
133                                        emptyToNull(activationSpec.getMessageSelector()),
134                                        serverSessionPool, 
135                                        prefetchSize,
136                                        activationSpec.getNoLocalBooleanValue());
137                        } else {
138                                consumer = connection.createConnectionConsumer(
139                                        dest, 
140                                        emptyToNull(activationSpec.getMessageSelector()), 
141                                        serverSessionPool, 
142                                        prefetchSize,
143                                                                           activationSpec.getNoLocalBooleanValue());
144                        }
145    
146    
147                            if ( connecting.compareAndSet(true, false) ) {
148                                if ( LOG.isInfoEnabled() ) {
149                                    LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
150                                }
151                            } else {
152                                LOG.error("Could not release connection lock");
153                            }
154                    } catch (JMSException error) {
155                            if ( LOG.isDebugEnabled() ) {
156                                LOG.debug("Failed to connect: " + error.getMessage(), error);
157                    }
158                            disconnect();
159                            pause(error);
160                }
161                    }
162                }
163                
164                private void pause(JMSException error) {
165                    if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
166                        LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " 
167                                + error.getMessage(), error);
168                        LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
169                    }
170                    try {
171                        synchronized ( shutdownMutex ) {
172                            // shutdownMutex will be notified by stop() method in
173                            // order to accelerate shutdown of endpoint
174                            shutdownMutex.wait(currentReconnectDelay);
175                        }
176                    } catch ( InterruptedException e ) {
177                        Thread.interrupted();
178                    }
179                    currentReconnectDelay *= 2;
180                    if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
181                        currentReconnectDelay = MAX_RECONNECT_DELAY;
182                    }                
183                }
184            };
185    
186            MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
187            if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
188                dest = new ActiveMQQueue(activationSpec.getDestination());
189            } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
190                dest = new ActiveMQTopic(activationSpec.getDestination());
191            } else {
192                throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
193            }
194    
195        }
196    
197        /**
198         * @param c
199         */
200        public static void safeClose(Connection c) {
201            try {
202                if (c != null) {
203                    LOG.debug("Closing connection to broker");
204                    c.close();
205                }
206            } catch (JMSException e) {
207                //
208            }
209        }
210    
211        /**
212         * @param cc
213         */
214        public static void safeClose(ConnectionConsumer cc) {
215            try {
216                if (cc != null) {
217                    LOG.debug("Closing ConnectionConsumer");
218                    cc.close();
219                }
220            } catch (JMSException e) {
221                //
222            }
223        }
224    
225        /**
226         * 
227         */
228        public void start() throws ResourceException {
229            synchronized (connectWork) {
230                if (running)
231                return;
232            running = true;
233    
234                if ( connecting.compareAndSet(false, true) ) {
235                    LOG.info("Starting");
236            serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
237            connect();
238                } else {
239                    LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
240        }
241            }
242        }
243    
244        /**
245         * 
246         */
247        public void stop() throws InterruptedException {
248            synchronized (shutdownMutex) {
249                if (!running)
250                    return;
251                running = false;
252                LOG.info("Stopping");
253                // wake up pausing reconnect attempt
254                shutdownMutex.notifyAll();
255                serverSessionPool.close();
256            }
257            disconnect();
258        }
259    
260        private boolean isRunning() {
261            return running;
262        }
263    
264        private void connect() {
265            synchronized ( connectWork ) {
266            if (!running) {
267                return;
268            }
269    
270            try {
271                workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
272            } catch (WorkException e) {
273                running = false;
274                LOG.error("Work Manager did not accept work: ", e);
275            }
276        }
277        }
278    
279        /**
280         * 
281         */
282        private void disconnect() {
283            synchronized ( connectWork ) {
284            safeClose(consumer);
285            consumer = null;
286            safeClose(connection);
287            connection = null;
288        }
289                }
290    
291        protected void registerThreadSession(Session session) {
292            THREAD_LOCAL.set(session);
293        }
294    
295        protected void unregisterThreadSession(Session session) {
296            THREAD_LOCAL.set(null);
297        }
298    
299        protected ActiveMQConnection getConnection() {
300            // make sure we only return a working connection
301            // in particular make sure that we do not return null
302            // after the resource adapter got disconnected from
303            // the broker via the disconnect() method
304            synchronized ( connectWork ) {
305                return connection;
306            }
307        }
308    
309        private String emptyToNull(String value) {
310            if (value == null || value.length() == 0) {
311                return null;
312            }
313            return value;
314        }
315    
316    }