001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.region.policy.DispatchPolicy; 036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 040import org.apache.activemq.broker.util.InsertionCountList; 041import org.apache.activemq.command.ActiveMQDestination; 042import org.apache.activemq.command.ConsumerInfo; 043import org.apache.activemq.command.ExceptionResponse; 044import org.apache.activemq.command.Message; 045import org.apache.activemq.command.MessageAck; 046import org.apache.activemq.command.MessageId; 047import org.apache.activemq.command.ProducerAck; 048import org.apache.activemq.command.ProducerInfo; 049import org.apache.activemq.command.Response; 050import org.apache.activemq.command.SubscriptionInfo; 051import org.apache.activemq.filter.MessageEvaluationContext; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.store.MessageRecoveryListener; 054import org.apache.activemq.store.NoLocalSubscriptionAware; 055import org.apache.activemq.store.PersistenceAdapter; 056import org.apache.activemq.store.TopicMessageStore; 057import org.apache.activemq.thread.Task; 058import org.apache.activemq.thread.TaskRunner; 059import org.apache.activemq.thread.TaskRunnerFactory; 060import org.apache.activemq.transaction.Synchronization; 061import org.apache.activemq.util.SubscriptionKey; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * The Topic is a destination that sends a copy of a message to every active 067 * Subscription registered. 068 */ 069public class Topic extends BaseDestination implements Task { 070 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 071 private final TopicMessageStore topicStore; 072 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 073 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 074 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 075 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 076 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 077 private final TaskRunner taskRunner; 078 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 079 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 080 @Override 081 public void run() { 082 try { 083 Topic.this.taskRunner.wakeup(); 084 } catch (InterruptedException e) { 085 } 086 } 087 }; 088 089 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 090 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 091 super(brokerService, store, destination, parentStats); 092 this.topicStore = store; 093 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 094 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 095 } 096 097 @Override 098 public void initialize() throws Exception { 099 super.initialize(); 100 // set non default subscription recovery policy (override policyEntries) 101 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 102 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 103 setAlwaysRetroactive(true); 104 } 105 if (store != null) { 106 // AMQ-2586: Better to leave this stat at zero than to give the user 107 // misleading metrics. 108 // int messageCount = store.getMessageCount(); 109 // destinationStatistics.getMessages().setCount(messageCount); 110 store.start(); 111 } 112 } 113 114 @Override 115 public List<Subscription> getConsumers() { 116 synchronized (consumers) { 117 return new ArrayList<Subscription>(consumers); 118 } 119 } 120 121 public boolean lock(MessageReference node, LockOwner sub) { 122 return true; 123 } 124 125 @Override 126 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 127 if (!sub.getConsumerInfo().isDurable()) { 128 129 // Do a retroactive recovery if needed. 130 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 131 132 // synchronize with dispatch method so that no new messages are sent 133 // while we are recovering a subscription to avoid out of order messages. 134 dispatchLock.writeLock().lock(); 135 try { 136 boolean applyRecovery = false; 137 synchronized (consumers) { 138 if (!consumers.contains(sub)){ 139 sub.add(context, this); 140 consumers.add(sub); 141 applyRecovery=true; 142 super.addSubscription(context, sub); 143 } 144 } 145 if (applyRecovery){ 146 subscriptionRecoveryPolicy.recover(context, this, sub); 147 } 148 } finally { 149 dispatchLock.writeLock().unlock(); 150 } 151 152 } else { 153 synchronized (consumers) { 154 if (!consumers.contains(sub)){ 155 sub.add(context, this); 156 consumers.add(sub); 157 super.addSubscription(context, sub); 158 } 159 } 160 } 161 } else { 162 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 163 super.addSubscription(context, sub); 164 sub.add(context, this); 165 if(dsub.isActive()) { 166 synchronized (consumers) { 167 boolean hasSubscription = false; 168 169 if (consumers.size() == 0) { 170 hasSubscription = false; 171 } else { 172 for (Subscription currentSub : consumers) { 173 if (currentSub.getConsumerInfo().isDurable()) { 174 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 175 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 176 hasSubscription = true; 177 break; 178 } 179 } 180 } 181 } 182 183 if (!hasSubscription) { 184 consumers.add(sub); 185 } 186 } 187 } 188 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 189 } 190 } 191 192 @Override 193 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 194 if (!sub.getConsumerInfo().isDurable()) { 195 boolean removed = false; 196 synchronized (consumers) { 197 removed = consumers.remove(sub); 198 } 199 if (removed) { 200 super.removeSubscription(context, sub, lastDeliveredSequenceId); 201 } 202 } 203 sub.remove(context, this); 204 } 205 206 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 207 if (topicStore != null) { 208 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 209 DurableTopicSubscription removed = durableSubscribers.remove(key); 210 if (removed != null) { 211 destinationStatistics.getConsumers().decrement(); 212 // deactivate and remove 213 removed.deactivate(false, 0l); 214 consumers.remove(removed); 215 } 216 } 217 } 218 219 private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { 220 if (hasSelectorChanged(info1, info2)) { 221 return true; 222 } 223 224 return hasNoLocalChanged(info1, info2); 225 } 226 227 private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { 228 //Not all persistence adapters store the noLocal value for a subscription 229 PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter(); 230 if (adapter instanceof NoLocalSubscriptionAware) { 231 if (info1.isNoLocal() ^ info2.isNoLocal()) { 232 return true; 233 } 234 } 235 236 return false; 237 } 238 239 private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) { 240 if (info1.getSelector() != null ^ info2.getSelector() != null) { 241 return true; 242 } 243 244 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 245 return true; 246 } 247 248 return false; 249 } 250 251 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 252 // synchronize with dispatch method so that no new messages are sent 253 // while we are recovering a subscription to avoid out of order messages. 254 dispatchLock.writeLock().lock(); 255 try { 256 257 if (topicStore == null) { 258 return; 259 } 260 261 // Recover the durable subscription. 262 String clientId = subscription.getSubscriptionKey().getClientId(); 263 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 264 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 265 if (info != null) { 266 // Check to see if selector changed. 267 if (hasDurableSubChanged(info, subscription.getConsumerInfo())) { 268 // Need to delete the subscription 269 topicStore.deleteSubscription(clientId, subscriptionName); 270 info = null; 271 // Force a rebuild of the selector chain for the subscription otherwise 272 // the stored subscription is updated but the selector expression is not 273 // and the subscription will not behave according to the new configuration. 274 subscription.setSelector(subscription.getConsumerInfo().getSelector()); 275 synchronized (consumers) { 276 consumers.remove(subscription); 277 } 278 } else { 279 synchronized (consumers) { 280 if (!consumers.contains(subscription)) { 281 consumers.add(subscription); 282 } 283 } 284 } 285 } 286 287 // Do we need to create the subscription? 288 if (info == null) { 289 info = new SubscriptionInfo(); 290 info.setClientId(clientId); 291 info.setSelector(subscription.getConsumerInfo().getSelector()); 292 info.setSubscriptionName(subscriptionName); 293 info.setDestination(getActiveMQDestination()); 294 info.setNoLocal(subscription.getConsumerInfo().isNoLocal()); 295 // This destination is an actual destination id. 296 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 297 // This destination might be a pattern 298 synchronized (consumers) { 299 consumers.add(subscription); 300 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 301 } 302 } 303 304 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 305 msgContext.setDestination(destination); 306 if (subscription.isRecoveryRequired()) { 307 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 308 @Override 309 public boolean recoverMessage(Message message) throws Exception { 310 message.setRegionDestination(Topic.this); 311 try { 312 msgContext.setMessageReference(message); 313 if (subscription.matches(message, msgContext)) { 314 subscription.add(message); 315 } 316 } catch (IOException e) { 317 LOG.error("Failed to recover this message {}", message, e); 318 } 319 return true; 320 } 321 322 @Override 323 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 324 throw new RuntimeException("Should not be called."); 325 } 326 327 @Override 328 public boolean hasSpace() { 329 return true; 330 } 331 332 @Override 333 public boolean isDuplicate(MessageId id) { 334 return false; 335 } 336 }); 337 } 338 } finally { 339 dispatchLock.writeLock().unlock(); 340 } 341 } 342 343 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 344 synchronized (consumers) { 345 consumers.remove(sub); 346 } 347 sub.remove(context, this, dispatched); 348 } 349 350 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 351 if (subscription.getConsumerInfo().isRetroactive()) { 352 subscriptionRecoveryPolicy.recover(context, this, subscription); 353 } 354 } 355 356 @Override 357 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 358 final ConnectionContext context = producerExchange.getConnectionContext(); 359 360 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 361 producerExchange.incrementSend(); 362 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 363 && !context.isInRecoveryMode(); 364 365 message.setRegionDestination(this); 366 367 // There is delay between the client sending it and it arriving at the 368 // destination.. it may have expired. 369 if (message.isExpired()) { 370 broker.messageExpired(context, message, null); 371 getDestinationStatistics().getExpired().increment(); 372 if (sendProducerAck) { 373 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 374 context.getConnection().dispatchAsync(ack); 375 } 376 return; 377 } 378 379 if (memoryUsage.isFull()) { 380 isFull(context, memoryUsage); 381 fastProducer(context, producerInfo); 382 383 if (isProducerFlowControl() && context.isProducerFlowControl()) { 384 385 if (isFlowControlLogRequired()) { 386 LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 387 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 388 } 389 390 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 391 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 392 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 393 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 394 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 395 } 396 397 // We can avoid blocking due to low usage if the producer is sending a sync message or 398 // if it is using a producer window 399 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 400 synchronized (messagesWaitingForSpace) { 401 messagesWaitingForSpace.add(new Runnable() { 402 @Override 403 public void run() { 404 try { 405 406 // While waiting for space to free up... the 407 // message may have expired. 408 if (message.isExpired()) { 409 broker.messageExpired(context, message, null); 410 getDestinationStatistics().getExpired().increment(); 411 } else { 412 doMessageSend(producerExchange, message); 413 } 414 415 if (sendProducerAck) { 416 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 417 .getSize()); 418 context.getConnection().dispatchAsync(ack); 419 } else { 420 Response response = new Response(); 421 response.setCorrelationId(message.getCommandId()); 422 context.getConnection().dispatchAsync(response); 423 } 424 425 } catch (Exception e) { 426 if (!sendProducerAck && !context.isInRecoveryMode()) { 427 ExceptionResponse response = new ExceptionResponse(e); 428 response.setCorrelationId(message.getCommandId()); 429 context.getConnection().dispatchAsync(response); 430 } 431 } 432 } 433 }); 434 435 registerCallbackForNotFullNotification(); 436 context.setDontSendReponse(true); 437 return; 438 } 439 440 } else { 441 // Producer flow control cannot be used, so we have do the flow control 442 // at the broker by blocking this thread until there is space available. 443 444 if (memoryUsage.isFull()) { 445 if (context.isInTransaction()) { 446 447 int count = 0; 448 while (!memoryUsage.waitForSpace(1000)) { 449 if (context.getStopping().get()) { 450 throw new IOException("Connection closed, send aborted."); 451 } 452 if (count > 2 && context.isInTransaction()) { 453 count = 0; 454 int size = context.getTransaction().size(); 455 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 456 } 457 count++; 458 } 459 } else { 460 waitForSpace( 461 context, 462 producerExchange, 463 memoryUsage, 464 "Usage Manager Memory Usage limit reached. Stopping producer (" 465 + message.getProducerId() 466 + ") to prevent flooding " 467 + getActiveMQDestination().getQualifiedName() 468 + "." 469 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 470 } 471 } 472 473 // The usage manager could have delayed us by the time 474 // we unblock the message could have expired.. 475 if (message.isExpired()) { 476 getDestinationStatistics().getExpired().increment(); 477 LOG.debug("Expired message: {}", message); 478 return; 479 } 480 } 481 } 482 } 483 484 doMessageSend(producerExchange, message); 485 messageDelivered(context, message); 486 if (sendProducerAck) { 487 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 488 context.getConnection().dispatchAsync(ack); 489 } 490 } 491 492 /** 493 * do send the message - this needs to be synchronized to ensure messages 494 * are stored AND dispatched in the right order 495 * 496 * @param producerExchange 497 * @param message 498 * @throws IOException 499 * @throws Exception 500 */ 501 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 502 throws IOException, Exception { 503 final ConnectionContext context = producerExchange.getConnectionContext(); 504 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 505 Future<Object> result = null; 506 507 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 508 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 509 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 510 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 511 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 512 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 513 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 514 throw new javax.jms.ResourceAllocationException(logMessage); 515 } 516 517 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 518 } 519 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 520 521 //Moved the reduceMemoryfootprint clearing to the dispatch method 522 } 523 524 message.incrementReferenceCount(); 525 526 if (context.isInTransaction()) { 527 context.getTransaction().addSynchronization(new Synchronization() { 528 @Override 529 public void afterCommit() throws Exception { 530 // It could take while before we receive the commit 531 // operation.. by that time the message could have 532 // expired.. 533 if (message.isExpired()) { 534 if (broker.isExpired(message)) { 535 getDestinationStatistics().getExpired().increment(); 536 broker.messageExpired(context, message, null); 537 } 538 message.decrementReferenceCount(); 539 return; 540 } 541 try { 542 dispatch(context, message); 543 } finally { 544 message.decrementReferenceCount(); 545 } 546 } 547 548 @Override 549 public void afterRollback() throws Exception { 550 message.decrementReferenceCount(); 551 } 552 }); 553 554 } else { 555 try { 556 dispatch(context, message); 557 } finally { 558 message.decrementReferenceCount(); 559 } 560 } 561 562 if (result != null && !result.isCancelled()) { 563 try { 564 result.get(); 565 } catch (CancellationException e) { 566 // ignore - the task has been cancelled if the message 567 // has already been deleted 568 } 569 } 570 } 571 572 private boolean canOptimizeOutPersistence() { 573 return durableSubscribers.size() == 0; 574 } 575 576 @Override 577 public String toString() { 578 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 579 } 580 581 @Override 582 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 583 final MessageReference node) throws IOException { 584 if (topicStore != null && node.isPersistent()) { 585 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 586 SubscriptionKey key = dsub.getSubscriptionKey(); 587 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 588 convertToNonRangedAck(ack, node)); 589 } 590 messageConsumed(context, node); 591 } 592 593 @Override 594 public void gc() { 595 } 596 597 public Message loadMessage(MessageId messageId) throws IOException { 598 return topicStore != null ? topicStore.getMessage(messageId) : null; 599 } 600 601 @Override 602 public void start() throws Exception { 603 if (started.compareAndSet(false, true)) { 604 this.subscriptionRecoveryPolicy.start(); 605 if (memoryUsage != null) { 606 memoryUsage.start(); 607 } 608 609 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 610 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 611 } 612 } 613 } 614 615 @Override 616 public void stop() throws Exception { 617 if (started.compareAndSet(true, false)) { 618 if (taskRunner != null) { 619 taskRunner.shutdown(); 620 } 621 this.subscriptionRecoveryPolicy.stop(); 622 if (memoryUsage != null) { 623 memoryUsage.stop(); 624 } 625 if (this.topicStore != null) { 626 this.topicStore.stop(); 627 } 628 629 scheduler.cancel(expireMessagesTask); 630 } 631 } 632 633 @Override 634 public Message[] browse() { 635 final List<Message> result = new ArrayList<Message>(); 636 doBrowse(result, getMaxBrowsePageSize()); 637 return result.toArray(new Message[result.size()]); 638 } 639 640 private void doBrowse(final List<Message> browseList, final int max) { 641 try { 642 if (topicStore != null) { 643 final List<Message> toExpire = new ArrayList<Message>(); 644 topicStore.recover(new MessageRecoveryListener() { 645 @Override 646 public boolean recoverMessage(Message message) throws Exception { 647 if (message.isExpired()) { 648 toExpire.add(message); 649 } 650 browseList.add(message); 651 return true; 652 } 653 654 @Override 655 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 656 return true; 657 } 658 659 @Override 660 public boolean hasSpace() { 661 return browseList.size() < max; 662 } 663 664 @Override 665 public boolean isDuplicate(MessageId id) { 666 return false; 667 } 668 }); 669 final ConnectionContext connectionContext = createConnectionContext(); 670 for (Message message : toExpire) { 671 for (DurableTopicSubscription sub : durableSubscribers.values()) { 672 if (!sub.isActive()) { 673 message.setRegionDestination(this); 674 messageExpired(connectionContext, sub, message); 675 } 676 } 677 } 678 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 679 if (msgs != null) { 680 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 681 browseList.add(msgs[i]); 682 } 683 } 684 } 685 } catch (Throwable e) { 686 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 687 } 688 } 689 690 @Override 691 public boolean iterate() { 692 synchronized (messagesWaitingForSpace) { 693 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 694 Runnable op = messagesWaitingForSpace.removeFirst(); 695 op.run(); 696 } 697 698 if (!messagesWaitingForSpace.isEmpty()) { 699 registerCallbackForNotFullNotification(); 700 } 701 } 702 return false; 703 } 704 705 private void registerCallbackForNotFullNotification() { 706 // If the usage manager is not full, then the task will not 707 // get called.. 708 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 709 // so call it directly here. 710 sendMessagesWaitingForSpaceTask.run(); 711 } 712 } 713 714 // Properties 715 // ------------------------------------------------------------------------- 716 717 public DispatchPolicy getDispatchPolicy() { 718 return dispatchPolicy; 719 } 720 721 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 722 this.dispatchPolicy = dispatchPolicy; 723 } 724 725 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 726 return subscriptionRecoveryPolicy; 727 } 728 729 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 730 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 731 // allow users to combine retained message policy with other ActiveMQ policies 732 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 733 policy.setWrapped(recoveryPolicy); 734 } else { 735 this.subscriptionRecoveryPolicy = recoveryPolicy; 736 } 737 } 738 739 // Implementation methods 740 // ------------------------------------------------------------------------- 741 742 @Override 743 public final void wakeup() { 744 } 745 746 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 747 // AMQ-2586: Better to leave this stat at zero than to give the user 748 // misleading metrics. 749 // destinationStatistics.getMessages().increment(); 750 destinationStatistics.getEnqueues().increment(); 751 destinationStatistics.getMessageSize().addSize(message.getSize()); 752 MessageEvaluationContext msgContext = null; 753 754 dispatchLock.readLock().lock(); 755 try { 756 if (!subscriptionRecoveryPolicy.add(context, message)) { 757 return; 758 } 759 synchronized (consumers) { 760 if (consumers.isEmpty()) { 761 onMessageWithNoConsumers(context, message); 762 return; 763 } 764 } 765 766 // Clear memory before dispatch - need to clear here because the call to 767 //subscriptionRecoveryPolicy.add() will unmarshall the state 768 if (isReduceMemoryFootprint() && message.isMarshalled()) { 769 message.clearUnMarshalledState(); 770 } 771 772 msgContext = context.getMessageEvaluationContext(); 773 msgContext.setDestination(destination); 774 msgContext.setMessageReference(message); 775 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 776 onMessageWithNoConsumers(context, message); 777 } 778 779 } finally { 780 dispatchLock.readLock().unlock(); 781 if (msgContext != null) { 782 msgContext.clear(); 783 } 784 } 785 } 786 787 private final Runnable expireMessagesTask = new Runnable() { 788 @Override 789 public void run() { 790 List<Message> browsedMessages = new InsertionCountList<Message>(); 791 doBrowse(browsedMessages, getMaxExpirePageSize()); 792 } 793 }; 794 795 @Override 796 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 797 broker.messageExpired(context, reference, subs); 798 // AMQ-2586: Better to leave this stat at zero than to give the user 799 // misleading metrics. 800 // destinationStatistics.getMessages().decrement(); 801 destinationStatistics.getExpired().increment(); 802 MessageAck ack = new MessageAck(); 803 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 804 ack.setDestination(destination); 805 ack.setMessageID(reference.getMessageId()); 806 try { 807 if (subs instanceof DurableTopicSubscription) { 808 ((DurableTopicSubscription)subs).removePending(reference); 809 } 810 acknowledge(context, subs, ack, reference); 811 } catch (Exception e) { 812 LOG.error("Failed to remove expired Message from the store ", e); 813 } 814 } 815 816 @Override 817 protected Logger getLog() { 818 return LOG; 819 } 820 821 protected boolean isOptimizeStorage(){ 822 boolean result = false; 823 824 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 825 result = true; 826 for (DurableTopicSubscription s : durableSubscribers.values()) { 827 if (s.isActive()== false){ 828 result = false; 829 break; 830 } 831 if (s.getPrefetchSize()==0){ 832 result = false; 833 break; 834 } 835 if (s.isSlowConsumer()){ 836 result = false; 837 break; 838 } 839 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 840 result = false; 841 break; 842 } 843 } 844 } 845 return result; 846 } 847 848 /** 849 * force a reread of the store - after transaction recovery completion 850 */ 851 @Override 852 public void clearPendingMessages() { 853 dispatchLock.readLock().lock(); 854 try { 855 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 856 clearPendingAndDispatch(durableTopicSubscription); 857 } 858 } finally { 859 dispatchLock.readLock().unlock(); 860 } 861 } 862 863 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 864 synchronized (durableTopicSubscription.pendingLock) { 865 durableTopicSubscription.pending.clear(); 866 try { 867 durableTopicSubscription.dispatchPending(); 868 } catch (IOException exception) { 869 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 870 durableTopicSubscription, 871 destination, 872 durableTopicSubscription.pending }, exception); 873 } 874 } 875 } 876 877 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 878 return durableSubscribers; 879 } 880}