001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.concurrent.*;
028    import java.util.concurrent.atomic.AtomicBoolean;
029    import java.util.concurrent.atomic.AtomicInteger;
030    
031    import javax.jms.Connection;
032    import javax.jms.ConnectionConsumer;
033    import javax.jms.ConnectionMetaData;
034    import javax.jms.DeliveryMode;
035    import javax.jms.Destination;
036    import javax.jms.ExceptionListener;
037    import javax.jms.IllegalStateException;
038    import javax.jms.InvalidDestinationException;
039    import javax.jms.JMSException;
040    import javax.jms.Queue;
041    import javax.jms.QueueConnection;
042    import javax.jms.QueueSession;
043    import javax.jms.ServerSessionPool;
044    import javax.jms.Session;
045    import javax.jms.Topic;
046    import javax.jms.TopicConnection;
047    import javax.jms.TopicSession;
048    import javax.jms.XAConnection;
049    
050    import org.apache.activemq.advisory.DestinationSource;
051    import org.apache.activemq.blob.BlobTransferPolicy;
052    import org.apache.activemq.command.ActiveMQDestination;
053    import org.apache.activemq.command.ActiveMQMessage;
054    import org.apache.activemq.command.ActiveMQTempDestination;
055    import org.apache.activemq.command.ActiveMQTempQueue;
056    import org.apache.activemq.command.ActiveMQTempTopic;
057    import org.apache.activemq.command.BrokerInfo;
058    import org.apache.activemq.command.Command;
059    import org.apache.activemq.command.CommandTypes;
060    import org.apache.activemq.command.ConnectionControl;
061    import org.apache.activemq.command.ConnectionError;
062    import org.apache.activemq.command.ConnectionId;
063    import org.apache.activemq.command.ConnectionInfo;
064    import org.apache.activemq.command.ConsumerControl;
065    import org.apache.activemq.command.ConsumerId;
066    import org.apache.activemq.command.ConsumerInfo;
067    import org.apache.activemq.command.ControlCommand;
068    import org.apache.activemq.command.DestinationInfo;
069    import org.apache.activemq.command.ExceptionResponse;
070    import org.apache.activemq.command.Message;
071    import org.apache.activemq.command.MessageDispatch;
072    import org.apache.activemq.command.MessageId;
073    import org.apache.activemq.command.ProducerAck;
074    import org.apache.activemq.command.ProducerId;
075    import org.apache.activemq.command.RemoveInfo;
076    import org.apache.activemq.command.RemoveSubscriptionInfo;
077    import org.apache.activemq.command.Response;
078    import org.apache.activemq.command.SessionId;
079    import org.apache.activemq.command.ShutdownInfo;
080    import org.apache.activemq.command.WireFormatInfo;
081    import org.apache.activemq.management.JMSConnectionStatsImpl;
082    import org.apache.activemq.management.JMSStatsImpl;
083    import org.apache.activemq.management.StatsCapable;
084    import org.apache.activemq.management.StatsImpl;
085    import org.apache.activemq.state.CommandVisitorAdapter;
086    import org.apache.activemq.thread.Scheduler;
087    import org.apache.activemq.thread.TaskRunnerFactory;
088    import org.apache.activemq.transport.FutureResponse;
089    import org.apache.activemq.transport.ResponseCallback;
090    import org.apache.activemq.transport.Transport;
091    import org.apache.activemq.transport.TransportListener;
092    import org.apache.activemq.transport.failover.FailoverTransport;
093    import org.apache.activemq.util.IdGenerator;
094    import org.apache.activemq.util.IntrospectionSupport;
095    import org.apache.activemq.util.JMSExceptionSupport;
096    import org.apache.activemq.util.LongSequenceGenerator;
097    import org.apache.activemq.util.ServiceSupport;
098    import org.slf4j.Logger;
099    import org.slf4j.LoggerFactory;
100    
101    public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
102    
103        public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
104        public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
105        public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
106    
107        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
108    
109        public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
110    
111        protected boolean dispatchAsync=true;
112        protected boolean alwaysSessionAsync = true;
113    
114        private TaskRunnerFactory sessionTaskRunner;
115        private final ThreadPoolExecutor executor;
116    
117        // Connection state variables
118        private final ConnectionInfo info;
119        private ExceptionListener exceptionListener;
120        private ClientInternalExceptionListener clientInternalExceptionListener;
121        private boolean clientIDSet;
122        private boolean isConnectionInfoSentToBroker;
123        private boolean userSpecifiedClientID;
124    
125        // Configuration options variables
126        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
127        private BlobTransferPolicy blobTransferPolicy;
128        private RedeliveryPolicy redeliveryPolicy;
129        private MessageTransformer transformer;
130    
131        private boolean disableTimeStampsByDefault;
132        private boolean optimizedMessageDispatch = true;
133        private boolean copyMessageOnSend = true;
134        private boolean useCompression;
135        private boolean objectMessageSerializationDefered;
136        private boolean useAsyncSend;
137        private boolean optimizeAcknowledge;
138        private long optimizeAcknowledgeTimeOut = 0;
139        private boolean nestedMapAndListEnabled = true;
140        private boolean useRetroactiveConsumer;
141        private boolean exclusiveConsumer;
142        private boolean alwaysSyncSend;
143        private int closeTimeout = 15000;
144        private boolean watchTopicAdvisories = true;
145        private long warnAboutUnstartedConnectionTimeout = 500L;
146        private int sendTimeout =0;
147        private boolean sendAcksAsync=true;
148        private boolean checkForDuplicates = true;
149    
150        private final Transport transport;
151        private final IdGenerator clientIdGenerator;
152        private final JMSStatsImpl factoryStats;
153        private final JMSConnectionStatsImpl stats;
154    
155        private final AtomicBoolean started = new AtomicBoolean(false);
156        private final AtomicBoolean closing = new AtomicBoolean(false);
157        private final AtomicBoolean closed = new AtomicBoolean(false);
158        private final AtomicBoolean transportFailed = new AtomicBoolean(false);
159        private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
160        private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
161        private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
162        private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
163        private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
164    
165        // Maps ConsumerIds to ActiveMQConsumer objects
166        private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
167        private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
168        private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
169        private final SessionId connectionSessionId;
170        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
171        private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
172        private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
173        private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
174    
175        private AdvisoryConsumer advisoryConsumer;
176        private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
177        private BrokerInfo brokerInfo;
178        private IOException firstFailureError;
179        private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
180    
181        // Assume that protocol is the latest. Change to the actual protocol
182        // version when a WireFormatInfo is received.
183        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
184        private final long timeCreated;
185        private final ConnectionAudit connectionAudit = new ConnectionAudit();
186        private DestinationSource destinationSource;
187        private final Object ensureConnectionInfoSentMutex = new Object();
188        private boolean useDedicatedTaskRunner;
189        protected volatile CountDownLatch transportInterruptionProcessingComplete;
190        private long consumerFailoverRedeliveryWaitPeriod;
191        private Scheduler scheduler;
192        private boolean messagePrioritySupported = true;
193        private boolean transactedIndividualAck = false;
194        private boolean nonBlockingRedelivery = false;
195    
196        /**
197         * Construct an <code>ActiveMQConnection</code>
198         *
199         * @param transport
200         * @param factoryStats
201         * @throws Exception
202         */
203        protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204    
205            this.transport = transport;
206            this.clientIdGenerator = clientIdGenerator;
207            this.factoryStats = factoryStats;
208    
209            // Configure a single threaded executor who's core thread can timeout if
210            // idle
211            executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212                public Thread newThread(Runnable r) {
213                    Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214                    //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
215                    //thread.setDaemon(true);
216                    return thread;
217                }
218            });
219            // asyncConnectionThread.allowCoreThreadTimeOut(true);
220            String uniqueId = connectionIdGenerator.generateId();
221            this.info = new ConnectionInfo(new ConnectionId(uniqueId));
222            this.info.setManageable(true);
223            this.info.setFaultTolerant(transport.isFaultTolerant());
224            this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
225    
226            this.transport.setTransportListener(this);
227    
228            this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
229            this.factoryStats.addConnection(this);
230            this.timeCreated = System.currentTimeMillis();
231            this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
232        }
233    
234        protected void setUserName(String userName) {
235            this.info.setUserName(userName);
236        }
237    
238        protected void setPassword(String password) {
239            this.info.setPassword(password);
240        }
241    
242        /**
243         * A static helper method to create a new connection
244         *
245         * @return an ActiveMQConnection
246         * @throws JMSException
247         */
248        public static ActiveMQConnection makeConnection() throws JMSException {
249            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
250            return (ActiveMQConnection)factory.createConnection();
251        }
252    
253        /**
254         * A static helper method to create a new connection
255         *
256         * @param uri
257         * @return and ActiveMQConnection
258         * @throws JMSException
259         */
260        public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
261            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
262            return (ActiveMQConnection)factory.createConnection();
263        }
264    
265        /**
266         * A static helper method to create a new connection
267         *
268         * @param user
269         * @param password
270         * @param uri
271         * @return an ActiveMQConnection
272         * @throws JMSException
273         */
274        public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
275            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
276            return (ActiveMQConnection)factory.createConnection();
277        }
278    
279        /**
280         * @return a number unique for this connection
281         */
282        public JMSConnectionStatsImpl getConnectionStats() {
283            return stats;
284        }
285    
286        /**
287         * Creates a <CODE>Session</CODE> object.
288         *
289         * @param transacted indicates whether the session is transacted
290         * @param acknowledgeMode indicates whether the consumer or the client will
291         *                acknowledge any messages it receives; ignored if the
292         *                session is transacted. Legal values are
293         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
294         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
295         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
296         * @return a newly created session
297         * @throws JMSException if the <CODE>Connection</CODE> object fails to
298         *                 create a session due to some internal error or lack of
299         *                 support for the specific transaction and acknowledgement
300         *                 mode.
301         * @see Session#AUTO_ACKNOWLEDGE
302         * @see Session#CLIENT_ACKNOWLEDGE
303         * @see Session#DUPS_OK_ACKNOWLEDGE
304         * @since 1.1
305         */
306        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
307            checkClosedOrFailed();
308            ensureConnectionInfoSent();
309            if(!transacted) {
310                if (acknowledgeMode==Session.SESSION_TRANSACTED) {
311                    throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
312                } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
313                    throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
314                            "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
315                }
316            }
317            return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
318                ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
319        }
320    
321        /**
322         * @return sessionId
323         */
324        protected SessionId getNextSessionId() {
325            return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
326        }
327    
328        /**
329         * Gets the client identifier for this connection.
330         * <P>
331         * This value is specific to the JMS provider. It is either preconfigured by
332         * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
333         * dynamically by the application by calling the <code>setClientID</code>
334         * method.
335         *
336         * @return the unique client identifier
337         * @throws JMSException if the JMS provider fails to return the client ID
338         *                 for this connection due to some internal error.
339         */
340        public String getClientID() throws JMSException {
341            checkClosedOrFailed();
342            return this.info.getClientId();
343        }
344    
345        /**
346         * Sets the client identifier for this connection.
347         * <P>
348         * The preferred way to assign a JMS client's client identifier is for it to
349         * be configured in a client-specific <CODE>ConnectionFactory</CODE>
350         * object and transparently assigned to the <CODE>Connection</CODE> object
351         * it creates.
352         * <P>
353         * Alternatively, a client can set a connection's client identifier using a
354         * provider-specific value. The facility to set a connection's client
355         * identifier explicitly is not a mechanism for overriding the identifier
356         * that has been administratively configured. It is provided for the case
357         * where no administratively specified identifier exists. If one does exist,
358         * an attempt to change it by setting it must throw an
359         * <CODE>IllegalStateException</CODE>. If a client sets the client
360         * identifier explicitly, it must do so immediately after it creates the
361         * connection and before any other action on the connection is taken. After
362         * this point, setting the client identifier is a programming error that
363         * should throw an <CODE>IllegalStateException</CODE>.
364         * <P>
365         * The purpose of the client identifier is to associate a connection and its
366         * objects with a state maintained on behalf of the client by a provider.
367         * The only such state identified by the JMS API is that required to support
368         * durable subscriptions.
369         * <P>
370         * If another connection with the same <code>clientID</code> is already
371         * running when this method is called, the JMS provider should detect the
372         * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
373         *
374         * @param newClientID the unique client identifier
375         * @throws JMSException if the JMS provider fails to set the client ID for
376         *                 this connection due to some internal error.
377         * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
378         *                 invalid or duplicate client ID.
379         * @throws javax.jms.IllegalStateException if the JMS client attempts to set
380         *                 a connection's client ID at the wrong time or when it has
381         *                 been administratively configured.
382         */
383        public void setClientID(String newClientID) throws JMSException {
384            checkClosedOrFailed();
385    
386            if (this.clientIDSet) {
387                throw new IllegalStateException("The clientID has already been set");
388            }
389    
390            if (this.isConnectionInfoSentToBroker) {
391                throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
392            }
393    
394            this.info.setClientId(newClientID);
395            this.userSpecifiedClientID = true;
396            ensureConnectionInfoSent();
397        }
398    
399        /**
400         * Sets the default client id that the connection will use if explicitly not
401         * set with the setClientId() call.
402         */
403        public void setDefaultClientID(String clientID) throws JMSException {
404            this.info.setClientId(clientID);
405            this.userSpecifiedClientID = true;
406        }
407    
408        /**
409         * Gets the metadata for this connection.
410         *
411         * @return the connection metadata
412         * @throws JMSException if the JMS provider fails to get the connection
413         *                 metadata for this connection.
414         * @see javax.jms.ConnectionMetaData
415         */
416        public ConnectionMetaData getMetaData() throws JMSException {
417            checkClosedOrFailed();
418            return ActiveMQConnectionMetaData.INSTANCE;
419        }
420    
421        /**
422         * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
423         * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
424         * associated with it.
425         *
426         * @return the <CODE>ExceptionListener</CODE> for this connection, or
427         *         null, if no <CODE>ExceptionListener</CODE> is associated with
428         *         this connection.
429         * @throws JMSException if the JMS provider fails to get the
430         *                 <CODE>ExceptionListener</CODE> for this connection.
431         * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
432         */
433        public ExceptionListener getExceptionListener() throws JMSException {
434            checkClosedOrFailed();
435            return this.exceptionListener;
436        }
437    
438        /**
439         * Sets an exception listener for this connection.
440         * <P>
441         * If a JMS provider detects a serious problem with a connection, it informs
442         * the connection's <CODE> ExceptionListener</CODE>, if one has been
443         * registered. It does this by calling the listener's <CODE>onException
444         * </CODE>
445         * method, passing it a <CODE>JMSException</CODE> object describing the
446         * problem.
447         * <P>
448         * An exception listener allows a client to be notified of a problem
449         * asynchronously. Some connections only consume messages, so they would
450         * have no other way to learn their connection has failed.
451         * <P>
452         * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
453         * <P>
454         * A JMS provider should attempt to resolve connection problems itself
455         * before it notifies the client of them.
456         *
457         * @param listener the exception listener
458         * @throws JMSException if the JMS provider fails to set the exception
459         *                 listener for this connection.
460         */
461        public void setExceptionListener(ExceptionListener listener) throws JMSException {
462            checkClosedOrFailed();
463            this.exceptionListener = listener;
464        }
465    
466        /**
467         * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
468         * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
469         * associated with it.
470         *
471         * @return the listener or <code>null</code> if no listener is registered with the connection.
472         */
473        public ClientInternalExceptionListener getClientInternalExceptionListener()
474        {
475            return clientInternalExceptionListener;
476        }
477    
478        /**
479         * Sets a client internal exception listener for this connection.
480         * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
481         * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
482         * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
483         * describing the problem.
484         *
485         * @param listener the exception listener
486         */
487        public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
488        {
489            this.clientInternalExceptionListener = listener;
490        }
491    
492        /**
493         * Starts (or restarts) a connection's delivery of incoming messages. A call
494         * to <CODE>start</CODE> on a connection that has already been started is
495         * ignored.
496         *
497         * @throws JMSException if the JMS provider fails to start message delivery
498         *                 due to some internal error.
499         * @see javax.jms.Connection#stop()
500         */
501        public void start() throws JMSException {
502            checkClosedOrFailed();
503            ensureConnectionInfoSent();
504            if (started.compareAndSet(false, true)) {
505                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
506                    ActiveMQSession session = i.next();
507                    session.start();
508                }
509            }
510        }
511    
512        /**
513         * Temporarily stops a connection's delivery of incoming messages. Delivery
514         * can be restarted using the connection's <CODE>start</CODE> method. When
515         * the connection is stopped, delivery to all the connection's message
516         * consumers is inhibited: synchronous receives block, and messages are not
517         * delivered to message listeners.
518         * <P>
519         * This call blocks until receives and/or message listeners in progress have
520         * completed.
521         * <P>
522         * Stopping a connection has no effect on its ability to send messages. A
523         * call to <CODE>stop</CODE> on a connection that has already been stopped
524         * is ignored.
525         * <P>
526         * A call to <CODE>stop</CODE> must not return until delivery of messages
527         * has paused. This means that a client can rely on the fact that none of
528         * its message listeners will be called and that all threads of control
529         * waiting for <CODE>receive</CODE> calls to return will not return with a
530         * message until the connection is restarted. The receive timers for a
531         * stopped connection continue to advance, so receives may time out while
532         * the connection is stopped.
533         * <P>
534         * If message listeners are running when <CODE>stop</CODE> is invoked, the
535         * <CODE>stop</CODE> call must wait until all of them have returned before
536         * it may return. While these message listeners are completing, they must
537         * have the full services of the connection available to them.
538         *
539         * @throws JMSException if the JMS provider fails to stop message delivery
540         *                 due to some internal error.
541         * @see javax.jms.Connection#start()
542         */
543        public void stop() throws JMSException {
544            checkClosedOrFailed();
545            if (started.compareAndSet(true, false)) {
546                synchronized(sessions) {
547                    for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
548                        ActiveMQSession s = i.next();
549                        s.stop();
550                    }
551                }
552            }
553        }
554    
555        /**
556         * Closes the connection.
557         * <P>
558         * Since a provider typically allocates significant resources outside the
559         * JVM on behalf of a connection, clients should close these resources when
560         * they are not needed. Relying on garbage collection to eventually reclaim
561         * these resources may not be timely enough.
562         * <P>
563         * There is no need to close the sessions, producers, and consumers of a
564         * closed connection.
565         * <P>
566         * Closing a connection causes all temporary destinations to be deleted.
567         * <P>
568         * When this method is invoked, it should not return until message
569         * processing has been shut down in an orderly fashion. This means that all
570         * message listeners that may have been running have returned, and that all
571         * pending receives have returned. A close terminates all pending message
572         * receives on the connection's sessions' consumers. The receives may return
573         * with a message or with null, depending on whether there was a message
574         * available at the time of the close. If one or more of the connection's
575         * sessions' message listeners is processing a message at the time when
576         * connection <CODE>close</CODE> is invoked, all the facilities of the
577         * connection and its sessions must remain available to those listeners
578         * until they return control to the JMS provider.
579         * <P>
580         * Closing a connection causes any of its sessions' transactions in progress
581         * to be rolled back. In the case where a session's work is coordinated by
582         * an external transaction manager, a session's <CODE>commit</CODE> and
583         * <CODE> rollback</CODE> methods are not used and the result of a closed
584         * session's work is determined later by the transaction manager. Closing a
585         * connection does NOT force an acknowledgment of client-acknowledged
586         * sessions.
587         * <P>
588         * Invoking the <CODE>acknowledge</CODE> method of a received message from
589         * a closed connection's session must throw an
590         * <CODE>IllegalStateException</CODE>. Closing a closed connection must
591         * NOT throw an exception.
592         *
593         * @throws JMSException if the JMS provider fails to close the connection
594         *                 due to some internal error. For example, a failure to
595         *                 release resources or to close a socket connection can
596         *                 cause this exception to be thrown.
597         */
598        public void close() throws JMSException {
599            // Store the interrupted state and clear so that cleanup happens without
600            // leaking connection resources.  Reset in finally to preserve state.
601            boolean interrupted = Thread.interrupted();
602    
603            try {
604    
605                // If we were running, lets stop first.
606                if (!closed.get() && !transportFailed.get()) {
607                    stop();
608                }
609    
610                synchronized (this) {
611                    if (!closed.get()) {
612                        closing.set(true);
613    
614                        if (destinationSource != null) {
615                            destinationSource.stop();
616                            destinationSource = null;
617                        }
618                        if (advisoryConsumer != null) {
619                            advisoryConsumer.dispose();
620                            advisoryConsumer = null;
621                        }
622    
623                        Scheduler scheduler = this.scheduler;
624                        if (scheduler != null) {
625                            try {
626                                scheduler.stop();
627                            } catch (Exception e) {
628                                JMSException ex =  JMSExceptionSupport.create(e);
629                                throw ex;
630                            }
631                        }
632    
633                        long lastDeliveredSequenceId = 0;
634                        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
635                            ActiveMQSession s = i.next();
636                            s.dispose();
637                            lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
638                        }
639                        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
640                            ActiveMQConnectionConsumer c = i.next();
641                            c.dispose();
642                        }
643                        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
644                            ActiveMQInputStream c = i.next();
645                            c.dispose();
646                        }
647                        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
648                            ActiveMQOutputStream c = i.next();
649                            c.dispose();
650                        }
651    
652                        // As TemporaryQueue and TemporaryTopic instances are bound
653                        // to a connection we should just delete them after the connection
654                        // is closed to free up memory
655                        for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
656                            ActiveMQTempDestination c = i.next();
657                            c.delete();
658                        }
659    
660                        if (isConnectionInfoSentToBroker) {
661                            // If we announced ourselfs to the broker.. Try to let
662                            // the broker
663                            // know that the connection is being shutdown.
664                            RemoveInfo removeCommand = info.createRemoveCommand();
665                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
666                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
667                            doAsyncSendPacket(new ShutdownInfo());
668                        }
669    
670                        started.set(false);
671    
672                        // TODO if we move the TaskRunnerFactory to the connection
673                        // factory
674                        // then we may need to call
675                        // factory.onConnectionClose(this);
676                        if (sessionTaskRunner != null) {
677                            sessionTaskRunner.shutdown();
678                        }
679                        closed.set(true);
680                        closing.set(false);
681                    }
682                }
683            } finally {
684                try {
685                    if (executor != null) {
686                        executor.shutdown();
687                    }
688                } catch (Throwable e) {
689                    LOG.error("Error shutting down thread pool " + e, e);
690                }
691    
692                ServiceSupport.dispose(this.transport);
693    
694                factoryStats.removeConnection(this);
695                if (interrupted) {
696                    Thread.currentThread().interrupt();
697                }
698            }
699        }
700    
701        /**
702         * Tells the broker to terminate its VM. This can be used to cleanly
703         * terminate a broker running in a standalone java process. Server must have
704         * property enable.vm.shutdown=true defined to allow this to work.
705         */
706        // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
707        // implemented.
708        /*
709         * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
710         * command = new BrokerAdminCommand();
711         * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
712         * asyncSendPacket(command); }
713         */
714    
715        /**
716         * Create a durable connection consumer for this connection (optional
717         * operation). This is an expert facility not used by regular JMS clients.
718         *
719         * @param topic topic to access
720         * @param subscriptionName durable subscription name
721         * @param messageSelector only messages with properties matching the message
722         *                selector expression are delivered. A value of null or an
723         *                empty string indicates that there is no message selector
724         *                for the message consumer.
725         * @param sessionPool the server session pool to associate with this durable
726         *                connection consumer
727         * @param maxMessages the maximum number of messages that can be assigned to
728         *                a server session at one time
729         * @return the durable connection consumer
730         * @throws JMSException if the <CODE>Connection</CODE> object fails to
731         *                 create a connection consumer due to some internal error
732         *                 or invalid arguments for <CODE>sessionPool</CODE> and
733         *                 <CODE>messageSelector</CODE>.
734         * @throws javax.jms.InvalidDestinationException if an invalid destination
735         *                 is specified.
736         * @throws javax.jms.InvalidSelectorException if the message selector is
737         *                 invalid.
738         * @see javax.jms.ConnectionConsumer
739         * @since 1.1
740         */
741        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
742            throws JMSException {
743            return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
744        }
745    
746        /**
747         * Create a durable connection consumer for this connection (optional
748         * operation). This is an expert facility not used by regular JMS clients.
749         *
750         * @param topic topic to access
751         * @param subscriptionName durable subscription name
752         * @param messageSelector only messages with properties matching the message
753         *                selector expression are delivered. A value of null or an
754         *                empty string indicates that there is no message selector
755         *                for the message consumer.
756         * @param sessionPool the server session pool to associate with this durable
757         *                connection consumer
758         * @param maxMessages the maximum number of messages that can be assigned to
759         *                a server session at one time
760         * @param noLocal set true if you want to filter out messages published
761         *                locally
762         * @return the durable connection consumer
763         * @throws JMSException if the <CODE>Connection</CODE> object fails to
764         *                 create a connection consumer due to some internal error
765         *                 or invalid arguments for <CODE>sessionPool</CODE> and
766         *                 <CODE>messageSelector</CODE>.
767         * @throws javax.jms.InvalidDestinationException if an invalid destination
768         *                 is specified.
769         * @throws javax.jms.InvalidSelectorException if the message selector is
770         *                 invalid.
771         * @see javax.jms.ConnectionConsumer
772         * @since 1.1
773         */
774        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
775                                                                  boolean noLocal) throws JMSException {
776            checkClosedOrFailed();
777            ensureConnectionInfoSent();
778            SessionId sessionId = new SessionId(info.getConnectionId(), -1);
779            ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
780            info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
781            info.setSubscriptionName(subscriptionName);
782            info.setSelector(messageSelector);
783            info.setPrefetchSize(maxMessages);
784            info.setDispatchAsync(isDispatchAsync());
785    
786            // Allows the options on the destination to configure the consumerInfo
787            if (info.getDestination().getOptions() != null) {
788                Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
789                IntrospectionSupport.setProperties(this.info, options, "consumer.");
790            }
791    
792            return new ActiveMQConnectionConsumer(this, sessionPool, info);
793        }
794    
795        // Properties
796        // -------------------------------------------------------------------------
797    
798        /**
799         * Returns true if this connection has been started
800         *
801         * @return true if this Connection is started
802         */
803        public boolean isStarted() {
804            return started.get();
805        }
806    
807        /**
808         * Returns true if the connection is closed
809         */
810        public boolean isClosed() {
811            return closed.get();
812        }
813    
814        /**
815         * Returns true if the connection is in the process of being closed
816         */
817        public boolean isClosing() {
818            return closing.get();
819        }
820    
821        /**
822         * Returns true if the underlying transport has failed
823         */
824        public boolean isTransportFailed() {
825            return transportFailed.get();
826        }
827    
828        /**
829         * @return Returns the prefetchPolicy.
830         */
831        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
832            return prefetchPolicy;
833        }
834    
835        /**
836         * Sets the <a
837         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
838         * policy</a> for consumers created by this connection.
839         */
840        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
841            this.prefetchPolicy = prefetchPolicy;
842        }
843    
844        /**
845         */
846        public Transport getTransportChannel() {
847            return transport;
848        }
849    
850        /**
851         * @return Returns the clientID of the connection, forcing one to be
852         *         generated if one has not yet been configured.
853         */
854        public String getInitializedClientID() throws JMSException {
855            ensureConnectionInfoSent();
856            return info.getClientId();
857        }
858    
859        /**
860         * @return Returns the timeStampsDisableByDefault.
861         */
862        public boolean isDisableTimeStampsByDefault() {
863            return disableTimeStampsByDefault;
864        }
865    
866        /**
867         * Sets whether or not timestamps on messages should be disabled or not. If
868         * you disable them it adds a small performance boost.
869         */
870        public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
871            this.disableTimeStampsByDefault = timeStampsDisableByDefault;
872        }
873    
874        /**
875         * @return Returns the dispatchOptimizedMessage.
876         */
877        public boolean isOptimizedMessageDispatch() {
878            return optimizedMessageDispatch;
879        }
880    
881        /**
882         * If this flag is set then an larger prefetch limit is used - only
883         * applicable for durable topic subscribers.
884         */
885        public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
886            this.optimizedMessageDispatch = dispatchOptimizedMessage;
887        }
888    
889        /**
890         * @return Returns the closeTimeout.
891         */
892        public int getCloseTimeout() {
893            return closeTimeout;
894        }
895    
896        /**
897         * Sets the timeout before a close is considered complete. Normally a
898         * close() on a connection waits for confirmation from the broker; this
899         * allows that operation to timeout to save the client hanging if there is
900         * no broker
901         */
902        public void setCloseTimeout(int closeTimeout) {
903            this.closeTimeout = closeTimeout;
904        }
905    
906        /**
907         * @return ConnectionInfo
908         */
909        public ConnectionInfo getConnectionInfo() {
910            return this.info;
911        }
912    
913        public boolean isUseRetroactiveConsumer() {
914            return useRetroactiveConsumer;
915        }
916    
917        /**
918         * Sets whether or not retroactive consumers are enabled. Retroactive
919         * consumers allow non-durable topic subscribers to receive old messages
920         * that were published before the non-durable subscriber started.
921         */
922        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
923            this.useRetroactiveConsumer = useRetroactiveConsumer;
924        }
925    
926        public boolean isNestedMapAndListEnabled() {
927            return nestedMapAndListEnabled;
928        }
929    
930        /**
931         * Enables/disables whether or not Message properties and MapMessage entries
932         * support <a
933         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
934         * Structures</a> of Map and List objects
935         */
936        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
937            this.nestedMapAndListEnabled = structuredMapsEnabled;
938        }
939    
940        public boolean isExclusiveConsumer() {
941            return exclusiveConsumer;
942        }
943    
944        /**
945         * Enables or disables whether or not queue consumers should be exclusive or
946         * not for example to preserve ordering when not using <a
947         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
948         *
949         * @param exclusiveConsumer
950         */
951        public void setExclusiveConsumer(boolean exclusiveConsumer) {
952            this.exclusiveConsumer = exclusiveConsumer;
953        }
954    
955        /**
956         * Adds a transport listener so that a client can be notified of events in
957         * the underlying transport
958         */
959        public void addTransportListener(TransportListener transportListener) {
960            transportListeners.add(transportListener);
961        }
962    
963        public void removeTransportListener(TransportListener transportListener) {
964            transportListeners.remove(transportListener);
965        }
966    
967        public boolean isUseDedicatedTaskRunner() {
968            return useDedicatedTaskRunner;
969        }
970    
971        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
972            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
973        }
974    
975        public TaskRunnerFactory getSessionTaskRunner() {
976            synchronized (this) {
977                if (sessionTaskRunner == null) {
978                    sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
979                }
980            }
981            return sessionTaskRunner;
982        }
983    
984        public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
985            this.sessionTaskRunner = sessionTaskRunner;
986        }
987    
988        public MessageTransformer getTransformer() {
989            return transformer;
990        }
991    
992        /**
993         * Sets the transformer used to transform messages before they are sent on
994         * to the JMS bus or when they are received from the bus but before they are
995         * delivered to the JMS client
996         */
997        public void setTransformer(MessageTransformer transformer) {
998            this.transformer = transformer;
999        }
1000    
1001        /**
1002         * @return the statsEnabled
1003         */
1004        public boolean isStatsEnabled() {
1005            return this.stats.isEnabled();
1006        }
1007    
1008        /**
1009         * @param statsEnabled the statsEnabled to set
1010         */
1011        public void setStatsEnabled(boolean statsEnabled) {
1012            this.stats.setEnabled(statsEnabled);
1013        }
1014    
1015        /**
1016         * Returns the {@link DestinationSource} object which can be used to listen to destinations
1017         * being created or destroyed or to enquire about the current destinations available on the broker
1018         *
1019         * @return a lazily created destination source
1020         * @throws JMSException
1021         */
1022        public DestinationSource getDestinationSource() throws JMSException {
1023            if (destinationSource == null) {
1024                destinationSource = new DestinationSource(this);
1025                destinationSource.start();
1026            }
1027            return destinationSource;
1028        }
1029    
1030        // Implementation methods
1031        // -------------------------------------------------------------------------
1032    
1033        /**
1034         * Used internally for adding Sessions to the Connection
1035         *
1036         * @param session
1037         * @throws JMSException
1038         * @throws JMSException
1039         */
1040        protected void addSession(ActiveMQSession session) throws JMSException {
1041            this.sessions.add(session);
1042            if (sessions.size() > 1 || session.isTransacted()) {
1043                optimizedMessageDispatch = false;
1044            }
1045        }
1046    
1047        /**
1048         * Used interanlly for removing Sessions from a Connection
1049         *
1050         * @param session
1051         */
1052        protected void removeSession(ActiveMQSession session) {
1053            this.sessions.remove(session);
1054            this.removeDispatcher(session);
1055        }
1056    
1057        /**
1058         * Add a ConnectionConsumer
1059         *
1060         * @param connectionConsumer
1061         * @throws JMSException
1062         */
1063        protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1064            this.connectionConsumers.add(connectionConsumer);
1065        }
1066    
1067        /**
1068         * Remove a ConnectionConsumer
1069         *
1070         * @param connectionConsumer
1071         */
1072        protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1073            this.connectionConsumers.remove(connectionConsumer);
1074            this.removeDispatcher(connectionConsumer);
1075        }
1076    
1077        /**
1078         * Creates a <CODE>TopicSession</CODE> object.
1079         *
1080         * @param transacted indicates whether the session is transacted
1081         * @param acknowledgeMode indicates whether the consumer or the client will
1082         *                acknowledge any messages it receives; ignored if the
1083         *                session is transacted. Legal values are
1084         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1085         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1086         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1087         * @return a newly created topic session
1088         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1089         *                 to create a session due to some internal error or lack of
1090         *                 support for the specific transaction and acknowledgement
1091         *                 mode.
1092         * @see Session#AUTO_ACKNOWLEDGE
1093         * @see Session#CLIENT_ACKNOWLEDGE
1094         * @see Session#DUPS_OK_ACKNOWLEDGE
1095         */
1096        public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1097            return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1098        }
1099    
1100        /**
1101         * Creates a connection consumer for this connection (optional operation).
1102         * This is an expert facility not used by regular JMS clients.
1103         *
1104         * @param topic the topic to access
1105         * @param messageSelector only messages with properties matching the message
1106         *                selector expression are delivered. A value of null or an
1107         *                empty string indicates that there is no message selector
1108         *                for the message consumer.
1109         * @param sessionPool the server session pool to associate with this
1110         *                connection consumer
1111         * @param maxMessages the maximum number of messages that can be assigned to
1112         *                a server session at one time
1113         * @return the connection consumer
1114         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1115         *                 to create a connection consumer due to some internal
1116         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1117         *                 and <CODE>messageSelector</CODE>.
1118         * @throws javax.jms.InvalidDestinationException if an invalid topic is
1119         *                 specified.
1120         * @throws javax.jms.InvalidSelectorException if the message selector is
1121         *                 invalid.
1122         * @see javax.jms.ConnectionConsumer
1123         */
1124        public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1125            return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1126        }
1127    
1128        /**
1129         * Creates a connection consumer for this connection (optional operation).
1130         * This is an expert facility not used by regular JMS clients.
1131         *
1132         * @param queue the queue to access
1133         * @param messageSelector only messages with properties matching the message
1134         *                selector expression are delivered. A value of null or an
1135         *                empty string indicates that there is no message selector
1136         *                for the message consumer.
1137         * @param sessionPool the server session pool to associate with this
1138         *                connection consumer
1139         * @param maxMessages the maximum number of messages that can be assigned to
1140         *                a server session at one time
1141         * @return the connection consumer
1142         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1143         *                 to create a connection consumer due to some internal
1144         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1145         *                 and <CODE>messageSelector</CODE>.
1146         * @throws javax.jms.InvalidDestinationException if an invalid queue is
1147         *                 specified.
1148         * @throws javax.jms.InvalidSelectorException if the message selector is
1149         *                 invalid.
1150         * @see javax.jms.ConnectionConsumer
1151         */
1152        public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1153            return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1154        }
1155    
1156        /**
1157         * Creates a connection consumer for this connection (optional operation).
1158         * This is an expert facility not used by regular JMS clients.
1159         *
1160         * @param destination the destination to access
1161         * @param messageSelector only messages with properties matching the message
1162         *                selector expression are delivered. A value of null or an
1163         *                empty string indicates that there is no message selector
1164         *                for the message consumer.
1165         * @param sessionPool the server session pool to associate with this
1166         *                connection consumer
1167         * @param maxMessages the maximum number of messages that can be assigned to
1168         *                a server session at one time
1169         * @return the connection consumer
1170         * @throws JMSException if the <CODE>Connection</CODE> object fails to
1171         *                 create a connection consumer due to some internal error
1172         *                 or invalid arguments for <CODE>sessionPool</CODE> and
1173         *                 <CODE>messageSelector</CODE>.
1174         * @throws javax.jms.InvalidDestinationException if an invalid destination
1175         *                 is specified.
1176         * @throws javax.jms.InvalidSelectorException if the message selector is
1177         *                 invalid.
1178         * @see javax.jms.ConnectionConsumer
1179         * @since 1.1
1180         */
1181        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1182            return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1183        }
1184    
1185        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1186            throws JMSException {
1187    
1188            checkClosedOrFailed();
1189            ensureConnectionInfoSent();
1190    
1191            ConsumerId consumerId = createConsumerId();
1192            ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1193            consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1194            consumerInfo.setSelector(messageSelector);
1195            consumerInfo.setPrefetchSize(maxMessages);
1196            consumerInfo.setNoLocal(noLocal);
1197            consumerInfo.setDispatchAsync(isDispatchAsync());
1198    
1199            // Allows the options on the destination to configure the consumerInfo
1200            if (consumerInfo.getDestination().getOptions() != null) {
1201                Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1202                IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1203            }
1204    
1205            return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1206        }
1207    
1208        /**
1209         * @return
1210         */
1211        private ConsumerId createConsumerId() {
1212            return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1213        }
1214    
1215        /**
1216         * @return
1217         */
1218        private ProducerId createProducerId() {
1219            return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1220        }
1221    
1222        /**
1223         * Creates a <CODE>QueueSession</CODE> object.
1224         *
1225         * @param transacted indicates whether the session is transacted
1226         * @param acknowledgeMode indicates whether the consumer or the client will
1227         *                acknowledge any messages it receives; ignored if the
1228         *                session is transacted. Legal values are
1229         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1230         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1231         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1232         * @return a newly created queue session
1233         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1234         *                 to create a session due to some internal error or lack of
1235         *                 support for the specific transaction and acknowledgement
1236         *                 mode.
1237         * @see Session#AUTO_ACKNOWLEDGE
1238         * @see Session#CLIENT_ACKNOWLEDGE
1239         * @see Session#DUPS_OK_ACKNOWLEDGE
1240         */
1241        public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1242            return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1243        }
1244    
1245        /**
1246         * Ensures that the clientID was manually specified and not auto-generated.
1247         * If the clientID was not specified this method will throw an exception.
1248         * This method is used to ensure that the clientID + durableSubscriber name
1249         * are used correctly.
1250         *
1251         * @throws JMSException
1252         */
1253        public void checkClientIDWasManuallySpecified() throws JMSException {
1254            if (!userSpecifiedClientID) {
1255                throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1256            }
1257        }
1258    
1259        /**
1260         * send a Packet through the Connection - for internal use only
1261         *
1262         * @param command
1263         * @throws JMSException
1264         */
1265        public void asyncSendPacket(Command command) throws JMSException {
1266            if (isClosed()) {
1267                throw new ConnectionClosedException();
1268            } else {
1269                doAsyncSendPacket(command);
1270            }
1271        }
1272    
1273        private void doAsyncSendPacket(Command command) throws JMSException {
1274            try {
1275                this.transport.oneway(command);
1276            } catch (IOException e) {
1277                throw JMSExceptionSupport.create(e);
1278            }
1279        }
1280    
1281        /**
1282         * Send a packet through a Connection - for internal use only
1283         *
1284         * @param command
1285         * @return
1286         * @throws JMSException
1287         */
1288        public void syncSendPacket(Command command, final AsyncCallback onComplete) throws JMSException {
1289            if(onComplete==null) {
1290                syncSendPacket(command);
1291            } else {
1292                if (isClosed()) {
1293                    throw new ConnectionClosedException();
1294                }
1295                try {
1296                    this.transport.asyncRequest(command, new ResponseCallback() {
1297                        @Override
1298                        public void onCompletion(FutureResponse resp) {
1299                            Response response;
1300                            Throwable exception = null;
1301                            try {
1302                                response = resp.getResult();
1303                                if (response.isException()) {
1304                                    ExceptionResponse er = (ExceptionResponse)response;
1305                                    exception = er.getException();
1306                                }
1307                            } catch (Exception e) {
1308                                exception = e;
1309                            }
1310                            if(exception!=null) {
1311                                if ( exception instanceof JMSException) {
1312                                    onComplete.onException((JMSException) exception);
1313                                } else {
1314                                    if (isClosed()||closing.get()) {
1315                                        LOG.debug("Received an exception but connection is closing");
1316                                    }
1317                                    JMSException jmsEx = null;
1318                                    try {
1319                                        jmsEx = JMSExceptionSupport.create(exception);
1320                                    } catch(Throwable e) {
1321                                        LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1322                                    }
1323                                    //dispose of transport for security exceptions
1324                                    if (exception instanceof SecurityException){
1325                                        Transport t = transport;
1326                                        if (null != t){
1327                                            ServiceSupport.dispose(t);
1328                                        }
1329                                    }
1330                                    if (jmsEx !=null) {
1331                                        onComplete.onException(jmsEx);
1332                                    }
1333                                }
1334                            } else {
1335                                onComplete.onSuccess();
1336                            }
1337                        }
1338                    });
1339                } catch (IOException e) {
1340                    throw JMSExceptionSupport.create(e);
1341                }
1342            }
1343        }
1344    
1345        public Response syncSendPacket(Command command) throws JMSException {
1346            if (isClosed()) {
1347                throw new ConnectionClosedException();
1348            } else {
1349    
1350                try {
1351                    Response response = (Response)this.transport.request(command);
1352                    if (response.isException()) {
1353                        ExceptionResponse er = (ExceptionResponse)response;
1354                        if (er.getException() instanceof JMSException) {
1355                            throw (JMSException)er.getException();
1356                        } else {
1357                            if (isClosed()||closing.get()) {
1358                                LOG.debug("Received an exception but connection is closing");
1359                            }
1360                            JMSException jmsEx = null;
1361                            try {
1362                                jmsEx = JMSExceptionSupport.create(er.getException());
1363                            } catch(Throwable e) {
1364                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1365                            }
1366                            //dispose of transport for security exceptions
1367                            if (er.getException() instanceof SecurityException){
1368                                Transport t = this.transport;
1369                                if (null != t){
1370                                    ServiceSupport.dispose(t);
1371                                }
1372                            }
1373                            if (jmsEx !=null) {
1374                                throw jmsEx;
1375                            }
1376                        }
1377                    }
1378                    return response;
1379                } catch (IOException e) {
1380                    throw JMSExceptionSupport.create(e);
1381                }
1382            }
1383        }
1384    
1385        /**
1386         * Send a packet through a Connection - for internal use only
1387         *
1388         * @param command
1389         * @return
1390         * @throws JMSException
1391         */
1392        public Response syncSendPacket(Command command, int timeout) throws JMSException {
1393            if (isClosed() || closing.get()) {
1394                throw new ConnectionClosedException();
1395            } else {
1396                return doSyncSendPacket(command, timeout);
1397            }
1398        }
1399    
1400        private Response doSyncSendPacket(Command command, int timeout)
1401                throws JMSException {
1402            try {
1403                Response response = (Response) (timeout > 0
1404                        ? this.transport.request(command, timeout)
1405                        : this.transport.request(command));
1406                if (response != null && response.isException()) {
1407                    ExceptionResponse er = (ExceptionResponse)response;
1408                    if (er.getException() instanceof JMSException) {
1409                        throw (JMSException)er.getException();
1410                    } else {
1411                        throw JMSExceptionSupport.create(er.getException());
1412                    }
1413                }
1414                return response;
1415            } catch (IOException e) {
1416                throw JMSExceptionSupport.create(e);
1417            }
1418        }
1419    
1420        /**
1421         * @return statistics for this Connection
1422         */
1423        public StatsImpl getStats() {
1424            return stats;
1425        }
1426    
1427        /**
1428         * simply throws an exception if the Connection is already closed or the
1429         * Transport has failed
1430         *
1431         * @throws JMSException
1432         */
1433        protected synchronized void checkClosedOrFailed() throws JMSException {
1434            checkClosed();
1435            if (transportFailed.get()) {
1436                throw new ConnectionFailedException(firstFailureError);
1437            }
1438        }
1439    
1440        /**
1441         * simply throws an exception if the Connection is already closed
1442         *
1443         * @throws JMSException
1444         */
1445        protected synchronized void checkClosed() throws JMSException {
1446            if (closed.get()) {
1447                throw new ConnectionClosedException();
1448            }
1449        }
1450    
1451        /**
1452         * Send the ConnectionInfo to the Broker
1453         *
1454         * @throws JMSException
1455         */
1456        protected void ensureConnectionInfoSent() throws JMSException {
1457            synchronized(this.ensureConnectionInfoSentMutex) {
1458                // Can we skip sending the ConnectionInfo packet??
1459                if (isConnectionInfoSentToBroker || closed.get()) {
1460                    return;
1461                }
1462                //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1463                if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1464                    info.setClientId(clientIdGenerator.generateId());
1465                }
1466                syncSendPacket(info.copy());
1467    
1468                this.isConnectionInfoSentToBroker = true;
1469                // Add a temp destination advisory consumer so that
1470                // We know what the valid temporary destinations are on the
1471                // broker without having to do an RPC to the broker.
1472    
1473                ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1474                if (watchTopicAdvisories) {
1475                    advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1476                }
1477            }
1478        }
1479    
1480        public synchronized boolean isWatchTopicAdvisories() {
1481            return watchTopicAdvisories;
1482        }
1483    
1484        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1485            this.watchTopicAdvisories = watchTopicAdvisories;
1486        }
1487    
1488        /**
1489         * @return Returns the useAsyncSend.
1490         */
1491        public boolean isUseAsyncSend() {
1492            return useAsyncSend;
1493        }
1494    
1495        /**
1496         * Forces the use of <a
1497         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1498         * adds a massive performance boost; but means that the send() method will
1499         * return immediately whether the message has been sent or not which could
1500         * lead to message loss.
1501         */
1502        public void setUseAsyncSend(boolean useAsyncSend) {
1503            this.useAsyncSend = useAsyncSend;
1504        }
1505    
1506        /**
1507         * @return true if always sync send messages
1508         */
1509        public boolean isAlwaysSyncSend() {
1510            return this.alwaysSyncSend;
1511        }
1512    
1513        /**
1514         * Set true if always require messages to be sync sent
1515         *
1516         * @param alwaysSyncSend
1517         */
1518        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1519            this.alwaysSyncSend = alwaysSyncSend;
1520        }
1521    
1522        /**
1523         * @return the messagePrioritySupported
1524         */
1525        public boolean isMessagePrioritySupported() {
1526            return this.messagePrioritySupported;
1527        }
1528    
1529        /**
1530         * @param messagePrioritySupported the messagePrioritySupported to set
1531         */
1532        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1533            this.messagePrioritySupported = messagePrioritySupported;
1534        }
1535    
1536        /**
1537         * Cleans up this connection so that it's state is as if the connection was
1538         * just created. This allows the Resource Adapter to clean up a connection
1539         * so that it can be reused without having to close and recreate the
1540         * connection.
1541         */
1542        public void cleanup() throws JMSException {
1543    
1544            if (advisoryConsumer != null && !isTransportFailed()) {
1545                advisoryConsumer.dispose();
1546                advisoryConsumer = null;
1547            }
1548    
1549            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1550                ActiveMQSession s = i.next();
1551                s.dispose();
1552            }
1553            for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1554                ActiveMQConnectionConsumer c = i.next();
1555                c.dispose();
1556            }
1557            for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1558                ActiveMQInputStream c = i.next();
1559                c.dispose();
1560            }
1561            for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1562                ActiveMQOutputStream c = i.next();
1563                c.dispose();
1564            }
1565    
1566            if (isConnectionInfoSentToBroker) {
1567                if (!transportFailed.get() && !closing.get()) {
1568                    syncSendPacket(info.createRemoveCommand());
1569                }
1570                isConnectionInfoSentToBroker = false;
1571            }
1572            if (userSpecifiedClientID) {
1573                info.setClientId(null);
1574                userSpecifiedClientID = false;
1575            }
1576            clientIDSet = false;
1577    
1578            started.set(false);
1579        }
1580    
1581        public void finalize() throws Throwable{
1582            Scheduler s = this.scheduler;
1583            if (s != null){
1584                s.stop();
1585            }
1586        }
1587    
1588        /**
1589         * Changes the associated username/password that is associated with this
1590         * connection. If the connection has been used, you must called cleanup()
1591         * before calling this method.
1592         *
1593         * @throws IllegalStateException if the connection is in used.
1594         */
1595        public void changeUserInfo(String userName, String password) throws JMSException {
1596            if (isConnectionInfoSentToBroker) {
1597                throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1598            }
1599            this.info.setUserName(userName);
1600            this.info.setPassword(password);
1601        }
1602    
1603        /**
1604         * @return Returns the resourceManagerId.
1605         * @throws JMSException
1606         */
1607        public String getResourceManagerId() throws JMSException {
1608            waitForBrokerInfo();
1609            if (brokerInfo == null) {
1610                throw new JMSException("Connection failed before Broker info was received.");
1611            }
1612            return brokerInfo.getBrokerId().getValue();
1613        }
1614    
1615        /**
1616         * Returns the broker name if one is available or null if one is not
1617         * available yet.
1618         */
1619        public String getBrokerName() {
1620            try {
1621                brokerInfoReceived.await(5, TimeUnit.SECONDS);
1622                if (brokerInfo == null) {
1623                    return null;
1624                }
1625                return brokerInfo.getBrokerName();
1626            } catch (InterruptedException e) {
1627                Thread.currentThread().interrupt();
1628                return null;
1629            }
1630        }
1631    
1632        /**
1633         * Returns the broker information if it is available or null if it is not
1634         * available yet.
1635         */
1636        public BrokerInfo getBrokerInfo() {
1637            return brokerInfo;
1638        }
1639    
1640        /**
1641         * @return Returns the RedeliveryPolicy.
1642         * @throws JMSException
1643         */
1644        public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1645            return redeliveryPolicy;
1646        }
1647    
1648        /**
1649         * Sets the redelivery policy to be used when messages are rolled back
1650         */
1651        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1652            this.redeliveryPolicy = redeliveryPolicy;
1653        }
1654    
1655        public BlobTransferPolicy getBlobTransferPolicy() {
1656            if (blobTransferPolicy == null) {
1657                blobTransferPolicy = createBlobTransferPolicy();
1658            }
1659            return blobTransferPolicy;
1660        }
1661    
1662        /**
1663         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1664         * OBjects) are transferred from producers to brokers to consumers
1665         */
1666        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1667            this.blobTransferPolicy = blobTransferPolicy;
1668        }
1669    
1670        /**
1671         * @return Returns the alwaysSessionAsync.
1672         */
1673        public boolean isAlwaysSessionAsync() {
1674            return alwaysSessionAsync;
1675        }
1676    
1677        /**
1678         * If this flag is set then a separate thread is not used for dispatching
1679         * messages for each Session in the Connection. However, a separate thread
1680         * is always used if there is more than one session, or the session isn't in
1681         * auto acknowledge or duplicates ok mode
1682         */
1683        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1684            this.alwaysSessionAsync = alwaysSessionAsync;
1685        }
1686    
1687        /**
1688         * @return Returns the optimizeAcknowledge.
1689         */
1690        public boolean isOptimizeAcknowledge() {
1691            return optimizeAcknowledge;
1692        }
1693    
1694        /**
1695         * Enables an optimised acknowledgement mode where messages are acknowledged
1696         * in batches rather than individually
1697         *
1698         * @param optimizeAcknowledge The optimizeAcknowledge to set.
1699         */
1700        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1701            this.optimizeAcknowledge = optimizeAcknowledge;
1702        }
1703    
1704        /**
1705         * The max time in milliseconds between optimized ack batches
1706         * @param optimizeAcknowledgeTimeOut
1707         */
1708        public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1709            this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1710        }
1711    
1712        public long getOptimizeAcknowledgeTimeOut() {
1713            return optimizeAcknowledgeTimeOut;
1714        }
1715    
1716        public long getWarnAboutUnstartedConnectionTimeout() {
1717            return warnAboutUnstartedConnectionTimeout;
1718        }
1719    
1720        /**
1721         * Enables the timeout from a connection creation to when a warning is
1722         * generated if the connection is not properly started via {@link #start()}
1723         * and a message is received by a consumer. It is a very common gotcha to
1724         * forget to <a
1725         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1726         * the connection</a> so this option makes the default case to create a
1727         * warning if the user forgets. To disable the warning just set the value to <
1728         * 0 (say -1).
1729         */
1730        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1731            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1732        }
1733    
1734        /**
1735         * @return the sendTimeout
1736         */
1737        public int getSendTimeout() {
1738            return sendTimeout;
1739        }
1740    
1741        /**
1742         * @param sendTimeout the sendTimeout to set
1743         */
1744        public void setSendTimeout(int sendTimeout) {
1745            this.sendTimeout = sendTimeout;
1746        }
1747    
1748        /**
1749         * @return the sendAcksAsync
1750         */
1751        public boolean isSendAcksAsync() {
1752            return sendAcksAsync;
1753        }
1754    
1755        /**
1756         * @param sendAcksAsync the sendAcksAsync to set
1757         */
1758        public void setSendAcksAsync(boolean sendAcksAsync) {
1759            this.sendAcksAsync = sendAcksAsync;
1760        }
1761    
1762    
1763        /**
1764         * Returns the time this connection was created
1765         */
1766        public long getTimeCreated() {
1767            return timeCreated;
1768        }
1769    
1770        private void waitForBrokerInfo() throws JMSException {
1771            try {
1772                brokerInfoReceived.await();
1773            } catch (InterruptedException e) {
1774                Thread.currentThread().interrupt();
1775                throw JMSExceptionSupport.create(e);
1776            }
1777        }
1778    
1779        // Package protected so that it can be used in unit tests
1780        public Transport getTransport() {
1781            return transport;
1782        }
1783    
1784        public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1785            producers.put(producerId, producer);
1786        }
1787    
1788        public void removeProducer(ProducerId producerId) {
1789            producers.remove(producerId);
1790        }
1791    
1792        public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1793            dispatchers.put(consumerId, dispatcher);
1794        }
1795    
1796        public void removeDispatcher(ConsumerId consumerId) {
1797            dispatchers.remove(consumerId);
1798        }
1799    
1800        /**
1801         * @param o - the command to consume
1802         */
1803        public void onCommand(final Object o) {
1804            final Command command = (Command)o;
1805            if (!closed.get() && command != null) {
1806                try {
1807                    command.visit(new CommandVisitorAdapter() {
1808                        @Override
1809                        public Response processMessageDispatch(MessageDispatch md) throws Exception {
1810                            waitForTransportInterruptionProcessingToComplete();
1811                            ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1812                            if (dispatcher != null) {
1813                                // Copy in case a embedded broker is dispatching via
1814                                // vm://
1815                                // md.getMessage() == null to signal end of queue
1816                                // browse.
1817                                Message msg = md.getMessage();
1818                                if (msg != null) {
1819                                    msg = msg.copy();
1820                                    msg.setReadOnlyBody(true);
1821                                    msg.setReadOnlyProperties(true);
1822                                    msg.setRedeliveryCounter(md.getRedeliveryCounter());
1823                                    msg.setConnection(ActiveMQConnection.this);
1824                                    md.setMessage(msg);
1825                                }
1826                                dispatcher.dispatch(md);
1827                            }
1828                            return null;
1829                        }
1830    
1831                        @Override
1832                        public Response processProducerAck(ProducerAck pa) throws Exception {
1833                            if (pa != null && pa.getProducerId() != null) {
1834                                ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1835                                if (producer != null) {
1836                                    producer.onProducerAck(pa);
1837                                }
1838                            }
1839                            return null;
1840                        }
1841    
1842                        @Override
1843                        public Response processBrokerInfo(BrokerInfo info) throws Exception {
1844                            brokerInfo = info;
1845                            brokerInfoReceived.countDown();
1846                            optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1847                            getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1848                            return null;
1849                        }
1850    
1851                        @Override
1852                        public Response processConnectionError(final ConnectionError error) throws Exception {
1853                            executor.execute(new Runnable() {
1854                                public void run() {
1855                                    onAsyncException(error.getException());
1856                                }
1857                            });
1858                            return null;
1859                        }
1860    
1861                        @Override
1862                        public Response processControlCommand(ControlCommand command) throws Exception {
1863                            onControlCommand(command);
1864                            return null;
1865                        }
1866    
1867                        @Override
1868                        public Response processConnectionControl(ConnectionControl control) throws Exception {
1869                            onConnectionControl((ConnectionControl)command);
1870                            return null;
1871                        }
1872    
1873                        @Override
1874                        public Response processConsumerControl(ConsumerControl control) throws Exception {
1875                            onConsumerControl((ConsumerControl)command);
1876                            return null;
1877                        }
1878    
1879                        @Override
1880                        public Response processWireFormat(WireFormatInfo info) throws Exception {
1881                            onWireFormatInfo((WireFormatInfo)command);
1882                            return null;
1883                        }
1884                    });
1885                } catch (Exception e) {
1886                    onClientInternalException(e);
1887                }
1888    
1889            }
1890            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1891                TransportListener listener = iter.next();
1892                listener.onCommand(command);
1893            }
1894        }
1895    
1896        protected void onWireFormatInfo(WireFormatInfo info) {
1897            protocolVersion.set(info.getVersion());
1898        }
1899    
1900        /**
1901         * Handles async client internal exceptions.
1902         * A client internal exception is usually one that has been thrown
1903         * by a container runtime component during asynchronous processing of a
1904         * message that does not affect the connection itself.
1905         * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1906         * its <code>onException</code> method, if one has been registered with this connection.
1907         *
1908         * @param error the exception that the problem
1909         */
1910        public void onClientInternalException(final Throwable error) {
1911            if ( !closed.get() && !closing.get() ) {
1912                if ( this.clientInternalExceptionListener != null ) {
1913                    executor.execute(new Runnable() {
1914                        public void run() {
1915                            ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1916                        }
1917                    });
1918                } else {
1919                    LOG.debug("Async client internal exception occurred with no exception listener registered: "
1920                            + error, error);
1921                }
1922            }
1923        }
1924        /**
1925         * Used for handling async exceptions
1926         *
1927         * @param error
1928         */
1929        public void onAsyncException(Throwable error) {
1930            if (!closed.get() && !closing.get()) {
1931                if (this.exceptionListener != null) {
1932    
1933                    if (!(error instanceof JMSException)) {
1934                        error = JMSExceptionSupport.create(error);
1935                    }
1936                    final JMSException e = (JMSException)error;
1937    
1938                    executor.execute(new Runnable() {
1939                        public void run() {
1940                            ActiveMQConnection.this.exceptionListener.onException(e);
1941                        }
1942                    });
1943    
1944                } else {
1945                    LOG.debug("Async exception with no exception listener: " + error, error);
1946                }
1947            }
1948        }
1949    
1950        public void onException(final IOException error) {
1951            onAsyncException(error);
1952            if (!closing.get() && !closed.get()) {
1953                executor.execute(new Runnable() {
1954                    public void run() {
1955                        transportFailed(error);
1956                        ServiceSupport.dispose(ActiveMQConnection.this.transport);
1957                        brokerInfoReceived.countDown();
1958                        try {
1959                            cleanup();
1960                        } catch (JMSException e) {
1961                            LOG.warn("Exception during connection cleanup, " + e, e);
1962                        }
1963                        for (Iterator<TransportListener> iter = transportListeners
1964                                .iterator(); iter.hasNext();) {
1965                            TransportListener listener = iter.next();
1966                            listener.onException(error);
1967                        }
1968                    }
1969                });
1970            }
1971        }
1972    
1973        public void transportInterupted() {
1974            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1975            if (LOG.isDebugEnabled()) {
1976                LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1977            }
1978            signalInterruptionProcessingNeeded();
1979    
1980            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1981                ActiveMQSession s = i.next();
1982                s.clearMessagesInProgress();
1983            }
1984    
1985            for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1986                connectionConsumer.clearMessagesInProgress();
1987            }
1988    
1989            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1990                TransportListener listener = iter.next();
1991                listener.transportInterupted();
1992            }
1993        }
1994    
1995        public void transportResumed() {
1996            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1997                TransportListener listener = iter.next();
1998                listener.transportResumed();
1999            }
2000        }
2001    
2002        /**
2003         * Create the DestinationInfo object for the temporary destination.
2004         *
2005         * @param topic - if its true topic, else queue.
2006         * @return DestinationInfo
2007         * @throws JMSException
2008         */
2009        protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2010    
2011            // Check if Destination info is of temporary type.
2012            ActiveMQTempDestination dest;
2013            if (topic) {
2014                dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2015            } else {
2016                dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2017            }
2018    
2019            DestinationInfo info = new DestinationInfo();
2020            info.setConnectionId(this.info.getConnectionId());
2021            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2022            info.setDestination(dest);
2023            syncSendPacket(info);
2024    
2025            dest.setConnection(this);
2026            activeTempDestinations.put(dest, dest);
2027            return dest;
2028        }
2029    
2030        /**
2031         * @param destination
2032         * @throws JMSException
2033         */
2034        public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2035    
2036            checkClosedOrFailed();
2037    
2038            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2039                ActiveMQSession s = i.next();
2040                if (s.isInUse(destination)) {
2041                    throw new JMSException("A consumer is consuming from the temporary destination");
2042                }
2043            }
2044    
2045            activeTempDestinations.remove(destination);
2046    
2047            DestinationInfo destInfo = new DestinationInfo();
2048            destInfo.setConnectionId(this.info.getConnectionId());
2049            destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2050            destInfo.setDestination(destination);
2051            destInfo.setTimeout(0);
2052            syncSendPacket(destInfo);
2053        }
2054    
2055        public boolean isDeleted(ActiveMQDestination dest) {
2056    
2057            // If we are not watching the advisories.. then
2058            // we will assume that the temp destination does exist.
2059            if (advisoryConsumer == null) {
2060                return false;
2061            }
2062    
2063            return !activeTempDestinations.contains(dest);
2064        }
2065    
2066        public boolean isCopyMessageOnSend() {
2067            return copyMessageOnSend;
2068        }
2069    
2070        public LongSequenceGenerator getLocalTransactionIdGenerator() {
2071            return localTransactionIdGenerator;
2072        }
2073    
2074        public boolean isUseCompression() {
2075            return useCompression;
2076        }
2077    
2078        /**
2079         * Enables the use of compression of the message bodies
2080         */
2081        public void setUseCompression(boolean useCompression) {
2082            this.useCompression = useCompression;
2083        }
2084    
2085        public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2086    
2087            checkClosedOrFailed();
2088            ensureConnectionInfoSent();
2089    
2090            DestinationInfo info = new DestinationInfo();
2091            info.setConnectionId(this.info.getConnectionId());
2092            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2093            info.setDestination(destination);
2094            info.setTimeout(0);
2095            syncSendPacket(info);
2096    
2097        }
2098    
2099        public boolean isDispatchAsync() {
2100            return dispatchAsync;
2101        }
2102    
2103        /**
2104         * Enables or disables the default setting of whether or not consumers have
2105         * their messages <a
2106         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2107         * synchronously or asynchronously by the broker</a>. For non-durable
2108         * topics for example we typically dispatch synchronously by default to
2109         * minimize context switches which boost performance. However sometimes its
2110         * better to go slower to ensure that a single blocked consumer socket does
2111         * not block delivery to other consumers.
2112         *
2113         * @param asyncDispatch If true then consumers created on this connection
2114         *                will default to having their messages dispatched
2115         *                asynchronously. The default value is true.
2116         */
2117        public void setDispatchAsync(boolean asyncDispatch) {
2118            this.dispatchAsync = asyncDispatch;
2119        }
2120    
2121        public boolean isObjectMessageSerializationDefered() {
2122            return objectMessageSerializationDefered;
2123        }
2124    
2125        /**
2126         * When an object is set on an ObjectMessage, the JMS spec requires the
2127         * object to be serialized by that set method. Enabling this flag causes the
2128         * object to not get serialized. The object may subsequently get serialized
2129         * if the message needs to be sent over a socket or stored to disk.
2130         */
2131        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2132            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2133        }
2134    
2135        public InputStream createInputStream(Destination dest) throws JMSException {
2136            return createInputStream(dest, null);
2137        }
2138    
2139        public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2140            return createInputStream(dest, messageSelector, false);
2141        }
2142    
2143        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2144            return createInputStream(dest, messageSelector, noLocal,  -1);
2145        }
2146    
2147    
2148    
2149        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2150            return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2151        }
2152    
2153        public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2154            return createInputStream(dest, null, false);
2155        }
2156    
2157        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2158            return createDurableInputStream(dest, name, messageSelector, false);
2159        }
2160    
2161        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2162            return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2163        }
2164    
2165        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2166            return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2167        }
2168    
2169        private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2170            checkClosedOrFailed();
2171            ensureConnectionInfoSent();
2172            return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2173        }
2174    
2175        /**
2176         * Creates a persistent output stream; individual messages will be written
2177         * to disk/database by the broker
2178         */
2179        public OutputStream createOutputStream(Destination dest) throws JMSException {
2180            return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2181        }
2182    
2183        /**
2184         * Creates a non persistent output stream; messages will not be written to
2185         * disk
2186         */
2187        public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2188            return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2189        }
2190    
2191        /**
2192         * Creates an output stream allowing full control over the delivery mode,
2193         * the priority and time to live of the messages and the properties added to
2194         * messages on the stream.
2195         *
2196         * @param streamProperties defines a map of key-value pairs where the keys
2197         *                are strings and the values are primitive values (numbers
2198         *                and strings) which are appended to the messages similarly
2199         *                to using the
2200         *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2201         *                method
2202         */
2203        public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2204            checkClosedOrFailed();
2205            ensureConnectionInfoSent();
2206            return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2207        }
2208    
2209        /**
2210         * Unsubscribes a durable subscription that has been created by a client.
2211         * <P>
2212         * This method deletes the state being maintained on behalf of the
2213         * subscriber by its provider.
2214         * <P>
2215         * It is erroneous for a client to delete a durable subscription while there
2216         * is an active <CODE>MessageConsumer </CODE> or
2217         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2218         * message is part of a pending transaction or has not been acknowledged in
2219         * the session.
2220         *
2221         * @param name the name used to identify this subscription
2222         * @throws JMSException if the session fails to unsubscribe to the durable
2223         *                 subscription due to some internal error.
2224         * @throws InvalidDestinationException if an invalid subscription name is
2225         *                 specified.
2226         * @since 1.1
2227         */
2228        public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2229            checkClosedOrFailed();
2230            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2231            rsi.setConnectionId(getConnectionInfo().getConnectionId());
2232            rsi.setSubscriptionName(name);
2233            rsi.setClientId(getConnectionInfo().getClientId());
2234            syncSendPacket(rsi);
2235        }
2236    
2237        /**
2238         * Internal send method optimized: - It does not copy the message - It can
2239         * only handle ActiveMQ messages. - You can specify if the send is async or
2240         * sync - Does not allow you to send /w a transaction.
2241         */
2242        void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2243            checkClosedOrFailed();
2244    
2245            if (destination.isTemporary() && isDeleted(destination)) {
2246                throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2247            }
2248    
2249            msg.setJMSDestination(destination);
2250            msg.setJMSDeliveryMode(deliveryMode);
2251            long expiration = 0L;
2252    
2253            if (!isDisableTimeStampsByDefault()) {
2254                long timeStamp = System.currentTimeMillis();
2255                msg.setJMSTimestamp(timeStamp);
2256                if (timeToLive > 0) {
2257                    expiration = timeToLive + timeStamp;
2258                }
2259            }
2260    
2261            msg.setJMSExpiration(expiration);
2262            msg.setJMSPriority(priority);
2263    
2264            msg.setJMSRedelivered(false);
2265            msg.setMessageId(messageId);
2266    
2267            msg.onSend();
2268    
2269            msg.setProducerId(msg.getMessageId().getProducerId());
2270    
2271            if (LOG.isDebugEnabled()) {
2272                LOG.debug("Sending message: " + msg);
2273            }
2274    
2275            if (async) {
2276                asyncSendPacket(msg);
2277            } else {
2278                syncSendPacket(msg);
2279            }
2280    
2281        }
2282    
2283        public void addOutputStream(ActiveMQOutputStream stream) {
2284            outputStreams.add(stream);
2285        }
2286    
2287        public void removeOutputStream(ActiveMQOutputStream stream) {
2288            outputStreams.remove(stream);
2289        }
2290    
2291        public void addInputStream(ActiveMQInputStream stream) {
2292            inputStreams.add(stream);
2293        }
2294    
2295        public void removeInputStream(ActiveMQInputStream stream) {
2296            inputStreams.remove(stream);
2297        }
2298    
2299        protected void onControlCommand(ControlCommand command) {
2300            String text = command.getCommand();
2301            if (text != null) {
2302                if ("shutdown".equals(text)) {
2303                    LOG.info("JVM told to shutdown");
2304                    System.exit(0);
2305                }
2306                if (false && "close".equals(text)){
2307                    LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2308                    try {
2309                        close();
2310                    } catch (JMSException e) {
2311                    }
2312                }
2313            }
2314        }
2315    
2316        protected void onConnectionControl(ConnectionControl command) {
2317            if (command.isFaultTolerant()) {
2318                this.optimizeAcknowledge = false;
2319                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2320                    ActiveMQSession s = i.next();
2321                    s.setOptimizeAcknowledge(false);
2322                }
2323            }
2324        }
2325    
2326        protected void onConsumerControl(ConsumerControl command) {
2327            if (command.isClose()) {
2328                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2329                    ActiveMQSession s = i.next();
2330                    s.close(command.getConsumerId());
2331                }
2332            } else {
2333                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2334                    ActiveMQSession s = i.next();
2335                    s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2336                }
2337            }
2338        }
2339    
2340        protected void transportFailed(IOException error) {
2341            transportFailed.set(true);
2342            if (firstFailureError == null) {
2343                firstFailureError = error;
2344            }
2345        }
2346    
2347        /**
2348         * Should a JMS message be copied to a new JMS Message object as part of the
2349         * send() method in JMS. This is enabled by default to be compliant with the
2350         * JMS specification. You can disable it if you do not mutate JMS messages
2351         * after they are sent for a performance boost
2352         */
2353        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2354            this.copyMessageOnSend = copyMessageOnSend;
2355        }
2356    
2357        @Override
2358        public String toString() {
2359            return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2360        }
2361    
2362        protected BlobTransferPolicy createBlobTransferPolicy() {
2363            return new BlobTransferPolicy();
2364        }
2365    
2366        public int getProtocolVersion() {
2367            return protocolVersion.get();
2368        }
2369    
2370        public int getProducerWindowSize() {
2371            return producerWindowSize;
2372        }
2373    
2374        public void setProducerWindowSize(int producerWindowSize) {
2375            this.producerWindowSize = producerWindowSize;
2376        }
2377    
2378        public void setAuditDepth(int auditDepth) {
2379            connectionAudit.setAuditDepth(auditDepth);
2380        }
2381    
2382        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2383            connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2384        }
2385    
2386        protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2387            connectionAudit.removeDispatcher(dispatcher);
2388        }
2389    
2390        protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2391            return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2392        }
2393    
2394        protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2395            connectionAudit.rollbackDuplicate(dispatcher, message);
2396        }
2397    
2398        public IOException getFirstFailureError() {
2399            return firstFailureError;
2400        }
2401    
2402        protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2403            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2404            if (cdl != null) {
2405                if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2406                    LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2407                    cdl.await(10, TimeUnit.SECONDS);
2408                }
2409                signalInterruptionProcessingComplete();
2410            }
2411        }
2412    
2413        protected void transportInterruptionProcessingComplete() {
2414            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2415            if (cdl != null) {
2416                cdl.countDown();
2417                try {
2418                    signalInterruptionProcessingComplete();
2419                } catch (InterruptedException ignored) {}
2420            }
2421        }
2422    
2423        private void signalInterruptionProcessingComplete() throws InterruptedException {
2424            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2425            if (cdl.getCount()==0) {
2426                if (LOG.isDebugEnabled()) {
2427                    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2428                }
2429                this.transportInterruptionProcessingComplete = null;
2430    
2431                FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2432                if (failoverTransport != null) {
2433                    failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2434                    if (LOG.isDebugEnabled()) {
2435                        LOG.debug("notified failover transport (" + failoverTransport
2436                                + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2437                    }
2438                }
2439    
2440            }
2441        }
2442    
2443        private void signalInterruptionProcessingNeeded() {
2444            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2445            if (failoverTransport != null) {
2446                failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2447                if (LOG.isDebugEnabled()) {
2448                    LOG.debug("notified failover transport (" + failoverTransport
2449                            + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2450                }
2451            }
2452        }
2453    
2454        /*
2455         * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2456         * will wait to receive re dispatched messages.
2457         * default value is 0 so there is no wait by default.
2458         */
2459        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2460            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2461        }
2462    
2463        public long getConsumerFailoverRedeliveryWaitPeriod() {
2464            return consumerFailoverRedeliveryWaitPeriod;
2465        }
2466    
2467        protected Scheduler getScheduler() throws JMSException {
2468            Scheduler result = scheduler;
2469            if (result == null) {
2470                synchronized (this) {
2471                    result = scheduler;
2472                    if (result == null) {
2473                        checkClosed();
2474                        try {
2475                            result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2476                            scheduler.start();
2477                        } catch(Exception e) {
2478                            throw JMSExceptionSupport.create(e);
2479                        }
2480                    }
2481                }
2482            }
2483            return result;
2484        }
2485    
2486        protected ThreadPoolExecutor getExecutor() {
2487            return this.executor;
2488        }
2489    
2490        /**
2491         * @return the checkForDuplicates
2492         */
2493        public boolean isCheckForDuplicates() {
2494            return this.checkForDuplicates;
2495        }
2496    
2497        /**
2498         * @param checkForDuplicates the checkForDuplicates to set
2499         */
2500        public void setCheckForDuplicates(boolean checkForDuplicates) {
2501            this.checkForDuplicates = checkForDuplicates;
2502        }
2503    
2504    
2505        public boolean isTransactedIndividualAck() {
2506            return transactedIndividualAck;
2507        }
2508    
2509        public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2510            this.transactedIndividualAck = transactedIndividualAck;
2511        }
2512    
2513        public boolean isNonBlockingRedelivery() {
2514            return nonBlockingRedelivery;
2515        }
2516    
2517        public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2518            this.nonBlockingRedelivery = nonBlockingRedelivery;
2519        }
2520    
2521        /**
2522         * Removes any TempDestinations that this connection has cached, ignoring
2523         * any exceptions generated because the destination is in use as they should
2524         * not be removed.
2525         */
2526        public void cleanUpTempDestinations() {
2527    
2528            if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2529                return;
2530            }
2531    
2532            Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2533                = this.activeTempDestinations.entrySet().iterator();
2534            while(entries.hasNext()) {
2535                ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2536                try {
2537                    // Only delete this temp destination if it was created from this connection. The connection used
2538                    // for the advisory consumer may also have a reference to this temp destination.
2539                    ActiveMQTempDestination dest = entry.getValue();
2540                    String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2541                    if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2542                        this.deleteTempDestination(entry.getValue());
2543                    }
2544                } catch (Exception ex) {
2545                    // the temp dest is in use so it can not be deleted.
2546                    // it is ok to leave it to connection tear down phase
2547                }
2548            }
2549        }
2550    }