001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.util.concurrent.CopyOnWriteArrayList;
020    
021    import javax.jms.Connection;
022    import javax.jms.ConnectionConsumer;
023    import javax.jms.ConnectionMetaData;
024    import javax.jms.Destination;
025    import javax.jms.ExceptionListener;
026    import javax.jms.JMSException;
027    import javax.jms.Queue;
028    import javax.jms.QueueConnection;
029    import javax.jms.QueueSession;
030    import javax.jms.ServerSessionPool;
031    import javax.jms.Session;
032    import javax.jms.TemporaryQueue;
033    import javax.jms.TemporaryTopic;
034    import javax.jms.Topic;
035    import javax.jms.TopicConnection;
036    import javax.jms.TopicSession;
037    
038    import org.apache.activemq.ActiveMQConnection;
039    import org.apache.activemq.ActiveMQSession;
040    import org.apache.activemq.AlreadyClosedException;
041    import org.apache.activemq.EnhancedConnection;
042    import org.apache.activemq.advisory.DestinationSource;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
048     * {@link QueueConnection} which is pooled and on {@link #close()} will return
049     * itself to the sessionPool.
050     *
051     * <b>NOTE</b> this implementation is only intended for use when sending
052     * messages. It does not deal with pooling of consumers; for that look at a
053     * library like <a href="http://jencks.org/">Jencks</a> such as in <a
054     * href="http://jencks.org/Message+Driven+POJOs">this example</a>
055     *
056     */
057    public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
058        private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
059    
060        private ConnectionPool pool;
061        private boolean stopped;
062        private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
063        private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
064    
065        public PooledConnection(ConnectionPool pool) {
066            this.pool = pool;
067            this.pool.incrementReferenceCount();
068        }
069    
070        /**
071         * Factory method to create a new instance.
072         */
073        public PooledConnection newInstance() {
074            return new PooledConnection(pool);
075        }
076    
077        public void close() throws JMSException {
078            this.cleanupConnectionTemporaryDestinations();
079            if (this.pool != null) {
080                this.pool.decrementReferenceCount();
081                this.pool = null;
082            }
083        }
084    
085        public void start() throws JMSException {
086            assertNotClosed();
087            pool.start();
088        }
089    
090        public void stop() throws JMSException {
091            stopped = true;
092        }
093    
094        public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
095                throws JMSException {
096            return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
097        }
098    
099        public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
100            return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
101        }
102    
103        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
104                throws JMSException {
105            return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
106        }
107    
108        public String getClientID() throws JMSException {
109            return getConnection().getClientID();
110        }
111    
112        public ExceptionListener getExceptionListener() throws JMSException {
113            return getConnection().getExceptionListener();
114        }
115    
116        public ConnectionMetaData getMetaData() throws JMSException {
117            return getConnection().getMetaData();
118        }
119    
120        public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
121            getConnection().setExceptionListener(exceptionListener);
122        }
123    
124        public void setClientID(String clientID) throws JMSException {
125    
126            // ignore repeated calls to setClientID() with the same client id
127            // this could happen when a JMS component such as Spring that uses a
128            // PooledConnectionFactory shuts down and reinitializes.
129            if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
130                getConnection().setClientID(clientID);
131            }
132        }
133    
134        public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
135            return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
136        }
137    
138        // Session factory methods
139        // -------------------------------------------------------------------------
140        public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
141            return (QueueSession) createSession(transacted, ackMode);
142        }
143    
144        public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
145            return (TopicSession) createSession(transacted, ackMode);
146        }
147    
148        public Session createSession(boolean transacted, int ackMode) throws JMSException {
149            PooledSession result;
150            result = (PooledSession) pool.createSession(transacted, ackMode);
151    
152            // Add a temporary destination event listener to the session that notifies us when
153            // the session creates temporary destinations.
154            result.addTempDestEventListener(new PooledSessionEventListener() {
155    
156                @Override
157                public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
158                    connTempQueues.add(tempQueue);
159                }
160    
161                @Override
162                public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
163                    connTempTopics.add(tempTopic);
164                }
165            });
166    
167            return (Session) result;
168        }
169    
170        // EnhancedCollection API
171        // -------------------------------------------------------------------------
172    
173        public DestinationSource getDestinationSource() throws JMSException {
174            return getConnection().getDestinationSource();
175        }
176    
177        // Implementation methods
178        // -------------------------------------------------------------------------
179    
180        public ActiveMQConnection getConnection() throws JMSException {
181            assertNotClosed();
182            return pool.getConnection();
183        }
184    
185        protected void assertNotClosed() throws AlreadyClosedException {
186            if (stopped || pool == null) {
187                throw new AlreadyClosedException();
188            }
189        }
190    
191        protected ActiveMQSession createSession(SessionKey key) throws JMSException {
192            return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
193        }
194    
195        public String toString() {
196            return "PooledConnection { " + pool + " }";
197        }
198    
199        /**
200         * Remove all of the temporary destinations created for this connection.
201         * This is important since the underlying connection may be reused over a
202         * long period of time, accumulating all of the temporary destinations from
203         * each use. However, from the perspective of the lifecycle from the
204         * client's view, close() closes the connection and, therefore, deletes all
205         * of the temporary destinations created.
206         */
207        protected void cleanupConnectionTemporaryDestinations() {
208    
209            for (TemporaryQueue tempQueue : connTempQueues) {
210                try {
211                    tempQueue.delete();
212                } catch (JMSException ex) {
213                    LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
214                }
215            }
216            connTempQueues.clear();
217    
218            for (TemporaryTopic tempTopic : connTempTopics) {
219                try {
220                    tempTopic.delete();
221                } catch (JMSException ex) {
222                    LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
223                }
224            }
225            connTempTopics.clear();
226        }
227    }