001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026
027import javax.jms.JMSException;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
033import org.apache.activemq.command.ConsumerControl;
034import org.apache.activemq.command.ConsumerInfo;
035import org.apache.activemq.command.Message;
036import org.apache.activemq.command.MessageAck;
037import org.apache.activemq.command.MessageDispatch;
038import org.apache.activemq.command.MessageDispatchNotification;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.command.MessagePull;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.transaction.Synchronization;
044import org.apache.activemq.transport.TransmitCallback;
045import org.apache.activemq.usage.SystemUsage;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A subscription that honors the pre-fetch option of the ConsumerInfo.
051 */
052public abstract class PrefetchSubscription extends AbstractSubscription {
053
054    private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
055    protected final Scheduler scheduler;
056
057    protected PendingMessageCursor pending;
058    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
059    private int maxProducersToAudit=32;
060    private int maxAuditDepth=2048;
061    protected final SystemUsage usageManager;
062    protected final Object pendingLock = new Object();
063    protected final Object dispatchLock = new Object();
064    private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
065
066    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException {
067        super(broker,context, info);
068        this.usageManager=usageManager;
069        pending = cursor;
070        try {
071            pending.start();
072        } catch (Exception e) {
073            throw new JMSException(e.getMessage());
074        }
075        this.scheduler = broker.getScheduler();
076    }
077
078    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
079        this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
080    }
081
082    /**
083     * Allows a message to be pulled on demand by a client
084     */
085    @Override
086    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
087        // The slave should not deliver pull messages.
088        // TODO: when the slave becomes a master, He should send a NULL message to all the
089        // consumers to 'wake them up' in case they were waiting for a message.
090        if (getPrefetchSize() == 0) {
091            prefetchExtension.set(pull.getQuantity());
092            final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount();
093
094            // Have the destination push us some messages.
095            for (Destination dest : destinations) {
096                dest.iterate();
097            }
098            dispatchPending();
099
100            synchronized(this) {
101                // If there was nothing dispatched.. we may need to setup a timeout.
102                if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
103                    // immediate timeout used by receiveNoWait()
104                    if (pull.getTimeout() == -1) {
105                        // Null message indicates the pull is done or did not have pending.
106                        prefetchExtension.set(1);
107                        add(QueueMessageReference.NULL_MESSAGE);
108                        dispatchPending();
109                    }
110                    if (pull.getTimeout() > 0) {
111                        scheduler.executeAfterDelay(new Runnable() {
112                            @Override
113                            public void run() {
114                                pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone());
115                            }
116                        }, pull.getTimeout());
117                    }
118                }
119            }
120        }
121        return null;
122    }
123
124    /**
125     * Occurs when a pull times out. If nothing has been dispatched since the
126     * timeout was setup, then send the NULL message.
127     */
128    final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
129        synchronized (pendingLock) {
130            if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) {
131                try {
132                    prefetchExtension.set(1);
133                    add(QueueMessageReference.NULL_MESSAGE);
134                    dispatchPending();
135                } catch (Exception e) {
136                    context.getConnection().serviceException(e);
137                } finally {
138                    prefetchExtension.set(0);
139                }
140            }
141        }
142    }
143
144    @Override
145    public void add(MessageReference node) throws Exception {
146        synchronized (pendingLock) {
147            // The destination may have just been removed...
148            if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) {
149                // perhaps we should inform the caller that we are no longer valid to dispatch to?
150                return;
151            }
152
153            // Don't increment for the pullTimeout control message.
154            if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
155                getSubscriptionStatistics().getEnqueues().increment();
156            }
157            pending.addMessageLast(node);
158        }
159        dispatchPending();
160    }
161
162    @Override
163    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
164        synchronized(pendingLock) {
165            try {
166                pending.reset();
167                while (pending.hasNext()) {
168                    MessageReference node = pending.next();
169                    node.decrementReferenceCount();
170                    if (node.getMessageId().equals(mdn.getMessageId())) {
171                        // Synchronize between dispatched list and removal of messages from pending list
172                        // related to remove subscription action
173                        synchronized(dispatchLock) {
174                            pending.remove();
175                            createMessageDispatch(node, node.getMessage());
176                            dispatched.add(node);
177                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
178                            onDispatch(node, node.getMessage());
179                        }
180                        return;
181                    }
182                }
183            } finally {
184                pending.release();
185            }
186        }
187        throw new JMSException(
188                "Slave broker out of sync with master: Dispatched message ("
189                        + mdn.getMessageId() + ") was not in the pending list for "
190                        + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
191    }
192
193    @Override
194    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
195        // Handle the standard acknowledgment case.
196        boolean callDispatchMatched = false;
197        Destination destination = null;
198
199        if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
200            // suppress unexpected ack exception in this expected case
201            LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack);
202            return;
203        }
204
205        LOG.trace("ack: {}", ack);
206
207        synchronized(dispatchLock) {
208            if (ack.isStandardAck()) {
209                // First check if the ack matches the dispatched. When using failover this might
210                // not be the case. We don't ever want to ack the wrong messages.
211                assertAckMatchesDispatched(ack);
212
213                // Acknowledge all dispatched messages up till the message id of
214                // the acknowledgment.
215                boolean inAckRange = false;
216                List<MessageReference> removeList = new ArrayList<MessageReference>();
217                for (final MessageReference node : dispatched) {
218                    MessageId messageId = node.getMessageId();
219                    if (ack.getFirstMessageId() == null
220                            || ack.getFirstMessageId().equals(messageId)) {
221                        inAckRange = true;
222                    }
223                    if (inAckRange) {
224                        // Don't remove the nodes until we are committed.
225                        if (!context.isInTransaction()) {
226                            getSubscriptionStatistics().getDequeues().increment();
227                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
228                            removeList.add(node);
229                            contractPrefetchExtension(1);
230                        } else {
231                            registerRemoveSync(context, node);
232                        }
233                        acknowledge(context, ack, node);
234                        if (ack.getLastMessageId().equals(messageId)) {
235                            destination = (Destination) node.getRegionDestination();
236                            callDispatchMatched = true;
237                            break;
238                        }
239                    }
240                }
241                for (final MessageReference node : removeList) {
242                    dispatched.remove(node);
243                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
244                }
245                // this only happens after a reconnect - get an ack which is not
246                // valid
247                if (!callDispatchMatched) {
248                    LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack);
249                }
250            } else if (ack.isIndividualAck()) {
251                // Message was delivered and acknowledge - but only delete the
252                // individual message
253                for (final MessageReference node : dispatched) {
254                    MessageId messageId = node.getMessageId();
255                    if (ack.getLastMessageId().equals(messageId)) {
256                        // Don't remove the nodes until we are committed - immediateAck option
257                        if (!context.isInTransaction()) {
258                            getSubscriptionStatistics().getDequeues().increment();
259                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
260                            dispatched.remove(node);
261                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
262                            contractPrefetchExtension(1);
263                        } else {
264                            registerRemoveSync(context, node);
265                            expandPrefetchExtension(1);
266                        }
267                        acknowledge(context, ack, node);
268                        destination = (Destination) node.getRegionDestination();
269                        callDispatchMatched = true;
270                        break;
271                    }
272                }
273            } else if (ack.isDeliveredAck()) {
274                // Message was delivered but not acknowledged: update pre-fetch
275                // counters.
276                int index = 0;
277                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
278                    final MessageReference node = iter.next();
279                    Destination nodeDest = (Destination) node.getRegionDestination();
280                    if (ack.getLastMessageId().equals(node.getMessageId())) {
281                        expandPrefetchExtension(ack.getMessageCount());
282                        destination = nodeDest;
283                        callDispatchMatched = true;
284                        break;
285                    }
286                }
287                if (!callDispatchMatched) {
288                    throw new JMSException(
289                            "Could not correlate acknowledgment with dispatched message: "
290                                    + ack);
291                }
292            } else if (ack.isExpiredAck()) {
293                // Message was expired
294                int index = 0;
295                boolean inAckRange = false;
296                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
297                    final MessageReference node = iter.next();
298                    Destination nodeDest = (Destination) node.getRegionDestination();
299                    MessageId messageId = node.getMessageId();
300                    if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
301                        inAckRange = true;
302                    }
303                    if (inAckRange) {
304                        Destination regionDestination = nodeDest;
305                        if (broker.isExpired(node)) {
306                            regionDestination.messageExpired(context, this, node);
307                        }
308                        iter.remove();
309                        nodeDest.getDestinationStatistics().getInflight().decrement();
310
311                        if (ack.getLastMessageId().equals(messageId)) {
312                            contractPrefetchExtension(1);
313                            destination = (Destination) node.getRegionDestination();
314                            callDispatchMatched = true;
315                            break;
316                        }
317                    }
318                }
319                if (!callDispatchMatched) {
320                    throw new JMSException(
321                            "Could not correlate expiration acknowledgment with dispatched message: "
322                                    + ack);
323                }
324            } else if (ack.isRedeliveredAck()) {
325                // Message was re-delivered but it was not yet considered to be
326                // a DLQ message.
327                boolean inAckRange = false;
328                for (final MessageReference node : dispatched) {
329                    MessageId messageId = node.getMessageId();
330                    if (ack.getFirstMessageId() == null
331                            || ack.getFirstMessageId().equals(messageId)) {
332                        inAckRange = true;
333                    }
334                    if (inAckRange) {
335                        if (ack.getLastMessageId().equals(messageId)) {
336                            destination = (Destination) node.getRegionDestination();
337                            callDispatchMatched = true;
338                            break;
339                        }
340                    }
341                }
342                if (!callDispatchMatched) {
343                    throw new JMSException(
344                            "Could not correlate acknowledgment with dispatched message: "
345                                    + ack);
346                }
347            } else if (ack.isPoisonAck()) {
348                // TODO: what if the message is already in a DLQ???
349                // Handle the poison ACK case: we need to send the message to a
350                // DLQ
351                if (ack.isInTransaction()) {
352                    throw new JMSException("Poison ack cannot be transacted: "
353                            + ack);
354                }
355                int index = 0;
356                boolean inAckRange = false;
357                List<MessageReference> removeList = new ArrayList<MessageReference>();
358                for (final MessageReference node : dispatched) {
359                    MessageId messageId = node.getMessageId();
360                    if (ack.getFirstMessageId() == null
361                            || ack.getFirstMessageId().equals(messageId)) {
362                        inAckRange = true;
363                    }
364                    if (inAckRange) {
365                        sendToDLQ(context, node, ack.getPoisonCause());
366                        Destination nodeDest = (Destination) node.getRegionDestination();
367                        nodeDest.getDestinationStatistics()
368                        .getInflight().decrement();
369                        removeList.add(node);
370                        getSubscriptionStatistics().getDequeues().increment();
371                        index++;
372                        acknowledge(context, ack, node);
373                        if (ack.getLastMessageId().equals(messageId)) {
374                            contractPrefetchExtension(1);
375                            destination = nodeDest;
376                            callDispatchMatched = true;
377                            break;
378                        }
379                    }
380                }
381                for (final MessageReference node : removeList) {
382                    dispatched.remove(node);
383                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
384                }
385                if (!callDispatchMatched) {
386                    throw new JMSException(
387                            "Could not correlate acknowledgment with dispatched message: "
388                                    + ack);
389                }
390            }
391        }
392        if (callDispatchMatched && destination != null) {
393            destination.wakeup();
394            dispatchPending();
395
396            if (pending.isEmpty()) {
397                wakeupDestinationsForDispatch();
398            }
399        } else {
400            LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
401        }
402    }
403
404    private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
405        // setup a Synchronization to remove nodes from the
406        // dispatched list.
407        context.getTransaction().addSynchronization(
408                new Synchronization() {
409
410                    @Override
411                    public void afterCommit()
412                            throws Exception {
413                        Destination nodeDest = (Destination) node.getRegionDestination();
414                        synchronized (dispatchLock) {
415                            getSubscriptionStatistics().getDequeues().increment();
416                            dispatched.remove(node);
417                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
418                            nodeDest.getDestinationStatistics().getInflight().decrement();
419                        }
420                        contractPrefetchExtension(1);
421                        nodeDest.wakeup();
422                        dispatchPending();
423                    }
424
425                    @Override
426                    public void afterRollback() throws Exception {
427                        contractPrefetchExtension(1);
428                    }
429                });
430    }
431
432    /**
433     * Checks an ack versus the contents of the dispatched list.
434     *  called with dispatchLock held
435     * @param ack
436     * @throws JMSException if it does not match
437     */
438    protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
439        MessageId firstAckedMsg = ack.getFirstMessageId();
440        MessageId lastAckedMsg = ack.getLastMessageId();
441        int checkCount = 0;
442        boolean checkFoundStart = false;
443        boolean checkFoundEnd = false;
444        for (MessageReference node : dispatched) {
445
446            if (firstAckedMsg == null) {
447                checkFoundStart = true;
448            } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
449                checkFoundStart = true;
450            }
451
452            if (checkFoundStart) {
453                checkCount++;
454            }
455
456            if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
457                checkFoundEnd = true;
458                break;
459            }
460        }
461        if (!checkFoundStart && firstAckedMsg != null)
462            throw new JMSException("Unmatched acknowledge: " + ack
463                    + "; Could not find Message-ID " + firstAckedMsg
464                    + " in dispatched-list (start of ack)");
465        if (!checkFoundEnd && lastAckedMsg != null)
466            throw new JMSException("Unmatched acknowledge: " + ack
467                    + "; Could not find Message-ID " + lastAckedMsg
468                    + " in dispatched-list (end of ack)");
469        if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
470            throw new JMSException("Unmatched acknowledge: " + ack
471                    + "; Expected message count (" + ack.getMessageCount()
472                    + ") differs from count in dispatched-list (" + checkCount
473                    + ")");
474        }
475    }
476
477    /**
478     *
479     * @param context
480     * @param node
481     * @param poisonCause
482     * @throws IOException
483     * @throws Exception
484     */
485    protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception {
486        broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause);
487    }
488
489    @Override
490    public int getInFlightSize() {
491        return dispatched.size();
492    }
493
494    /**
495     * Used to determine if the broker can dispatch to the consumer.
496     *
497     * @return true if the subscription is full
498     */
499    @Override
500    public boolean isFull() {
501        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
502    }
503
504    /**
505     * @return true when 60% or more room is left for dispatching messages
506     */
507    @Override
508    public boolean isLowWaterMark() {
509        return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
510    }
511
512    /**
513     * @return true when 10% or less room is left for dispatching messages
514     */
515    @Override
516    public boolean isHighWaterMark() {
517        return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
518    }
519
520    @Override
521    public int countBeforeFull() {
522        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
523    }
524
525    @Override
526    public int getPendingQueueSize() {
527        return pending.size();
528    }
529
530    @Override
531    public long getPendingMessageSize() {
532        synchronized (pendingLock) {
533            return pending.messageSize();
534        }
535    }
536
537    @Override
538    public int getDispatchedQueueSize() {
539        return dispatched.size();
540    }
541
542    @Override
543    public long getDequeueCounter() {
544        return getSubscriptionStatistics().getDequeues().getCount();
545    }
546
547    @Override
548    public long getDispatchedCounter() {
549        return getSubscriptionStatistics().getDispatched().getCount();
550    }
551
552    @Override
553    public long getEnqueueCounter() {
554        return getSubscriptionStatistics().getEnqueues().getCount();
555    }
556
557    @Override
558    public boolean isRecoveryRequired() {
559        return pending.isRecoveryRequired();
560    }
561
562    public PendingMessageCursor getPending() {
563        return this.pending;
564    }
565
566    public void setPending(PendingMessageCursor pending) {
567        this.pending = pending;
568        if (this.pending!=null) {
569            this.pending.setSystemUsage(usageManager);
570            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
571        }
572    }
573
574    @Override
575    public void add(ConnectionContext context, Destination destination) throws Exception {
576        synchronized(pendingLock) {
577            super.add(context, destination);
578            pending.add(context, destination);
579        }
580    }
581
582    @Override
583    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
584        return remove(context, destination, dispatched);
585    }
586
587    public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
588        LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>();
589        synchronized(pendingLock) {
590            super.remove(context, destination);
591            // Here is a potential problem concerning Inflight stat:
592            // Messages not already committed or rolled back may not be removed from dispatched list at the moment
593            // Except if each commit or rollback callback action comes before remove of subscriber.
594            redispatch.addAll(pending.remove(context, destination));
595
596            if (dispatched == null) {
597                return redispatch;
598            }
599
600            // Synchronized to DispatchLock if necessary
601            if (dispatched == this.dispatched) {
602                synchronized(dispatchLock) {
603                    addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
604                }
605            } else {
606                addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
607            }
608        }
609
610        return redispatch;
611    }
612
613    private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) {
614        ArrayList<MessageReference> references = new ArrayList<MessageReference>();
615        for (MessageReference r : dispatched) {
616            if (r.getRegionDestination() == destination) {
617                references.add(r);
618                getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
619            }
620        }
621        redispatch.addAll(0, references);
622        destination.getDestinationStatistics().getInflight().subtract(references.size());
623        dispatched.removeAll(references);
624    }
625
626    // made public so it can be used in MQTTProtocolConverter
627    public void dispatchPending() throws IOException {
628        List<Destination> slowConsumerTargets = null;
629
630        synchronized(pendingLock) {
631            try {
632                int numberToDispatch = countBeforeFull();
633                if (numberToDispatch > 0) {
634                    setSlowConsumer(false);
635                    setPendingBatchSize(pending, numberToDispatch);
636                    int count = 0;
637                    pending.reset();
638                    while (count < numberToDispatch && !isFull() && pending.hasNext()) {
639                        MessageReference node = pending.next();
640                        if (node == null) {
641                            break;
642                        }
643
644                        // Synchronize between dispatched list and remove of message from pending list
645                        // related to remove subscription action
646                        synchronized(dispatchLock) {
647                            pending.remove();
648                            if (!isDropped(node) && canDispatch(node)) {
649
650                                // Message may have been sitting in the pending
651                                // list a while waiting for the consumer to ak the message.
652                                if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
653                                    //increment number to dispatch
654                                    numberToDispatch++;
655                                    if (broker.isExpired(node)) {
656                                        ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
657                                    }
658
659                                    if (!isBrowser()) {
660                                        node.decrementReferenceCount();
661                                        continue;
662                                    }
663                                }
664                                dispatch(node);
665                                count++;
666                            }
667                        }
668                        // decrement after dispatch has taken ownership to avoid usage jitter
669                        node.decrementReferenceCount();
670                    }
671                } else if (!isSlowConsumer()) {
672                    setSlowConsumer(true);
673                    slowConsumerTargets = destinations;
674                }
675            } finally {
676                pending.release();
677            }
678        }
679
680        if (slowConsumerTargets != null) {
681            for (Destination dest : slowConsumerTargets) {
682                dest.slowConsumer(context, this);
683            }
684        }
685    }
686
687    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
688        pending.setMaxBatchSize(numberToDispatch);
689    }
690
691    // called with dispatchLock held
692    protected boolean dispatch(final MessageReference node) throws IOException {
693        final Message message = node.getMessage();
694        if (message == null) {
695            return false;
696        }
697
698        okForAckAsDispatchDone.countDown();
699
700        MessageDispatch md = createMessageDispatch(node, message);
701        if (node != QueueMessageReference.NULL_MESSAGE) {
702            getSubscriptionStatistics().getDispatched().increment();
703            dispatched.add(node);
704            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
705        }
706        if (getPrefetchSize() == 0) {
707            while (true) {
708                int currentExtension = prefetchExtension.get();
709                int newExtension = Math.max(0, currentExtension - 1);
710                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
711                    break;
712                }
713            }
714        }
715        if (info.isDispatchAsync()) {
716            md.setTransmitCallback(new TransmitCallback() {
717
718                @Override
719                public void onSuccess() {
720                    // Since the message gets queued up in async dispatch, we don't want to
721                    // decrease the reference count until it gets put on the wire.
722                    onDispatch(node, message);
723                }
724
725                @Override
726                public void onFailure() {
727                    Destination nodeDest = (Destination) node.getRegionDestination();
728                    if (nodeDest != null) {
729                        if (node != QueueMessageReference.NULL_MESSAGE) {
730                            nodeDest.getDestinationStatistics().getDispatched().increment();
731                            nodeDest.getDestinationStatistics().getInflight().increment();
732                            LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
733                        }
734                    }
735                    if (node instanceof QueueMessageReference) {
736                        ((QueueMessageReference) node).unlock();
737                    }
738                }
739            });
740            context.getConnection().dispatchAsync(md);
741        } else {
742            context.getConnection().dispatchSync(md);
743            onDispatch(node, message);
744        }
745        return true;
746    }
747
748    protected void onDispatch(final MessageReference node, final Message message) {
749        Destination nodeDest = (Destination) node.getRegionDestination();
750        if (nodeDest != null) {
751            if (node != QueueMessageReference.NULL_MESSAGE) {
752                nodeDest.getDestinationStatistics().getDispatched().increment();
753                nodeDest.getDestinationStatistics().getInflight().increment();
754                LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
755            }
756        }
757
758        if (info.isDispatchAsync()) {
759            try {
760                dispatchPending();
761            } catch (IOException e) {
762                context.getConnection().serviceExceptionAsync(e);
763            }
764        }
765    }
766
767    /**
768     * inform the MessageConsumer on the client to change it's prefetch
769     *
770     * @param newPrefetch
771     */
772    @Override
773    public void updateConsumerPrefetch(int newPrefetch) {
774        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
775            ConsumerControl cc = new ConsumerControl();
776            cc.setConsumerId(info.getConsumerId());
777            cc.setPrefetch(newPrefetch);
778            context.getConnection().dispatchAsync(cc);
779        }
780    }
781
782    /**
783     * @param node
784     * @param message
785     * @return MessageDispatch
786     */
787    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
788        MessageDispatch md = new MessageDispatch();
789        md.setConsumerId(info.getConsumerId());
790
791        if (node == QueueMessageReference.NULL_MESSAGE) {
792            md.setMessage(null);
793            md.setDestination(null);
794        } else {
795            Destination regionDestination = (Destination) node.getRegionDestination();
796            md.setDestination(regionDestination.getActiveMQDestination());
797            md.setMessage(message);
798            md.setRedeliveryCounter(node.getRedeliveryCounter());
799        }
800
801        return md;
802    }
803
804    /**
805     * Use when a matched message is about to be dispatched to the client.
806     *
807     * @param node
808     * @return false if the message should not be dispatched to the client
809     *         (another sub may have already dispatched it for example).
810     * @throws IOException
811     */
812    protected abstract boolean canDispatch(MessageReference node) throws IOException;
813
814    protected abstract boolean isDropped(MessageReference node);
815
816    /**
817     * Used during acknowledgment to remove the message.
818     *
819     * @throws IOException
820     */
821    protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
822
823
824    public int getMaxProducersToAudit() {
825        return maxProducersToAudit;
826    }
827
828    public void setMaxProducersToAudit(int maxProducersToAudit) {
829        this.maxProducersToAudit = maxProducersToAudit;
830        if (this.pending != null) {
831            this.pending.setMaxProducersToAudit(maxProducersToAudit);
832        }
833    }
834
835    public int getMaxAuditDepth() {
836        return maxAuditDepth;
837    }
838
839    public void setMaxAuditDepth(int maxAuditDepth) {
840        this.maxAuditDepth = maxAuditDepth;
841        if (this.pending != null) {
842            this.pending.setMaxAuditDepth(maxAuditDepth);
843        }
844    }
845
846    @Override
847    public void setPrefetchSize(int prefetchSize) {
848        this.info.setPrefetchSize(prefetchSize);
849        try {
850            this.dispatchPending();
851        } catch (Exception e) {
852            LOG.trace("Caught exception during dispatch after prefetch change.", e);
853        }
854    }
855}