001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CopyOnWriteArrayList;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.LinkedBlockingQueue;
032import java.util.concurrent.RejectedExecutionHandler;
033import java.util.concurrent.ThreadFactory;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import javax.jms.Connection;
040import javax.jms.ConnectionConsumer;
041import javax.jms.ConnectionMetaData;
042import javax.jms.Destination;
043import javax.jms.ExceptionListener;
044import javax.jms.IllegalStateException;
045import javax.jms.InvalidDestinationException;
046import javax.jms.JMSException;
047import javax.jms.Queue;
048import javax.jms.QueueConnection;
049import javax.jms.QueueSession;
050import javax.jms.ServerSessionPool;
051import javax.jms.Session;
052import javax.jms.Topic;
053import javax.jms.TopicConnection;
054import javax.jms.TopicSession;
055import javax.jms.XAConnection;
056
057import org.apache.activemq.advisory.DestinationSource;
058import org.apache.activemq.blob.BlobTransferPolicy;
059import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
060import org.apache.activemq.command.ActiveMQDestination;
061import org.apache.activemq.command.ActiveMQMessage;
062import org.apache.activemq.command.ActiveMQTempDestination;
063import org.apache.activemq.command.ActiveMQTempQueue;
064import org.apache.activemq.command.ActiveMQTempTopic;
065import org.apache.activemq.command.BrokerInfo;
066import org.apache.activemq.command.Command;
067import org.apache.activemq.command.CommandTypes;
068import org.apache.activemq.command.ConnectionControl;
069import org.apache.activemq.command.ConnectionError;
070import org.apache.activemq.command.ConnectionId;
071import org.apache.activemq.command.ConnectionInfo;
072import org.apache.activemq.command.ConsumerControl;
073import org.apache.activemq.command.ConsumerId;
074import org.apache.activemq.command.ConsumerInfo;
075import org.apache.activemq.command.ControlCommand;
076import org.apache.activemq.command.DestinationInfo;
077import org.apache.activemq.command.ExceptionResponse;
078import org.apache.activemq.command.Message;
079import org.apache.activemq.command.MessageDispatch;
080import org.apache.activemq.command.MessageId;
081import org.apache.activemq.command.ProducerAck;
082import org.apache.activemq.command.ProducerId;
083import org.apache.activemq.command.RemoveInfo;
084import org.apache.activemq.command.RemoveSubscriptionInfo;
085import org.apache.activemq.command.Response;
086import org.apache.activemq.command.SessionId;
087import org.apache.activemq.command.ShutdownInfo;
088import org.apache.activemq.command.WireFormatInfo;
089import org.apache.activemq.management.JMSConnectionStatsImpl;
090import org.apache.activemq.management.JMSStatsImpl;
091import org.apache.activemq.management.StatsCapable;
092import org.apache.activemq.management.StatsImpl;
093import org.apache.activemq.state.CommandVisitorAdapter;
094import org.apache.activemq.thread.Scheduler;
095import org.apache.activemq.thread.TaskRunnerFactory;
096import org.apache.activemq.transport.FutureResponse;
097import org.apache.activemq.transport.RequestTimedOutIOException;
098import org.apache.activemq.transport.ResponseCallback;
099import org.apache.activemq.transport.Transport;
100import org.apache.activemq.transport.TransportListener;
101import org.apache.activemq.transport.failover.FailoverTransport;
102import org.apache.activemq.util.IdGenerator;
103import org.apache.activemq.util.IntrospectionSupport;
104import org.apache.activemq.util.JMSExceptionSupport;
105import org.apache.activemq.util.LongSequenceGenerator;
106import org.apache.activemq.util.ServiceSupport;
107import org.apache.activemq.util.ThreadPoolUtils;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110
111public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
112
113    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
114    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
115    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
116    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
117
118    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
119
120    public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
121
122    protected boolean dispatchAsync=true;
123    protected boolean alwaysSessionAsync = true;
124
125    private TaskRunnerFactory sessionTaskRunner;
126    private final ThreadPoolExecutor executor;
127
128    // Connection state variables
129    private final ConnectionInfo info;
130    private ExceptionListener exceptionListener;
131    private ClientInternalExceptionListener clientInternalExceptionListener;
132    private boolean clientIDSet;
133    private boolean isConnectionInfoSentToBroker;
134    private boolean userSpecifiedClientID;
135
136    // Configuration options variables
137    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
138    private BlobTransferPolicy blobTransferPolicy;
139    private RedeliveryPolicyMap redeliveryPolicyMap;
140    private MessageTransformer transformer;
141
142    private boolean disableTimeStampsByDefault;
143    private boolean optimizedMessageDispatch = true;
144    private boolean copyMessageOnSend = true;
145    private boolean useCompression;
146    private boolean objectMessageSerializationDefered;
147    private boolean useAsyncSend;
148    private boolean optimizeAcknowledge;
149    private long optimizeAcknowledgeTimeOut = 0;
150    private long optimizedAckScheduledAckInterval = 0;
151    private boolean nestedMapAndListEnabled = true;
152    private boolean useRetroactiveConsumer;
153    private boolean exclusiveConsumer;
154    private boolean alwaysSyncSend;
155    private int closeTimeout = 15000;
156    private boolean watchTopicAdvisories = true;
157    private long warnAboutUnstartedConnectionTimeout = 500L;
158    private int sendTimeout =0;
159    private boolean sendAcksAsync=true;
160    private boolean checkForDuplicates = true;
161    private boolean queueOnlyConnection = false;
162    private boolean consumerExpiryCheckEnabled = true;
163
164    private final Transport transport;
165    private final IdGenerator clientIdGenerator;
166    private final JMSStatsImpl factoryStats;
167    private final JMSConnectionStatsImpl stats;
168
169    private final AtomicBoolean started = new AtomicBoolean(false);
170    private final AtomicBoolean closing = new AtomicBoolean(false);
171    private final AtomicBoolean closed = new AtomicBoolean(false);
172    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
173    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
174    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
175    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
176
177    // Maps ConsumerIds to ActiveMQConsumer objects
178    private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
179    private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
180    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
181    private final SessionId connectionSessionId;
182    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
183    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
184    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
185
186    private AdvisoryConsumer advisoryConsumer;
187    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
188    private BrokerInfo brokerInfo;
189    private IOException firstFailureError;
190    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
191
192    // Assume that protocol is the latest. Change to the actual protocol
193    // version when a WireFormatInfo is received.
194    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
195    private final long timeCreated;
196    private final ConnectionAudit connectionAudit = new ConnectionAudit();
197    private DestinationSource destinationSource;
198    private final Object ensureConnectionInfoSentMutex = new Object();
199    private boolean useDedicatedTaskRunner;
200    protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
201    private long consumerFailoverRedeliveryWaitPeriod;
202    private Scheduler scheduler;
203    private boolean messagePrioritySupported = false;
204    private boolean transactedIndividualAck = false;
205    private boolean nonBlockingRedelivery = false;
206    private boolean rmIdFromConnectionId = false;
207
208    private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
209    private RejectedExecutionHandler rejectedTaskHandler = null;
210
211    private List<String> trustedPackages = new ArrayList<String>();
212    private boolean trustAllPackages = false;
213        private int connectResponseTimeout;
214
215    /**
216     * Construct an <code>ActiveMQConnection</code>
217     *
218     * @param transport
219     * @param factoryStats
220     * @throws Exception
221     */
222    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
223
224        this.transport = transport;
225        this.clientIdGenerator = clientIdGenerator;
226        this.factoryStats = factoryStats;
227
228        // Configure a single threaded executor who's core thread can timeout if
229        // idle
230        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
231            @Override
232            public Thread newThread(Runnable r) {
233                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
234                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
235                //thread.setDaemon(true);
236                return thread;
237            }
238        });
239        // asyncConnectionThread.allowCoreThreadTimeOut(true);
240        String uniqueId = connectionIdGenerator.generateId();
241        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
242        this.info.setManageable(true);
243        this.info.setFaultTolerant(transport.isFaultTolerant());
244        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
245
246        this.transport.setTransportListener(this);
247
248        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
249        this.factoryStats.addConnection(this);
250        this.timeCreated = System.currentTimeMillis();
251        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
252    }
253
254    protected void setUserName(String userName) {
255        this.info.setUserName(userName);
256    }
257
258    protected void setPassword(String password) {
259        this.info.setPassword(password);
260    }
261
262    /**
263     * A static helper method to create a new connection
264     *
265     * @return an ActiveMQConnection
266     * @throws JMSException
267     */
268    public static ActiveMQConnection makeConnection() throws JMSException {
269        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
270        return (ActiveMQConnection)factory.createConnection();
271    }
272
273    /**
274     * A static helper method to create a new connection
275     *
276     * @param uri
277     * @return and ActiveMQConnection
278     * @throws JMSException
279     */
280    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
281        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
282        return (ActiveMQConnection)factory.createConnection();
283    }
284
285    /**
286     * A static helper method to create a new connection
287     *
288     * @param user
289     * @param password
290     * @param uri
291     * @return an ActiveMQConnection
292     * @throws JMSException
293     */
294    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
295        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
296        return (ActiveMQConnection)factory.createConnection();
297    }
298
299    /**
300     * @return a number unique for this connection
301     */
302    public JMSConnectionStatsImpl getConnectionStats() {
303        return stats;
304    }
305
306    /**
307     * Creates a <CODE>Session</CODE> object.
308     *
309     * @param transacted indicates whether the session is transacted
310     * @param acknowledgeMode indicates whether the consumer or the client will
311     *                acknowledge any messages it receives; ignored if the
312     *                session is transacted. Legal values are
313     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
314     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
315     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
316     * @return a newly created session
317     * @throws JMSException if the <CODE>Connection</CODE> object fails to
318     *                 create a session due to some internal error or lack of
319     *                 support for the specific transaction and acknowledgement
320     *                 mode.
321     * @see Session#AUTO_ACKNOWLEDGE
322     * @see Session#CLIENT_ACKNOWLEDGE
323     * @see Session#DUPS_OK_ACKNOWLEDGE
324     * @since 1.1
325     */
326    @Override
327    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
328        checkClosedOrFailed();
329        ensureConnectionInfoSent();
330        if (!transacted) {
331            if (acknowledgeMode == Session.SESSION_TRANSACTED) {
332                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
333            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
334                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
335                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
336            }
337        }
338        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), isAlwaysSessionAsync());
339    }
340
341    /**
342     * @return sessionId
343     */
344    protected SessionId getNextSessionId() {
345        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
346    }
347
348    /**
349     * Gets the client identifier for this connection.
350     * <P>
351     * This value is specific to the JMS provider. It is either preconfigured by
352     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
353     * dynamically by the application by calling the <code>setClientID</code>
354     * method.
355     *
356     * @return the unique client identifier
357     * @throws JMSException if the JMS provider fails to return the client ID
358     *                 for this connection due to some internal error.
359     */
360    @Override
361    public String getClientID() throws JMSException {
362        checkClosedOrFailed();
363        return this.info.getClientId();
364    }
365
366    /**
367     * Sets the client identifier for this connection.
368     * <P>
369     * The preferred way to assign a JMS client's client identifier is for it to
370     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
371     * object and transparently assigned to the <CODE>Connection</CODE> object
372     * it creates.
373     * <P>
374     * Alternatively, a client can set a connection's client identifier using a
375     * provider-specific value. The facility to set a connection's client
376     * identifier explicitly is not a mechanism for overriding the identifier
377     * that has been administratively configured. It is provided for the case
378     * where no administratively specified identifier exists. If one does exist,
379     * an attempt to change it by setting it must throw an
380     * <CODE>IllegalStateException</CODE>. If a client sets the client
381     * identifier explicitly, it must do so immediately after it creates the
382     * connection and before any other action on the connection is taken. After
383     * this point, setting the client identifier is a programming error that
384     * should throw an <CODE>IllegalStateException</CODE>.
385     * <P>
386     * The purpose of the client identifier is to associate a connection and its
387     * objects with a state maintained on behalf of the client by a provider.
388     * The only such state identified by the JMS API is that required to support
389     * durable subscriptions.
390     * <P>
391     * If another connection with the same <code>clientID</code> is already
392     * running when this method is called, the JMS provider should detect the
393     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
394     *
395     * @param newClientID the unique client identifier
396     * @throws JMSException if the JMS provider fails to set the client ID for
397     *                 this connection due to some internal error.
398     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
399     *                 invalid or duplicate client ID.
400     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
401     *                 a connection's client ID at the wrong time or when it has
402     *                 been administratively configured.
403     */
404    @Override
405    public void setClientID(String newClientID) throws JMSException {
406        checkClosedOrFailed();
407
408        if (this.clientIDSet) {
409            throw new IllegalStateException("The clientID has already been set");
410        }
411
412        if (this.isConnectionInfoSentToBroker) {
413            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
414        }
415
416        this.info.setClientId(newClientID);
417        this.userSpecifiedClientID = true;
418        ensureConnectionInfoSent();
419    }
420
421    /**
422     * Sets the default client id that the connection will use if explicitly not
423     * set with the setClientId() call.
424     */
425    public void setDefaultClientID(String clientID) throws JMSException {
426        this.info.setClientId(clientID);
427        this.userSpecifiedClientID = true;
428    }
429
430    /**
431     * Gets the metadata for this connection.
432     *
433     * @return the connection metadata
434     * @throws JMSException if the JMS provider fails to get the connection
435     *                 metadata for this connection.
436     * @see javax.jms.ConnectionMetaData
437     */
438    @Override
439    public ConnectionMetaData getMetaData() throws JMSException {
440        checkClosedOrFailed();
441        return ActiveMQConnectionMetaData.INSTANCE;
442    }
443
444    /**
445     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
446     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
447     * associated with it.
448     *
449     * @return the <CODE>ExceptionListener</CODE> for this connection, or
450     *         null, if no <CODE>ExceptionListener</CODE> is associated with
451     *         this connection.
452     * @throws JMSException if the JMS provider fails to get the
453     *                 <CODE>ExceptionListener</CODE> for this connection.
454     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
455     */
456    @Override
457    public ExceptionListener getExceptionListener() throws JMSException {
458        checkClosedOrFailed();
459        return this.exceptionListener;
460    }
461
462    /**
463     * Sets an exception listener for this connection.
464     * <P>
465     * If a JMS provider detects a serious problem with a connection, it informs
466     * the connection's <CODE> ExceptionListener</CODE>, if one has been
467     * registered. It does this by calling the listener's <CODE>onException
468     * </CODE>
469     * method, passing it a <CODE>JMSException</CODE> object describing the
470     * problem.
471     * <P>
472     * An exception listener allows a client to be notified of a problem
473     * asynchronously. Some connections only consume messages, so they would
474     * have no other way to learn their connection has failed.
475     * <P>
476     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
477     * <P>
478     * A JMS provider should attempt to resolve connection problems itself
479     * before it notifies the client of them.
480     *
481     * @param listener the exception listener
482     * @throws JMSException if the JMS provider fails to set the exception
483     *                 listener for this connection.
484     */
485    @Override
486    public void setExceptionListener(ExceptionListener listener) throws JMSException {
487        checkClosedOrFailed();
488        this.exceptionListener = listener;
489    }
490
491    /**
492     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
493     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
494     * associated with it.
495     *
496     * @return the listener or <code>null</code> if no listener is registered with the connection.
497     */
498    public ClientInternalExceptionListener getClientInternalExceptionListener() {
499        return clientInternalExceptionListener;
500    }
501
502    /**
503     * Sets a client internal exception listener for this connection.
504     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
505     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
506     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
507     * describing the problem.
508     *
509     * @param listener the exception listener
510     */
511    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
512        this.clientInternalExceptionListener = listener;
513    }
514
515    /**
516     * Starts (or restarts) a connection's delivery of incoming messages. A call
517     * to <CODE>start</CODE> on a connection that has already been started is
518     * ignored.
519     *
520     * @throws JMSException if the JMS provider fails to start message delivery
521     *                 due to some internal error.
522     * @see javax.jms.Connection#stop()
523     */
524    @Override
525    public void start() throws JMSException {
526        checkClosedOrFailed();
527        ensureConnectionInfoSent();
528        if (started.compareAndSet(false, true)) {
529            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
530                ActiveMQSession session = i.next();
531                session.start();
532            }
533        }
534    }
535
536    /**
537     * Temporarily stops a connection's delivery of incoming messages. Delivery
538     * can be restarted using the connection's <CODE>start</CODE> method. When
539     * the connection is stopped, delivery to all the connection's message
540     * consumers is inhibited: synchronous receives block, and messages are not
541     * delivered to message listeners.
542     * <P>
543     * This call blocks until receives and/or message listeners in progress have
544     * completed.
545     * <P>
546     * Stopping a connection has no effect on its ability to send messages. A
547     * call to <CODE>stop</CODE> on a connection that has already been stopped
548     * is ignored.
549     * <P>
550     * A call to <CODE>stop</CODE> must not return until delivery of messages
551     * has paused. This means that a client can rely on the fact that none of
552     * its message listeners will be called and that all threads of control
553     * waiting for <CODE>receive</CODE> calls to return will not return with a
554     * message until the connection is restarted. The receive timers for a
555     * stopped connection continue to advance, so receives may time out while
556     * the connection is stopped.
557     * <P>
558     * If message listeners are running when <CODE>stop</CODE> is invoked, the
559     * <CODE>stop</CODE> call must wait until all of them have returned before
560     * it may return. While these message listeners are completing, they must
561     * have the full services of the connection available to them.
562     *
563     * @throws JMSException if the JMS provider fails to stop message delivery
564     *                 due to some internal error.
565     * @see javax.jms.Connection#start()
566     */
567    @Override
568    public void stop() throws JMSException {
569        doStop(true);
570    }
571
572    /**
573     * @see #stop()
574     * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
575     *                    <tt>false</tt> to skip this check
576     * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
577     */
578    void doStop(boolean checkClosed) throws JMSException {
579        if (checkClosed) {
580            checkClosedOrFailed();
581        }
582        if (started.compareAndSet(true, false)) {
583            synchronized(sessions) {
584                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
585                    ActiveMQSession s = i.next();
586                    s.stop();
587                }
588            }
589        }
590    }
591
592    /**
593     * Closes the connection.
594     * <P>
595     * Since a provider typically allocates significant resources outside the
596     * JVM on behalf of a connection, clients should close these resources when
597     * they are not needed. Relying on garbage collection to eventually reclaim
598     * these resources may not be timely enough.
599     * <P>
600     * There is no need to close the sessions, producers, and consumers of a
601     * closed connection.
602     * <P>
603     * Closing a connection causes all temporary destinations to be deleted.
604     * <P>
605     * When this method is invoked, it should not return until message
606     * processing has been shut down in an orderly fashion. This means that all
607     * message listeners that may have been running have returned, and that all
608     * pending receives have returned. A close terminates all pending message
609     * receives on the connection's sessions' consumers. The receives may return
610     * with a message or with null, depending on whether there was a message
611     * available at the time of the close. If one or more of the connection's
612     * sessions' message listeners is processing a message at the time when
613     * connection <CODE>close</CODE> is invoked, all the facilities of the
614     * connection and its sessions must remain available to those listeners
615     * until they return control to the JMS provider.
616     * <P>
617     * Closing a connection causes any of its sessions' transactions in progress
618     * to be rolled back. In the case where a session's work is coordinated by
619     * an external transaction manager, a session's <CODE>commit</CODE> and
620     * <CODE> rollback</CODE> methods are not used and the result of a closed
621     * session's work is determined later by the transaction manager. Closing a
622     * connection does NOT force an acknowledgment of client-acknowledged
623     * sessions.
624     * <P>
625     * Invoking the <CODE>acknowledge</CODE> method of a received message from
626     * a closed connection's session must throw an
627     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
628     * NOT throw an exception.
629     *
630     * @throws JMSException if the JMS provider fails to close the connection
631     *                 due to some internal error. For example, a failure to
632     *                 release resources or to close a socket connection can
633     *                 cause this exception to be thrown.
634     */
635    @Override
636    public void close() throws JMSException {
637        try {
638            // If we were running, lets stop first.
639            if (!closed.get() && !transportFailed.get()) {
640                // do not fail if already closed as according to JMS spec we must not
641                // throw exception if already closed
642                doStop(false);
643            }
644
645            synchronized (this) {
646                if (!closed.get()) {
647                    closing.set(true);
648
649                    if (destinationSource != null) {
650                        destinationSource.stop();
651                        destinationSource = null;
652                    }
653                    if (advisoryConsumer != null) {
654                        advisoryConsumer.dispose();
655                        advisoryConsumer = null;
656                    }
657
658                    Scheduler scheduler = this.scheduler;
659                    if (scheduler != null) {
660                        try {
661                            scheduler.stop();
662                        } catch (Exception e) {
663                            JMSException ex =  JMSExceptionSupport.create(e);
664                            throw ex;
665                        }
666                    }
667
668                    long lastDeliveredSequenceId = -1;
669                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
670                        ActiveMQSession s = i.next();
671                        s.dispose();
672                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
673                    }
674                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
675                        ActiveMQConnectionConsumer c = i.next();
676                        c.dispose();
677                    }
678
679                    this.activeTempDestinations.clear();
680
681                    try {
682                        if (isConnectionInfoSentToBroker) {
683                            // If we announced ourselves to the broker.. Try to let the broker
684                            // know that the connection is being shutdown.
685                            RemoveInfo removeCommand = info.createRemoveCommand();
686                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
687                            try {
688                                syncSendPacket(removeCommand, closeTimeout);
689                            } catch (JMSException e) {
690                                if (e.getCause() instanceof RequestTimedOutIOException) {
691                                    // expected
692                                } else {
693                                    throw e;
694                                }
695                            }
696                            doAsyncSendPacket(new ShutdownInfo());
697                        }
698                    } finally { // release anyway even if previous communication fails
699                        started.set(false);
700
701                        // TODO if we move the TaskRunnerFactory to the connection
702                        // factory
703                        // then we may need to call
704                        // factory.onConnectionClose(this);
705                        if (sessionTaskRunner != null) {
706                            sessionTaskRunner.shutdown();
707                        }
708                        closed.set(true);
709                        closing.set(false);
710                    }
711                }
712            }
713        } finally {
714            try {
715                if (executor != null) {
716                    ThreadPoolUtils.shutdown(executor);
717                }
718            } catch (Throwable e) {
719                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
720            }
721
722            ServiceSupport.dispose(this.transport);
723
724            factoryStats.removeConnection(this);
725        }
726    }
727
728    /**
729     * Tells the broker to terminate its VM. This can be used to cleanly
730     * terminate a broker running in a standalone java process. Server must have
731     * property enable.vm.shutdown=true defined to allow this to work.
732     */
733    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
734    // implemented.
735    /*
736     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
737     * command = new BrokerAdminCommand();
738     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
739     * asyncSendPacket(command); }
740     */
741
742    /**
743     * Create a durable connection consumer for this connection (optional
744     * operation). This is an expert facility not used by regular JMS clients.
745     *
746     * @param topic topic to access
747     * @param subscriptionName durable subscription name
748     * @param messageSelector only messages with properties matching the message
749     *                selector expression are delivered. A value of null or an
750     *                empty string indicates that there is no message selector
751     *                for the message consumer.
752     * @param sessionPool the server session pool to associate with this durable
753     *                connection consumer
754     * @param maxMessages the maximum number of messages that can be assigned to
755     *                a server session at one time
756     * @return the durable connection consumer
757     * @throws JMSException if the <CODE>Connection</CODE> object fails to
758     *                 create a connection consumer due to some internal error
759     *                 or invalid arguments for <CODE>sessionPool</CODE> and
760     *                 <CODE>messageSelector</CODE>.
761     * @throws javax.jms.InvalidDestinationException if an invalid destination
762     *                 is specified.
763     * @throws javax.jms.InvalidSelectorException if the message selector is
764     *                 invalid.
765     * @see javax.jms.ConnectionConsumer
766     * @since 1.1
767     */
768    @Override
769    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
770        throws JMSException {
771        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
772    }
773
774    /**
775     * Create a durable connection consumer for this connection (optional
776     * operation). This is an expert facility not used by regular JMS clients.
777     *
778     * @param topic topic to access
779     * @param subscriptionName durable subscription name
780     * @param messageSelector only messages with properties matching the message
781     *                selector expression are delivered. A value of null or an
782     *                empty string indicates that there is no message selector
783     *                for the message consumer.
784     * @param sessionPool the server session pool to associate with this durable
785     *                connection consumer
786     * @param maxMessages the maximum number of messages that can be assigned to
787     *                a server session at one time
788     * @param noLocal set true if you want to filter out messages published
789     *                locally
790     * @return the durable connection consumer
791     * @throws JMSException if the <CODE>Connection</CODE> object fails to
792     *                 create a connection consumer due to some internal error
793     *                 or invalid arguments for <CODE>sessionPool</CODE> and
794     *                 <CODE>messageSelector</CODE>.
795     * @throws javax.jms.InvalidDestinationException if an invalid destination
796     *                 is specified.
797     * @throws javax.jms.InvalidSelectorException if the message selector is
798     *                 invalid.
799     * @see javax.jms.ConnectionConsumer
800     * @since 1.1
801     */
802    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
803                                                              boolean noLocal) throws JMSException {
804        checkClosedOrFailed();
805
806        if (queueOnlyConnection) {
807            throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
808        }
809
810        ensureConnectionInfoSent();
811        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
812        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
813        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
814        info.setSubscriptionName(subscriptionName);
815        info.setSelector(messageSelector);
816        info.setPrefetchSize(maxMessages);
817        info.setDispatchAsync(isDispatchAsync());
818
819        // Allows the options on the destination to configure the consumerInfo
820        if (info.getDestination().getOptions() != null) {
821            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
822            IntrospectionSupport.setProperties(this.info, options, "consumer.");
823        }
824
825        return new ActiveMQConnectionConsumer(this, sessionPool, info);
826    }
827
828    // Properties
829    // -------------------------------------------------------------------------
830
831    /**
832     * Returns true if this connection has been started
833     *
834     * @return true if this Connection is started
835     */
836    public boolean isStarted() {
837        return started.get();
838    }
839
840    /**
841     * Returns true if the connection is closed
842     */
843    public boolean isClosed() {
844        return closed.get();
845    }
846
847    /**
848     * Returns true if the connection is in the process of being closed
849     */
850    public boolean isClosing() {
851        return closing.get();
852    }
853
854    /**
855     * Returns true if the underlying transport has failed
856     */
857    public boolean isTransportFailed() {
858        return transportFailed.get();
859    }
860
861    /**
862     * @return Returns the prefetchPolicy.
863     */
864    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
865        return prefetchPolicy;
866    }
867
868    /**
869     * Sets the <a
870     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
871     * policy</a> for consumers created by this connection.
872     */
873    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
874        this.prefetchPolicy = prefetchPolicy;
875    }
876
877    /**
878     */
879    public Transport getTransportChannel() {
880        return transport;
881    }
882
883    /**
884     * @return Returns the clientID of the connection, forcing one to be
885     *         generated if one has not yet been configured.
886     */
887    public String getInitializedClientID() throws JMSException {
888        ensureConnectionInfoSent();
889        return info.getClientId();
890    }
891
892    /**
893     * @return Returns the timeStampsDisableByDefault.
894     */
895    public boolean isDisableTimeStampsByDefault() {
896        return disableTimeStampsByDefault;
897    }
898
899    /**
900     * Sets whether or not timestamps on messages should be disabled or not. If
901     * you disable them it adds a small performance boost.
902     */
903    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
904        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
905    }
906
907    /**
908     * @return Returns the dispatchOptimizedMessage.
909     */
910    public boolean isOptimizedMessageDispatch() {
911        return optimizedMessageDispatch;
912    }
913
914    /**
915     * If this flag is set then an larger prefetch limit is used - only
916     * applicable for durable topic subscribers.
917     */
918    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
919        this.optimizedMessageDispatch = dispatchOptimizedMessage;
920    }
921
922    /**
923     * @return Returns the closeTimeout.
924     */
925    public int getCloseTimeout() {
926        return closeTimeout;
927    }
928
929    /**
930     * Sets the timeout before a close is considered complete. Normally a
931     * close() on a connection waits for confirmation from the broker; this
932     * allows that operation to timeout to save the client hanging if there is
933     * no broker
934     */
935    public void setCloseTimeout(int closeTimeout) {
936        this.closeTimeout = closeTimeout;
937    }
938
939    /**
940     * @return ConnectionInfo
941     */
942    public ConnectionInfo getConnectionInfo() {
943        return this.info;
944    }
945
946    public boolean isUseRetroactiveConsumer() {
947        return useRetroactiveConsumer;
948    }
949
950    /**
951     * Sets whether or not retroactive consumers are enabled. Retroactive
952     * consumers allow non-durable topic subscribers to receive old messages
953     * that were published before the non-durable subscriber started.
954     */
955    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
956        this.useRetroactiveConsumer = useRetroactiveConsumer;
957    }
958
959    public boolean isNestedMapAndListEnabled() {
960        return nestedMapAndListEnabled;
961    }
962
963    /**
964     * Enables/disables whether or not Message properties and MapMessage entries
965     * support <a
966     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
967     * Structures</a> of Map and List objects
968     */
969    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
970        this.nestedMapAndListEnabled = structuredMapsEnabled;
971    }
972
973    public boolean isExclusiveConsumer() {
974        return exclusiveConsumer;
975    }
976
977    /**
978     * Enables or disables whether or not queue consumers should be exclusive or
979     * not for example to preserve ordering when not using <a
980     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
981     *
982     * @param exclusiveConsumer
983     */
984    public void setExclusiveConsumer(boolean exclusiveConsumer) {
985        this.exclusiveConsumer = exclusiveConsumer;
986    }
987
988    /**
989     * Adds a transport listener so that a client can be notified of events in
990     * the underlying transport
991     */
992    public void addTransportListener(TransportListener transportListener) {
993        transportListeners.add(transportListener);
994    }
995
996    public void removeTransportListener(TransportListener transportListener) {
997        transportListeners.remove(transportListener);
998    }
999
1000    public boolean isUseDedicatedTaskRunner() {
1001        return useDedicatedTaskRunner;
1002    }
1003
1004    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1005        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1006    }
1007
1008    public TaskRunnerFactory getSessionTaskRunner() {
1009        synchronized (this) {
1010            if (sessionTaskRunner == null) {
1011                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1012                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1013            }
1014        }
1015        return sessionTaskRunner;
1016    }
1017
1018    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1019        this.sessionTaskRunner = sessionTaskRunner;
1020    }
1021
1022    public MessageTransformer getTransformer() {
1023        return transformer;
1024    }
1025
1026    /**
1027     * Sets the transformer used to transform messages before they are sent on
1028     * to the JMS bus or when they are received from the bus but before they are
1029     * delivered to the JMS client
1030     */
1031    public void setTransformer(MessageTransformer transformer) {
1032        this.transformer = transformer;
1033    }
1034
1035    /**
1036     * @return the statsEnabled
1037     */
1038    public boolean isStatsEnabled() {
1039        return this.stats.isEnabled();
1040    }
1041
1042    /**
1043     * @param statsEnabled the statsEnabled to set
1044     */
1045    public void setStatsEnabled(boolean statsEnabled) {
1046        this.stats.setEnabled(statsEnabled);
1047    }
1048
1049    /**
1050     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1051     * being created or destroyed or to enquire about the current destinations available on the broker
1052     *
1053     * @return a lazily created destination source
1054     * @throws JMSException
1055     */
1056    @Override
1057    public DestinationSource getDestinationSource() throws JMSException {
1058        if (destinationSource == null) {
1059            destinationSource = new DestinationSource(this);
1060            destinationSource.start();
1061        }
1062        return destinationSource;
1063    }
1064
1065    // Implementation methods
1066    // -------------------------------------------------------------------------
1067
1068    /**
1069     * Used internally for adding Sessions to the Connection
1070     *
1071     * @param session
1072     * @throws JMSException
1073     * @throws JMSException
1074     */
1075    protected void addSession(ActiveMQSession session) throws JMSException {
1076        this.sessions.add(session);
1077        if (sessions.size() > 1 || session.isTransacted()) {
1078            optimizedMessageDispatch = false;
1079        }
1080    }
1081
1082    /**
1083     * Used interanlly for removing Sessions from a Connection
1084     *
1085     * @param session
1086     */
1087    protected void removeSession(ActiveMQSession session) {
1088        this.sessions.remove(session);
1089        this.removeDispatcher(session);
1090    }
1091
1092    /**
1093     * Add a ConnectionConsumer
1094     *
1095     * @param connectionConsumer
1096     * @throws JMSException
1097     */
1098    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1099        this.connectionConsumers.add(connectionConsumer);
1100    }
1101
1102    /**
1103     * Remove a ConnectionConsumer
1104     *
1105     * @param connectionConsumer
1106     */
1107    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1108        this.connectionConsumers.remove(connectionConsumer);
1109        this.removeDispatcher(connectionConsumer);
1110    }
1111
1112    /**
1113     * Creates a <CODE>TopicSession</CODE> object.
1114     *
1115     * @param transacted indicates whether the session is transacted
1116     * @param acknowledgeMode indicates whether the consumer or the client will
1117     *                acknowledge any messages it receives; ignored if the
1118     *                session is transacted. Legal values are
1119     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1120     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1121     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1122     * @return a newly created topic session
1123     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1124     *                 to create a session due to some internal error or lack of
1125     *                 support for the specific transaction and acknowledgement
1126     *                 mode.
1127     * @see Session#AUTO_ACKNOWLEDGE
1128     * @see Session#CLIENT_ACKNOWLEDGE
1129     * @see Session#DUPS_OK_ACKNOWLEDGE
1130     */
1131    @Override
1132    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1133        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1134    }
1135
1136    /**
1137     * Creates a connection consumer for this connection (optional operation).
1138     * This is an expert facility not used by regular JMS clients.
1139     *
1140     * @param topic the topic to access
1141     * @param messageSelector only messages with properties matching the message
1142     *                selector expression are delivered. A value of null or an
1143     *                empty string indicates that there is no message selector
1144     *                for the message consumer.
1145     * @param sessionPool the server session pool to associate with this
1146     *                connection consumer
1147     * @param maxMessages the maximum number of messages that can be assigned to
1148     *                a server session at one time
1149     * @return the connection consumer
1150     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1151     *                 to create a connection consumer due to some internal
1152     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1153     *                 and <CODE>messageSelector</CODE>.
1154     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1155     *                 specified.
1156     * @throws javax.jms.InvalidSelectorException if the message selector is
1157     *                 invalid.
1158     * @see javax.jms.ConnectionConsumer
1159     */
1160    @Override
1161    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1162        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1163    }
1164
1165    /**
1166     * Creates a connection consumer for this connection (optional operation).
1167     * This is an expert facility not used by regular JMS clients.
1168     *
1169     * @param queue the queue to access
1170     * @param messageSelector only messages with properties matching the message
1171     *                selector expression are delivered. A value of null or an
1172     *                empty string indicates that there is no message selector
1173     *                for the message consumer.
1174     * @param sessionPool the server session pool to associate with this
1175     *                connection consumer
1176     * @param maxMessages the maximum number of messages that can be assigned to
1177     *                a server session at one time
1178     * @return the connection consumer
1179     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1180     *                 to create a connection consumer due to some internal
1181     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1182     *                 and <CODE>messageSelector</CODE>.
1183     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1184     *                 specified.
1185     * @throws javax.jms.InvalidSelectorException if the message selector is
1186     *                 invalid.
1187     * @see javax.jms.ConnectionConsumer
1188     */
1189    @Override
1190    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1191        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1192    }
1193
1194    /**
1195     * Creates a connection consumer for this connection (optional operation).
1196     * This is an expert facility not used by regular JMS clients.
1197     *
1198     * @param destination the destination to access
1199     * @param messageSelector only messages with properties matching the message
1200     *                selector expression are delivered. A value of null or an
1201     *                empty string indicates that there is no message selector
1202     *                for the message consumer.
1203     * @param sessionPool the server session pool to associate with this
1204     *                connection consumer
1205     * @param maxMessages the maximum number of messages that can be assigned to
1206     *                a server session at one time
1207     * @return the connection consumer
1208     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1209     *                 create a connection consumer due to some internal error
1210     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1211     *                 <CODE>messageSelector</CODE>.
1212     * @throws javax.jms.InvalidDestinationException if an invalid destination
1213     *                 is specified.
1214     * @throws javax.jms.InvalidSelectorException if the message selector is
1215     *                 invalid.
1216     * @see javax.jms.ConnectionConsumer
1217     * @since 1.1
1218     */
1219    @Override
1220    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1221        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1222    }
1223
1224    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1225        throws JMSException {
1226
1227        checkClosedOrFailed();
1228        ensureConnectionInfoSent();
1229
1230        ConsumerId consumerId = createConsumerId();
1231        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1232        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1233        consumerInfo.setSelector(messageSelector);
1234        consumerInfo.setPrefetchSize(maxMessages);
1235        consumerInfo.setNoLocal(noLocal);
1236        consumerInfo.setDispatchAsync(isDispatchAsync());
1237
1238        // Allows the options on the destination to configure the consumerInfo
1239        if (consumerInfo.getDestination().getOptions() != null) {
1240            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1241            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1242        }
1243
1244        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1245    }
1246
1247    /**
1248     * @return a newly created ConsumedId unique to this connection session instance.
1249     */
1250    private ConsumerId createConsumerId() {
1251        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1252    }
1253
1254    /**
1255     * Creates a <CODE>QueueSession</CODE> object.
1256     *
1257     * @param transacted indicates whether the session is transacted
1258     * @param acknowledgeMode indicates whether the consumer or the client will
1259     *                acknowledge any messages it receives; ignored if the
1260     *                session is transacted. Legal values are
1261     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1262     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1263     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1264     * @return a newly created queue session
1265     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1266     *                 to create a session due to some internal error or lack of
1267     *                 support for the specific transaction and acknowledgement
1268     *                 mode.
1269     * @see Session#AUTO_ACKNOWLEDGE
1270     * @see Session#CLIENT_ACKNOWLEDGE
1271     * @see Session#DUPS_OK_ACKNOWLEDGE
1272     */
1273    @Override
1274    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1275        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1276    }
1277
1278    /**
1279     * Ensures that the clientID was manually specified and not auto-generated.
1280     * If the clientID was not specified this method will throw an exception.
1281     * This method is used to ensure that the clientID + durableSubscriber name
1282     * are used correctly.
1283     *
1284     * @throws JMSException
1285     */
1286    public void checkClientIDWasManuallySpecified() throws JMSException {
1287        if (!userSpecifiedClientID) {
1288            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1289        }
1290    }
1291
1292    /**
1293     * send a Packet through the Connection - for internal use only
1294     *
1295     * @param command
1296     * @throws JMSException
1297     */
1298    public void asyncSendPacket(Command command) throws JMSException {
1299        if (isClosed()) {
1300            throw new ConnectionClosedException();
1301        } else {
1302            doAsyncSendPacket(command);
1303        }
1304    }
1305
1306    private void doAsyncSendPacket(Command command) throws JMSException {
1307        try {
1308            this.transport.oneway(command);
1309        } catch (IOException e) {
1310            throw JMSExceptionSupport.create(e);
1311        }
1312    }
1313
1314    /**
1315     * Send a packet through a Connection - for internal use only
1316     *
1317     * @param command
1318     *
1319     * @throws JMSException
1320     */
1321    public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1322        if(onComplete==null) {
1323            syncSendPacket(command);
1324        } else {
1325            if (isClosed()) {
1326                throw new ConnectionClosedException();
1327            }
1328            try {
1329                this.transport.asyncRequest(command, new ResponseCallback() {
1330                    @Override
1331                    public void onCompletion(FutureResponse resp) {
1332                        Response response;
1333                        Throwable exception = null;
1334                        try {
1335                            response = resp.getResult();
1336                            if (response.isException()) {
1337                                ExceptionResponse er = (ExceptionResponse)response;
1338                                exception = er.getException();
1339                            }
1340                        } catch (Exception e) {
1341                            exception = e;
1342                        }
1343                        if(exception!=null) {
1344                            if ( exception instanceof JMSException) {
1345                                onComplete.onException((JMSException) exception);
1346                            } else {
1347                                if (isClosed()||closing.get()) {
1348                                    LOG.debug("Received an exception but connection is closing");
1349                                }
1350                                JMSException jmsEx = null;
1351                                try {
1352                                    jmsEx = JMSExceptionSupport.create(exception);
1353                                } catch(Throwable e) {
1354                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1355                                }
1356                                // dispose of transport for security exceptions on connection initiation
1357                                if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1358                                    forceCloseOnSecurityException(exception);
1359                                }
1360                                if (jmsEx !=null) {
1361                                    onComplete.onException(jmsEx);
1362                                }
1363                            }
1364                        } else {
1365                            onComplete.onSuccess();
1366                        }
1367                    }
1368                });
1369            } catch (IOException e) {
1370                throw JMSExceptionSupport.create(e);
1371            }
1372        }
1373    }
1374
1375    private void forceCloseOnSecurityException(Throwable exception) {
1376        LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
1377        onException(new IOException("Force close due to SecurityException on connect", exception));
1378    }
1379
1380    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1381        if (isClosed()) {
1382            throw new ConnectionClosedException();
1383        } else {
1384
1385            try {
1386                Response response = (Response)(timeout > 0
1387                        ? this.transport.request(command, timeout)
1388                        : this.transport.request(command));
1389                if (response.isException()) {
1390                    ExceptionResponse er = (ExceptionResponse)response;
1391                    if (er.getException() instanceof JMSException) {
1392                        throw (JMSException)er.getException();
1393                    } else {
1394                        if (isClosed()||closing.get()) {
1395                            LOG.debug("Received an exception but connection is closing");
1396                        }
1397                        JMSException jmsEx = null;
1398                        try {
1399                            jmsEx = JMSExceptionSupport.create(er.getException());
1400                        } catch(Throwable e) {
1401                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1402                        }
1403                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1404                            forceCloseOnSecurityException(er.getException());
1405                        }
1406                        if (jmsEx !=null) {
1407                            throw jmsEx;
1408                        }
1409                    }
1410                }
1411                return response;
1412            } catch (IOException e) {
1413                throw JMSExceptionSupport.create(e);
1414            }
1415        }
1416    }
1417
1418    /**
1419     * Send a packet through a Connection - for internal use only
1420     *
1421     * @param command
1422     *
1423     * @return the broker Response for the given Command.
1424     *
1425     * @throws JMSException
1426     */
1427    public Response syncSendPacket(Command command) throws JMSException {
1428        return syncSendPacket(command, 0);
1429    }
1430
1431    /**
1432     * @return statistics for this Connection
1433     */
1434    @Override
1435    public StatsImpl getStats() {
1436        return stats;
1437    }
1438
1439    /**
1440     * simply throws an exception if the Connection is already closed or the
1441     * Transport has failed
1442     *
1443     * @throws JMSException
1444     */
1445    protected synchronized void checkClosedOrFailed() throws JMSException {
1446        checkClosed();
1447        if (transportFailed.get()) {
1448            throw new ConnectionFailedException(firstFailureError);
1449        }
1450    }
1451
1452    /**
1453     * simply throws an exception if the Connection is already closed
1454     *
1455     * @throws JMSException
1456     */
1457    protected synchronized void checkClosed() throws JMSException {
1458        if (closed.get()) {
1459            throw new ConnectionClosedException();
1460        }
1461    }
1462
1463    /**
1464     * Send the ConnectionInfo to the Broker
1465     *
1466     * @throws JMSException
1467     */
1468    protected void ensureConnectionInfoSent() throws JMSException {
1469        synchronized(this.ensureConnectionInfoSentMutex) {
1470            // Can we skip sending the ConnectionInfo packet??
1471            if (isConnectionInfoSentToBroker || closed.get()) {
1472                return;
1473            }
1474            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1475            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1476                info.setClientId(clientIdGenerator.generateId());
1477            }
1478            syncSendPacket(info.copy(), getConnectResponseTimeout());
1479
1480            this.isConnectionInfoSentToBroker = true;
1481            // Add a temp destination advisory consumer so that
1482            // We know what the valid temporary destinations are on the
1483            // broker without having to do an RPC to the broker.
1484
1485            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1486            if (watchTopicAdvisories) {
1487                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1488            }
1489        }
1490    }
1491
1492    public synchronized boolean isWatchTopicAdvisories() {
1493        return watchTopicAdvisories;
1494    }
1495
1496    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1497        this.watchTopicAdvisories = watchTopicAdvisories;
1498    }
1499
1500    /**
1501     * @return Returns the useAsyncSend.
1502     */
1503    public boolean isUseAsyncSend() {
1504        return useAsyncSend;
1505    }
1506
1507    /**
1508     * Forces the use of <a
1509     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1510     * adds a massive performance boost; but means that the send() method will
1511     * return immediately whether the message has been sent or not which could
1512     * lead to message loss.
1513     */
1514    public void setUseAsyncSend(boolean useAsyncSend) {
1515        this.useAsyncSend = useAsyncSend;
1516    }
1517
1518    /**
1519     * @return true if always sync send messages
1520     */
1521    public boolean isAlwaysSyncSend() {
1522        return this.alwaysSyncSend;
1523    }
1524
1525    /**
1526     * Set true if always require messages to be sync sent
1527     *
1528     * @param alwaysSyncSend
1529     */
1530    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1531        this.alwaysSyncSend = alwaysSyncSend;
1532    }
1533
1534    /**
1535     * @return the messagePrioritySupported
1536     */
1537    public boolean isMessagePrioritySupported() {
1538        return this.messagePrioritySupported;
1539    }
1540
1541    /**
1542     * @param messagePrioritySupported the messagePrioritySupported to set
1543     */
1544    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1545        this.messagePrioritySupported = messagePrioritySupported;
1546    }
1547
1548    /**
1549     * Cleans up this connection so that it's state is as if the connection was
1550     * just created. This allows the Resource Adapter to clean up a connection
1551     * so that it can be reused without having to close and recreate the
1552     * connection.
1553     */
1554    public void cleanup() throws JMSException {
1555        doCleanup(false);
1556    }
1557
1558    public boolean isUserSpecifiedClientID() {
1559        return userSpecifiedClientID;
1560    }
1561
1562    public void doCleanup(boolean removeConnection) throws JMSException {
1563        if (advisoryConsumer != null && !isTransportFailed()) {
1564            advisoryConsumer.dispose();
1565            advisoryConsumer = null;
1566        }
1567
1568        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1569            ActiveMQSession s = i.next();
1570            s.dispose();
1571        }
1572        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1573            ActiveMQConnectionConsumer c = i.next();
1574            c.dispose();
1575        }
1576
1577        if (removeConnection) {
1578            if (isConnectionInfoSentToBroker) {
1579                if (!transportFailed.get() && !closing.get()) {
1580                    syncSendPacket(info.createRemoveCommand());
1581                }
1582                isConnectionInfoSentToBroker = false;
1583            }
1584            if (userSpecifiedClientID) {
1585                info.setClientId(null);
1586                userSpecifiedClientID = false;
1587            }
1588            clientIDSet = false;
1589        }
1590
1591        started.set(false);
1592    }
1593
1594    /**
1595     * Changes the associated username/password that is associated with this
1596     * connection. If the connection has been used, you must called cleanup()
1597     * before calling this method.
1598     *
1599     * @throws IllegalStateException if the connection is in used.
1600     */
1601    public void changeUserInfo(String userName, String password) throws JMSException {
1602        if (isConnectionInfoSentToBroker) {
1603            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1604        }
1605        this.info.setUserName(userName);
1606        this.info.setPassword(password);
1607    }
1608
1609    /**
1610     * @return Returns the resourceManagerId.
1611     * @throws JMSException
1612     */
1613    public String getResourceManagerId() throws JMSException {
1614        if (isRmIdFromConnectionId()) {
1615            return info.getConnectionId().getValue();
1616        }
1617        waitForBrokerInfo();
1618        if (brokerInfo == null) {
1619            throw new JMSException("Connection failed before Broker info was received.");
1620        }
1621        return brokerInfo.getBrokerId().getValue();
1622    }
1623
1624    /**
1625     * Returns the broker name if one is available or null if one is not
1626     * available yet.
1627     */
1628    public String getBrokerName() {
1629        try {
1630            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1631            if (brokerInfo == null) {
1632                return null;
1633            }
1634            return brokerInfo.getBrokerName();
1635        } catch (InterruptedException e) {
1636            Thread.currentThread().interrupt();
1637            return null;
1638        }
1639    }
1640
1641    /**
1642     * Returns the broker information if it is available or null if it is not
1643     * available yet.
1644     */
1645    public BrokerInfo getBrokerInfo() {
1646        return brokerInfo;
1647    }
1648
1649    /**
1650     * @return Returns the RedeliveryPolicy.
1651     * @throws JMSException
1652     */
1653    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1654        return redeliveryPolicyMap.getDefaultEntry();
1655    }
1656
1657    /**
1658     * Sets the redelivery policy to be used when messages are rolled back
1659     */
1660    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1661        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1662    }
1663
1664    public BlobTransferPolicy getBlobTransferPolicy() {
1665        if (blobTransferPolicy == null) {
1666            blobTransferPolicy = createBlobTransferPolicy();
1667        }
1668        return blobTransferPolicy;
1669    }
1670
1671    /**
1672     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1673     * OBjects) are transferred from producers to brokers to consumers
1674     */
1675    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1676        this.blobTransferPolicy = blobTransferPolicy;
1677    }
1678
1679    /**
1680     * @return Returns the alwaysSessionAsync.
1681     */
1682    public boolean isAlwaysSessionAsync() {
1683        return alwaysSessionAsync;
1684    }
1685
1686    /**
1687     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1688     * the Connection. However, a separate thread is always used if there is more than one session, or the session
1689     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1690     * happens asynchronously.
1691     */
1692    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1693        this.alwaysSessionAsync = alwaysSessionAsync;
1694    }
1695
1696    /**
1697     * @return Returns the optimizeAcknowledge.
1698     */
1699    public boolean isOptimizeAcknowledge() {
1700        return optimizeAcknowledge;
1701    }
1702
1703    /**
1704     * Enables an optimised acknowledgement mode where messages are acknowledged
1705     * in batches rather than individually
1706     *
1707     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1708     */
1709    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1710        this.optimizeAcknowledge = optimizeAcknowledge;
1711    }
1712
1713    /**
1714     * The max time in milliseconds between optimized ack batches
1715     * @param optimizeAcknowledgeTimeOut
1716     */
1717    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1718        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1719    }
1720
1721    public long getOptimizeAcknowledgeTimeOut() {
1722        return optimizeAcknowledgeTimeOut;
1723    }
1724
1725    public long getWarnAboutUnstartedConnectionTimeout() {
1726        return warnAboutUnstartedConnectionTimeout;
1727    }
1728
1729    /**
1730     * Enables the timeout from a connection creation to when a warning is
1731     * generated if the connection is not properly started via {@link #start()}
1732     * and a message is received by a consumer. It is a very common gotcha to
1733     * forget to <a
1734     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1735     * the connection</a> so this option makes the default case to create a
1736     * warning if the user forgets. To disable the warning just set the value to <
1737     * 0 (say -1).
1738     */
1739    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1740        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1741    }
1742
1743    /**
1744     * @return the sendTimeout (in milliseconds)
1745     */
1746    public int getSendTimeout() {
1747        return sendTimeout;
1748    }
1749
1750    /**
1751     * @param sendTimeout the sendTimeout to set (in milliseconds)
1752     */
1753    public void setSendTimeout(int sendTimeout) {
1754        this.sendTimeout = sendTimeout;
1755    }
1756
1757    /**
1758     * @return the sendAcksAsync
1759     */
1760    public boolean isSendAcksAsync() {
1761        return sendAcksAsync;
1762    }
1763
1764    /**
1765     * @param sendAcksAsync the sendAcksAsync to set
1766     */
1767    public void setSendAcksAsync(boolean sendAcksAsync) {
1768        this.sendAcksAsync = sendAcksAsync;
1769    }
1770
1771    /**
1772     * Returns the time this connection was created
1773     */
1774    public long getTimeCreated() {
1775        return timeCreated;
1776    }
1777
1778    private void waitForBrokerInfo() throws JMSException {
1779        try {
1780            brokerInfoReceived.await();
1781        } catch (InterruptedException e) {
1782            Thread.currentThread().interrupt();
1783            throw JMSExceptionSupport.create(e);
1784        }
1785    }
1786
1787    // Package protected so that it can be used in unit tests
1788    public Transport getTransport() {
1789        return transport;
1790    }
1791
1792    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1793        producers.put(producerId, producer);
1794    }
1795
1796    public void removeProducer(ProducerId producerId) {
1797        producers.remove(producerId);
1798    }
1799
1800    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1801        dispatchers.put(consumerId, dispatcher);
1802    }
1803
1804    public void removeDispatcher(ConsumerId consumerId) {
1805        dispatchers.remove(consumerId);
1806    }
1807
1808    public boolean hasDispatcher(ConsumerId consumerId) {
1809        return dispatchers.containsKey(consumerId);
1810    }
1811
1812    /**
1813     * @param o - the command to consume
1814     */
1815    @Override
1816    public void onCommand(final Object o) {
1817        final Command command = (Command)o;
1818        if (!closed.get() && command != null) {
1819            try {
1820                command.visit(new CommandVisitorAdapter() {
1821                    @Override
1822                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1823                        waitForTransportInterruptionProcessingToComplete();
1824                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1825                        if (dispatcher != null) {
1826                            // Copy in case a embedded broker is dispatching via
1827                            // vm://
1828                            // md.getMessage() == null to signal end of queue
1829                            // browse.
1830                            Message msg = md.getMessage();
1831                            if (msg != null) {
1832                                msg = msg.copy();
1833                                msg.setReadOnlyBody(true);
1834                                msg.setReadOnlyProperties(true);
1835                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1836                                msg.setConnection(ActiveMQConnection.this);
1837                                msg.setMemoryUsage(null);
1838                                md.setMessage(msg);
1839                            }
1840                            dispatcher.dispatch(md);
1841                        } else {
1842                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
1843                        }
1844                        return null;
1845                    }
1846
1847                    @Override
1848                    public Response processProducerAck(ProducerAck pa) throws Exception {
1849                        if (pa != null && pa.getProducerId() != null) {
1850                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1851                            if (producer != null) {
1852                                producer.onProducerAck(pa);
1853                            }
1854                        }
1855                        return null;
1856                    }
1857
1858                    @Override
1859                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1860                        brokerInfo = info;
1861                        brokerInfoReceived.countDown();
1862                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1863                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1864                        return null;
1865                    }
1866
1867                    @Override
1868                    public Response processConnectionError(final ConnectionError error) throws Exception {
1869                        executor.execute(new Runnable() {
1870                            @Override
1871                            public void run() {
1872                                onAsyncException(error.getException());
1873                            }
1874                        });
1875                        return null;
1876                    }
1877
1878                    @Override
1879                    public Response processControlCommand(ControlCommand command) throws Exception {
1880                        return null;
1881                    }
1882
1883                    @Override
1884                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1885                        onConnectionControl((ConnectionControl)command);
1886                        return null;
1887                    }
1888
1889                    @Override
1890                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1891                        onConsumerControl((ConsumerControl)command);
1892                        return null;
1893                    }
1894
1895                    @Override
1896                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1897                        onWireFormatInfo((WireFormatInfo)command);
1898                        return null;
1899                    }
1900                });
1901            } catch (Exception e) {
1902                onClientInternalException(e);
1903            }
1904        }
1905
1906        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1907            TransportListener listener = iter.next();
1908            listener.onCommand(command);
1909        }
1910    }
1911
1912    protected void onWireFormatInfo(WireFormatInfo info) {
1913        protocolVersion.set(info.getVersion());
1914    }
1915
1916    /**
1917     * Handles async client internal exceptions.
1918     * A client internal exception is usually one that has been thrown
1919     * by a container runtime component during asynchronous processing of a
1920     * message that does not affect the connection itself.
1921     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1922     * its <code>onException</code> method, if one has been registered with this connection.
1923     *
1924     * @param error the exception that the problem
1925     */
1926    public void onClientInternalException(final Throwable error) {
1927        if ( !closed.get() && !closing.get() ) {
1928            if ( this.clientInternalExceptionListener != null ) {
1929                executor.execute(new Runnable() {
1930                    @Override
1931                    public void run() {
1932                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1933                    }
1934                });
1935            } else {
1936                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1937                        + error, error);
1938            }
1939        }
1940    }
1941
1942    /**
1943     * Used for handling async exceptions
1944     *
1945     * @param error
1946     */
1947    public void onAsyncException(Throwable error) {
1948        if (!closed.get() && !closing.get()) {
1949            if (this.exceptionListener != null) {
1950
1951                if (!(error instanceof JMSException)) {
1952                    error = JMSExceptionSupport.create(error);
1953                }
1954                final JMSException e = (JMSException)error;
1955
1956                executor.execute(new Runnable() {
1957                    @Override
1958                    public void run() {
1959                        ActiveMQConnection.this.exceptionListener.onException(e);
1960                    }
1961                });
1962
1963            } else {
1964                LOG.debug("Async exception with no exception listener: " + error, error);
1965            }
1966        }
1967    }
1968
1969    @Override
1970    public void onException(final IOException error) {
1971        onAsyncException(error);
1972        if (!closed.get() && !closing.get()) {
1973            executor.execute(new Runnable() {
1974                @Override
1975                public void run() {
1976                    transportFailed(error);
1977                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
1978                    brokerInfoReceived.countDown();
1979                    try {
1980                        doCleanup(true);
1981                    } catch (JMSException e) {
1982                        LOG.warn("Exception during connection cleanup, " + e, e);
1983                    }
1984                    for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1985                        TransportListener listener = iter.next();
1986                        listener.onException(error);
1987                    }
1988                }
1989            });
1990        }
1991    }
1992
1993    @Override
1994    public void transportInterupted() {
1995        transportInterruptionProcessingComplete.set(1);
1996        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1997            ActiveMQSession s = i.next();
1998            s.clearMessagesInProgress(transportInterruptionProcessingComplete);
1999        }
2000
2001        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2002            connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
2003        }
2004
2005        if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
2006            if (LOG.isDebugEnabled()) {
2007                LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
2008            }
2009            signalInterruptionProcessingNeeded();
2010        }
2011
2012        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2013            TransportListener listener = iter.next();
2014            listener.transportInterupted();
2015        }
2016    }
2017
2018    @Override
2019    public void transportResumed() {
2020        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2021            TransportListener listener = iter.next();
2022            listener.transportResumed();
2023        }
2024    }
2025
2026    /**
2027     * Create the DestinationInfo object for the temporary destination.
2028     *
2029     * @param topic - if its true topic, else queue.
2030     * @return DestinationInfo
2031     * @throws JMSException
2032     */
2033    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2034
2035        // Check if Destination info is of temporary type.
2036        ActiveMQTempDestination dest;
2037        if (topic) {
2038            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2039        } else {
2040            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2041        }
2042
2043        DestinationInfo info = new DestinationInfo();
2044        info.setConnectionId(this.info.getConnectionId());
2045        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2046        info.setDestination(dest);
2047        syncSendPacket(info);
2048
2049        dest.setConnection(this);
2050        activeTempDestinations.put(dest, dest);
2051        return dest;
2052    }
2053
2054    /**
2055     * @param destination
2056     * @throws JMSException
2057     */
2058    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2059
2060        checkClosedOrFailed();
2061
2062        for (ActiveMQSession session : this.sessions) {
2063            if (session.isInUse(destination)) {
2064                throw new JMSException("A consumer is consuming from the temporary destination");
2065            }
2066        }
2067
2068        activeTempDestinations.remove(destination);
2069
2070        DestinationInfo destInfo = new DestinationInfo();
2071        destInfo.setConnectionId(this.info.getConnectionId());
2072        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2073        destInfo.setDestination(destination);
2074        destInfo.setTimeout(0);
2075        syncSendPacket(destInfo);
2076    }
2077
2078    public boolean isDeleted(ActiveMQDestination dest) {
2079
2080        // If we are not watching the advisories.. then
2081        // we will assume that the temp destination does exist.
2082        if (advisoryConsumer == null) {
2083            return false;
2084        }
2085
2086        return !activeTempDestinations.containsValue(dest);
2087    }
2088
2089    public boolean isCopyMessageOnSend() {
2090        return copyMessageOnSend;
2091    }
2092
2093    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2094        return localTransactionIdGenerator;
2095    }
2096
2097    public boolean isUseCompression() {
2098        return useCompression;
2099    }
2100
2101    /**
2102     * Enables the use of compression of the message bodies
2103     */
2104    public void setUseCompression(boolean useCompression) {
2105        this.useCompression = useCompression;
2106    }
2107
2108    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2109
2110        checkClosedOrFailed();
2111        ensureConnectionInfoSent();
2112
2113        DestinationInfo info = new DestinationInfo();
2114        info.setConnectionId(this.info.getConnectionId());
2115        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2116        info.setDestination(destination);
2117        info.setTimeout(0);
2118        syncSendPacket(info);
2119    }
2120
2121    public boolean isDispatchAsync() {
2122        return dispatchAsync;
2123    }
2124
2125    /**
2126     * Enables or disables the default setting of whether or not consumers have
2127     * their messages <a
2128     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2129     * synchronously or asynchronously by the broker</a>. For non-durable
2130     * topics for example we typically dispatch synchronously by default to
2131     * minimize context switches which boost performance. However sometimes its
2132     * better to go slower to ensure that a single blocked consumer socket does
2133     * not block delivery to other consumers.
2134     *
2135     * @param asyncDispatch If true then consumers created on this connection
2136     *                will default to having their messages dispatched
2137     *                asynchronously. The default value is true.
2138     */
2139    public void setDispatchAsync(boolean asyncDispatch) {
2140        this.dispatchAsync = asyncDispatch;
2141    }
2142
2143    public boolean isObjectMessageSerializationDefered() {
2144        return objectMessageSerializationDefered;
2145    }
2146
2147    /**
2148     * When an object is set on an ObjectMessage, the JMS spec requires the
2149     * object to be serialized by that set method. Enabling this flag causes the
2150     * object to not get serialized. The object may subsequently get serialized
2151     * if the message needs to be sent over a socket or stored to disk.
2152     */
2153    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2154        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2155    }
2156
2157    /**
2158     * Unsubscribes a durable subscription that has been created by a client.
2159     * <P>
2160     * This method deletes the state being maintained on behalf of the
2161     * subscriber by its provider.
2162     * <P>
2163     * It is erroneous for a client to delete a durable subscription while there
2164     * is an active <CODE>MessageConsumer </CODE> or
2165     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2166     * message is part of a pending transaction or has not been acknowledged in
2167     * the session.
2168     *
2169     * @param name the name used to identify this subscription
2170     * @throws JMSException if the session fails to unsubscribe to the durable
2171     *                 subscription due to some internal error.
2172     * @throws InvalidDestinationException if an invalid subscription name is
2173     *                 specified.
2174     * @since 1.1
2175     */
2176    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2177        checkClosedOrFailed();
2178        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2179        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2180        rsi.setSubscriptionName(name);
2181        rsi.setClientId(getConnectionInfo().getClientId());
2182        syncSendPacket(rsi);
2183    }
2184
2185    /**
2186     * Internal send method optimized: - It does not copy the message - It can
2187     * only handle ActiveMQ messages. - You can specify if the send is async or
2188     * sync - Does not allow you to send /w a transaction.
2189     */
2190    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2191        checkClosedOrFailed();
2192
2193        if (destination.isTemporary() && isDeleted(destination)) {
2194            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2195        }
2196
2197        msg.setJMSDestination(destination);
2198        msg.setJMSDeliveryMode(deliveryMode);
2199        long expiration = 0L;
2200
2201        if (!isDisableTimeStampsByDefault()) {
2202            long timeStamp = System.currentTimeMillis();
2203            msg.setJMSTimestamp(timeStamp);
2204            if (timeToLive > 0) {
2205                expiration = timeToLive + timeStamp;
2206            }
2207        }
2208
2209        msg.setJMSExpiration(expiration);
2210        msg.setJMSPriority(priority);
2211        msg.setJMSRedelivered(false);
2212        msg.setMessageId(messageId);
2213        msg.onSend();
2214        msg.setProducerId(msg.getMessageId().getProducerId());
2215
2216        if (LOG.isDebugEnabled()) {
2217            LOG.debug("Sending message: " + msg);
2218        }
2219
2220        if (async) {
2221            asyncSendPacket(msg);
2222        } else {
2223            syncSendPacket(msg);
2224        }
2225    }
2226
2227    protected void onConnectionControl(ConnectionControl command) {
2228        if (command.isFaultTolerant()) {
2229            this.optimizeAcknowledge = false;
2230            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2231                ActiveMQSession s = i.next();
2232                s.setOptimizeAcknowledge(false);
2233            }
2234        }
2235    }
2236
2237    protected void onConsumerControl(ConsumerControl command) {
2238        if (command.isClose()) {
2239            for (ActiveMQSession session : this.sessions) {
2240                session.close(command.getConsumerId());
2241            }
2242        } else {
2243            for (ActiveMQSession session : this.sessions) {
2244                session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2245            }
2246            for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2247                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2248                if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2249                    consumerInfo.setPrefetchSize(command.getPrefetch());
2250                }
2251            }
2252        }
2253    }
2254
2255    protected void transportFailed(IOException error) {
2256        transportFailed.set(true);
2257        if (firstFailureError == null) {
2258            firstFailureError = error;
2259        }
2260    }
2261
2262    /**
2263     * Should a JMS message be copied to a new JMS Message object as part of the
2264     * send() method in JMS. This is enabled by default to be compliant with the
2265     * JMS specification. You can disable it if you do not mutate JMS messages
2266     * after they are sent for a performance boost
2267     */
2268    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2269        this.copyMessageOnSend = copyMessageOnSend;
2270    }
2271
2272    @Override
2273    public String toString() {
2274        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2275    }
2276
2277    protected BlobTransferPolicy createBlobTransferPolicy() {
2278        return new BlobTransferPolicy();
2279    }
2280
2281    public int getProtocolVersion() {
2282        return protocolVersion.get();
2283    }
2284
2285    public int getProducerWindowSize() {
2286        return producerWindowSize;
2287    }
2288
2289    public void setProducerWindowSize(int producerWindowSize) {
2290        this.producerWindowSize = producerWindowSize;
2291    }
2292
2293    public void setAuditDepth(int auditDepth) {
2294        connectionAudit.setAuditDepth(auditDepth);
2295    }
2296
2297    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2298        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2299    }
2300
2301    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2302        connectionAudit.removeDispatcher(dispatcher);
2303    }
2304
2305    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2306        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2307    }
2308
2309    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2310        connectionAudit.rollbackDuplicate(dispatcher, message);
2311    }
2312
2313    public IOException getFirstFailureError() {
2314        return firstFailureError;
2315    }
2316
2317    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2318        if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
2319            LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
2320            signalInterruptionProcessingComplete();
2321        }
2322    }
2323
2324    protected void transportInterruptionProcessingComplete() {
2325        if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
2326            signalInterruptionProcessingComplete();
2327        }
2328    }
2329
2330    private void signalInterruptionProcessingComplete() {
2331            if (LOG.isDebugEnabled()) {
2332                LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
2333                        + " for:" + this.getConnectionInfo().getConnectionId());
2334            }
2335
2336            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2337            if (failoverTransport != null) {
2338                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2339                if (LOG.isDebugEnabled()) {
2340                    LOG.debug("notified failover transport (" + failoverTransport
2341                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2342                }
2343            }
2344            transportInterruptionProcessingComplete.set(0);
2345    }
2346
2347    private void signalInterruptionProcessingNeeded() {
2348        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2349        if (failoverTransport != null) {
2350            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2351            if (LOG.isDebugEnabled()) {
2352                LOG.debug("notified failover transport (" + failoverTransport
2353                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2354            }
2355        }
2356    }
2357
2358    /*
2359     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2360     * will wait to receive re dispatched messages.
2361     * default value is 0 so there is no wait by default.
2362     */
2363    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2364        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2365    }
2366
2367    public long getConsumerFailoverRedeliveryWaitPeriod() {
2368        return consumerFailoverRedeliveryWaitPeriod;
2369    }
2370
2371    protected Scheduler getScheduler() throws JMSException {
2372        Scheduler result = scheduler;
2373        if (result == null) {
2374            if (isClosing() || isClosed()) {
2375                // without lock contention report the closing state
2376                throw new ConnectionClosedException();
2377            }
2378            synchronized (this) {
2379                result = scheduler;
2380                if (result == null) {
2381                    checkClosed();
2382                    try {
2383                        result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2384                        result.start();
2385                        scheduler = result;
2386                    } catch(Exception e) {
2387                        throw JMSExceptionSupport.create(e);
2388                    }
2389                }
2390            }
2391        }
2392        return result;
2393    }
2394
2395    protected ThreadPoolExecutor getExecutor() {
2396        return this.executor;
2397    }
2398
2399    protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
2400        return sessions;
2401    }
2402
2403    /**
2404     * @return the checkForDuplicates
2405     */
2406    public boolean isCheckForDuplicates() {
2407        return this.checkForDuplicates;
2408    }
2409
2410    /**
2411     * @param checkForDuplicates the checkForDuplicates to set
2412     */
2413    public void setCheckForDuplicates(boolean checkForDuplicates) {
2414        this.checkForDuplicates = checkForDuplicates;
2415    }
2416
2417    public boolean isTransactedIndividualAck() {
2418        return transactedIndividualAck;
2419    }
2420
2421    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2422        this.transactedIndividualAck = transactedIndividualAck;
2423    }
2424
2425    public boolean isNonBlockingRedelivery() {
2426        return nonBlockingRedelivery;
2427    }
2428
2429    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2430        this.nonBlockingRedelivery = nonBlockingRedelivery;
2431    }
2432
2433    public boolean isRmIdFromConnectionId() {
2434        return rmIdFromConnectionId;
2435    }
2436
2437    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
2438        this.rmIdFromConnectionId = rmIdFromConnectionId;
2439    }
2440
2441    /**
2442     * Removes any TempDestinations that this connection has cached, ignoring
2443     * any exceptions generated because the destination is in use as they should
2444     * not be removed.
2445     * Used from a pooled connection, b/c it will not be explicitly closed.
2446     */
2447    public void cleanUpTempDestinations() {
2448
2449        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2450            return;
2451        }
2452
2453        Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2454            = this.activeTempDestinations.entrySet().iterator();
2455        while(entries.hasNext()) {
2456            ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2457            try {
2458                // Only delete this temp destination if it was created from this connection. The connection used
2459                // for the advisory consumer may also have a reference to this temp destination.
2460                ActiveMQTempDestination dest = entry.getValue();
2461                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2462                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2463                    this.deleteTempDestination(entry.getValue());
2464                }
2465            } catch (Exception ex) {
2466                // the temp dest is in use so it can not be deleted.
2467                // it is ok to leave it to connection tear down phase
2468            }
2469        }
2470    }
2471
2472    /**
2473     * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2474     * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2475     */
2476    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2477        this.redeliveryPolicyMap = redeliveryPolicyMap;
2478    }
2479
2480    /**
2481     * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2482     * Consumers when dealing with transaction messages that have been rolled back.
2483     *
2484     * @return the redeliveryPolicyMap
2485     */
2486    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2487        return redeliveryPolicyMap;
2488    }
2489
2490    public int getMaxThreadPoolSize() {
2491        return maxThreadPoolSize;
2492    }
2493
2494    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2495        this.maxThreadPoolSize = maxThreadPoolSize;
2496    }
2497
2498    /**
2499     * Enable enforcement of QueueConnection semantics.
2500     *
2501     * @return this object, useful for chaining
2502     */
2503    ActiveMQConnection enforceQueueOnlyConnection() {
2504        this.queueOnlyConnection = true;
2505        return this;
2506    }
2507
2508    public RejectedExecutionHandler getRejectedTaskHandler() {
2509        return rejectedTaskHandler;
2510    }
2511
2512    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2513        this.rejectedTaskHandler = rejectedTaskHandler;
2514    }
2515
2516    /**
2517     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2518     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2519     * will not do any background Message acknowledgment.
2520     *
2521     * @return the scheduledOptimizedAckInterval
2522     */
2523    public long getOptimizedAckScheduledAckInterval() {
2524        return optimizedAckScheduledAckInterval;
2525    }
2526
2527    /**
2528     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2529     * have been configured with optimizeAcknowledge enabled.
2530     *
2531     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2532     */
2533    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2534        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2535    }
2536
2537    /**
2538     * @return true if MessageConsumer instance will check for expired messages before dispatch.
2539     */
2540    public boolean isConsumerExpiryCheckEnabled() {
2541        return consumerExpiryCheckEnabled;
2542    }
2543
2544    /**
2545     * Controls whether message expiration checking is done in each MessageConsumer
2546     * prior to dispatching a message.  Disabling this check can lead to consumption
2547     * of expired messages.
2548     *
2549     * @param consumerExpiryCheckEnabled
2550     *        controls whether expiration checking is done prior to dispatch.
2551     */
2552    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
2553        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
2554    }
2555
2556    public List<String> getTrustedPackages() {
2557        return trustedPackages;
2558    }
2559
2560    public void setTrustedPackages(List<String> trustedPackages) {
2561        this.trustedPackages = trustedPackages;
2562    }
2563
2564    public boolean isTrustAllPackages() {
2565        return trustAllPackages;
2566    }
2567
2568    public void setTrustAllPackages(boolean trustAllPackages) {
2569        this.trustAllPackages = trustAllPackages;
2570    }
2571
2572    public int getConnectResponseTimeout() {
2573        return connectResponseTimeout;
2574    }
2575
2576        public void setConnectResponseTimeout(int connectResponseTimeout) {
2577                this.connectResponseTimeout = connectResponseTimeout;
2578        }
2579}