001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import org.apache.activemq.broker.Broker; 020 import org.apache.activemq.broker.BrokerService; 021 import org.apache.activemq.broker.Connection; 022 import org.apache.activemq.broker.ConnectionContext; 023 import org.apache.activemq.broker.ConsumerBrokerExchange; 024 import org.apache.activemq.broker.EmptyBroker; 025 import org.apache.activemq.broker.ProducerBrokerExchange; 026 import org.apache.activemq.broker.TransportConnector; 027 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 028 import org.apache.activemq.broker.region.policy.PolicyMap; 029 import org.apache.activemq.command.*; 030 import org.apache.activemq.state.ConnectionState; 031 import org.apache.activemq.store.kahadb.plist.PListStore; 032 import org.apache.activemq.thread.Scheduler; 033 import org.apache.activemq.thread.TaskRunnerFactory; 034 import org.apache.activemq.usage.SystemUsage; 035 import org.apache.activemq.util.BrokerSupport; 036 import org.apache.activemq.util.IdGenerator; 037 import org.apache.activemq.util.InetAddressUtil; 038 import org.apache.activemq.util.LongSequenceGenerator; 039 import org.apache.activemq.util.ServiceStopper; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 043 import javax.jms.InvalidClientIDException; 044 import javax.jms.JMSException; 045 import java.io.IOException; 046 import java.net.URI; 047 import java.util.ArrayList; 048 import java.util.Collections; 049 import java.util.HashMap; 050 import java.util.List; 051 import java.util.Map; 052 import java.util.Set; 053 import java.util.concurrent.ConcurrentHashMap; 054 import java.util.concurrent.CopyOnWriteArrayList; 055 import java.util.concurrent.ThreadPoolExecutor; 056 import java.util.concurrent.locks.ReentrantReadWriteLock; 057 058 /** 059 * Routes Broker operations to the correct messaging regions for processing. 060 * 061 * 062 */ 063 public class RegionBroker extends EmptyBroker { 064 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 065 private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class); 066 private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); 067 068 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 069 protected DestinationFactory destinationFactory; 070 protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); 071 072 private final Region queueRegion; 073 private final Region topicRegion; 074 private final Region tempQueueRegion; 075 private final Region tempTopicRegion; 076 protected final BrokerService brokerService; 077 private boolean started; 078 private boolean keepDurableSubsActive; 079 080 private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>(); 081 private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 082 private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>(); 083 084 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 085 private BrokerId brokerId; 086 private String brokerName; 087 private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); 088 private final DestinationInterceptor destinationInterceptor; 089 private ConnectionContext adminConnectionContext; 090 private final Scheduler scheduler; 091 private final ThreadPoolExecutor executor; 092 private boolean allowTempAutoCreationOnSend; 093 094 private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock(); 095 private final Runnable purgeInactiveDestinationsTask = new Runnable() { 096 public void run() { 097 purgeInactiveDestinations(); 098 } 099 }; 100 101 public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, 102 DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { 103 this.brokerService = brokerService; 104 this.executor=executor; 105 this.scheduler = scheduler; 106 if (destinationFactory == null) { 107 throw new IllegalArgumentException("null destinationFactory"); 108 } 109 this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId()); 110 this.destinationFactory = destinationFactory; 111 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 112 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 113 this.destinationInterceptor = destinationInterceptor; 114 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 115 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 116 } 117 118 @Override 119 public Map<ActiveMQDestination, Destination> getDestinationMap() { 120 Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap()); 121 answer.putAll(getTopicRegion().getDestinationMap()); 122 return answer; 123 } 124 125 @Override 126 public Set <Destination> getDestinations(ActiveMQDestination destination) { 127 switch (destination.getDestinationType()) { 128 case ActiveMQDestination.QUEUE_TYPE: 129 return queueRegion.getDestinations(destination); 130 case ActiveMQDestination.TOPIC_TYPE: 131 return topicRegion.getDestinations(destination); 132 case ActiveMQDestination.TEMP_QUEUE_TYPE: 133 return tempQueueRegion.getDestinations(destination); 134 case ActiveMQDestination.TEMP_TOPIC_TYPE: 135 return tempTopicRegion.getDestinations(destination); 136 default: 137 return Collections.emptySet(); 138 } 139 } 140 141 @Override 142 @SuppressWarnings("rawtypes") 143 public Broker getAdaptor(Class type) { 144 if (type.isInstance(this)) { 145 return this; 146 } 147 return null; 148 } 149 150 public Region getQueueRegion() { 151 return queueRegion; 152 } 153 154 public Region getTempQueueRegion() { 155 return tempQueueRegion; 156 } 157 158 public Region getTempTopicRegion() { 159 return tempTopicRegion; 160 } 161 162 public Region getTopicRegion() { 163 return topicRegion; 164 } 165 166 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 167 return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 168 } 169 170 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 171 return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 172 } 173 174 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 175 return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 176 } 177 178 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 179 return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 180 } 181 182 @Override 183 public void start() throws Exception { 184 ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); 185 started = true; 186 queueRegion.start(); 187 topicRegion.start(); 188 tempQueueRegion.start(); 189 tempTopicRegion.start(); 190 int period = this.brokerService.getSchedulePeriodForDestinationPurge(); 191 if (period > 0) { 192 this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period); 193 } 194 } 195 196 @Override 197 public void stop() throws Exception { 198 started = false; 199 this.scheduler.cancel(purgeInactiveDestinationsTask); 200 ServiceStopper ss = new ServiceStopper(); 201 doStop(ss); 202 ss.throwFirstException(); 203 // clear the state 204 clientIdSet.clear(); 205 connections.clear(); 206 destinations.clear(); 207 brokerInfos.clear(); 208 } 209 210 public PolicyMap getDestinationPolicy() { 211 return brokerService != null ? brokerService.getDestinationPolicy() : null; 212 } 213 214 @Override 215 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 216 String clientId = info.getClientId(); 217 if (clientId == null) { 218 throw new InvalidClientIDException("No clientID specified for connection request"); 219 } 220 synchronized (clientIdSet) { 221 ConnectionContext oldContext = clientIdSet.get(clientId); 222 if (oldContext != null) { 223 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " 224 + oldContext.getConnection().getRemoteAddress()); 225 } else { 226 clientIdSet.put(clientId, context); 227 } 228 } 229 230 connections.add(context.getConnection()); 231 } 232 233 @Override 234 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 235 String clientId = info.getClientId(); 236 if (clientId == null) { 237 throw new InvalidClientIDException("No clientID specified for connection disconnect request"); 238 } 239 synchronized (clientIdSet) { 240 ConnectionContext oldValue = clientIdSet.get(clientId); 241 // we may be removing the duplicate connection, not the first 242 // connection to be created 243 // so lets check that their connection IDs are the same 244 if (oldValue == context) { 245 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) { 246 clientIdSet.remove(clientId); 247 } 248 } 249 } 250 connections.remove(context.getConnection()); 251 } 252 253 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) { 254 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2)); 255 } 256 257 @Override 258 public Connection[] getClients() throws Exception { 259 ArrayList<Connection> l = new ArrayList<Connection>(connections); 260 Connection rc[] = new Connection[l.size()]; 261 l.toArray(rc); 262 return rc; 263 } 264 265 @Override 266 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception { 267 268 Destination answer; 269 270 answer = destinations.get(destination); 271 if (answer != null) { 272 return answer; 273 } 274 275 synchronized (destinations) { 276 answer = destinations.get(destination); 277 if (answer != null) { 278 return answer; 279 } 280 281 switch (destination.getDestinationType()) { 282 case ActiveMQDestination.QUEUE_TYPE: 283 answer = queueRegion.addDestination(context, destination,true); 284 break; 285 case ActiveMQDestination.TOPIC_TYPE: 286 answer = topicRegion.addDestination(context, destination,true); 287 break; 288 case ActiveMQDestination.TEMP_QUEUE_TYPE: 289 answer = tempQueueRegion.addDestination(context, destination, createIfTemp); 290 break; 291 case ActiveMQDestination.TEMP_TOPIC_TYPE: 292 answer = tempTopicRegion.addDestination(context, destination, createIfTemp); 293 break; 294 default: 295 throw createUnknownDestinationTypeException(destination); 296 } 297 298 destinations.put(destination, answer); 299 return answer; 300 } 301 302 } 303 304 @Override 305 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 306 307 if (destinations.containsKey(destination)) { 308 switch (destination.getDestinationType()) { 309 case ActiveMQDestination.QUEUE_TYPE: 310 queueRegion.removeDestination(context, destination, timeout); 311 break; 312 case ActiveMQDestination.TOPIC_TYPE: 313 topicRegion.removeDestination(context, destination, timeout); 314 break; 315 case ActiveMQDestination.TEMP_QUEUE_TYPE: 316 tempQueueRegion.removeDestination(context, destination, timeout); 317 break; 318 case ActiveMQDestination.TEMP_TOPIC_TYPE: 319 tempTopicRegion.removeDestination(context, destination, timeout); 320 break; 321 default: 322 throw createUnknownDestinationTypeException(destination); 323 } 324 destinations.remove(destination); 325 326 } 327 328 } 329 330 @Override 331 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 332 addDestination(context, info.getDestination(),true); 333 334 } 335 336 @Override 337 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 338 removeDestination(context, info.getDestination(), info.getTimeout()); 339 340 } 341 342 @Override 343 public ActiveMQDestination[] getDestinations() throws Exception { 344 ArrayList<ActiveMQDestination> l; 345 346 l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet()); 347 348 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; 349 l.toArray(rc); 350 return rc; 351 } 352 353 @Override 354 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 355 ActiveMQDestination destination = info.getDestination(); 356 if (destination != null) { 357 inactiveDestinationsPurgeLock.readLock().lock(); 358 try { 359 // This seems to cause the destination to be added but without 360 // advisories firing... 361 context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend()); 362 switch (destination.getDestinationType()) { 363 case ActiveMQDestination.QUEUE_TYPE: 364 queueRegion.addProducer(context, info); 365 break; 366 case ActiveMQDestination.TOPIC_TYPE: 367 topicRegion.addProducer(context, info); 368 break; 369 case ActiveMQDestination.TEMP_QUEUE_TYPE: 370 tempQueueRegion.addProducer(context, info); 371 break; 372 case ActiveMQDestination.TEMP_TOPIC_TYPE: 373 tempTopicRegion.addProducer(context, info); 374 break; 375 } 376 } finally { 377 inactiveDestinationsPurgeLock.readLock().unlock(); 378 } 379 } 380 } 381 382 @Override 383 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 384 ActiveMQDestination destination = info.getDestination(); 385 if (destination != null) { 386 inactiveDestinationsPurgeLock.readLock().lock(); 387 try { 388 switch (destination.getDestinationType()) { 389 case ActiveMQDestination.QUEUE_TYPE: 390 queueRegion.removeProducer(context, info); 391 break; 392 case ActiveMQDestination.TOPIC_TYPE: 393 topicRegion.removeProducer(context, info); 394 break; 395 case ActiveMQDestination.TEMP_QUEUE_TYPE: 396 tempQueueRegion.removeProducer(context, info); 397 break; 398 case ActiveMQDestination.TEMP_TOPIC_TYPE: 399 tempTopicRegion.removeProducer(context, info); 400 break; 401 } 402 } finally { 403 inactiveDestinationsPurgeLock.readLock().unlock(); 404 } 405 } 406 } 407 408 @Override 409 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 410 ActiveMQDestination destination = info.getDestination(); 411 if (destinationInterceptor != null) { 412 destinationInterceptor.create(this, context, destination); 413 } 414 inactiveDestinationsPurgeLock.readLock().lock(); 415 try { 416 switch (destination.getDestinationType()) { 417 case ActiveMQDestination.QUEUE_TYPE: 418 return queueRegion.addConsumer(context, info); 419 420 case ActiveMQDestination.TOPIC_TYPE: 421 return topicRegion.addConsumer(context, info); 422 423 case ActiveMQDestination.TEMP_QUEUE_TYPE: 424 return tempQueueRegion.addConsumer(context, info); 425 426 case ActiveMQDestination.TEMP_TOPIC_TYPE: 427 return tempTopicRegion.addConsumer(context, info); 428 429 default: 430 throw createUnknownDestinationTypeException(destination); 431 } 432 } finally { 433 inactiveDestinationsPurgeLock.readLock().unlock(); 434 } 435 } 436 437 @Override 438 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 439 ActiveMQDestination destination = info.getDestination(); 440 inactiveDestinationsPurgeLock.readLock().lock(); 441 try { 442 switch (destination.getDestinationType()) { 443 444 case ActiveMQDestination.QUEUE_TYPE: 445 queueRegion.removeConsumer(context, info); 446 break; 447 case ActiveMQDestination.TOPIC_TYPE: 448 topicRegion.removeConsumer(context, info); 449 break; 450 case ActiveMQDestination.TEMP_QUEUE_TYPE: 451 tempQueueRegion.removeConsumer(context, info); 452 break; 453 case ActiveMQDestination.TEMP_TOPIC_TYPE: 454 tempTopicRegion.removeConsumer(context, info); 455 break; 456 default: 457 throw createUnknownDestinationTypeException(destination); 458 } 459 } finally { 460 inactiveDestinationsPurgeLock.readLock().unlock(); 461 } 462 } 463 464 @Override 465 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 466 inactiveDestinationsPurgeLock.readLock().lock(); 467 try { 468 topicRegion.removeSubscription(context, info); 469 } finally { 470 inactiveDestinationsPurgeLock.readLock().unlock(); 471 } 472 } 473 474 @Override 475 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 476 message.setBrokerInTime(System.currentTimeMillis()); 477 if (producerExchange.isMutable() || producerExchange.getRegion() == null 478 || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) { 479 ActiveMQDestination destination = message.getDestination(); 480 // ensure the destination is registered with the RegionBroker 481 producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend()); 482 Region region; 483 switch (destination.getDestinationType()) { 484 case ActiveMQDestination.QUEUE_TYPE: 485 region = queueRegion; 486 break; 487 case ActiveMQDestination.TOPIC_TYPE: 488 region = topicRegion; 489 break; 490 case ActiveMQDestination.TEMP_QUEUE_TYPE: 491 region = tempQueueRegion; 492 break; 493 case ActiveMQDestination.TEMP_TOPIC_TYPE: 494 region = tempTopicRegion; 495 break; 496 default: 497 throw createUnknownDestinationTypeException(destination); 498 } 499 producerExchange.setRegion(region); 500 producerExchange.setRegionDestination(null); 501 } 502 503 producerExchange.getRegion().send(producerExchange, message); 504 } 505 506 @Override 507 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 508 if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) { 509 ActiveMQDestination destination = ack.getDestination(); 510 Region region; 511 switch (destination.getDestinationType()) { 512 case ActiveMQDestination.QUEUE_TYPE: 513 region = queueRegion; 514 break; 515 case ActiveMQDestination.TOPIC_TYPE: 516 region = topicRegion; 517 break; 518 case ActiveMQDestination.TEMP_QUEUE_TYPE: 519 region = tempQueueRegion; 520 break; 521 case ActiveMQDestination.TEMP_TOPIC_TYPE: 522 region = tempTopicRegion; 523 break; 524 default: 525 throw createUnknownDestinationTypeException(destination); 526 } 527 consumerExchange.setRegion(region); 528 } 529 consumerExchange.getRegion().acknowledge(consumerExchange, ack); 530 } 531 532 @Override 533 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 534 ActiveMQDestination destination = pull.getDestination(); 535 switch (destination.getDestinationType()) { 536 case ActiveMQDestination.QUEUE_TYPE: 537 return queueRegion.messagePull(context, pull); 538 539 case ActiveMQDestination.TOPIC_TYPE: 540 return topicRegion.messagePull(context, pull); 541 542 case ActiveMQDestination.TEMP_QUEUE_TYPE: 543 return tempQueueRegion.messagePull(context, pull); 544 545 case ActiveMQDestination.TEMP_TOPIC_TYPE: 546 return tempTopicRegion.messagePull(context, pull); 547 default: 548 throw createUnknownDestinationTypeException(destination); 549 } 550 } 551 552 @Override 553 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 554 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 555 } 556 557 @Override 558 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 559 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 560 } 561 562 @Override 563 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 564 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 565 } 566 567 @Override 568 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 569 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 570 } 571 572 @Override 573 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 574 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 575 } 576 577 @Override 578 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 579 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 580 } 581 582 @Override 583 public void gc() { 584 queueRegion.gc(); 585 topicRegion.gc(); 586 } 587 588 @Override 589 public BrokerId getBrokerId() { 590 if (brokerId == null) { 591 brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); 592 } 593 return brokerId; 594 } 595 596 public void setBrokerId(BrokerId brokerId) { 597 this.brokerId = brokerId; 598 } 599 600 @Override 601 public String getBrokerName() { 602 if (brokerName == null) { 603 try { 604 brokerName = InetAddressUtil.getLocalHostName().toLowerCase(); 605 } catch (Exception e) { 606 brokerName = "localhost"; 607 } 608 } 609 return brokerName; 610 } 611 612 public void setBrokerName(String brokerName) { 613 this.brokerName = brokerName; 614 } 615 616 public DestinationStatistics getDestinationStatistics() { 617 return destinationStatistics; 618 } 619 620 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) { 621 return new JMSException("Unknown destination type: " + destination.getDestinationType()); 622 } 623 624 @Override 625 public synchronized void addBroker(Connection connection, BrokerInfo info) { 626 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 627 if (existing == null) { 628 existing = info.copy(); 629 existing.setPeerBrokerInfos(null); 630 brokerInfos.put(info.getBrokerId(), existing); 631 } 632 existing.incrementRefCount(); 633 if (LOG.isDebugEnabled()) { 634 LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); 635 } 636 addBrokerInClusterUpdate(info); 637 } 638 639 @Override 640 public synchronized void removeBroker(Connection connection, BrokerInfo info) { 641 if (info != null) { 642 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 643 if (existing != null && existing.decrementRefCount() == 0) { 644 brokerInfos.remove(info.getBrokerId()); 645 } 646 if (LOG.isDebugEnabled()) { 647 LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); 648 } 649 removeBrokerInClusterUpdate(info); 650 } 651 } 652 653 @Override 654 public synchronized BrokerInfo[] getPeerBrokerInfos() { 655 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; 656 result = brokerInfos.values().toArray(result); 657 return result; 658 } 659 660 @Override 661 public void preProcessDispatch(MessageDispatch messageDispatch) { 662 Message message = messageDispatch.getMessage(); 663 if (message != null) { 664 long endTime = System.currentTimeMillis(); 665 message.setBrokerOutTime(endTime); 666 if (getBrokerService().isEnableStatistics()) { 667 long totalTime = endTime - message.getBrokerInTime(); 668 message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime); 669 } 670 } 671 } 672 673 @Override 674 public void postProcessDispatch(MessageDispatch messageDispatch) { 675 } 676 677 @Override 678 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 679 ActiveMQDestination destination = messageDispatchNotification.getDestination(); 680 switch (destination.getDestinationType()) { 681 case ActiveMQDestination.QUEUE_TYPE: 682 queueRegion.processDispatchNotification(messageDispatchNotification); 683 break; 684 case ActiveMQDestination.TOPIC_TYPE: 685 topicRegion.processDispatchNotification(messageDispatchNotification); 686 break; 687 case ActiveMQDestination.TEMP_QUEUE_TYPE: 688 tempQueueRegion.processDispatchNotification(messageDispatchNotification); 689 break; 690 case ActiveMQDestination.TEMP_TOPIC_TYPE: 691 tempTopicRegion.processDispatchNotification(messageDispatchNotification); 692 break; 693 default: 694 throw createUnknownDestinationTypeException(destination); 695 } 696 } 697 698 public boolean isSlaveBroker() { 699 return brokerService.isSlave(); 700 } 701 702 @Override 703 public boolean isStopped() { 704 return !started; 705 } 706 707 @Override 708 public Set<ActiveMQDestination> getDurableDestinations() { 709 return destinationFactory.getDestinations(); 710 } 711 712 protected void doStop(ServiceStopper ss) { 713 ss.stop(queueRegion); 714 ss.stop(topicRegion); 715 ss.stop(tempQueueRegion); 716 ss.stop(tempTopicRegion); 717 } 718 719 public boolean isKeepDurableSubsActive() { 720 return keepDurableSubsActive; 721 } 722 723 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 724 this.keepDurableSubsActive = keepDurableSubsActive; 725 } 726 727 public DestinationInterceptor getDestinationInterceptor() { 728 return destinationInterceptor; 729 } 730 731 @Override 732 public ConnectionContext getAdminConnectionContext() { 733 return adminConnectionContext; 734 } 735 736 @Override 737 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 738 this.adminConnectionContext = adminConnectionContext; 739 } 740 741 public Map<ConnectionId, ConnectionState> getConnectionStates() { 742 return connectionStates; 743 } 744 745 @Override 746 public PListStore getTempDataStore() { 747 return brokerService.getTempDataStore(); 748 } 749 750 @Override 751 public URI getVmConnectorURI() { 752 return brokerService.getVmConnectorURI(); 753 } 754 755 @Override 756 public void brokerServiceStarted() { 757 } 758 759 @Override 760 public BrokerService getBrokerService() { 761 return brokerService; 762 } 763 764 @Override 765 public boolean isExpired(MessageReference messageReference) { 766 boolean expired = false; 767 if (messageReference.isExpired()) { 768 try { 769 // prevent duplicate expiry processing 770 Message message = messageReference.getMessage(); 771 synchronized (message) { 772 expired = stampAsExpired(message); 773 } 774 } catch (IOException e) { 775 LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e); 776 } 777 } 778 return expired; 779 } 780 781 private boolean stampAsExpired(Message message) throws IOException { 782 boolean stamped=false; 783 if (message.getProperty(ORIGINAL_EXPIRATION) == null) { 784 long expiration=message.getExpiration(); 785 message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration)); 786 stamped = true; 787 } 788 return stamped; 789 } 790 791 792 @Override 793 public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) { 794 if (LOG.isDebugEnabled()) { 795 LOG.debug("Message expired " + node); 796 } 797 getRoot().sendToDeadLetterQueue(context, node, subscription); 798 } 799 800 @Override 801 public void sendToDeadLetterQueue(ConnectionContext context, 802 MessageReference node, Subscription subscription){ 803 try{ 804 if(node!=null){ 805 Message message=node.getMessage(); 806 if(message!=null && node.getRegionDestination()!=null){ 807 DeadLetterStrategy deadLetterStrategy=node 808 .getRegionDestination().getDeadLetterStrategy(); 809 if(deadLetterStrategy!=null){ 810 if(deadLetterStrategy.isSendToDeadLetterQueue(message)){ 811 // message may be inflight to other subscriptions so do not modify 812 message = message.copy(); 813 stampAsExpired(message); 814 message.setExpiration(0); 815 if(!message.isPersistent()){ 816 message.setPersistent(true); 817 message.setProperty("originalDeliveryMode", 818 "NON_PERSISTENT"); 819 } 820 // The original destination and transaction id do 821 // not get filled when the message is first sent, 822 // it is only populated if the message is routed to 823 // another destination like the DLQ 824 ActiveMQDestination deadLetterDestination=deadLetterStrategy 825 .getDeadLetterQueueFor(message, subscription); 826 if (context.getBroker()==null) { 827 context.setBroker(getRoot()); 828 } 829 BrokerSupport.resendNoCopy(context,message, 830 deadLetterDestination); 831 } 832 } else { 833 if (LOG.isDebugEnabled()) { 834 LOG.debug("Dead Letter message with no DLQ strategy in place, message id: " 835 + message.getMessageId() + ", destination: " + message.getDestination()); 836 } 837 } 838 } 839 } 840 }catch(Exception e){ 841 LOG.warn("Caught an exception sending to DLQ: "+node,e); 842 } 843 } 844 845 @Override 846 public Broker getRoot() { 847 try { 848 return getBrokerService().getBroker(); 849 } catch (Exception e) { 850 LOG.error("Trying to get Root Broker " + e); 851 throw new RuntimeException("The broker from the BrokerService should not throw an exception"); 852 } 853 } 854 855 /** 856 * @return the broker sequence id 857 */ 858 @Override 859 public long getBrokerSequenceId() { 860 synchronized(sequenceGenerator) { 861 return sequenceGenerator.getNextSequenceId(); 862 } 863 } 864 865 866 @Override 867 public Scheduler getScheduler() { 868 return this.scheduler; 869 } 870 871 public ThreadPoolExecutor getExecutor() { 872 return this.executor; 873 } 874 875 @Override 876 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 877 ActiveMQDestination destination = control.getDestination(); 878 switch (destination.getDestinationType()) { 879 case ActiveMQDestination.QUEUE_TYPE: 880 queueRegion.processConsumerControl(consumerExchange, control); 881 break; 882 883 case ActiveMQDestination.TOPIC_TYPE: 884 topicRegion.processConsumerControl(consumerExchange, control); 885 break; 886 887 case ActiveMQDestination.TEMP_QUEUE_TYPE: 888 tempQueueRegion.processConsumerControl(consumerExchange, control); 889 break; 890 891 case ActiveMQDestination.TEMP_TOPIC_TYPE: 892 tempTopicRegion.processConsumerControl(consumerExchange, control); 893 break; 894 895 default: 896 LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control); 897 } 898 } 899 900 protected void addBrokerInClusterUpdate(BrokerInfo info) { 901 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 902 for (TransportConnector connector : connectors) { 903 if (connector.isUpdateClusterClients()) { 904 connector.addPeerBroker(info); 905 connector.updateClientClusterInfo(); 906 } 907 } 908 } 909 910 protected void removeBrokerInClusterUpdate(BrokerInfo info) { 911 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 912 for (TransportConnector connector : connectors) { 913 if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) { 914 connector.removePeerBroker(info); 915 connector.updateClientClusterInfo(); 916 } 917 } 918 } 919 920 protected void purgeInactiveDestinations() { 921 inactiveDestinationsPurgeLock.writeLock().lock(); 922 try { 923 List<Destination> list = new ArrayList<Destination>(); 924 Map<ActiveMQDestination, Destination> map = getDestinationMap(); 925 if (isAllowTempAutoCreationOnSend()) { 926 map.putAll(tempQueueRegion.getDestinationMap()); 927 map.putAll(tempTopicRegion.getDestinationMap()); 928 } 929 long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep(); 930 long timeStamp = System.currentTimeMillis(); 931 for (Destination d : map.values()) { 932 d.markForGC(timeStamp); 933 if (d.canGC()) { 934 list.add(d); 935 if (maxPurgedDests > 0 && list.size() == maxPurgedDests) { 936 break; 937 } 938 } 939 } 940 941 if (!list.isEmpty()) { 942 ConnectionContext context = BrokerSupport.getConnectionContext(this); 943 context.setBroker(this); 944 945 for (Destination dest : list) { 946 Logger log = LOG; 947 if (dest instanceof BaseDestination) { 948 log = ((BaseDestination) dest).getLog(); 949 } 950 log.info(dest.getName() + " Inactive for longer than " + 951 dest.getInactiveTimoutBeforeGC() + " ms - removing ..."); 952 try { 953 getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0); 954 } catch (Exception e) { 955 LOG.error("Failed to remove inactive destination " + dest, e); 956 } 957 } 958 } 959 } finally { 960 inactiveDestinationsPurgeLock.writeLock().unlock(); 961 } 962 } 963 964 public boolean isAllowTempAutoCreationOnSend() { 965 return allowTempAutoCreationOnSend; 966 } 967 968 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 969 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 970 } 971 }