001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.Set;
020
021import org.apache.activemq.ActiveMQPrefetchPolicy;
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.region.BaseDestination;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.broker.region.DurableTopicSubscription;
026import org.apache.activemq.broker.region.Queue;
027import org.apache.activemq.broker.region.QueueBrowserSubscription;
028import org.apache.activemq.broker.region.QueueSubscription;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.broker.region.Topic;
031import org.apache.activemq.broker.region.TopicSubscription;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.group.GroupFactoryFinder;
034import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
035import org.apache.activemq.filter.DestinationMapEntry;
036import org.apache.activemq.network.NetworkBridgeFilterFactory;
037import org.apache.activemq.usage.SystemUsage;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Represents an entry in a {@link PolicyMap} for assigning policies to a
043 * specific destination or a hierarchical wildcard area of destinations.
044 *
045 * @org.apache.xbean.XBean
046 *
047 */
048public class PolicyEntry extends DestinationMapEntry {
049
050    private static final Logger LOG = LoggerFactory.getLogger(PolicyEntry.class);
051    private DispatchPolicy dispatchPolicy;
052    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
053    private boolean sendAdvisoryIfNoConsumers;
054    private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
055    private PendingMessageLimitStrategy pendingMessageLimitStrategy;
056    private MessageEvictionStrategy messageEvictionStrategy;
057    private long memoryLimit;
058    private String messageGroupMapFactoryType = "cached";
059    private MessageGroupMapFactory messageGroupMapFactory;
060    private PendingQueueMessageStoragePolicy pendingQueuePolicy;
061    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
062    private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
063    private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT;
064    private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
065    private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
066    private boolean enableAudit=true;
067    private boolean producerFlowControl = true;
068    private boolean alwaysRetroactive = false;
069    private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
070    private boolean optimizedDispatch=false;
071    private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
072    private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
073    private boolean useCache=true;
074    private long minimumMessageSize=1024;
075    private boolean useConsumerPriority=true;
076    private boolean strictOrderDispatch=false;
077    private boolean lazyDispatch=false;
078    private int timeBeforeDispatchStarts = 0;
079    private int consumersBeforeDispatchStarts = 0;
080    private boolean advisoryForSlowConsumers;
081    private boolean advisoryForFastProducers;
082    private boolean advisoryForDiscardingMessages;
083    private boolean advisoryWhenFull;
084    private boolean advisoryForDelivery;
085    private boolean advisoryForConsumed;
086    private boolean includeBodyForAdvisory;
087    private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
088    private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
089    private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
090    private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
091    private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
092    private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
093    private boolean usePrefetchExtension = true;
094    private int cursorMemoryHighWaterMark = 70;
095    private int storeUsageHighWaterMark = 100;
096    private SlowConsumerStrategy slowConsumerStrategy;
097    private boolean prioritizedMessages;
098    private boolean allConsumersExclusiveByDefault;
099    private boolean gcInactiveDestinations;
100    private boolean gcWithNetworkConsumers;
101    private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
102    private boolean reduceMemoryFootprint;
103    private NetworkBridgeFilterFactory networkBridgeFilterFactory;
104    private boolean doOptimzeMessageStorage = true;
105    private int maxDestinations = -1;
106
107    /*
108     * percentage of in-flight messages above which optimize message store is disabled
109     */
110    private int optimizeMessageStoreInFlightLimit = 10;
111    private boolean persistJMSRedelivered = false;
112
113
114    public void configure(Broker broker,Queue queue) {
115        baseConfiguration(broker,queue);
116        if (dispatchPolicy != null) {
117            queue.setDispatchPolicy(dispatchPolicy);
118        }
119        queue.setDeadLetterStrategy(getDeadLetterStrategy());
120        queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
121        if (memoryLimit > 0) {
122            queue.getMemoryUsage().setLimit(memoryLimit);
123        }
124        if (pendingQueuePolicy != null) {
125            PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
126            queue.setMessages(messages);
127        }
128
129        queue.setUseConsumerPriority(isUseConsumerPriority());
130        queue.setStrictOrderDispatch(isStrictOrderDispatch());
131        queue.setOptimizedDispatch(isOptimizedDispatch());
132        queue.setLazyDispatch(isLazyDispatch());
133        queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
134        queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
135        queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
136        queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
137    }
138
139    public void update(Queue queue) {
140        update(queue, null);
141    }
142
143    /**
144     * Update a queue with this policy.  Only apply properties that
145     * match the includedProperties list.  Not all properties are eligible
146     * to be updated.
147     *
148     * If includedProperties is null then all of the properties will be set as
149     * isUpdate will return true
150     * @param baseDestination
151     * @param includedProperties
152     */
153    public void update(Queue queue, Set<String> includedProperties) {
154        baseUpdate(queue, includedProperties);
155        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
156            queue.getMemoryUsage().setLimit(memoryLimit);
157        }
158        if (isUpdate("useConsumerPriority", includedProperties)) {
159            queue.setUseConsumerPriority(isUseConsumerPriority());
160        }
161        if (isUpdate("strictOrderDispatch", includedProperties)) {
162            queue.setStrictOrderDispatch(isStrictOrderDispatch());
163        }
164        if (isUpdate("optimizedDispatch", includedProperties)) {
165            queue.setOptimizedDispatch(isOptimizedDispatch());
166        }
167        if (isUpdate("lazyDispatch", includedProperties)) {
168            queue.setLazyDispatch(isLazyDispatch());
169        }
170        if (isUpdate("timeBeforeDispatchStarts", includedProperties)) {
171            queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
172        }
173        if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) {
174            queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
175        }
176        if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) {
177            queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
178        }
179        if (isUpdate("persistJMSRedelivered", includedProperties)) {
180            queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
181        }
182    }
183
184    public void configure(Broker broker,Topic topic) {
185        baseConfiguration(broker,topic);
186        if (dispatchPolicy != null) {
187            topic.setDispatchPolicy(dispatchPolicy);
188        }
189        topic.setDeadLetterStrategy(getDeadLetterStrategy());
190        if (subscriptionRecoveryPolicy != null) {
191            SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy();
192            srp.setBroker(broker);
193            topic.setSubscriptionRecoveryPolicy(srp);
194        }
195        if (memoryLimit > 0) {
196            topic.getMemoryUsage().setLimit(memoryLimit);
197        }
198        topic.setLazyDispatch(isLazyDispatch());
199    }
200
201    public void update(Topic topic) {
202        update(topic, null);
203    }
204
205    //If includedProperties is null then all of the properties will be set as
206    //isUpdate will return true
207    public void update(Topic topic, Set<String> includedProperties) {
208        baseUpdate(topic, includedProperties);
209        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
210            topic.getMemoryUsage().setLimit(memoryLimit);
211        }
212        if (isUpdate("lazyDispatch", includedProperties)) {
213            topic.setLazyDispatch(isLazyDispatch());
214        }
215    }
216
217    // attributes that can change on the fly
218    public void baseUpdate(BaseDestination destination) {
219        baseUpdate(destination, null);
220    }
221
222    // attributes that can change on the fly
223    //If includedProperties is null then all of the properties will be set as
224    //isUpdate will return true
225    public void baseUpdate(BaseDestination destination, Set<String> includedProperties) {
226        if (isUpdate("producerFlowControl", includedProperties)) {
227            destination.setProducerFlowControl(isProducerFlowControl());
228        }
229        if (isUpdate("alwaysRetroactive", includedProperties)) {
230            destination.setAlwaysRetroactive(isAlwaysRetroactive());
231        }
232        if (isUpdate("blockedProducerWarningInterval", includedProperties)) {
233            destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
234        }
235        if (isUpdate("maxPageSize", includedProperties)) {
236            destination.setMaxPageSize(getMaxPageSize());
237        }
238        if (isUpdate("maxBrowsePageSize", includedProperties)) {
239            destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
240        }
241
242        if (isUpdate("minimumMessageSize", includedProperties)) {
243            destination.setMinimumMessageSize((int) getMinimumMessageSize());
244        }
245        if (isUpdate("maxExpirePageSize", includedProperties)) {
246            destination.setMaxExpirePageSize(getMaxExpirePageSize());
247        }
248        if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) {
249            destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
250        }
251        if (isUpdate("storeUsageHighWaterMark", includedProperties)) {
252            destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
253        }
254        if (isUpdate("gcInactiveDestinations", includedProperties)) {
255            destination.setGcIfInactive(isGcInactiveDestinations());
256        }
257        if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
258            destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
259        }
260        if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) {
261            destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC());
262        }
263        if (isUpdate("reduceMemoryFootprint", includedProperties)) {
264            destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
265        }
266        if (isUpdate("doOptimizeMessageStore", includedProperties)) {
267            destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
268        }
269        if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) {
270            destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
271        }
272        if (isUpdate("advisoryForConsumed", includedProperties)) {
273            destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
274        }
275        if (isUpdate("advisoryForDelivery", includedProperties)) {
276            destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
277        }
278        if (isUpdate("advisoryForDiscardingMessages", includedProperties)) {
279            destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
280        }
281        if (isUpdate("advisoryForSlowConsumers", includedProperties)) {
282            destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
283        }
284        if (isUpdate("advisoryForFastProducers", includedProperties)) {
285            destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
286        }
287        if (isUpdate("advisoryWhenFull", includedProperties)) {
288            destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
289        }
290        if (isUpdate("includeBodyForAdvisory", includedProperties)) {
291            destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
292        }
293        if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
294            destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
295        }
296    }
297
298    public void baseConfiguration(Broker broker, BaseDestination destination) {
299        baseUpdate(destination);
300        destination.setEnableAudit(isEnableAudit());
301        destination.setMaxAuditDepth(getMaxQueueAuditDepth());
302        destination.setMaxProducersToAudit(getMaxProducersToAudit());
303        destination.setUseCache(isUseCache());
304        destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
305        SlowConsumerStrategy scs = getSlowConsumerStrategy();
306        if (scs != null) {
307            scs.setBrokerService(broker);
308            scs.addDestination(destination);
309        }
310        destination.setSlowConsumerStrategy(scs);
311        destination.setPrioritizedMessages(isPrioritizedMessages());
312    }
313
314    public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
315        configurePrefetch(subscription);
316        subscription.setUsePrefetchExtension(isUsePrefetchExtension());
317        subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
318        if (pendingMessageLimitStrategy != null) {
319            int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
320            int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
321            if (consumerLimit > 0) {
322                if (value < 0 || consumerLimit < value) {
323                    value = consumerLimit;
324                }
325            }
326            if (value >= 0) {
327                LOG.debug("Setting the maximumPendingMessages size to: {} for consumer: {}", value, subscription.getInfo().getConsumerId());
328                subscription.setMaximumPendingMessages(value);
329            }
330        }
331        if (messageEvictionStrategy != null) {
332            subscription.setMessageEvictionStrategy(messageEvictionStrategy);
333        }
334        if (pendingSubscriberPolicy != null) {
335            String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
336            int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
337            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription));
338        }
339        if (enableAudit) {
340            subscription.setEnableAudit(enableAudit);
341            subscription.setMaxProducersToAudit(maxProducersToAudit);
342            subscription.setMaxAuditDepth(maxAuditDepth);
343        }
344    }
345
346    public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
347        String clientId = sub.getSubscriptionKey().getClientId();
348        String subName = sub.getSubscriptionKey().getSubscriptionName();
349        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
350        configurePrefetch(sub);
351        if (pendingDurableSubscriberPolicy != null) {
352            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub);
353            cursor.setSystemUsage(memoryManager);
354            sub.setPending(cursor);
355        }
356        int auditDepth = getMaxAuditDepth();
357        if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) {
358            sub.setMaxAuditDepth(auditDepth * 10);
359        } else {
360            sub.setMaxAuditDepth(auditDepth);
361        }
362        sub.setMaxProducersToAudit(getMaxProducersToAudit());
363        sub.setUsePrefetchExtension(isUsePrefetchExtension());
364    }
365
366    public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
367        configurePrefetch(sub);
368        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
369        sub.setUsePrefetchExtension(isUsePrefetchExtension());
370
371        // TODO
372        // We currently need an infinite audit because of the way that browser dispatch
373        // is done.  We should refactor the browsers to better handle message dispatch so
374        // we can remove this and perform a more efficient dispatch.
375        sub.setMaxProducersToAudit(Integer.MAX_VALUE);
376        sub.setMaxAuditDepth(Short.MAX_VALUE);
377
378        // part solution - dispatching to browsers needs to be restricted
379        sub.setMaxMessages(getMaxBrowsePageSize());
380    }
381
382    public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
383        configurePrefetch(sub);
384        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
385        sub.setUsePrefetchExtension(isUsePrefetchExtension());
386        sub.setMaxProducersToAudit(getMaxProducersToAudit());
387    }
388
389    public void configurePrefetch(Subscription subscription) {
390
391        final int currentPrefetch = subscription.getConsumerInfo().getPrefetchSize();
392        if (subscription instanceof QueueBrowserSubscription) {
393            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH) {
394                ((QueueBrowserSubscription) subscription).setPrefetchSize(getQueueBrowserPrefetch());
395            }
396        } else if (subscription instanceof QueueSubscription) {
397            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH) {
398                ((QueueSubscription) subscription).setPrefetchSize(getQueuePrefetch());
399            }
400        } else if (subscription instanceof DurableTopicSubscription) {
401            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH ||
402                    subscription.getConsumerInfo().getPrefetchSize() == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH) {
403                ((DurableTopicSubscription)subscription).setPrefetchSize(getDurableTopicPrefetch());
404            }
405        } else if (subscription instanceof TopicSubscription) {
406            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH) {
407                ((TopicSubscription) subscription).setPrefetchSize(getTopicPrefetch());
408            }
409        }
410        if (currentPrefetch != 0 && subscription.getPrefetchSize() == 0) {
411            // tell the sub so that it can issue a pull request
412            subscription.updateConsumerPrefetch(0);
413        }
414    }
415
416    private boolean isUpdate(String property, Set<String> includedProperties) {
417        return includedProperties == null || includedProperties.contains(property);
418    }
419    // Properties
420    // -------------------------------------------------------------------------
421    public DispatchPolicy getDispatchPolicy() {
422        return dispatchPolicy;
423    }
424
425    public void setDispatchPolicy(DispatchPolicy policy) {
426        this.dispatchPolicy = policy;
427    }
428
429    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
430        return subscriptionRecoveryPolicy;
431    }
432
433    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
434        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
435    }
436
437    public boolean isSendAdvisoryIfNoConsumers() {
438        return sendAdvisoryIfNoConsumers;
439    }
440
441    /**
442     * Sends an advisory message if a non-persistent message is sent and there
443     * are no active consumers
444     */
445    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
446        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
447    }
448
449    public DeadLetterStrategy getDeadLetterStrategy() {
450        return deadLetterStrategy;
451    }
452
453    /**
454     * Sets the policy used to determine which dead letter queue destination
455     * should be used
456     */
457    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
458        this.deadLetterStrategy = deadLetterStrategy;
459    }
460
461    public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
462        return pendingMessageLimitStrategy;
463    }
464
465    /**
466     * Sets the strategy to calculate the maximum number of messages that are
467     * allowed to be pending on consumers (in addition to their prefetch sizes).
468     * Once the limit is reached, non-durable topics can then start discarding
469     * old messages. This allows us to keep dispatching messages to slow
470     * consumers while not blocking fast consumers and discarding the messages
471     * oldest first.
472     */
473    public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) {
474        this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
475    }
476
477    public MessageEvictionStrategy getMessageEvictionStrategy() {
478        return messageEvictionStrategy;
479    }
480
481    /**
482     * Sets the eviction strategy used to decide which message to evict when the
483     * slow consumer needs to discard messages
484     */
485    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
486        this.messageEvictionStrategy = messageEvictionStrategy;
487    }
488
489    public long getMemoryLimit() {
490        return memoryLimit;
491    }
492
493    /**
494     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
495     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
496     */
497    public void setMemoryLimit(long memoryLimit) {
498        this.memoryLimit = memoryLimit;
499    }
500
501    public MessageGroupMapFactory getMessageGroupMapFactory() {
502        if (messageGroupMapFactory == null) {
503            try {
504            messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType());
505            }catch(Exception e){
506                LOG.error("Failed to create message group Factory ",e);
507            }
508        }
509        return messageGroupMapFactory;
510    }
511
512    /**
513     * Sets the factory used to create new instances of {MessageGroupMap} used
514     * to implement the <a
515     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
516     * functionality.
517     */
518    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
519        this.messageGroupMapFactory = messageGroupMapFactory;
520    }
521
522
523    public String getMessageGroupMapFactoryType() {
524        return messageGroupMapFactoryType;
525    }
526
527    public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) {
528        this.messageGroupMapFactoryType = messageGroupMapFactoryType;
529    }
530
531
532    /**
533     * @return the pendingDurableSubscriberPolicy
534     */
535    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
536        return this.pendingDurableSubscriberPolicy;
537    }
538
539    /**
540     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy
541     *                to set
542     */
543    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
544        this.pendingDurableSubscriberPolicy = pendingDurableSubscriberPolicy;
545    }
546
547    /**
548     * @return the pendingQueuePolicy
549     */
550    public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
551        return this.pendingQueuePolicy;
552    }
553
554    /**
555     * @param pendingQueuePolicy the pendingQueuePolicy to set
556     */
557    public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy) {
558        this.pendingQueuePolicy = pendingQueuePolicy;
559    }
560
561    /**
562     * @return the pendingSubscriberPolicy
563     */
564    public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy() {
565        return this.pendingSubscriberPolicy;
566    }
567
568    /**
569     * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
570     */
571    public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) {
572        this.pendingSubscriberPolicy = pendingSubscriberPolicy;
573    }
574
575    /**
576     * @return true if producer flow control enabled
577     */
578    public boolean isProducerFlowControl() {
579        return producerFlowControl;
580    }
581
582    /**
583     * @param producerFlowControl
584     */
585    public void setProducerFlowControl(boolean producerFlowControl) {
586        this.producerFlowControl = producerFlowControl;
587    }
588
589    /**
590     * @return true if topic is always retroactive
591     */
592    public boolean isAlwaysRetroactive() {
593        return alwaysRetroactive;
594    }
595
596    /**
597     * @param alwaysRetroactive
598     */
599    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
600        this.alwaysRetroactive = alwaysRetroactive;
601    }
602
603
604    /**
605     * Set's the interval at which warnings about producers being blocked by
606     * resource usage will be triggered. Values of 0 or less will disable
607     * warnings
608     *
609     * @param blockedProducerWarningInterval the interval at which warning about
610     *            blocked producers will be triggered.
611     */
612    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
613        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
614    }
615
616    /**
617     *
618     * @return the interval at which warning about blocked producers will be
619     *         triggered.
620     */
621    public long getBlockedProducerWarningInterval() {
622        return blockedProducerWarningInterval;
623    }
624
625    /**
626     * @return the maxProducersToAudit
627     */
628    public int getMaxProducersToAudit() {
629        return maxProducersToAudit;
630    }
631
632    /**
633     * @param maxProducersToAudit the maxProducersToAudit to set
634     */
635    public void setMaxProducersToAudit(int maxProducersToAudit) {
636        this.maxProducersToAudit = maxProducersToAudit;
637    }
638
639    /**
640     * @return the maxAuditDepth
641     */
642    public int getMaxAuditDepth() {
643        return maxAuditDepth;
644    }
645
646    /**
647     * @param maxAuditDepth the maxAuditDepth to set
648     */
649    public void setMaxAuditDepth(int maxAuditDepth) {
650        this.maxAuditDepth = maxAuditDepth;
651    }
652
653    /**
654     * @return the enableAudit
655     */
656    public boolean isEnableAudit() {
657        return enableAudit;
658    }
659
660    /**
661     * @param enableAudit the enableAudit to set
662     */
663    public void setEnableAudit(boolean enableAudit) {
664        this.enableAudit = enableAudit;
665    }
666
667    public int getMaxQueueAuditDepth() {
668        return maxQueueAuditDepth;
669    }
670
671    public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
672        this.maxQueueAuditDepth = maxQueueAuditDepth;
673    }
674
675    public boolean isOptimizedDispatch() {
676        return optimizedDispatch;
677    }
678
679    public void setOptimizedDispatch(boolean optimizedDispatch) {
680        this.optimizedDispatch = optimizedDispatch;
681    }
682
683    public int getMaxPageSize() {
684        return maxPageSize;
685    }
686
687    public void setMaxPageSize(int maxPageSize) {
688        this.maxPageSize = maxPageSize;
689    }
690
691    public int getMaxBrowsePageSize() {
692        return maxBrowsePageSize;
693    }
694
695    public void setMaxBrowsePageSize(int maxPageSize) {
696        this.maxBrowsePageSize = maxPageSize;
697    }
698
699    public boolean isUseCache() {
700        return useCache;
701    }
702
703    public void setUseCache(boolean useCache) {
704        this.useCache = useCache;
705    }
706
707    public long getMinimumMessageSize() {
708        return minimumMessageSize;
709    }
710
711    public void setMinimumMessageSize(long minimumMessageSize) {
712        this.minimumMessageSize = minimumMessageSize;
713    }
714
715    public boolean isUseConsumerPriority() {
716        return useConsumerPriority;
717    }
718
719    public void setUseConsumerPriority(boolean useConsumerPriority) {
720        this.useConsumerPriority = useConsumerPriority;
721    }
722
723    public boolean isStrictOrderDispatch() {
724        return strictOrderDispatch;
725    }
726
727    public void setStrictOrderDispatch(boolean strictOrderDispatch) {
728        this.strictOrderDispatch = strictOrderDispatch;
729    }
730
731    public boolean isLazyDispatch() {
732        return lazyDispatch;
733    }
734
735    public void setLazyDispatch(boolean lazyDispatch) {
736        this.lazyDispatch = lazyDispatch;
737    }
738
739    public int getTimeBeforeDispatchStarts() {
740        return timeBeforeDispatchStarts;
741    }
742
743    public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
744        this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
745    }
746
747    public int getConsumersBeforeDispatchStarts() {
748        return consumersBeforeDispatchStarts;
749    }
750
751    public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
752        this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
753    }
754
755    /**
756     * @return the advisoryForSlowConsumers
757     */
758    public boolean isAdvisoryForSlowConsumers() {
759        return advisoryForSlowConsumers;
760    }
761
762    /**
763     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
764     */
765    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
766        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
767    }
768
769    /**
770     * @return the advisoryForDiscardingMessages
771     */
772    public boolean isAdvisoryForDiscardingMessages() {
773        return advisoryForDiscardingMessages;
774    }
775
776    /**
777     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
778     */
779    public void setAdvisoryForDiscardingMessages(
780            boolean advisoryForDiscardingMessages) {
781        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
782    }
783
784    /**
785     * @return the advisoryWhenFull
786     */
787    public boolean isAdvisoryWhenFull() {
788        return advisoryWhenFull;
789    }
790
791    /**
792     * @param advisoryWhenFull the advisoryWhenFull to set
793     */
794    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
795        this.advisoryWhenFull = advisoryWhenFull;
796    }
797
798    /**
799     * @return the advisoryForDelivery
800     */
801    public boolean isAdvisoryForDelivery() {
802        return advisoryForDelivery;
803    }
804
805    /**
806     * @param advisoryForDelivery the advisoryForDelivery to set
807     */
808    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
809        this.advisoryForDelivery = advisoryForDelivery;
810    }
811
812    /**
813     * @return the advisoryForConsumed
814     */
815    public boolean isAdvisoryForConsumed() {
816        return advisoryForConsumed;
817    }
818
819    /**
820     * @param advisoryForConsumed the advisoryForConsumed to set
821     */
822    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
823        this.advisoryForConsumed = advisoryForConsumed;
824    }
825
826    /**
827     * @return the advisdoryForFastProducers
828     */
829    public boolean isAdvisoryForFastProducers() {
830        return advisoryForFastProducers;
831    }
832
833    /**
834     * @param advisoryForFastProducers the advisdoryForFastProducers to set
835     */
836    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
837        this.advisoryForFastProducers = advisoryForFastProducers;
838    }
839
840    /**
841     * Returns true if the original message body should be included when applicable
842     * for advisory messages
843     *
844     * @return
845     */
846    public boolean isIncludeBodyForAdvisory() {
847        return includeBodyForAdvisory;
848    }
849
850    /**
851     * Sets if the original message body should be included when applicable
852     * for advisory messages
853     *
854     * @param includeBodyForAdvisory
855     */
856    public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
857        this.includeBodyForAdvisory = includeBodyForAdvisory;
858    }
859
860    public void setMaxExpirePageSize(int maxExpirePageSize) {
861        this.maxExpirePageSize = maxExpirePageSize;
862    }
863
864    public int getMaxExpirePageSize() {
865        return maxExpirePageSize;
866    }
867
868    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
869        this.expireMessagesPeriod = expireMessagesPeriod;
870    }
871
872    public long getExpireMessagesPeriod() {
873        return expireMessagesPeriod;
874    }
875
876    /**
877     * Get the queuePrefetch
878     * @return the queuePrefetch
879     */
880    public int getQueuePrefetch() {
881        return this.queuePrefetch;
882    }
883
884    /**
885     * Set the queuePrefetch
886     * @param queuePrefetch the queuePrefetch to set
887     */
888    public void setQueuePrefetch(int queuePrefetch) {
889        this.queuePrefetch = queuePrefetch;
890    }
891
892    /**
893     * Get the queueBrowserPrefetch
894     * @return the queueBrowserPrefetch
895     */
896    public int getQueueBrowserPrefetch() {
897        return this.queueBrowserPrefetch;
898    }
899
900    /**
901     * Set the queueBrowserPrefetch
902     * @param queueBrowserPrefetch the queueBrowserPrefetch to set
903     */
904    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
905        this.queueBrowserPrefetch = queueBrowserPrefetch;
906    }
907
908    /**
909     * Get the topicPrefetch
910     * @return the topicPrefetch
911     */
912    public int getTopicPrefetch() {
913        return this.topicPrefetch;
914    }
915
916    /**
917     * Set the topicPrefetch
918     * @param topicPrefetch the topicPrefetch to set
919     */
920    public void setTopicPrefetch(int topicPrefetch) {
921        this.topicPrefetch = topicPrefetch;
922    }
923
924    /**
925     * Get the durableTopicPrefetch
926     * @return the durableTopicPrefetch
927     */
928    public int getDurableTopicPrefetch() {
929        return this.durableTopicPrefetch;
930    }
931
932    /**
933     * Set the durableTopicPrefetch
934     * @param durableTopicPrefetch the durableTopicPrefetch to set
935     */
936    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
937        this.durableTopicPrefetch = durableTopicPrefetch;
938    }
939
940    public boolean isUsePrefetchExtension() {
941        return this.usePrefetchExtension;
942    }
943
944    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
945        this.usePrefetchExtension = usePrefetchExtension;
946    }
947
948    public int getCursorMemoryHighWaterMark() {
949        return this.cursorMemoryHighWaterMark;
950    }
951
952    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
953        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
954    }
955
956    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
957        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
958    }
959
960    public int getStoreUsageHighWaterMark() {
961        return storeUsageHighWaterMark;
962    }
963
964    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
965        this.slowConsumerStrategy = slowConsumerStrategy;
966    }
967
968    public SlowConsumerStrategy getSlowConsumerStrategy() {
969        return this.slowConsumerStrategy;
970    }
971
972
973    public boolean isPrioritizedMessages() {
974        return this.prioritizedMessages;
975    }
976
977    public void setPrioritizedMessages(boolean prioritizedMessages) {
978        this.prioritizedMessages = prioritizedMessages;
979    }
980
981    public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
982        this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
983    }
984
985    public boolean isAllConsumersExclusiveByDefault() {
986        return allConsumersExclusiveByDefault;
987    }
988
989    public boolean isGcInactiveDestinations() {
990        return this.gcInactiveDestinations;
991    }
992
993    public void setGcInactiveDestinations(boolean gcInactiveDestinations) {
994        this.gcInactiveDestinations = gcInactiveDestinations;
995    }
996
997    /**
998     * @return the amount of time spent inactive before GC of the destination kicks in.
999     *
1000     * @deprecated use getInactiveTimeoutBeforeGC instead.
1001     */
1002    @Deprecated
1003    public long getInactiveTimoutBeforeGC() {
1004        return getInactiveTimeoutBeforeGC();
1005    }
1006
1007    /**
1008     * Sets the amount of time a destination is inactive before it is marked for GC
1009     *
1010     * @param inactiveTimoutBeforeGC
1011     *        time in milliseconds to configure as the inactive timeout.
1012     *
1013     * @deprecated use getInactiveTimeoutBeforeGC instead.
1014     */
1015    @Deprecated
1016    public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
1017        setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC);
1018    }
1019
1020    /**
1021     * @return the amount of time spent inactive before GC of the destination kicks in.
1022     */
1023    public long getInactiveTimeoutBeforeGC() {
1024        return this.inactiveTimeoutBeforeGC;
1025    }
1026
1027    /**
1028     * Sets the amount of time a destination is inactive before it is marked for GC
1029     *
1030     * @param inactiveTimoutBeforeGC
1031     *        time in milliseconds to configure as the inactive timeout.
1032     */
1033    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
1034        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
1035    }
1036
1037    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
1038        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
1039    }
1040
1041    public boolean isGcWithNetworkConsumers() {
1042        return gcWithNetworkConsumers;
1043    }
1044
1045    public boolean isReduceMemoryFootprint() {
1046        return reduceMemoryFootprint;
1047    }
1048
1049    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
1050        this.reduceMemoryFootprint = reduceMemoryFootprint;
1051    }
1052
1053    public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) {
1054        this.networkBridgeFilterFactory = networkBridgeFilterFactory;
1055    }
1056
1057    public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() {
1058        return networkBridgeFilterFactory;
1059    }
1060
1061    public boolean isDoOptimzeMessageStorage() {
1062        return doOptimzeMessageStorage;
1063    }
1064
1065    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
1066        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
1067    }
1068
1069    public int getOptimizeMessageStoreInFlightLimit() {
1070        return optimizeMessageStoreInFlightLimit;
1071    }
1072
1073    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
1074        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
1075    }
1076
1077    public void setPersistJMSRedelivered(boolean val) {
1078        this.persistJMSRedelivered = val;
1079    }
1080
1081    public boolean isPersistJMSRedelivered() {
1082        return persistJMSRedelivered;
1083    }
1084
1085    public int getMaxDestinations() {
1086        return maxDestinations;
1087    }
1088
1089    /**
1090     * Sets the maximum number of destinations that can be created
1091     *
1092     * @param maxDestinations
1093     *            maximum number of destinations
1094     */
1095    public void setMaxDestinations(int maxDestinations) {
1096        this.maxDestinations = maxDestinations;
1097    }
1098
1099    @Override
1100    public String toString() {
1101        return "PolicyEntry [" + destination + "]";
1102    }
1103}