001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.pool;
019    
020    import java.io.IOException;
021    import java.util.Iterator;
022    import java.util.concurrent.ConcurrentHashMap;
023    import java.util.concurrent.ConcurrentLinkedQueue;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    
026    import javax.jms.JMSException;
027    import javax.jms.Session;
028    
029    import org.apache.activemq.ActiveMQConnection;
030    import org.apache.activemq.transport.TransportListener;
031    import org.apache.commons.pool.ObjectPoolFactory;
032    
033    /**
034     * Holds a real JMS connection along with the session pools associated with it.
035     *
036     *
037     */
038    public class ConnectionPool {
039    
040        private ActiveMQConnection connection;
041        private ConcurrentHashMap<SessionKey, SessionPool> cache;
042        private ConcurrentLinkedQueue<PooledSession> loanedSessions = new ConcurrentLinkedQueue<PooledSession>();
043        private AtomicBoolean started = new AtomicBoolean(false);
044        private int referenceCount;
045        private ObjectPoolFactory poolFactory;
046        private long lastUsed = System.currentTimeMillis();
047        private long firstUsed = lastUsed;
048        private boolean hasFailed;
049        private boolean hasExpired;
050        private int idleTimeout = 30 * 1000;
051        private long expiryTimeout = 0l;
052    
053        public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
054            this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
055            // Add a transport Listener so that we can notice if this connection
056            // should be expired due to a connection failure.
057            connection.addTransportListener(new TransportListener() {
058                public void onCommand(Object command) {
059                }
060    
061                public void onException(IOException error) {
062                    synchronized (ConnectionPool.this) {
063                        hasFailed = true;
064                    }
065                }
066    
067                public void transportInterupted() {
068                }
069    
070                public void transportResumed() {
071                }
072            });
073    
074            // make sure that we set the hasFailed flag, in case the transport already failed
075            // prior to the addition of our new TransportListener
076            if(connection.isTransportFailed()) {
077                hasFailed = true;
078            }
079        }
080    
081        public ConnectionPool(ActiveMQConnection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
082            this.connection = connection;
083            this.cache = cache;
084            this.poolFactory = poolFactory;
085        }
086    
087        public void start() throws JMSException {
088            if (started.compareAndSet(false, true)) {
089                try {
090                    connection.start();
091                } catch (JMSException e) {
092                    started.set(false);
093                    throw(e);
094                }
095            }
096        }
097    
098        public synchronized ActiveMQConnection getConnection() {
099            return connection;
100        }
101    
102        public Session createSession(boolean transacted, int ackMode) throws JMSException {
103            SessionKey key = new SessionKey(transacted, ackMode);
104            SessionPool pool = null;
105            pool = cache.get(key);
106            if (pool == null) {
107                SessionPool newPool = createSessionPool(key);
108                SessionPool prevPool = cache.putIfAbsent(key, newPool);
109                if (prevPool != null && prevPool != newPool) {
110                    // newPool was not the first one to be associated with this
111                    // key... close created session pool
112                    try {
113                        newPool.close();
114                    } catch (Exception e) {
115                        throw new JMSException(e.getMessage());
116                    }
117                }
118                pool = cache.get(key); // this will return a non-null value...
119            }
120            PooledSession session = pool.borrowSession();
121            this.loanedSessions.add(session);
122            return session;
123        }
124    
125        public synchronized void close() {
126            if (connection != null) {
127                try {
128                    Iterator<SessionPool> i = cache.values().iterator();
129                    while (i.hasNext()) {
130                        SessionPool pool = i.next();
131                        i.remove();
132                        try {
133                            pool.close();
134                        } catch (Exception e) {
135                        }
136                    }
137                } finally {
138                    try {
139                        connection.close();
140                    } catch (Exception e) {
141                    } finally {
142                        connection = null;
143                    }
144                }
145            }
146        }
147    
148        public synchronized void incrementReferenceCount() {
149            referenceCount++;
150            lastUsed = System.currentTimeMillis();
151        }
152    
153        public synchronized void decrementReferenceCount() {
154            referenceCount--;
155            lastUsed = System.currentTimeMillis();
156            if (referenceCount == 0) {
157                expiredCheck();
158    
159                for (PooledSession session : this.loanedSessions) {
160                    try {
161                        session.close();
162                    } catch (Exception e) {
163                    }
164                }
165                this.loanedSessions.clear();
166    
167                // only clean up temp destinations when all users
168                // of this connection have called close
169                if (getConnection() != null) {
170                    getConnection().cleanUpTempDestinations();
171                }
172            }
173        }
174    
175        /**
176         * @return true if this connection has expired.
177         */
178        public synchronized boolean expiredCheck() {
179            if (connection == null) {
180                return true;
181            }
182            if (hasExpired) {
183                if (referenceCount == 0) {
184                    close();
185                }
186                return true;
187            }
188            if (hasFailed
189                    || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
190                    || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
191                hasExpired = true;
192                if (referenceCount == 0) {
193                    close();
194                }
195                return true;
196            }
197            return false;
198        }
199    
200        public int getIdleTimeout() {
201            return idleTimeout;
202        }
203    
204        public void setIdleTimeout(int idleTimeout) {
205            this.idleTimeout = idleTimeout;
206        }
207    
208        protected SessionPool createSessionPool(SessionKey key) {
209            return new SessionPool(this, key, poolFactory.createPool());
210        }
211    
212        public void setExpiryTimeout(long expiryTimeout) {
213            this.expiryTimeout  = expiryTimeout;
214        }
215    
216        public long getExpiryTimeout() {
217            return expiryTimeout;
218        }
219    
220        void onSessionReturned(PooledSession session) {
221            this.loanedSessions.remove(session);
222        }
223    
224        void onSessionInvalidated(PooledSession session) {
225            this.loanedSessions.remove(session);
226        }
227    }