001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.List;
021    import javax.jms.ResourceAllocationException;
022    import org.apache.activemq.advisory.AdvisorySupport;
023    import org.apache.activemq.broker.Broker;
024    import org.apache.activemq.broker.BrokerService;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.ProducerBrokerExchange;
027    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQTopic;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.command.MessageDispatchNotification;
033    import org.apache.activemq.command.ProducerInfo;
034    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
035    import org.apache.activemq.security.SecurityContext;
036    import org.apache.activemq.state.ProducerState;
037    import org.apache.activemq.store.MessageStore;
038    import org.apache.activemq.thread.Scheduler;
039    import org.apache.activemq.usage.MemoryUsage;
040    import org.apache.activemq.usage.SystemUsage;
041    import org.apache.activemq.usage.Usage;
042    import org.slf4j.Logger;
043    
044    /**
045     *
046     */
047    public abstract class BaseDestination implements Destination {
048        /**
049         * The maximum number of messages to page in to the destination from
050         * persistent storage
051         */
052        public static final int MAX_PAGE_SIZE = 200;
053        public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
054        public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
055        public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
056        public static final int MAX_PRODUCERS_TO_AUDIT = 64;
057        public static final int MAX_AUDIT_DEPTH = 2048;
058    
059        protected final ActiveMQDestination destination;
060        protected final Broker broker;
061        protected final MessageStore store;
062        protected SystemUsage systemUsage;
063        protected MemoryUsage memoryUsage;
064        private boolean producerFlowControl = true;
065        private boolean alwaysRetroactive = false;
066        protected boolean warnOnProducerFlowControl = true;
067        protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
068    
069        private int maxProducersToAudit = 1024;
070        private int maxAuditDepth = 2048;
071        private boolean enableAudit = true;
072        private int maxPageSize = MAX_PAGE_SIZE;
073        private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
074        private boolean useCache = true;
075        private int minimumMessageSize = 1024;
076        private boolean lazyDispatch = false;
077        private boolean advisoryForSlowConsumers;
078        private boolean advisdoryForFastProducers;
079        private boolean advisoryForDiscardingMessages;
080        private boolean advisoryWhenFull;
081        private boolean advisoryForDelivery;
082        private boolean advisoryForConsumed;
083        private boolean sendAdvisoryIfNoConsumers;
084        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
085        protected final BrokerService brokerService;
086        protected final Broker regionBroker;
087        protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
088        protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
089        private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
090        protected int cursorMemoryHighWaterMark = 70;
091        protected int storeUsageHighWaterMark = 100;
092        private SlowConsumerStrategy slowConsumerStrategy;
093        private boolean prioritizedMessages;
094        private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
095        private boolean gcIfInactive;
096        private boolean gcWithNetworkConsumers;
097        private long lastActiveTime=0l;
098        private boolean reduceMemoryFootprint = false;
099        protected final Scheduler scheduler;
100        private boolean disposed = false;
101        private boolean doOptimzeMessageStorage = true;
102        /*
103         * percentage of in-flight messages above which optimize message store is disabled
104         */
105        private int optimizeMessageStoreInFlightLimit = 10;
106    
107        /**
108         * @param brokerService
109         * @param store
110         * @param destination
111         * @param parentStats
112         * @throws Exception
113         */
114        public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
115            this.brokerService = brokerService;
116            this.broker = brokerService.getBroker();
117            this.store = store;
118            this.destination = destination;
119            // let's copy the enabled property from the parent DestinationStatistics
120            this.destinationStatistics.setEnabled(parentStats.isEnabled());
121            this.destinationStatistics.setParent(parentStats);
122            this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
123            this.memoryUsage = this.systemUsage.getMemoryUsage();
124            this.memoryUsage.setUsagePortion(1.0f);
125            this.regionBroker = brokerService.getRegionBroker();
126            this.scheduler = brokerService.getBroker().getScheduler();
127        }
128    
129        /**
130         * initialize the destination
131         *
132         * @throws Exception
133         */
134        public void initialize() throws Exception {
135            // Let the store know what usage manager we are using so that he can
136            // flush messages to disk when usage gets high.
137            if (store != null) {
138                store.setMemoryUsage(this.memoryUsage);
139            }
140        }
141    
142        /**
143         * @return the producerFlowControl
144         */
145        public boolean isProducerFlowControl() {
146            return producerFlowControl;
147        }
148    
149        /**
150         * @param producerFlowControl the producerFlowControl to set
151         */
152        public void setProducerFlowControl(boolean producerFlowControl) {
153            this.producerFlowControl = producerFlowControl;
154        }
155    
156        public boolean isAlwaysRetroactive() {
157            return alwaysRetroactive;
158        }
159    
160        public void setAlwaysRetroactive(boolean alwaysRetroactive) {
161            this.alwaysRetroactive = alwaysRetroactive;
162        }
163    
164        /**
165         * Set's the interval at which warnings about producers being blocked by
166         * resource usage will be triggered. Values of 0 or less will disable
167         * warnings
168         *
169         * @param blockedProducerWarningInterval the interval at which warning about
170         *            blocked producers will be triggered.
171         */
172        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
173            this.blockedProducerWarningInterval = blockedProducerWarningInterval;
174        }
175    
176        /**
177         *
178         * @return the interval at which warning about blocked producers will be
179         *         triggered.
180         */
181        public long getBlockedProducerWarningInterval() {
182            return blockedProducerWarningInterval;
183        }
184    
185        /**
186         * @return the maxProducersToAudit
187         */
188        public int getMaxProducersToAudit() {
189            return maxProducersToAudit;
190        }
191    
192        /**
193         * @param maxProducersToAudit the maxProducersToAudit to set
194         */
195        public void setMaxProducersToAudit(int maxProducersToAudit) {
196            this.maxProducersToAudit = maxProducersToAudit;
197        }
198    
199        /**
200         * @return the maxAuditDepth
201         */
202        public int getMaxAuditDepth() {
203            return maxAuditDepth;
204        }
205    
206        /**
207         * @param maxAuditDepth the maxAuditDepth to set
208         */
209        public void setMaxAuditDepth(int maxAuditDepth) {
210            this.maxAuditDepth = maxAuditDepth;
211        }
212    
213        /**
214         * @return the enableAudit
215         */
216        public boolean isEnableAudit() {
217            return enableAudit;
218        }
219    
220        /**
221         * @param enableAudit the enableAudit to set
222         */
223        public void setEnableAudit(boolean enableAudit) {
224            this.enableAudit = enableAudit;
225        }
226    
227        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
228            destinationStatistics.getProducers().increment();
229            this.lastActiveTime=0l;
230        }
231    
232        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
233            destinationStatistics.getProducers().decrement();
234        }
235    
236        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
237            destinationStatistics.getConsumers().increment();
238            this.lastActiveTime=0l;
239        }
240    
241        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
242            destinationStatistics.getConsumers().decrement();
243        }
244    
245    
246        public final MemoryUsage getMemoryUsage() {
247            return memoryUsage;
248        }
249    
250        public DestinationStatistics getDestinationStatistics() {
251            return destinationStatistics;
252        }
253    
254        public ActiveMQDestination getActiveMQDestination() {
255            return destination;
256        }
257    
258        public final String getName() {
259            return getActiveMQDestination().getPhysicalName();
260        }
261    
262        public final MessageStore getMessageStore() {
263            return store;
264        }
265    
266        public boolean isActive() {
267            boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
268                               destinationStatistics.getProducers().getCount() != 0;
269            if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
270                isActive = hasRegularConsumers(getConsumers());
271            }
272            return isActive;
273        }
274    
275        public int getMaxPageSize() {
276            return maxPageSize;
277        }
278    
279        public void setMaxPageSize(int maxPageSize) {
280            this.maxPageSize = maxPageSize;
281        }
282    
283        public int getMaxBrowsePageSize() {
284            return this.maxBrowsePageSize;
285        }
286    
287        public void setMaxBrowsePageSize(int maxPageSize) {
288            this.maxBrowsePageSize = maxPageSize;
289        }
290    
291        public int getMaxExpirePageSize() {
292            return this.maxExpirePageSize;
293        }
294    
295        public void setMaxExpirePageSize(int maxPageSize) {
296            this.maxExpirePageSize = maxPageSize;
297        }
298    
299        public void setExpireMessagesPeriod(long expireMessagesPeriod) {
300            this.expireMessagesPeriod = expireMessagesPeriod;
301        }
302    
303        public long getExpireMessagesPeriod() {
304            return expireMessagesPeriod;
305        }
306    
307        public boolean isUseCache() {
308            return useCache;
309        }
310    
311        public void setUseCache(boolean useCache) {
312            this.useCache = useCache;
313        }
314    
315        public int getMinimumMessageSize() {
316            return minimumMessageSize;
317        }
318    
319        public void setMinimumMessageSize(int minimumMessageSize) {
320            this.minimumMessageSize = minimumMessageSize;
321        }
322    
323        public boolean isLazyDispatch() {
324            return lazyDispatch;
325        }
326    
327        public void setLazyDispatch(boolean lazyDispatch) {
328            this.lazyDispatch = lazyDispatch;
329        }
330    
331        protected long getDestinationSequenceId() {
332            return regionBroker.getBrokerSequenceId();
333        }
334    
335        /**
336         * @return the advisoryForSlowConsumers
337         */
338        public boolean isAdvisoryForSlowConsumers() {
339            return advisoryForSlowConsumers;
340        }
341    
342        /**
343         * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
344         */
345        public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
346            this.advisoryForSlowConsumers = advisoryForSlowConsumers;
347        }
348    
349        /**
350         * @return the advisoryForDiscardingMessages
351         */
352        public boolean isAdvisoryForDiscardingMessages() {
353            return advisoryForDiscardingMessages;
354        }
355    
356        /**
357         * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
358         *            set
359         */
360        public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
361            this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
362        }
363    
364        /**
365         * @return the advisoryWhenFull
366         */
367        public boolean isAdvisoryWhenFull() {
368            return advisoryWhenFull;
369        }
370    
371        /**
372         * @param advisoryWhenFull the advisoryWhenFull to set
373         */
374        public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
375            this.advisoryWhenFull = advisoryWhenFull;
376        }
377    
378        /**
379         * @return the advisoryForDelivery
380         */
381        public boolean isAdvisoryForDelivery() {
382            return advisoryForDelivery;
383        }
384    
385        /**
386         * @param advisoryForDelivery the advisoryForDelivery to set
387         */
388        public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
389            this.advisoryForDelivery = advisoryForDelivery;
390        }
391    
392        /**
393         * @return the advisoryForConsumed
394         */
395        public boolean isAdvisoryForConsumed() {
396            return advisoryForConsumed;
397        }
398    
399        /**
400         * @param advisoryForConsumed the advisoryForConsumed to set
401         */
402        public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
403            this.advisoryForConsumed = advisoryForConsumed;
404        }
405    
406        /**
407         * @return the advisdoryForFastProducers
408         */
409        public boolean isAdvisdoryForFastProducers() {
410            return advisdoryForFastProducers;
411        }
412    
413        /**
414         * @param advisdoryForFastProducers the advisdoryForFastProducers to set
415         */
416        public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
417            this.advisdoryForFastProducers = advisdoryForFastProducers;
418        }
419    
420        public boolean isSendAdvisoryIfNoConsumers() {
421            return sendAdvisoryIfNoConsumers;
422        }
423    
424        public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
425            this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
426        }
427    
428        /**
429         * @return the dead letter strategy
430         */
431        public DeadLetterStrategy getDeadLetterStrategy() {
432            return deadLetterStrategy;
433        }
434    
435        /**
436         * set the dead letter strategy
437         *
438         * @param deadLetterStrategy
439         */
440        public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
441            this.deadLetterStrategy = deadLetterStrategy;
442        }
443    
444        public int getCursorMemoryHighWaterMark() {
445            return this.cursorMemoryHighWaterMark;
446        }
447    
448        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
449            this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
450        }
451    
452        /**
453         * called when message is consumed
454         *
455         * @param context
456         * @param messageReference
457         */
458        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
459            if (advisoryForConsumed) {
460                broker.messageConsumed(context, messageReference);
461            }
462        }
463    
464        /**
465         * Called when message is delivered to the broker
466         *
467         * @param context
468         * @param messageReference
469         */
470        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
471            if (advisoryForDelivery) {
472                broker.messageDelivered(context, messageReference);
473            }
474        }
475    
476        /**
477         * Called when a message is discarded - e.g. running low on memory This will
478         * happen only if the policy is enabled - e.g. non durable topics
479         *
480         * @param context
481         * @param messageReference
482         */
483        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
484            if (advisoryForDiscardingMessages) {
485                broker.messageDiscarded(context, sub, messageReference);
486            }
487        }
488    
489        /**
490         * Called when there is a slow consumer
491         *
492         * @param context
493         * @param subs
494         */
495        public void slowConsumer(ConnectionContext context, Subscription subs) {
496            if (advisoryForSlowConsumers) {
497                broker.slowConsumer(context, this, subs);
498            }
499            if (slowConsumerStrategy != null) {
500                slowConsumerStrategy.slowConsumer(context, subs);
501            }
502        }
503    
504        /**
505         * Called to notify a producer is too fast
506         *
507         * @param context
508         * @param producerInfo
509         */
510        public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
511            if (advisdoryForFastProducers) {
512                broker.fastProducer(context, producerInfo);
513            }
514        }
515    
516        /**
517         * Called when a Usage reaches a limit
518         *
519         * @param context
520         * @param usage
521         */
522        public void isFull(ConnectionContext context, Usage<?> usage) {
523            if (advisoryWhenFull) {
524                broker.isFull(context, this, usage);
525            }
526        }
527    
528        public void dispose(ConnectionContext context) throws IOException {
529            if (this.store != null) {
530                this.store.removeAllMessages(context);
531                this.store.dispose(context);
532            }
533            this.destinationStatistics.setParent(null);
534            this.memoryUsage.stop();
535            this.disposed = true;
536        }
537    
538        public boolean isDisposed() {
539            return this.disposed;
540        }
541    
542        /**
543         * Provides a hook to allow messages with no consumer to be processed in
544         * some way - such as to send to a dead letter queue or something..
545         */
546        protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
547            if (!msg.isPersistent()) {
548                if (isSendAdvisoryIfNoConsumers()) {
549                    // allow messages with no consumers to be dispatched to a dead
550                    // letter queue
551                    if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
552    
553                        Message message = msg.copy();
554                        // The original destination and transaction id do not get
555                        // filled when the message is first sent,
556                        // it is only populated if the message is routed to another
557                        // destination like the DLQ
558                        if (message.getOriginalDestination() != null) {
559                            message.setOriginalDestination(message.getDestination());
560                        }
561                        if (message.getOriginalTransactionId() != null) {
562                            message.setOriginalTransactionId(message.getTransactionId());
563                        }
564    
565                        ActiveMQTopic advisoryTopic;
566                        if (destination.isQueue()) {
567                            advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
568                        } else {
569                            advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
570                        }
571                        message.setDestination(advisoryTopic);
572                        message.setTransactionId(null);
573    
574                        // Disable flow control for this since since we don't want
575                        // to block.
576                        boolean originalFlowControl = context.isProducerFlowControl();
577                        try {
578                            context.setProducerFlowControl(false);
579                            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
580                            producerExchange.setMutable(false);
581                            producerExchange.setConnectionContext(context);
582                            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
583                            context.getBroker().send(producerExchange, message);
584                        } finally {
585                            context.setProducerFlowControl(originalFlowControl);
586                        }
587    
588                    }
589                }
590            }
591        }
592    
593        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
594        }
595    
596        public final int getStoreUsageHighWaterMark() {
597            return this.storeUsageHighWaterMark;
598        }
599    
600        public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
601            this.storeUsageHighWaterMark = storeUsageHighWaterMark;
602        }
603    
604        protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
605            waitForSpace(context, usage, 100, warning);
606        }
607    
608        protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
609            if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
610                getLog().debug("sendFailIfNoSpace, forcing exception on send, usage:  " + usage + ": " + warning);
611                throw new ResourceAllocationException(warning);
612            }
613            if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
614                if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
615                    getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
616                    throw new ResourceAllocationException(warning);
617                }
618            } else {
619                long start = System.currentTimeMillis();
620                long nextWarn = start;
621                while (!usage.waitForSpace(1000, highWaterMark)) {
622                    if (context.getStopping().get()) {
623                        throw new IOException("Connection closed, send aborted.");
624                    }
625    
626                    long now = System.currentTimeMillis();
627                    if (now >= nextWarn) {
628                        getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
629                        nextWarn = now + blockedProducerWarningInterval;
630                    }
631                }
632            }
633        }
634    
635        protected abstract Logger getLog();
636    
637        public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
638            this.slowConsumerStrategy = slowConsumerStrategy;
639        }
640    
641        public SlowConsumerStrategy getSlowConsumerStrategy() {
642            return this.slowConsumerStrategy;
643        }
644    
645    
646        public boolean isPrioritizedMessages() {
647            return this.prioritizedMessages;
648        }
649    
650        public void setPrioritizedMessages(boolean prioritizedMessages) {
651            this.prioritizedMessages = prioritizedMessages;
652            if (store != null) {
653                store.setPrioritizedMessages(prioritizedMessages);
654            }
655        }
656    
657        /**
658         * @return the inactiveTimoutBeforeGC
659         */
660        public long getInactiveTimoutBeforeGC() {
661            return this.inactiveTimoutBeforeGC;
662        }
663    
664        /**
665         * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
666         */
667        public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
668            this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
669        }
670    
671        /**
672         * @return the gcIfInactive
673         */
674        public boolean isGcIfInactive() {
675            return this.gcIfInactive;
676        }
677    
678        /**
679         * @param gcIfInactive the gcIfInactive to set
680         */
681        public void setGcIfInactive(boolean gcIfInactive) {
682            this.gcIfInactive = gcIfInactive;
683        }
684    
685        /**
686         * Indicate if it is ok to gc destinations that have only network consumers
687         * @param gcWithNetworkConsumers
688         */
689        public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
690            this.gcWithNetworkConsumers = gcWithNetworkConsumers;
691        }
692    
693        public boolean isGcWithNetworkConsumers() {
694            return gcWithNetworkConsumers;
695        }
696    
697        public void markForGC(long timeStamp) {
698            if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
699                    && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
700                this.lastActiveTime = timeStamp;
701            }
702        }
703    
704        public boolean canGC() {
705            boolean result = false;
706            if (isGcIfInactive()&& this.lastActiveTime != 0l) {
707                if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
708                    result = true;
709                }
710            }
711            return result;
712        }
713    
714        public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
715            this.reduceMemoryFootprint = reduceMemoryFootprint;
716        }
717    
718        protected boolean isReduceMemoryFootprint() {
719            return this.reduceMemoryFootprint;
720        }
721    
722        public boolean isDoOptimzeMessageStorage() {
723            return doOptimzeMessageStorage;
724        }
725    
726        public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
727            this.doOptimzeMessageStorage = doOptimzeMessageStorage;
728        }
729    
730        public int getOptimizeMessageStoreInFlightLimit() {
731            return optimizeMessageStoreInFlightLimit;
732        }
733    
734        public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
735            this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
736        }
737    
738    
739        public abstract List<Subscription> getConsumers();
740    
741        protected boolean hasRegularConsumers(List<Subscription> consumers) {
742            boolean hasRegularConsumers = false;
743            for (Subscription subscription: consumers) {
744                if (!subscription.getConsumerInfo().isNetworkSubscription()) {
745                    hasRegularConsumers = true;
746                    break;
747                }
748            }
749            return hasRegularConsumers;
750        }
751    
752        protected ConnectionContext createConnectionContext() {
753            ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
754            answer.setBroker(this.broker);
755            answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
756            answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
757            return answer;
758        }
759    }