001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.net.URI; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Locale; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.CopyOnWriteArrayList; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.locks.ReentrantReadWriteLock; 032 033import javax.jms.InvalidClientIDException; 034import javax.jms.JMSException; 035 036import org.apache.activemq.broker.Broker; 037import org.apache.activemq.broker.BrokerService; 038import org.apache.activemq.broker.Connection; 039import org.apache.activemq.broker.ConnectionContext; 040import org.apache.activemq.broker.ConsumerBrokerExchange; 041import org.apache.activemq.broker.EmptyBroker; 042import org.apache.activemq.broker.ProducerBrokerExchange; 043import org.apache.activemq.broker.TransportConnection; 044import org.apache.activemq.broker.TransportConnector; 045import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 046import org.apache.activemq.broker.region.policy.PolicyMap; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQMessage; 049import org.apache.activemq.command.BrokerId; 050import org.apache.activemq.command.BrokerInfo; 051import org.apache.activemq.command.ConnectionId; 052import org.apache.activemq.command.ConnectionInfo; 053import org.apache.activemq.command.ConsumerControl; 054import org.apache.activemq.command.ConsumerInfo; 055import org.apache.activemq.command.DestinationInfo; 056import org.apache.activemq.command.Message; 057import org.apache.activemq.command.MessageAck; 058import org.apache.activemq.command.MessageDispatch; 059import org.apache.activemq.command.MessageDispatchNotification; 060import org.apache.activemq.command.MessagePull; 061import org.apache.activemq.command.ProducerInfo; 062import org.apache.activemq.command.RemoveSubscriptionInfo; 063import org.apache.activemq.command.Response; 064import org.apache.activemq.command.TransactionId; 065import org.apache.activemq.state.ConnectionState; 066import org.apache.activemq.store.PListStore; 067import org.apache.activemq.thread.Scheduler; 068import org.apache.activemq.thread.TaskRunnerFactory; 069import org.apache.activemq.transport.TransmitCallback; 070import org.apache.activemq.usage.SystemUsage; 071import org.apache.activemq.util.BrokerSupport; 072import org.apache.activemq.util.IdGenerator; 073import org.apache.activemq.util.InetAddressUtil; 074import org.apache.activemq.util.LongSequenceGenerator; 075import org.apache.activemq.util.ServiceStopper; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079/** 080 * Routes Broker operations to the correct messaging regions for processing. 081 */ 082public class RegionBroker extends EmptyBroker { 083 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 084 private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class); 085 private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); 086 087 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 088 protected DestinationFactory destinationFactory; 089 protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); 090 091 private final Region queueRegion; 092 private final Region topicRegion; 093 private final Region tempQueueRegion; 094 private final Region tempTopicRegion; 095 protected final BrokerService brokerService; 096 private boolean started; 097 private boolean keepDurableSubsActive; 098 099 private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>(); 100 private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap<ActiveMQDestination, ActiveMQDestination>(); 101 private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 102 private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>(); 103 104 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 105 private BrokerId brokerId; 106 private String brokerName; 107 private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); 108 private final DestinationInterceptor destinationInterceptor; 109 private ConnectionContext adminConnectionContext; 110 private final Scheduler scheduler; 111 private final ThreadPoolExecutor executor; 112 private boolean allowTempAutoCreationOnSend; 113 114 private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock(); 115 private final Runnable purgeInactiveDestinationsTask = new Runnable() { 116 @Override 117 public void run() { 118 purgeInactiveDestinations(); 119 } 120 }; 121 122 public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, 123 DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor executor) throws IOException { 124 this.brokerService = brokerService; 125 this.executor = executor; 126 this.scheduler = scheduler; 127 if (destinationFactory == null) { 128 throw new IllegalArgumentException("null destinationFactory"); 129 } 130 this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId()); 131 this.destinationFactory = destinationFactory; 132 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 133 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 134 this.destinationInterceptor = destinationInterceptor; 135 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 136 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 137 } 138 139 @Override 140 public Map<ActiveMQDestination, Destination> getDestinationMap() { 141 Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap()); 142 answer.putAll(getTopicRegion().getDestinationMap()); 143 return answer; 144 } 145 146 @Override 147 public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) { 148 try { 149 return getRegion(destination).getDestinationMap(); 150 } catch (JMSException jmse) { 151 return Collections.emptyMap(); 152 } 153 } 154 155 @Override 156 public Set<Destination> getDestinations(ActiveMQDestination destination) { 157 try { 158 return getRegion(destination).getDestinations(destination); 159 } catch (JMSException jmse) { 160 return Collections.emptySet(); 161 } 162 } 163 164 public Region getQueueRegion() { 165 return queueRegion; 166 } 167 168 public Region getTempQueueRegion() { 169 return tempQueueRegion; 170 } 171 172 public Region getTempTopicRegion() { 173 return tempTopicRegion; 174 } 175 176 public Region getTopicRegion() { 177 return topicRegion; 178 } 179 180 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 181 return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 182 } 183 184 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 185 return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 186 } 187 188 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 189 return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 190 } 191 192 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 193 return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 194 } 195 196 @Override 197 public void start() throws Exception { 198 started = true; 199 queueRegion.start(); 200 topicRegion.start(); 201 tempQueueRegion.start(); 202 tempTopicRegion.start(); 203 int period = this.brokerService.getSchedulePeriodForDestinationPurge(); 204 if (period > 0) { 205 this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period); 206 } 207 } 208 209 @Override 210 public void stop() throws Exception { 211 started = false; 212 this.scheduler.cancel(purgeInactiveDestinationsTask); 213 ServiceStopper ss = new ServiceStopper(); 214 doStop(ss); 215 ss.throwFirstException(); 216 // clear the state 217 clientIdSet.clear(); 218 connections.clear(); 219 destinations.clear(); 220 brokerInfos.clear(); 221 } 222 223 public PolicyMap getDestinationPolicy() { 224 return brokerService != null ? brokerService.getDestinationPolicy() : null; 225 } 226 227 public ConnectionContext getConnectionContext(String clientId) { 228 return clientIdSet.get(clientId); 229 } 230 231 @Override 232 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 233 String clientId = info.getClientId(); 234 if (clientId == null) { 235 throw new InvalidClientIDException("No clientID specified for connection request"); 236 } 237 238 ConnectionContext oldContext = null; 239 240 synchronized (clientIdSet) { 241 oldContext = clientIdSet.get(clientId); 242 if (oldContext != null) { 243 if (context.isAllowLinkStealing()) { 244 clientIdSet.put(clientId, context); 245 } else { 246 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " 247 + oldContext.getConnection().getRemoteAddress()); 248 } 249 } else { 250 clientIdSet.put(clientId, context); 251 } 252 } 253 254 if (oldContext != null) { 255 if (oldContext.getConnection() != null) { 256 Connection connection = oldContext.getConnection(); 257 LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection()); 258 if (connection instanceof TransportConnection) { 259 TransportConnection transportConnection = (TransportConnection) connection; 260 transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId())); 261 } else { 262 connection.stop(); 263 } 264 } else { 265 LOG.error("No Connection found for {}", oldContext); 266 } 267 } 268 269 connections.add(context.getConnection()); 270 } 271 272 @Override 273 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 274 String clientId = info.getClientId(); 275 if (clientId == null) { 276 throw new InvalidClientIDException("No clientID specified for connection disconnect request"); 277 } 278 synchronized (clientIdSet) { 279 ConnectionContext oldValue = clientIdSet.get(clientId); 280 // we may be removing the duplicate connection, not the first connection to be created 281 // so lets check that their connection IDs are the same 282 if (oldValue == context) { 283 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) { 284 clientIdSet.remove(clientId); 285 } 286 } 287 } 288 connections.remove(context.getConnection()); 289 } 290 291 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) { 292 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2)); 293 } 294 295 @Override 296 public Connection[] getClients() throws Exception { 297 ArrayList<Connection> l = new ArrayList<Connection>(connections); 298 Connection rc[] = new Connection[l.size()]; 299 l.toArray(rc); 300 return rc; 301 } 302 303 @Override 304 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception { 305 306 Destination answer; 307 308 answer = destinations.get(destination); 309 if (answer != null) { 310 return answer; 311 } 312 313 synchronized (destinationGate) { 314 answer = destinations.get(destination); 315 if (answer != null) { 316 return answer; 317 } 318 319 if (destinationGate.get(destination) != null) { 320 // Guard against spurious wakeup. 321 while (destinationGate.containsKey(destination)) { 322 destinationGate.wait(); 323 } 324 answer = destinations.get(destination); 325 if (answer != null) { 326 return answer; 327 } else { 328 // In case of intermediate remove or add failure 329 destinationGate.put(destination, destination); 330 } 331 } 332 } 333 334 try { 335 boolean create = true; 336 if (destination.isTemporary()) { 337 create = createIfTemp; 338 } 339 answer = getRegion(destination).addDestination(context, destination, create); 340 destinations.put(destination, answer); 341 } finally { 342 synchronized (destinationGate) { 343 destinationGate.remove(destination); 344 destinationGate.notifyAll(); 345 } 346 } 347 348 return answer; 349 } 350 351 @Override 352 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 353 if (destinations.containsKey(destination)) { 354 getRegion(destination).removeDestination(context, destination, timeout); 355 destinations.remove(destination); 356 } 357 } 358 359 @Override 360 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 361 addDestination(context, info.getDestination(), true); 362 363 } 364 365 @Override 366 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 367 removeDestination(context, info.getDestination(), info.getTimeout()); 368 } 369 370 @Override 371 public ActiveMQDestination[] getDestinations() throws Exception { 372 ArrayList<ActiveMQDestination> l; 373 374 l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet()); 375 376 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; 377 l.toArray(rc); 378 return rc; 379 } 380 381 @Override 382 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 383 ActiveMQDestination destination = info.getDestination(); 384 if (destination != null) { 385 inactiveDestinationsPurgeLock.readLock().lock(); 386 try { 387 // This seems to cause the destination to be added but without 388 // advisories firing... 389 context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend()); 390 getRegion(destination).addProducer(context, info); 391 } finally { 392 inactiveDestinationsPurgeLock.readLock().unlock(); 393 } 394 } 395 } 396 397 @Override 398 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 399 ActiveMQDestination destination = info.getDestination(); 400 if (destination != null) { 401 inactiveDestinationsPurgeLock.readLock().lock(); 402 try { 403 getRegion(destination).removeProducer(context, info); 404 } finally { 405 inactiveDestinationsPurgeLock.readLock().unlock(); 406 } 407 } 408 } 409 410 @Override 411 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 412 ActiveMQDestination destination = info.getDestination(); 413 if (destinationInterceptor != null) { 414 destinationInterceptor.create(this, context, destination); 415 } 416 inactiveDestinationsPurgeLock.readLock().lock(); 417 try { 418 return getRegion(destination).addConsumer(context, info); 419 } finally { 420 inactiveDestinationsPurgeLock.readLock().unlock(); 421 } 422 } 423 424 @Override 425 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 426 ActiveMQDestination destination = info.getDestination(); 427 inactiveDestinationsPurgeLock.readLock().lock(); 428 try { 429 getRegion(destination).removeConsumer(context, info); 430 } finally { 431 inactiveDestinationsPurgeLock.readLock().unlock(); 432 } 433 } 434 435 @Override 436 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 437 inactiveDestinationsPurgeLock.readLock().lock(); 438 try { 439 topicRegion.removeSubscription(context, info); 440 } finally { 441 inactiveDestinationsPurgeLock.readLock().unlock(); 442 443 } 444 } 445 446 @Override 447 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 448 ActiveMQDestination destination = message.getDestination(); 449 message.setBrokerInTime(System.currentTimeMillis()); 450 if (producerExchange.isMutable() || producerExchange.getRegion() == null 451 || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) { 452 // ensure the destination is registered with the RegionBroker 453 producerExchange.getConnectionContext().getBroker() 454 .addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend()); 455 producerExchange.setRegion(getRegion(destination)); 456 producerExchange.setRegionDestination(null); 457 } 458 459 producerExchange.getRegion().send(producerExchange, message); 460 461 // clean up so these references aren't kept (possible leak) in the producer exchange 462 // especially since temps are transitory 463 if (producerExchange.isMutable()) { 464 producerExchange.setRegionDestination(null); 465 producerExchange.setRegion(null); 466 } 467 } 468 469 @Override 470 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 471 if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) { 472 ActiveMQDestination destination = ack.getDestination(); 473 consumerExchange.setRegion(getRegion(destination)); 474 } 475 consumerExchange.getRegion().acknowledge(consumerExchange, ack); 476 } 477 478 public Region getRegion(ActiveMQDestination destination) throws JMSException { 479 switch (destination.getDestinationType()) { 480 case ActiveMQDestination.QUEUE_TYPE: 481 return queueRegion; 482 case ActiveMQDestination.TOPIC_TYPE: 483 return topicRegion; 484 case ActiveMQDestination.TEMP_QUEUE_TYPE: 485 return tempQueueRegion; 486 case ActiveMQDestination.TEMP_TOPIC_TYPE: 487 return tempTopicRegion; 488 default: 489 throw createUnknownDestinationTypeException(destination); 490 } 491 } 492 493 @Override 494 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 495 ActiveMQDestination destination = pull.getDestination(); 496 return getRegion(destination).messagePull(context, pull); 497 } 498 499 @Override 500 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 501 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 502 } 503 504 @Override 505 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 506 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 507 } 508 509 @Override 510 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 511 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 512 } 513 514 @Override 515 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 516 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 517 } 518 519 @Override 520 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 521 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 522 } 523 524 @Override 525 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 526 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 527 } 528 529 @Override 530 public void gc() { 531 queueRegion.gc(); 532 topicRegion.gc(); 533 } 534 535 @Override 536 public BrokerId getBrokerId() { 537 if (brokerId == null) { 538 brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); 539 } 540 return brokerId; 541 } 542 543 public void setBrokerId(BrokerId brokerId) { 544 this.brokerId = brokerId; 545 } 546 547 @Override 548 public String getBrokerName() { 549 if (brokerName == null) { 550 try { 551 brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH); 552 } catch (Exception e) { 553 brokerName = "localhost"; 554 } 555 } 556 return brokerName; 557 } 558 559 public void setBrokerName(String brokerName) { 560 this.brokerName = brokerName; 561 } 562 563 public DestinationStatistics getDestinationStatistics() { 564 return destinationStatistics; 565 } 566 567 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) { 568 return new JMSException("Unknown destination type: " + destination.getDestinationType()); 569 } 570 571 @Override 572 public synchronized void addBroker(Connection connection, BrokerInfo info) { 573 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 574 if (existing == null) { 575 existing = info.copy(); 576 existing.setPeerBrokerInfos(null); 577 brokerInfos.put(info.getBrokerId(), existing); 578 } 579 existing.incrementRefCount(); 580 LOG.debug("{} addBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size() }); 581 addBrokerInClusterUpdate(info); 582 } 583 584 @Override 585 public synchronized void removeBroker(Connection connection, BrokerInfo info) { 586 if (info != null) { 587 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 588 if (existing != null && existing.decrementRefCount() == 0) { 589 brokerInfos.remove(info.getBrokerId()); 590 } 591 LOG.debug("{} removeBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size()}); 592 // When stopping don't send cluster updates since we are the one's tearing down 593 // our own bridges. 594 if (!brokerService.isStopping()) { 595 removeBrokerInClusterUpdate(info); 596 } 597 } 598 } 599 600 @Override 601 public synchronized BrokerInfo[] getPeerBrokerInfos() { 602 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; 603 result = brokerInfos.values().toArray(result); 604 return result; 605 } 606 607 @Override 608 public void preProcessDispatch(final MessageDispatch messageDispatch) { 609 final Message message = messageDispatch.getMessage(); 610 if (message != null) { 611 long endTime = System.currentTimeMillis(); 612 message.setBrokerOutTime(endTime); 613 if (getBrokerService().isEnableStatistics()) { 614 long totalTime = endTime - message.getBrokerInTime(); 615 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime); 616 } 617 if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered()) { 618 final int originalValue = message.getRedeliveryCounter(); 619 message.incrementRedeliveryCounter(); 620 try { 621 if (message.isPersistent()) { 622 ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); 623 } 624 messageDispatch.setTransmitCallback(new TransmitCallback() { 625 // dispatch is considered a delivery, so update sub state post dispatch otherwise 626 // on a disconnect/reconnect cached messages will not reflect initial delivery attempt 627 final TransmitCallback delegate = messageDispatch.getTransmitCallback(); 628 @Override 629 public void onSuccess() { 630 message.incrementRedeliveryCounter(); 631 if (delegate != null) { 632 delegate.onSuccess(); 633 } 634 } 635 636 @Override 637 public void onFailure() { 638 if (delegate != null) { 639 delegate.onFailure(); 640 } 641 } 642 }); 643 } catch (IOException error) { 644 RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error); 645 LOG.warn(runtimeException.getLocalizedMessage(), runtimeException); 646 throw runtimeException; 647 } finally { 648 message.setRedeliveryCounter(originalValue); 649 } 650 } 651 } 652 } 653 654 @Override 655 public void postProcessDispatch(MessageDispatch messageDispatch) { 656 } 657 658 @Override 659 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 660 ActiveMQDestination destination = messageDispatchNotification.getDestination(); 661 getRegion(destination).processDispatchNotification(messageDispatchNotification); 662 } 663 664 @Override 665 public boolean isStopped() { 666 return !started; 667 } 668 669 @Override 670 public Set<ActiveMQDestination> getDurableDestinations() { 671 return destinationFactory.getDestinations(); 672 } 673 674 protected void doStop(ServiceStopper ss) { 675 ss.stop(queueRegion); 676 ss.stop(topicRegion); 677 ss.stop(tempQueueRegion); 678 ss.stop(tempTopicRegion); 679 } 680 681 public boolean isKeepDurableSubsActive() { 682 return keepDurableSubsActive; 683 } 684 685 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 686 this.keepDurableSubsActive = keepDurableSubsActive; 687 ((TopicRegion) topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); 688 } 689 690 public DestinationInterceptor getDestinationInterceptor() { 691 return destinationInterceptor; 692 } 693 694 @Override 695 public ConnectionContext getAdminConnectionContext() { 696 return adminConnectionContext; 697 } 698 699 @Override 700 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 701 this.adminConnectionContext = adminConnectionContext; 702 } 703 704 public Map<ConnectionId, ConnectionState> getConnectionStates() { 705 return connectionStates; 706 } 707 708 @Override 709 public PListStore getTempDataStore() { 710 return brokerService.getTempDataStore(); 711 } 712 713 @Override 714 public URI getVmConnectorURI() { 715 return brokerService.getVmConnectorURI(); 716 } 717 718 @Override 719 public void brokerServiceStarted() { 720 } 721 722 @Override 723 public BrokerService getBrokerService() { 724 return brokerService; 725 } 726 727 @Override 728 public boolean isExpired(MessageReference messageReference) { 729 return messageReference.canProcessAsExpired(); 730 } 731 732 private boolean stampAsExpired(Message message) throws IOException { 733 boolean stamped = false; 734 if (message.getProperty(ORIGINAL_EXPIRATION) == null) { 735 long expiration = message.getExpiration(); 736 message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration)); 737 stamped = true; 738 } 739 return stamped; 740 } 741 742 @Override 743 public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) { 744 LOG.debug("Message expired {}", node); 745 getRoot().sendToDeadLetterQueue(context, node, subscription, new Throwable("Message Expired. Expiration:" + node.getExpiration())); 746 } 747 748 @Override 749 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription, Throwable poisonCause) { 750 try { 751 if (node != null) { 752 Message message = node.getMessage(); 753 if (message != null && node.getRegionDestination() != null) { 754 DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy(); 755 if (deadLetterStrategy != null) { 756 if (deadLetterStrategy.isSendToDeadLetterQueue(message)) { 757 ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription); 758 // Prevent a DLQ loop where same message is sent from a DLQ back to itself 759 if (deadLetterDestination.equals(message.getDestination())) { 760 LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); 761 return false; 762 } 763 764 // message may be inflight to other subscriptions so do not modify 765 message = message.copy(); 766 long dlqExpiration = deadLetterStrategy.getExpiration(); 767 if (dlqExpiration > 0) { 768 dlqExpiration += System.currentTimeMillis(); 769 } else { 770 stampAsExpired(message); 771 } 772 message.setExpiration(dlqExpiration); 773 if (!message.isPersistent()) { 774 message.setPersistent(true); 775 message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); 776 } 777 if (poisonCause != null) { 778 message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, 779 poisonCause.toString()); 780 } 781 // The original destination and transaction id do 782 // not get filled when the message is first sent, 783 // it is only populated if the message is routed to 784 // another destination like the DLQ 785 ConnectionContext adminContext = context; 786 if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) { 787 adminContext = BrokerSupport.getConnectionContext(this); 788 } 789 addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(true); 790 BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination); 791 return true; 792 } 793 } else { 794 LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination()); 795 } 796 } 797 } 798 } catch (Exception e) { 799 LOG.warn("Caught an exception sending to DLQ: {}", node, e); 800 } 801 802 return false; 803 } 804 805 @Override 806 public Broker getRoot() { 807 try { 808 return getBrokerService().getBroker(); 809 } catch (Exception e) { 810 LOG.error("Trying to get Root Broker", e); 811 throw new RuntimeException("The broker from the BrokerService should not throw an exception"); 812 } 813 } 814 815 /** 816 * @return the broker sequence id 817 */ 818 @Override 819 public long getBrokerSequenceId() { 820 synchronized (sequenceGenerator) { 821 return sequenceGenerator.getNextSequenceId(); 822 } 823 } 824 825 @Override 826 public Scheduler getScheduler() { 827 return this.scheduler; 828 } 829 830 @Override 831 public ThreadPoolExecutor getExecutor() { 832 return this.executor; 833 } 834 835 @Override 836 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 837 ActiveMQDestination destination = control.getDestination(); 838 try { 839 getRegion(destination).processConsumerControl(consumerExchange, control); 840 } catch (JMSException jmse) { 841 LOG.warn("unmatched destination: {}, in consumerControl: {}", destination, control); 842 } 843 } 844 845 protected void addBrokerInClusterUpdate(BrokerInfo info) { 846 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 847 for (TransportConnector connector : connectors) { 848 if (connector.isUpdateClusterClients()) { 849 connector.addPeerBroker(info); 850 connector.updateClientClusterInfo(); 851 } 852 } 853 } 854 855 protected void removeBrokerInClusterUpdate(BrokerInfo info) { 856 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 857 for (TransportConnector connector : connectors) { 858 if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) { 859 connector.removePeerBroker(info); 860 connector.updateClientClusterInfo(); 861 } 862 } 863 } 864 865 protected void purgeInactiveDestinations() { 866 inactiveDestinationsPurgeLock.writeLock().lock(); 867 try { 868 List<Destination> list = new ArrayList<Destination>(); 869 Map<ActiveMQDestination, Destination> map = getDestinationMap(); 870 if (isAllowTempAutoCreationOnSend()) { 871 map.putAll(tempQueueRegion.getDestinationMap()); 872 map.putAll(tempTopicRegion.getDestinationMap()); 873 } 874 long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep(); 875 long timeStamp = System.currentTimeMillis(); 876 for (Destination d : map.values()) { 877 d.markForGC(timeStamp); 878 if (d.canGC()) { 879 list.add(d); 880 if (maxPurgedDests > 0 && list.size() == maxPurgedDests) { 881 break; 882 } 883 } 884 } 885 886 if (!list.isEmpty()) { 887 ConnectionContext context = BrokerSupport.getConnectionContext(this); 888 context.setBroker(this); 889 890 for (Destination dest : list) { 891 Logger log = LOG; 892 if (dest instanceof BaseDestination) { 893 log = ((BaseDestination) dest).getLog(); 894 } 895 log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC()); 896 try { 897 getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0); 898 } catch (Exception e) { 899 LOG.error("Failed to remove inactive destination {}", dest, e); 900 } 901 } 902 } 903 } finally { 904 inactiveDestinationsPurgeLock.writeLock().unlock(); 905 } 906 } 907 908 public boolean isAllowTempAutoCreationOnSend() { 909 return allowTempAutoCreationOnSend; 910 } 911 912 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 913 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 914 } 915 916 @Override 917 public void reapplyInterceptor() { 918 queueRegion.reapplyInterceptor(); 919 topicRegion.reapplyInterceptor(); 920 tempQueueRegion.reapplyInterceptor(); 921 tempTopicRegion.reapplyInterceptor(); 922 } 923}