001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Executors;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.atomic.AtomicBoolean;
031    import java.util.concurrent.atomic.AtomicReference;
032    
033    import javax.jms.IllegalStateException;
034    import javax.jms.InvalidDestinationException;
035    import javax.jms.JMSException;
036    import javax.jms.Message;
037    import javax.jms.MessageConsumer;
038    import javax.jms.MessageListener;
039    import javax.jms.TransactionRolledBackException;
040    
041    import org.apache.activemq.blob.BlobDownloader;
042    import org.apache.activemq.command.ActiveMQBlobMessage;
043    import org.apache.activemq.command.ActiveMQDestination;
044    import org.apache.activemq.command.ActiveMQMessage;
045    import org.apache.activemq.command.ActiveMQTempDestination;
046    import org.apache.activemq.command.CommandTypes;
047    import org.apache.activemq.command.ConsumerId;
048    import org.apache.activemq.command.ConsumerInfo;
049    import org.apache.activemq.command.MessageAck;
050    import org.apache.activemq.command.MessageDispatch;
051    import org.apache.activemq.command.MessageId;
052    import org.apache.activemq.command.MessagePull;
053    import org.apache.activemq.command.RemoveInfo;
054    import org.apache.activemq.command.TransactionId;
055    import org.apache.activemq.management.JMSConsumerStatsImpl;
056    import org.apache.activemq.management.StatsCapable;
057    import org.apache.activemq.management.StatsImpl;
058    import org.apache.activemq.selector.SelectorParser;
059    import org.apache.activemq.transaction.Synchronization;
060    import org.apache.activemq.util.Callback;
061    import org.apache.activemq.util.IntrospectionSupport;
062    import org.apache.activemq.util.JMSExceptionSupport;
063    import org.slf4j.Logger;
064    import org.slf4j.LoggerFactory;
065    
066    /**
067     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
068     * from a destination. A <CODE> MessageConsumer</CODE> object is created by
069     * passing a <CODE>Destination</CODE> object to a message-consumer creation
070     * method supplied by a session.
071     * <P>
072     * <CODE>MessageConsumer</CODE> is the parent interface for all message
073     * consumers.
074     * <P>
075     * A message consumer can be created with a message selector. A message selector
076     * allows the client to restrict the messages delivered to the message consumer
077     * to those that match the selector.
078     * <P>
079     * A client may either synchronously receive a message consumer's messages or
080     * have the consumer asynchronously deliver them as they arrive.
081     * <P>
082     * For synchronous receipt, a client can request the next message from a message
083     * consumer using one of its <CODE> receive</CODE> methods. There are several
084     * variations of <CODE>receive</CODE> that allow a client to poll or wait for
085     * the next message.
086     * <P>
087     * For asynchronous delivery, a client can register a
088     * <CODE>MessageListener</CODE> object with a message consumer. As messages
089     * arrive at the message consumer, it delivers them by calling the
090     * <CODE>MessageListener</CODE>'s<CODE>
091     * onMessage</CODE> method.
092     * <P>
093     * It is a client programming error for a <CODE>MessageListener</CODE> to
094     * throw an exception.
095     *
096     *
097     * @see javax.jms.MessageConsumer
098     * @see javax.jms.QueueReceiver
099     * @see javax.jms.TopicSubscriber
100     * @see javax.jms.Session
101     */
102    public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
103    
104        @SuppressWarnings("serial")
105        class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
106            final TransactionId transactionId;
107            public PreviouslyDeliveredMap(TransactionId transactionId) {
108                this.transactionId = transactionId;
109            }
110        }
111    
112        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
113        protected final ActiveMQSession session;
114        protected final ConsumerInfo info;
115    
116        // These are the messages waiting to be delivered to the client
117        protected final MessageDispatchChannel unconsumedMessages;
118    
119        // The are the messages that were delivered to the consumer but that have
120        // not been acknowledged. It's kept in reverse order since we
121        // Always walk list in reverse order.
122        private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
123        // track duplicate deliveries in a transaction such that the tx integrity can be validated
124        private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
125        private int deliveredCounter;
126        private int additionalWindowSize;
127        private long redeliveryDelay;
128        private int ackCounter;
129        private int dispatchedCount;
130        private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
131        private final JMSConsumerStatsImpl stats;
132    
133        private final String selector;
134        private boolean synchronizationRegistered;
135        private final AtomicBoolean started = new AtomicBoolean(false);
136    
137        private MessageAvailableListener availableListener;
138    
139        private RedeliveryPolicy redeliveryPolicy;
140        private boolean optimizeAcknowledge;
141        private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
142        private ExecutorService executorService;
143        private MessageTransformer transformer;
144        private boolean clearDispatchList;
145        boolean inProgressClearRequiredFlag;
146    
147        private MessageAck pendingAck;
148        private long lastDeliveredSequenceId;
149    
150        private IOException failureError;
151    
152        private long optimizeAckTimestamp = System.currentTimeMillis();
153        private long optimizeAcknowledgeTimeOut = 0;
154        private long failoverRedeliveryWaitPeriod = 0;
155        private boolean transactedIndividualAck = false;
156        private boolean nonBlockingRedelivery = false;
157    
158        /**
159         * Create a MessageConsumer
160         *
161         * @param session
162         * @param dest
163         * @param name
164         * @param selector
165         * @param prefetch
166         * @param maximumPendingMessageCount
167         * @param noLocal
168         * @param browser
169         * @param dispatchAsync
170         * @param messageListener
171         * @throws JMSException
172         */
173        public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
174                String name, String selector, int prefetch,
175                int maximumPendingMessageCount, boolean noLocal, boolean browser,
176                boolean dispatchAsync, MessageListener messageListener) throws JMSException {
177            if (dest == null) {
178                throw new InvalidDestinationException("Don't understand null destinations");
179            } else if (dest.getPhysicalName() == null) {
180                throw new InvalidDestinationException("The destination object was not given a physical name.");
181            } else if (dest.isTemporary()) {
182                String physicalName = dest.getPhysicalName();
183    
184                if (physicalName == null) {
185                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
186                }
187    
188                String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
189    
190                if (physicalName.indexOf(connectionID) < 0) {
191                    throw new InvalidDestinationException(
192                                                          "Cannot use a Temporary destination from another Connection");
193                }
194    
195                if (session.connection.isDeleted(dest)) {
196                    throw new InvalidDestinationException(
197                                                          "Cannot use a Temporary destination that has been deleted");
198                }
199                if (prefetch < 0) {
200                    throw new JMSException("Cannot have a prefetch size less than zero");
201                }
202            }
203            if (session.connection.isMessagePrioritySupported()) {
204                this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
205            }else {
206                this.unconsumedMessages = new FifoMessageDispatchChannel();
207            }
208    
209            this.session = session;
210            this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
211            setTransformer(session.getTransformer());
212    
213            this.info = new ConsumerInfo(consumerId);
214            this.info.setExclusive(this.session.connection.isExclusiveConsumer());
215            this.info.setSubscriptionName(name);
216            this.info.setPrefetchSize(prefetch);
217            this.info.setCurrentPrefetchSize(prefetch);
218            this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
219            this.info.setNoLocal(noLocal);
220            this.info.setDispatchAsync(dispatchAsync);
221            this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
222            this.info.setSelector(null);
223    
224            // Allows the options on the destination to configure the consumerInfo
225            if (dest.getOptions() != null) {
226                Map<String, Object> options = IntrospectionSupport.extractProperties(
227                    new HashMap<String, Object>(dest.getOptions()), "consumer.");
228                IntrospectionSupport.setProperties(this.info, options);
229                if (options.size() > 0) {
230                    String msg = "There are " + options.size()
231                        + " consumer options that couldn't be set on the consumer."
232                        + " Check the options are spelled correctly."
233                        + " Unknown parameters=[" + options + "]."
234                        + " This consumer cannot be started.";
235                    LOG.warn(msg);
236                    throw new ConfigurationException(msg);
237                }
238            }
239    
240            this.info.setDestination(dest);
241            this.info.setBrowser(browser);
242            if (selector != null && selector.trim().length() != 0) {
243                // Validate the selector
244                SelectorParser.parse(selector);
245                this.info.setSelector(selector);
246                this.selector = selector;
247            } else if (info.getSelector() != null) {
248                // Validate the selector
249                SelectorParser.parse(this.info.getSelector());
250                this.selector = this.info.getSelector();
251            } else {
252                this.selector = null;
253            }
254    
255            this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
256            this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
257                                       && !info.isBrowser();
258            if (this.optimizeAcknowledge) {
259                this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
260            }
261            this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
262            this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
263            this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
264            this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
265            if (messageListener != null) {
266                setMessageListener(messageListener);
267            }
268            try {
269                this.session.addConsumer(this);
270                this.session.syncSendPacket(info);
271            } catch (JMSException e) {
272                this.session.removeConsumer(this);
273                throw e;
274            }
275    
276            if (session.connection.isStarted()) {
277                start();
278            }
279        }
280    
281        private boolean isAutoAcknowledgeEach() {
282            return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
283        }
284    
285        private boolean isAutoAcknowledgeBatch() {
286            return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
287        }
288    
289        public StatsImpl getStats() {
290            return stats;
291        }
292    
293        public JMSConsumerStatsImpl getConsumerStats() {
294            return stats;
295        }
296    
297        public RedeliveryPolicy getRedeliveryPolicy() {
298            return redeliveryPolicy;
299        }
300    
301        /**
302         * Sets the redelivery policy used when messages are redelivered
303         */
304        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
305            this.redeliveryPolicy = redeliveryPolicy;
306        }
307    
308        public MessageTransformer getTransformer() {
309            return transformer;
310        }
311    
312        /**
313         * Sets the transformer used to transform messages before they are sent on
314         * to the JMS bus
315         */
316        public void setTransformer(MessageTransformer transformer) {
317            this.transformer = transformer;
318        }
319    
320        /**
321         * @return Returns the value.
322         */
323        public ConsumerId getConsumerId() {
324            return info.getConsumerId();
325        }
326    
327        /**
328         * @return the consumer name - used for durable consumers
329         */
330        public String getConsumerName() {
331            return this.info.getSubscriptionName();
332        }
333    
334        /**
335         * @return true if this consumer does not accept locally produced messages
336         */
337        protected boolean isNoLocal() {
338            return info.isNoLocal();
339        }
340    
341        /**
342         * Retrieve is a browser
343         *
344         * @return true if a browser
345         */
346        protected boolean isBrowser() {
347            return info.isBrowser();
348        }
349    
350        /**
351         * @return ActiveMQDestination
352         */
353        protected ActiveMQDestination getDestination() {
354            return info.getDestination();
355        }
356    
357        /**
358         * @return Returns the prefetchNumber.
359         */
360        public int getPrefetchNumber() {
361            return info.getPrefetchSize();
362        }
363    
364        /**
365         * @return true if this is a durable topic subscriber
366         */
367        public boolean isDurableSubscriber() {
368            return info.getSubscriptionName() != null && info.getDestination().isTopic();
369        }
370    
371        /**
372         * Gets this message consumer's message selector expression.
373         *
374         * @return this message consumer's message selector, or null if no message
375         *         selector exists for the message consumer (that is, if the message
376         *         selector was not set or was set to null or the empty string)
377         * @throws JMSException if the JMS provider fails to receive the next
378         *                 message due to some internal error.
379         */
380        public String getMessageSelector() throws JMSException {
381            checkClosed();
382            return selector;
383        }
384    
385        /**
386         * Gets the message consumer's <CODE>MessageListener</CODE>.
387         *
388         * @return the listener for the message consumer, or null if no listener is
389         *         set
390         * @throws JMSException if the JMS provider fails to get the message
391         *                 listener due to some internal error.
392         * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
393         */
394        public MessageListener getMessageListener() throws JMSException {
395            checkClosed();
396            return this.messageListener.get();
397        }
398    
399        /**
400         * Sets the message consumer's <CODE>MessageListener</CODE>.
401         * <P>
402         * Setting the message listener to null is the equivalent of unsetting the
403         * message listener for the message consumer.
404         * <P>
405         * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
406         * while messages are being consumed by an existing listener or the consumer
407         * is being used to consume messages synchronously is undefined.
408         *
409         * @param listener the listener to which the messages are to be delivered
410         * @throws JMSException if the JMS provider fails to receive the next
411         *                 message due to some internal error.
412         * @see javax.jms.MessageConsumer#getMessageListener
413         */
414        public void setMessageListener(MessageListener listener) throws JMSException {
415            checkClosed();
416            if (info.getPrefetchSize() == 0) {
417                throw new JMSException(
418                                       "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
419            }
420            if (listener != null) {
421                boolean wasRunning = session.isRunning();
422                if (wasRunning) {
423                    session.stop();
424                }
425    
426                this.messageListener.set(listener);
427                session.redispatch(this, unconsumedMessages);
428    
429                if (wasRunning) {
430                    session.start();
431                }
432            } else {
433                this.messageListener.set(null);
434            }
435        }
436    
437        public MessageAvailableListener getAvailableListener() {
438            return availableListener;
439        }
440    
441        /**
442         * Sets the listener used to notify synchronous consumers that there is a
443         * message available so that the {@link MessageConsumer#receiveNoWait()} can
444         * be called.
445         */
446        public void setAvailableListener(MessageAvailableListener availableListener) {
447            this.availableListener = availableListener;
448        }
449    
450        /**
451         * Used to get an enqueued message from the unconsumedMessages list. The
452         * amount of time this method blocks is based on the timeout value. - if
453         * timeout==-1 then it blocks until a message is received. - if timeout==0
454         * then it it tries to not block at all, it returns a message if it is
455         * available - if timeout>0 then it blocks up to timeout amount of time.
456         * Expired messages will consumed by this method.
457         *
458         * @throws JMSException
459         * @return null if we timeout or if the consumer is closed.
460         */
461        private MessageDispatch dequeue(long timeout) throws JMSException {
462            try {
463                long deadline = 0;
464                if (timeout > 0) {
465                    deadline = System.currentTimeMillis() + timeout;
466                }
467                while (true) {
468                    MessageDispatch md = unconsumedMessages.dequeue(timeout);
469                    if (md == null) {
470                        if (timeout > 0 && !unconsumedMessages.isClosed()) {
471                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
472                        } else {
473                            if (failureError != null) {
474                                throw JMSExceptionSupport.create(failureError);
475                            } else {
476                                return null;
477                            }
478                        }
479                    } else if (md.getMessage() == null) {
480                        return null;
481                    } else if (md.getMessage().isExpired()) {
482                        if (LOG.isDebugEnabled()) {
483                            LOG.debug(getConsumerId() + " received expired message: " + md);
484                        }
485                        beforeMessageIsConsumed(md);
486                        afterMessageIsConsumed(md, true);
487                        if (timeout > 0) {
488                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
489                        }
490                    } else {
491                        if (LOG.isTraceEnabled()) {
492                            LOG.trace(getConsumerId() + " received message: " + md);
493                        }
494                        return md;
495                    }
496                }
497            } catch (InterruptedException e) {
498                Thread.currentThread().interrupt();
499                throw JMSExceptionSupport.create(e);
500            }
501        }
502    
503        /**
504         * Receives the next message produced for this message consumer.
505         * <P>
506         * This call blocks indefinitely until a message is produced or until this
507         * message consumer is closed.
508         * <P>
509         * If this <CODE>receive</CODE> is done within a transaction, the consumer
510         * retains the message until the transaction commits.
511         *
512         * @return the next message produced for this message consumer, or null if
513         *         this message consumer is concurrently closed
514         */
515        public Message receive() throws JMSException {
516            checkClosed();
517            checkMessageListener();
518    
519            sendPullCommand(0);
520            MessageDispatch md = dequeue(-1);
521            if (md == null) {
522                return null;
523            }
524    
525            beforeMessageIsConsumed(md);
526            afterMessageIsConsumed(md, false);
527    
528            return createActiveMQMessage(md);
529        }
530    
531        /**
532         * @param md
533         * @return
534         */
535        private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
536            ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
537            if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
538                ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
539            }
540            if (transformer != null) {
541                Message transformedMessage = transformer.consumerTransform(session, this, m);
542                if (transformedMessage != null) {
543                    m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
544                }
545            }
546            if (session.isClientAcknowledge()) {
547                m.setAcknowledgeCallback(new Callback() {
548                    public void execute() throws Exception {
549                        session.checkClosed();
550                        session.acknowledge();
551                    }
552                });
553            }else if (session.isIndividualAcknowledge()) {
554                m.setAcknowledgeCallback(new Callback() {
555                    public void execute() throws Exception {
556                        session.checkClosed();
557                        acknowledge(md);
558                    }
559                });
560            }
561            return m;
562        }
563    
564        /**
565         * Receives the next message that arrives within the specified timeout
566         * interval.
567         * <P>
568         * This call blocks until a message arrives, the timeout expires, or this
569         * message consumer is closed. A <CODE>timeout</CODE> of zero never
570         * expires, and the call blocks indefinitely.
571         *
572         * @param timeout the timeout value (in milliseconds), a time out of zero
573         *                never expires.
574         * @return the next message produced for this message consumer, or null if
575         *         the timeout expires or this message consumer is concurrently
576         *         closed
577         */
578        public Message receive(long timeout) throws JMSException {
579            checkClosed();
580            checkMessageListener();
581            if (timeout == 0) {
582                return this.receive();
583            }
584    
585            sendPullCommand(timeout);
586            while (timeout > 0) {
587    
588                MessageDispatch md;
589                if (info.getPrefetchSize() == 0) {
590                    md = dequeue(-1); // We let the broker let us know when we timeout.
591                } else {
592                    md = dequeue(timeout);
593                }
594    
595                if (md == null) {
596                    return null;
597                }
598    
599                beforeMessageIsConsumed(md);
600                afterMessageIsConsumed(md, false);
601                return createActiveMQMessage(md);
602            }
603            return null;
604        }
605    
606        /**
607         * Receives the next message if one is immediately available.
608         *
609         * @return the next message produced for this message consumer, or null if
610         *         one is not available
611         * @throws JMSException if the JMS provider fails to receive the next
612         *                 message due to some internal error.
613         */
614        public Message receiveNoWait() throws JMSException {
615            checkClosed();
616            checkMessageListener();
617            sendPullCommand(-1);
618    
619            MessageDispatch md;
620            if (info.getPrefetchSize() == 0) {
621                md = dequeue(-1); // We let the broker let us know when we
622                // timeout.
623            } else {
624                md = dequeue(0);
625            }
626    
627            if (md == null) {
628                return null;
629            }
630    
631            beforeMessageIsConsumed(md);
632            afterMessageIsConsumed(md, false);
633            return createActiveMQMessage(md);
634        }
635    
636        /**
637         * Closes the message consumer.
638         * <P>
639         * Since a provider may allocate some resources on behalf of a <CODE>
640         * MessageConsumer</CODE>
641         * outside the Java virtual machine, clients should close them when they are
642         * not needed. Relying on garbage collection to eventually reclaim these
643         * resources may not be timely enough.
644         * <P>
645         * This call blocks until a <CODE>receive</CODE> or message listener in
646         * progress has completed. A blocked message consumer <CODE>receive </CODE>
647         * call returns null when this message consumer is closed.
648         *
649         * @throws JMSException if the JMS provider fails to close the consumer due
650         *                 to some internal error.
651         */
652        public void close() throws JMSException {
653            if (!unconsumedMessages.isClosed()) {
654                if (session.getTransactionContext().isInTransaction()) {
655                    session.getTransactionContext().addSynchronization(new Synchronization() {
656                        @Override
657                        public void afterCommit() throws Exception {
658                            doClose();
659                        }
660    
661                        @Override
662                        public void afterRollback() throws Exception {
663                            doClose();
664                        }
665                    });
666                } else {
667                    doClose();
668                }
669            }
670        }
671    
672        void doClose() throws JMSException {
673            // Store interrupted state and clear so that Transport operations don't
674            // throw InterruptedException and we ensure that resources are clened up.
675            boolean interrupted = Thread.interrupted();
676            dispose();
677            RemoveInfo removeCommand = info.createRemoveCommand();
678            if (LOG.isDebugEnabled()) {
679                LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
680            }
681            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
682            this.session.asyncSendPacket(removeCommand);
683            if (interrupted) {
684                Thread.currentThread().interrupt();
685            }    }
686    
687        void inProgressClearRequired() {
688            inProgressClearRequiredFlag = true;
689            // deal with delivered messages async to avoid lock contention with in progress acks
690            clearDispatchList = true;
691        }
692    
693        void clearMessagesInProgress() {
694            if (inProgressClearRequiredFlag) {
695                synchronized (unconsumedMessages.getMutex()) {
696                    if (inProgressClearRequiredFlag) {
697                        if (LOG.isDebugEnabled()) {
698                            LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
699                        }
700                        // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
701                        List<MessageDispatch> list = unconsumedMessages.removeAll();
702                        if (!this.info.isBrowser()) {
703                            for (MessageDispatch old : list) {
704                                session.connection.rollbackDuplicate(this, old.getMessage());
705                            }
706                        }
707                        // allow dispatch on this connection to resume
708                        session.connection.transportInterruptionProcessingComplete();
709                        inProgressClearRequiredFlag = false;
710                    }
711                }
712            }
713        }
714    
715        void deliverAcks() {
716            MessageAck ack = null;
717            if (deliveryingAcknowledgements.compareAndSet(false, true)) {
718                if (isAutoAcknowledgeEach()) {
719                    synchronized(deliveredMessages) {
720                        ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
721                        if (ack != null) {
722                            deliveredMessages.clear();
723                            ackCounter = 0;
724                        } else {
725                            ack = pendingAck;
726                            pendingAck = null;
727                        }
728                    }
729                } else if (pendingAck != null && pendingAck.isStandardAck()) {
730                    ack = pendingAck;
731                    pendingAck = null;
732                }
733                if (ack != null) {
734                    final MessageAck ackToSend = ack;
735    
736                    if (executorService == null) {
737                        executorService = Executors.newSingleThreadExecutor();
738                    }
739                    executorService.submit(new Runnable() {
740                        public void run() {
741                            try {
742                                session.sendAck(ackToSend,true);
743                            } catch (JMSException e) {
744                                LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
745                            } finally {
746                                deliveryingAcknowledgements.set(false);
747                            }
748                        }
749                    });
750                } else {
751                    deliveryingAcknowledgements.set(false);
752                }
753            }
754        }
755    
756        public void dispose() throws JMSException {
757            if (!unconsumedMessages.isClosed()) {
758    
759                // Do we have any acks we need to send out before closing?
760                // Ack any delivered messages now.
761                if (!session.getTransacted()) {
762                    deliverAcks();
763                    if (isAutoAcknowledgeBatch()) {
764                        acknowledge();
765                    }
766                }
767                if (executorService != null) {
768                    executorService.shutdown();
769                    try {
770                        executorService.awaitTermination(60, TimeUnit.SECONDS);
771                    } catch (InterruptedException e) {
772                        Thread.currentThread().interrupt();
773                    }
774                }
775    
776                if (session.isClientAcknowledge()) {
777                    if (!this.info.isBrowser()) {
778                        // rollback duplicates that aren't acknowledged
779                        List<MessageDispatch> tmp = null;
780                        synchronized (this.deliveredMessages) {
781                            tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
782                        }
783                        for (MessageDispatch old : tmp) {
784                            this.session.connection.rollbackDuplicate(this, old.getMessage());
785                        }
786                        tmp.clear();
787                    }
788                }
789                if (!session.isTransacted()) {
790                    synchronized(deliveredMessages) {
791                        deliveredMessages.clear();
792                    }
793                }
794                unconsumedMessages.close();
795                this.session.removeConsumer(this);
796                List<MessageDispatch> list = unconsumedMessages.removeAll();
797                if (!this.info.isBrowser()) {
798                    for (MessageDispatch old : list) {
799                        // ensure we don't filter this as a duplicate
800                        session.connection.rollbackDuplicate(this, old.getMessage());
801                    }
802                }
803            }
804        }
805    
806        /**
807         * @throws IllegalStateException
808         */
809        protected void checkClosed() throws IllegalStateException {
810            if (unconsumedMessages.isClosed()) {
811                throw new IllegalStateException("The Consumer is closed");
812            }
813        }
814    
815        /**
816         * If we have a zero prefetch specified then send a pull command to the
817         * broker to pull a message we are about to receive
818         */
819        protected void sendPullCommand(long timeout) throws JMSException {
820            clearDispatchList();
821            if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
822                MessagePull messagePull = new MessagePull();
823                messagePull.configure(info);
824                messagePull.setTimeout(timeout);
825                session.asyncSendPacket(messagePull);
826            }
827        }
828    
829        protected void checkMessageListener() throws JMSException {
830            session.checkMessageListener();
831        }
832    
833        protected void setOptimizeAcknowledge(boolean value) {
834            if (optimizeAcknowledge && !value) {
835                deliverAcks();
836            }
837            optimizeAcknowledge = value;
838        }
839    
840        protected void setPrefetchSize(int prefetch) {
841            deliverAcks();
842            this.info.setCurrentPrefetchSize(prefetch);
843        }
844    
845        private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
846            md.setDeliverySequenceId(session.getNextDeliveryId());
847            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
848            if (!isAutoAcknowledgeBatch()) {
849                synchronized(deliveredMessages) {
850                    deliveredMessages.addFirst(md);
851                }
852                if (session.getTransacted()) {
853                    if (transactedIndividualAck) {
854                        immediateIndividualTransactedAck(md);
855                    } else {
856                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
857                    }
858                }
859            }
860        }
861    
862        private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
863            // acks accumulate on the broker pending transaction completion to indicate
864            // delivery status
865            registerSync();
866            MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
867            ack.setTransactionId(session.getTransactionContext().getTransactionId());
868            session.syncSendPacket(ack);
869        }
870    
871        private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
872            if (unconsumedMessages.isClosed()) {
873                return;
874            }
875            if (messageExpired) {
876                synchronized (deliveredMessages) {
877                    deliveredMessages.remove(md);
878                }
879                stats.getExpiredMessageCount().increment();
880                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
881            } else {
882                stats.onMessage();
883                if (session.getTransacted()) {
884                    // Do nothing.
885                } else if (isAutoAcknowledgeEach()) {
886                    if (deliveryingAcknowledgements.compareAndSet(false, true)) {
887                        synchronized (deliveredMessages) {
888                            if (!deliveredMessages.isEmpty()) {
889                                if (optimizeAcknowledge) {
890                                    ackCounter++;
891                                    if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
892                                        MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
893                                        if (ack != null) {
894                                            deliveredMessages.clear();
895                                            ackCounter = 0;
896                                            session.sendAck(ack);
897                                            optimizeAckTimestamp = System.currentTimeMillis();
898                                        }
899                                    }
900                                } else {
901                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
902                                    if (ack!=null) {
903                                        deliveredMessages.clear();
904                                        session.sendAck(ack);
905                                    }
906                                }
907                            }
908                        }
909                        deliveryingAcknowledgements.set(false);
910                    }
911                } else if (isAutoAcknowledgeBatch()) {
912                    ackLater(md, MessageAck.STANDARD_ACK_TYPE);
913                } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
914                    boolean messageUnackedByConsumer = false;
915                    synchronized (deliveredMessages) {
916                        messageUnackedByConsumer = deliveredMessages.contains(md);
917                    }
918                    if (messageUnackedByConsumer) {
919                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
920                    }
921                }
922                else {
923                    throw new IllegalStateException("Invalid session state.");
924                }
925            }
926        }
927    
928        /**
929         * Creates a MessageAck for all messages contained in deliveredMessages.
930         * Caller should hold the lock for deliveredMessages.
931         *
932         * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
933         * @return <code>null</code> if nothing to ack.
934         */
935        private MessageAck makeAckForAllDeliveredMessages(byte type) {
936            synchronized (deliveredMessages) {
937                if (deliveredMessages.isEmpty())
938                    return null;
939    
940                MessageDispatch md = deliveredMessages.getFirst();
941                MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
942                ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
943                return ack;
944            }
945        }
946    
947        private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
948    
949            // Don't acknowledge now, but we may need to let the broker know the
950            // consumer got the message to expand the pre-fetch window
951            if (session.getTransacted()) {
952                registerSync();
953            }
954    
955            deliveredCounter++;
956    
957            MessageAck oldPendingAck = pendingAck;
958            pendingAck = new MessageAck(md, ackType, deliveredCounter);
959            pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
960            if( oldPendingAck==null ) {
961                pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
962            } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
963                pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
964            } else {
965                // old pending ack being superseded by ack of another type, if is is not a delivered
966                // ack and hence important, send it now so it is not lost.
967                if ( !oldPendingAck.isDeliveredAck()) {
968                    if (LOG.isDebugEnabled()) {
969                        LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
970                    }
971                    session.sendAck(oldPendingAck);
972                } else {
973                    if (LOG.isDebugEnabled()) {
974                        LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
975                    }
976                }
977            }
978    
979            if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
980                session.sendAck(pendingAck);
981                pendingAck=null;
982                deliveredCounter = 0;
983                additionalWindowSize = 0;
984            }
985        }
986    
987        private void registerSync() throws JMSException {
988            session.doStartTransaction();
989            if (!synchronizationRegistered) {
990                synchronizationRegistered = true;
991                session.getTransactionContext().addSynchronization(new Synchronization() {
992                    @Override
993                    public void beforeEnd() throws Exception {
994                        if (transactedIndividualAck) {
995                            clearDispatchList();
996                            waitForRedeliveries();
997                            synchronized(deliveredMessages) {
998                                rollbackOnFailedRecoveryRedelivery();
999                            }
1000                        } else {
1001                            acknowledge();
1002                        }
1003                        synchronizationRegistered = false;
1004                    }
1005    
1006                    @Override
1007                    public void afterCommit() throws Exception {
1008                        commit();
1009                        synchronizationRegistered = false;
1010                    }
1011    
1012                    @Override
1013                    public void afterRollback() throws Exception {
1014                        rollback();
1015                        synchronizationRegistered = false;
1016                    }
1017                });
1018            }
1019        }
1020    
1021        /**
1022         * Acknowledge all the messages that have been delivered to the client up to
1023         * this point.
1024         *
1025         * @throws JMSException
1026         */
1027        public void acknowledge() throws JMSException {
1028            clearDispatchList();
1029            waitForRedeliveries();
1030            synchronized(deliveredMessages) {
1031                // Acknowledge all messages so far.
1032                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
1033                if (ack == null)
1034                    return; // no msgs
1035    
1036                if (session.getTransacted()) {
1037                    rollbackOnFailedRecoveryRedelivery();
1038                    session.doStartTransaction();
1039                    ack.setTransactionId(session.getTransactionContext().getTransactionId());
1040                }
1041                session.sendAck(ack);
1042                pendingAck = null;
1043    
1044                // Adjust the counters
1045                deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1046                additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1047    
1048                if (!session.getTransacted()) {
1049                    deliveredMessages.clear();
1050                }
1051            }
1052        }
1053    
1054        private void waitForRedeliveries() {
1055            if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1056                long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1057                int numberNotReplayed;
1058                do {
1059                    numberNotReplayed = 0;
1060                    synchronized(deliveredMessages) {
1061                        if (previouslyDeliveredMessages != null) {
1062                            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1063                                if (!entry.getValue()) {
1064                                    numberNotReplayed++;
1065                                }
1066                            }
1067                        }
1068                    }
1069                    if (numberNotReplayed > 0) {
1070                        LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1071                                + previouslyDeliveredMessages.transactionId +  ", to consumer :" + this.getConsumerId());
1072                        try {
1073                            Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1074                        } catch (InterruptedException outOfhere) {
1075                            break;
1076                        }
1077                    }
1078                } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1079            }
1080        }
1081    
1082        /*
1083         * called with deliveredMessages locked
1084         */
1085        private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1086            if (previouslyDeliveredMessages != null) {
1087                // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1088                // as messages have been dispatched else where.
1089                int numberNotReplayed = 0;
1090                for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1091                    if (!entry.getValue()) {
1092                        numberNotReplayed++;
1093                        if (LOG.isDebugEnabled()) {
1094                            LOG.debug("previously delivered message has not been replayed in transaction: "
1095                                    + previouslyDeliveredMessages.transactionId
1096                                    + " , messageId: " + entry.getKey());
1097                        }
1098                    }
1099                }
1100                if (numberNotReplayed > 0) {
1101                    String message = "rolling back transaction ("
1102                        + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1103                        + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1104                    LOG.warn(message);
1105                    throw new TransactionRolledBackException(message);
1106                }
1107            }
1108        }
1109    
1110        void acknowledge(MessageDispatch md) throws JMSException {
1111            MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
1112            session.sendAck(ack);
1113            synchronized(deliveredMessages){
1114                deliveredMessages.remove(md);
1115            }
1116        }
1117    
1118        public void commit() throws JMSException {
1119            synchronized (deliveredMessages) {
1120                deliveredMessages.clear();
1121                clearPreviouslyDelivered();
1122            }
1123            redeliveryDelay = 0;
1124        }
1125    
1126        public void rollback() throws JMSException {
1127            synchronized (unconsumedMessages.getMutex()) {
1128                if (optimizeAcknowledge) {
1129                    // remove messages read but not acked at the broker yet through
1130                    // optimizeAcknowledge
1131                    if (!this.info.isBrowser()) {
1132                        synchronized(deliveredMessages) {
1133                            for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1134                                // ensure we don't filter this as a duplicate
1135                                MessageDispatch md = deliveredMessages.removeLast();
1136                                session.connection.rollbackDuplicate(this, md.getMessage());
1137                            }
1138                        }
1139                    }
1140                }
1141                synchronized(deliveredMessages) {
1142                    rollbackPreviouslyDeliveredAndNotRedelivered();
1143                    if (deliveredMessages.isEmpty()) {
1144                        return;
1145                    }
1146    
1147                    // use initial delay for first redelivery
1148                    MessageDispatch lastMd = deliveredMessages.getFirst();
1149                    final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1150                    if (currentRedeliveryCount > 0) {
1151                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1152                    } else {
1153                        redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1154                    }
1155                    MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1156    
1157                    for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1158                        MessageDispatch md = iter.next();
1159                        md.getMessage().onMessageRolledBack();
1160                        // ensure we don't filter this as a duplicate
1161                        session.connection.rollbackDuplicate(this, md.getMessage());
1162                    }
1163    
1164                    if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1165                        && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1166                        // We need to NACK the messages so that they get sent to the
1167                        // DLQ.
1168                        // Acknowledge the last message.
1169    
1170                        MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1171                        ack.setPoisonCause(lastMd.getRollbackCause());
1172                        ack.setFirstMessageId(firstMsgId);
1173                        session.sendAck(ack,true);
1174                        // Adjust the window size.
1175                        additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1176                        redeliveryDelay = 0;
1177                    } else {
1178    
1179                        // only redelivery_ack after first delivery
1180                        if (currentRedeliveryCount > 0) {
1181                            MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1182                            ack.setFirstMessageId(firstMsgId);
1183                            session.sendAck(ack,true);
1184                        }
1185    
1186                        // stop the delivery of messages.
1187                        if (nonBlockingRedelivery) {
1188                            if (!unconsumedMessages.isClosed()) {
1189    
1190                                final LinkedList<MessageDispatch> pendingRedeliveries =
1191                                    new LinkedList<MessageDispatch>(deliveredMessages);
1192    
1193                                // Start up the delivery again a little later.
1194                                session.getScheduler().executeAfterDelay(new Runnable() {
1195                                    public void run() {
1196                                        try {
1197                                            if (!unconsumedMessages.isClosed()) {
1198                                                for(MessageDispatch dispatch : pendingRedeliveries) {
1199                                                    session.dispatch(dispatch);
1200                                                }
1201                                            }
1202                                        } catch (Exception e) {
1203                                            session.connection.onAsyncException(e);
1204                                        }
1205                                    }
1206                                }, redeliveryDelay);
1207                            }
1208    
1209                        } else {
1210                            unconsumedMessages.stop();
1211    
1212                            for (MessageDispatch md : deliveredMessages) {
1213                                unconsumedMessages.enqueueFirst(md);
1214                            }
1215    
1216                            if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1217                                // Start up the delivery again a little later.
1218                                session.getScheduler().executeAfterDelay(new Runnable() {
1219                                    public void run() {
1220                                        try {
1221                                            if (started.get()) {
1222                                                start();
1223                                            }
1224                                        } catch (JMSException e) {
1225                                            session.connection.onAsyncException(e);
1226                                        }
1227                                    }
1228                                }, redeliveryDelay);
1229                            } else {
1230                                start();
1231                            }
1232                        }
1233                    }
1234                    deliveredCounter -= deliveredMessages.size();
1235                    deliveredMessages.clear();
1236                }
1237            }
1238            if (messageListener.get() != null) {
1239                session.redispatch(this, unconsumedMessages);
1240            }
1241        }
1242    
1243        /*
1244         * called with unconsumedMessages && deliveredMessages locked
1245         * remove any message not re-delivered as they can't be replayed to this
1246         * consumer on rollback
1247         */
1248        private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1249            if (previouslyDeliveredMessages != null) {
1250                for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1251                    if (!entry.getValue()) {
1252                        removeFromDeliveredMessages(entry.getKey());
1253                    }
1254                }
1255                clearPreviouslyDelivered();
1256            }
1257        }
1258    
1259        /*
1260         * called with deliveredMessages locked
1261         */
1262        private void removeFromDeliveredMessages(MessageId key) {
1263            Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1264            while (iterator.hasNext()) {
1265                MessageDispatch candidate = iterator.next();
1266                if (key.equals(candidate.getMessage().getMessageId())) {
1267                    session.connection.rollbackDuplicate(this, candidate.getMessage());
1268                    iterator.remove();
1269                    break;
1270                }
1271            }
1272        }
1273    
1274        /*
1275         * called with deliveredMessages locked
1276         */
1277        private void clearPreviouslyDelivered() {
1278            if (previouslyDeliveredMessages != null) {
1279                previouslyDeliveredMessages.clear();
1280                previouslyDeliveredMessages = null;
1281            }
1282        }
1283    
1284        public void dispatch(MessageDispatch md) {
1285            MessageListener listener = this.messageListener.get();
1286            try {
1287                clearMessagesInProgress();
1288                clearDispatchList();
1289                synchronized (unconsumedMessages.getMutex()) {
1290                    if (!unconsumedMessages.isClosed()) {
1291                        if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1292                            if (listener != null && unconsumedMessages.isRunning()) {
1293                                ActiveMQMessage message = createActiveMQMessage(md);
1294                                beforeMessageIsConsumed(md);
1295                                try {
1296                                    boolean expired = message.isExpired();
1297                                    if (!expired) {
1298                                        listener.onMessage(message);
1299                                    }
1300                                    afterMessageIsConsumed(md, expired);
1301                                } catch (RuntimeException e) {
1302                                    LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
1303                                    if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1304                                        // schedual redelivery and possible dlq processing
1305                                        md.setRollbackCause(e);
1306                                        rollback();
1307                                    } else {
1308                                        // Transacted or Client ack: Deliver the
1309                                        // next message.
1310                                        afterMessageIsConsumed(md, false);
1311                                    }
1312                                }
1313                            } else {
1314                                if (!unconsumedMessages.isRunning()) {
1315                                    // delayed redelivery, ensure it can be re delivered
1316                                    session.connection.rollbackDuplicate(this, md.getMessage());
1317                                }
1318                                unconsumedMessages.enqueue(md);
1319                                if (availableListener != null) {
1320                                    availableListener.onMessageAvailable(this);
1321                                }
1322                            }
1323                        } else {
1324                            if (!session.isTransacted()) {
1325                                LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
1326                                        + " to consumer: "  + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
1327                                MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
1328                                session.sendAck(ack);
1329                            } else {
1330                                if (LOG.isDebugEnabled()) {
1331                                    LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
1332                                }
1333                                boolean needsPoisonAck = false;
1334                                synchronized (deliveredMessages) {
1335                                    if (previouslyDeliveredMessages != null) {
1336                                        previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1337                                    } else {
1338                                        // delivery while pending redelivery to another consumer on the same connection
1339                                        // not waiting for redelivery will help here
1340                                        needsPoisonAck = true;
1341                                    }
1342                                }
1343                                if (needsPoisonAck) {
1344                                    MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1345                                    poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1346                                    poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
1347                                            + session.getConnection().getConnectionInfo().getConnectionId()));
1348                                    LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1349                                            + " consumer on this connection, failoverRedeliveryWaitPeriod="
1350                                            + failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
1351                                    session.sendAck(poisonAck);
1352                                } else {
1353                                    if (transactedIndividualAck) {
1354                                        immediateIndividualTransactedAck(md);
1355                                    } else {
1356                                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1357                                    }
1358                                }
1359                            }
1360                        }
1361                    }
1362                }
1363                if (++dispatchedCount % 1000 == 0) {
1364                    dispatchedCount = 0;
1365                    Thread.yield();
1366                }
1367            } catch (Exception e) {
1368                session.connection.onClientInternalException(e);
1369            }
1370        }
1371    
1372        // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1373        private void clearDispatchList() {
1374            if (clearDispatchList) {
1375                synchronized (deliveredMessages) {
1376                    if (clearDispatchList) {
1377                        if (!deliveredMessages.isEmpty()) {
1378                            if (session.isTransacted()) {
1379                                if (LOG.isDebugEnabled()) {
1380                                    LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1381                                }
1382                                if (previouslyDeliveredMessages == null) {
1383                                    previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1384                                }
1385                                for (MessageDispatch delivered : deliveredMessages) {
1386                                    previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1387                                }
1388                            } else {
1389                                if (LOG.isDebugEnabled()) {
1390                                    LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1391                                }
1392                                deliveredMessages.clear();
1393                                pendingAck = null;
1394                            }
1395                        }
1396                        clearDispatchList = false;
1397                    }
1398                }
1399            }
1400        }
1401    
1402        public int getMessageSize() {
1403            return unconsumedMessages.size();
1404        }
1405    
1406        public void start() throws JMSException {
1407            if (unconsumedMessages.isClosed()) {
1408                return;
1409            }
1410            started.set(true);
1411            unconsumedMessages.start();
1412            session.executor.wakeup();
1413        }
1414    
1415        public void stop() {
1416            started.set(false);
1417            unconsumedMessages.stop();
1418        }
1419    
1420        @Override
1421        public String toString() {
1422            return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1423                   + " }";
1424        }
1425    
1426        /**
1427         * Delivers a message to the message listener.
1428         *
1429         * @return
1430         * @throws JMSException
1431         */
1432        public boolean iterate() {
1433            MessageListener listener = this.messageListener.get();
1434            if (listener != null) {
1435                MessageDispatch md = unconsumedMessages.dequeueNoWait();
1436                if (md != null) {
1437                    dispatch(md);
1438                    return true;
1439                }
1440            }
1441            return false;
1442        }
1443    
1444        public boolean isInUse(ActiveMQTempDestination destination) {
1445            return info.getDestination().equals(destination);
1446        }
1447    
1448        public long getLastDeliveredSequenceId() {
1449            return lastDeliveredSequenceId;
1450        }
1451    
1452        public IOException getFailureError() {
1453            return failureError;
1454        }
1455    
1456        public void setFailureError(IOException failureError) {
1457            this.failureError = failureError;
1458        }
1459    }