001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.Iterator; 024import java.util.LinkedHashMap; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.broker.Broker; 032import org.apache.activemq.broker.BrokerFilter; 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.ConnectionContext; 035import org.apache.activemq.broker.ProducerBrokerExchange; 036import org.apache.activemq.broker.TransportConnector; 037import org.apache.activemq.broker.region.BaseDestination; 038import org.apache.activemq.broker.region.Destination; 039import org.apache.activemq.broker.region.DurableTopicSubscription; 040import org.apache.activemq.broker.region.MessageReference; 041import org.apache.activemq.broker.region.RegionBroker; 042import org.apache.activemq.broker.region.Subscription; 043import org.apache.activemq.broker.region.TopicRegion; 044import org.apache.activemq.broker.region.TopicSubscription; 045import org.apache.activemq.broker.region.virtual.VirtualDestination; 046import org.apache.activemq.broker.region.virtual.VirtualTopic; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQMessage; 049import org.apache.activemq.command.ActiveMQTopic; 050import org.apache.activemq.command.BrokerInfo; 051import org.apache.activemq.command.Command; 052import org.apache.activemq.command.ConnectionId; 053import org.apache.activemq.command.ConnectionInfo; 054import org.apache.activemq.command.ConsumerId; 055import org.apache.activemq.command.ConsumerInfo; 056import org.apache.activemq.command.DestinationInfo; 057import org.apache.activemq.command.Message; 058import org.apache.activemq.command.MessageId; 059import org.apache.activemq.command.ProducerId; 060import org.apache.activemq.command.ProducerInfo; 061import org.apache.activemq.command.RemoveSubscriptionInfo; 062import org.apache.activemq.command.SessionId; 063import org.apache.activemq.filter.DestinationPath; 064import org.apache.activemq.security.SecurityContext; 065import org.apache.activemq.state.ProducerState; 066import org.apache.activemq.usage.Usage; 067import org.apache.activemq.util.IdGenerator; 068import org.apache.activemq.util.LongSequenceGenerator; 069import org.apache.activemq.util.SubscriptionKey; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073/** 074 * This broker filter handles tracking the state of the broker for purposes of 075 * publishing advisory messages to advisory consumers. 076 */ 077public class AdvisoryBroker extends BrokerFilter { 078 079 private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class); 080 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 081 082 protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); 083 084 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); 085 protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>(); 086 087 /** 088 * This is a set to track all of the virtual destinations that have been added to the broker so 089 * they can be easily referenced later. 090 */ 091 protected final Set<VirtualDestination> virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap<VirtualDestination, Boolean>()); 092 /** 093 * This is a map to track all consumers that exist on the virtual destination so that we can fire 094 * an advisory later when they go away to remove the demand. 095 */ 096 protected final ConcurrentMap<ConsumerInfo, VirtualDestination> virtualDestinationConsumers = new ConcurrentHashMap<>(); 097 /** 098 * This is a map to track unique demand for the existence of a virtual destination so we make sure 099 * we don't send duplicate advisories. 100 */ 101 protected final ConcurrentMap<VirtualConsumerPair, ConsumerInfo> brokerConsumerDests = new ConcurrentHashMap<>(); 102 103 protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); 104 protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); 105 protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>(); 106 protected final ProducerId advisoryProducerId = new ProducerId(); 107 108 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 109 110 private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher(); 111 112 public AdvisoryBroker(Broker next) { 113 super(next); 114 advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 115 } 116 117 @Override 118 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 119 super.addConnection(context, info); 120 121 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 122 // do not distribute passwords in advisory messages. usernames okay 123 ConnectionInfo copy = info.copy(); 124 copy.setPassword(""); 125 fireAdvisory(context, topic, copy); 126 connections.put(copy.getConnectionId(), copy); 127 } 128 129 @Override 130 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 131 Subscription answer = super.addConsumer(context, info); 132 133 // Don't advise advisory topics. 134 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 135 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); 136 consumersLock.writeLock().lock(); 137 try { 138 consumers.put(info.getConsumerId(), info); 139 140 //check if this is a consumer on a destination that matches a virtual destination 141 if (getBrokerService().isUseVirtualDestSubs()) { 142 for (VirtualDestination virtualDestination : virtualDestinations) { 143 if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { 144 fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); 145 } 146 } 147 } 148 } finally { 149 consumersLock.writeLock().unlock(); 150 } 151 fireConsumerAdvisory(context, info.getDestination(), topic, info); 152 } else { 153 // We need to replay all the previously collected state objects 154 // for this newly added consumer. 155 if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) { 156 // Replay the connections. 157 for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext(); ) { 158 ConnectionInfo value = iter.next(); 159 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 160 fireAdvisory(context, topic, value, info.getConsumerId()); 161 } 162 } 163 164 // We check here whether the Destination is Temporary Destination specific or not since we 165 // can avoid sending advisory messages to the consumer if it only wants Temporary Destination 166 // notifications. If its not just temporary destination related destinations then we have 167 // to send them all, a composite destination could want both. 168 if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) { 169 // Replay the temporary destinations. 170 for (DestinationInfo destination : destinations.values()) { 171 if (destination.getDestination().isTemporary()) { 172 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); 173 fireAdvisory(context, topic, destination, info.getConsumerId()); 174 } 175 } 176 } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) { 177 // Replay all the destinations. 178 for (DestinationInfo destination : destinations.values()) { 179 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); 180 fireAdvisory(context, topic, destination, info.getConsumerId()); 181 } 182 } 183 184 // Replay the producers. 185 if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) { 186 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext(); ) { 187 ProducerInfo value = iter.next(); 188 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); 189 fireProducerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); 190 } 191 } 192 193 // Replay the consumers. 194 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { 195 consumersLock.readLock().lock(); 196 try { 197 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) { 198 ConsumerInfo value = iter.next(); 199 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); 200 fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); 201 } 202 } finally { 203 consumersLock.readLock().unlock(); 204 } 205 } 206 207 // Replay the virtual destination consumers. 208 if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { 209 for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) { 210 ConsumerInfo key = iter.next(); 211 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination()); 212 fireConsumerAdvisory(context, key.getDestination(), topic, key); 213 } 214 } 215 216 // Replay network bridges 217 if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) { 218 for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) { 219 BrokerInfo key = iter.next(); 220 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 221 fireAdvisory(context, topic, key, null, networkBridges.get(key)); 222 } 223 } 224 } 225 return answer; 226 } 227 228 @Override 229 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 230 super.addProducer(context, info); 231 232 // Don't advise advisory topics. 233 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 234 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); 235 fireProducerAdvisory(context, info.getDestination(), topic, info); 236 producers.put(info.getProducerId(), info); 237 } 238 } 239 240 @Override 241 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { 242 Destination answer = super.addDestination(context, destination, create); 243 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 244 //for queues, create demand if isUseVirtualDestSubsOnCreation is true 245 if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) { 246 //check if this new destination matches a virtual destination that exists 247 for (VirtualDestination virtualDestination : virtualDestinations) { 248 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 249 fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); 250 } 251 } 252 } 253 254 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); 255 DestinationInfo previous = destinations.putIfAbsent(destination, info); 256 if (previous == null) { 257 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 258 fireAdvisory(context, topic, info); 259 } 260 } 261 return answer; 262 } 263 264 @Override 265 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 266 ActiveMQDestination destination = info.getDestination(); 267 next.addDestinationInfo(context, info); 268 269 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 270 DestinationInfo previous = destinations.putIfAbsent(destination, info); 271 if (previous == null) { 272 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 273 fireAdvisory(context, topic, info); 274 } 275 } 276 } 277 278 @Override 279 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 280 super.removeDestination(context, destination, timeout); 281 DestinationInfo info = destinations.remove(destination); 282 if (info != null) { 283 284 //on destination removal, remove all demand if using virtual dest subs 285 if (getBrokerService().isUseVirtualDestSubs()) { 286 for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) { 287 //find all consumers for this virtual destination 288 VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo); 289 290 //find a consumer that matches this virtualDest and destination 291 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 292 //in case of multiple matches 293 VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination); 294 ConsumerInfo i = brokerConsumerDests.get(key); 295 if (consumerInfo.equals(i) && brokerConsumerDests.remove(key) != null) { 296 LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", key, i); 297 fireVirtualDestinationRemoveAdvisory(context, consumerInfo); 298 break; 299 } 300 } 301 } 302 } 303 304 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate 305 info = info.copy(); 306 info.setDestination(destination); 307 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 308 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 309 fireAdvisory(context, topic, info); 310 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination); 311 for (ActiveMQTopic advisoryDestination : advisoryDestinations) { 312 try { 313 next.removeDestination(context, advisoryDestination, -1); 314 } catch (Exception expectedIfDestinationDidNotExistYet) { 315 } 316 } 317 } 318 } 319 320 @Override 321 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { 322 super.removeDestinationInfo(context, destInfo); 323 DestinationInfo info = destinations.remove(destInfo.getDestination()); 324 if (info != null) { 325 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate 326 info = info.copy(); 327 info.setDestination(destInfo.getDestination()); 328 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 329 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination()); 330 fireAdvisory(context, topic, info); 331 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination()); 332 for (ActiveMQTopic advisoryDestination : advisoryDestinations) { 333 try { 334 next.removeDestination(context, advisoryDestination, -1); 335 } catch (Exception expectedIfDestinationDidNotExistYet) { 336 } 337 } 338 } 339 } 340 341 @Override 342 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 343 super.removeConnection(context, info, error); 344 345 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 346 fireAdvisory(context, topic, info.createRemoveCommand()); 347 connections.remove(info.getConnectionId()); 348 } 349 350 @Override 351 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 352 super.removeConsumer(context, info); 353 354 // Don't advise advisory topics. 355 ActiveMQDestination dest = info.getDestination(); 356 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 357 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 358 consumersLock.writeLock().lock(); 359 try { 360 consumers.remove(info.getConsumerId()); 361 362 //remove the demand for this consumer if it matches a virtual destination 363 if(getBrokerService().isUseVirtualDestSubs()) { 364 fireVirtualDestinationRemoveAdvisory(context, info); 365 } 366 } finally { 367 consumersLock.writeLock().unlock(); 368 } 369 if (!dest.isTemporary() || destinations.containsKey(dest)) { 370 fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); 371 } 372 } 373 } 374 375 @Override 376 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 377 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 378 379 RegionBroker regionBroker = null; 380 if (next instanceof RegionBroker) { 381 regionBroker = (RegionBroker) next; 382 } else { 383 BrokerService service = next.getBrokerService(); 384 regionBroker = (RegionBroker) service.getRegionBroker(); 385 } 386 387 if (regionBroker == null) { 388 LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call"); 389 throw new IllegalStateException("No RegionBroker found."); 390 } 391 392 DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key); 393 394 super.removeSubscription(context, info); 395 396 if (sub == null) { 397 LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub"); 398 return; 399 } 400 401 ActiveMQDestination dest = sub.getConsumerInfo().getDestination(); 402 403 // Don't advise advisory topics. 404 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 405 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 406 fireConsumerAdvisory(context, dest, topic, info); 407 } 408 409 } 410 411 @Override 412 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 413 super.removeProducer(context, info); 414 415 // Don't advise advisory topics. 416 ActiveMQDestination dest = info.getDestination(); 417 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { 418 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); 419 producers.remove(info.getProducerId()); 420 if (!dest.isTemporary() || destinations.containsKey(dest)) { 421 fireProducerAdvisory(context, dest, topic, info.createRemoveCommand()); 422 } 423 } 424 } 425 426 @Override 427 public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) { 428 super.messageExpired(context, messageReference, subscription); 429 try { 430 if (!messageReference.isAdvisory()) { 431 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 432 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination()); 433 Message payload = messageReference.getMessage().copy(); 434 if (!baseDestination.isIncludeBodyForAdvisory()) { 435 payload.clearBody(); 436 } 437 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 438 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 439 fireAdvisory(context, topic, payload, null, advisoryMessage); 440 } 441 } catch (Exception e) { 442 handleFireFailure("expired", e); 443 } 444 } 445 446 @Override 447 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 448 super.messageConsumed(context, messageReference); 449 try { 450 if (!messageReference.isAdvisory()) { 451 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 452 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(baseDestination.getActiveMQDestination()); 453 Message payload = messageReference.getMessage().copy(); 454 if (!baseDestination.isIncludeBodyForAdvisory()) { 455 payload.clearBody(); 456 } 457 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 458 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 459 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 460 fireAdvisory(context, topic, payload, null, advisoryMessage); 461 } 462 } catch (Exception e) { 463 handleFireFailure("consumed", e); 464 } 465 } 466 467 @Override 468 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 469 super.messageDelivered(context, messageReference); 470 try { 471 if (!messageReference.isAdvisory()) { 472 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 473 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(baseDestination.getActiveMQDestination()); 474 Message payload = messageReference.getMessage().copy(); 475 if (!baseDestination.isIncludeBodyForAdvisory()) { 476 payload.clearBody(); 477 } 478 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 479 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 480 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 481 fireAdvisory(context, topic, payload, null, advisoryMessage); 482 } 483 } catch (Exception e) { 484 handleFireFailure("delivered", e); 485 } 486 } 487 488 @Override 489 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 490 super.messageDiscarded(context, sub, messageReference); 491 try { 492 if (!messageReference.isAdvisory()) { 493 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 494 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(baseDestination.getActiveMQDestination()); 495 Message payload = messageReference.getMessage().copy(); 496 if (!baseDestination.isIncludeBodyForAdvisory()) { 497 payload.clearBody(); 498 } 499 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 500 if (sub instanceof TopicSubscription) { 501 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded()); 502 } 503 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 504 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString()); 505 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 506 507 fireAdvisory(context, topic, payload, null, advisoryMessage); 508 } 509 } catch (Exception e) { 510 handleFireFailure("discarded", e); 511 } 512 } 513 514 @Override 515 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { 516 super.slowConsumer(context, destination, subs); 517 try { 518 if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { 519 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination()); 520 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 521 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString()); 522 fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage); 523 } 524 } catch (Exception e) { 525 handleFireFailure("slow consumer", e); 526 } 527 } 528 529 @Override 530 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination) { 531 super.fastProducer(context, producerInfo, destination); 532 try { 533 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 534 ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination); 535 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 536 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString()); 537 fireAdvisory(context, topic, producerInfo, null, advisoryMessage); 538 } 539 } catch (Exception e) { 540 handleFireFailure("fast producer", e); 541 } 542 } 543 544 private final IdGenerator connectionIdGenerator = new IdGenerator("advisory"); 545 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 546 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 547 548 @Override 549 public void virtualDestinationAdded(ConnectionContext context, 550 VirtualDestination virtualDestination) { 551 super.virtualDestinationAdded(context, virtualDestination); 552 553 if (virtualDestinations.add(virtualDestination)) { 554 LOG.debug("Virtual destination added: {}", virtualDestination); 555 try { 556 // Don't advise advisory topics. 557 if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { 558 559 //create demand for consumers on virtual destinations 560 consumersLock.readLock().lock(); 561 try { 562 //loop through existing destinations to see if any match this newly 563 //created virtual destination 564 if (getBrokerService().isUseVirtualDestSubsOnCreation()) { 565 //for matches that are a queue, fire an advisory for demand 566 for (ActiveMQDestination destination : destinations.keySet()) { 567 if(destination.isQueue()) { 568 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 569 fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); 570 } 571 } 572 } 573 } 574 575 //loop through existing consumers to see if any of them are consuming on a destination 576 //that matches the new virtual destination 577 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) { 578 ConsumerInfo info = iter.next(); 579 if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { 580 fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); 581 } 582 } 583 } finally { 584 consumersLock.readLock().unlock(); 585 } 586 } 587 } catch (Exception e) { 588 handleFireFailure("virtualDestinationAdded", e); 589 } 590 } 591 } 592 593 private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest, 594 VirtualDestination virtualDestination) throws Exception { 595 //if no consumer info, we need to create one - this is the case when an advisory is fired 596 //because of the existence of a destination matching a virtual destination 597 if (info == null) { 598 599 //store the virtual destination and the activeMQDestination as a pair so that we can keep track 600 //of all matching forwarded destinations that caused demand 601 VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest); 602 if (brokerConsumerDests.get(pair) == null) { 603 ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); 604 SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId()); 605 ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 606 info = new ConsumerInfo(consumerId); 607 608 if(brokerConsumerDests.putIfAbsent(pair, info) == null) { 609 LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info); 610 setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest); 611 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); 612 613 if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { 614 LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination); 615 fireConsumerAdvisory(context, info.getDestination(), topic, info); 616 } 617 } 618 } 619 //this is the case of a real consumer coming online 620 } else { 621 info = info.copy(); 622 setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest); 623 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); 624 625 if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { 626 LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination); 627 fireConsumerAdvisory(context, info.getDestination(), topic, info); 628 } 629 } 630 } 631 632 /** 633 * Sets the virtual destination on the ConsumerInfo 634 * If this is a VirtualTopic then the destination used will be the actual topic subscribed 635 * to in order to track demand properly 636 * 637 * @param info 638 * @param virtualDestination 639 * @param activeMQDest 640 */ 641 private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) { 642 info.setDestination(virtualDestination.getVirtualDestination()); 643 if (virtualDestination instanceof VirtualTopic) { 644 VirtualTopic vt = (VirtualTopic) virtualDestination; 645 String prefix = vt.getPrefix() != null ? vt.getPrefix() : ""; 646 String postfix = vt.getPostfix() != null ? vt.getPostfix() : ""; 647 if (prefix.endsWith(".")) { 648 prefix = prefix.substring(0, prefix.length() - 1); 649 } 650 if (postfix.startsWith(".")) { 651 postfix = postfix.substring(1, postfix.length()); 652 } 653 ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null; 654 ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null; 655 656 String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {}; 657 String[] activeMQDestPaths = activeMQDest.getDestinationPaths(); 658 String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {}; 659 660 //sanity check 661 if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) { 662 String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length, 663 activeMQDestPaths.length - postfixPaths.length); 664 665 ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath)); 666 info.setDestination(newTopic); 667 } 668 } 669 } 670 671 @Override 672 public void virtualDestinationRemoved(ConnectionContext context, 673 VirtualDestination virtualDestination) { 674 super.virtualDestinationRemoved(context, virtualDestination); 675 676 if (virtualDestinations.remove(virtualDestination)) { 677 LOG.debug("Virtual destination removed: {}", virtualDestination); 678 try { 679 consumersLock.readLock().lock(); 680 try { 681 // remove the demand created by the addition of the virtual destination 682 if (getBrokerService().isUseVirtualDestSubsOnCreation()) { 683 if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { 684 for (ConsumerInfo info : virtualDestinationConsumers.keySet()) { 685 //find all consumers for this virtual destination 686 if (virtualDestinationConsumers.get(info).equals(virtualDestination)) { 687 fireVirtualDestinationRemoveAdvisory(context, info); 688 689 //check consumers created for the existence of a destination to see if they 690 //match the consumerinfo and clean up 691 for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) { 692 ConsumerInfo i = brokerConsumerDests.get(activeMQDest); 693 if (info.equals(i) && brokerConsumerDests.remove(activeMQDest) != null) { 694 LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", activeMQDest, i); 695 } 696 } 697 } 698 699 } 700 } 701 } 702 } finally { 703 consumersLock.readLock().unlock(); 704 } 705 } catch (Exception e) { 706 handleFireFailure("virtualDestinationAdded", e); 707 } 708 } 709 } 710 711 private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context, 712 ConsumerInfo info) throws Exception { 713 714 VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info); 715 if (virtualDestination != null) { 716 LOG.debug("Virtual consumer removed: {}, for virtual destination: {}", info, virtualDestination); 717 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination()); 718 719 ActiveMQDestination dest = info.getDestination(); 720 721 if (!dest.isTemporary() || destinations.containsKey(dest)) { 722 fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); 723 } 724 } 725 } 726 727 @Override 728 public void isFull(ConnectionContext context, Destination destination, Usage<?> usage) { 729 super.isFull(context, destination, usage); 730 if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) { 731 try { 732 733 ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination()); 734 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 735 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName()); 736 advisoryMessage.setLongProperty(AdvisorySupport.MSG_PROPERTY_USAGE_COUNT, usage.getUsage()); 737 fireAdvisory(context, topic, null, null, advisoryMessage); 738 739 } catch (Exception e) { 740 handleFireFailure("is full", e); 741 } 742 } 743 } 744 745 @Override 746 public void nowMasterBroker() { 747 super.nowMasterBroker(); 748 try { 749 ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic(); 750 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 751 ConnectionContext context = new ConnectionContext(); 752 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 753 context.setBroker(getBrokerService().getBroker()); 754 fireAdvisory(context, topic, null, null, advisoryMessage); 755 } catch (Exception e) { 756 handleFireFailure("now master broker", e); 757 } 758 } 759 760 @Override 761 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 762 Subscription subscription, Throwable poisonCause) { 763 boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); 764 if (wasDLQd) { 765 try { 766 if (!messageReference.isAdvisory()) { 767 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 768 ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(baseDestination.getActiveMQDestination()); 769 Message payload = messageReference.getMessage().copy(); 770 if (!baseDestination.isIncludeBodyForAdvisory()) { 771 payload.clearBody(); 772 } 773 fireAdvisory(context, topic, payload); 774 } 775 } catch (Exception e) { 776 handleFireFailure("add to DLQ", e); 777 } 778 } 779 780 return wasDLQd; 781 } 782 783 @Override 784 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { 785 try { 786 if (brokerInfo != null) { 787 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 788 advisoryMessage.setBooleanProperty("started", true); 789 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex); 790 advisoryMessage.setStringProperty("remoteIp", remoteIp); 791 networkBridges.putIfAbsent(brokerInfo, advisoryMessage); 792 793 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 794 795 ConnectionContext context = new ConnectionContext(); 796 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 797 context.setBroker(getBrokerService().getBroker()); 798 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 799 } 800 } catch (Exception e) { 801 handleFireFailure("network bridge started", e); 802 } 803 } 804 805 @Override 806 public void networkBridgeStopped(BrokerInfo brokerInfo) { 807 try { 808 if (brokerInfo != null) { 809 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 810 advisoryMessage.setBooleanProperty("started", false); 811 networkBridges.remove(brokerInfo); 812 813 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 814 815 ConnectionContext context = new ConnectionContext(); 816 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 817 context.setBroker(getBrokerService().getBroker()); 818 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 819 } 820 } catch (Exception e) { 821 handleFireFailure("network bridge stopped", e); 822 } 823 } 824 825 private void handleFireFailure(String message, Throwable cause) { 826 LOG.warn("Failed to fire {} advisory, reason: {}", message, cause); 827 LOG.debug("{} detail: {}", message, cause, cause); 828 } 829 830 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 831 fireAdvisory(context, topic, command, null); 832 } 833 834 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 835 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 836 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 837 } 838 839 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception { 840 fireConsumerAdvisory(context, consumerDestination, topic, command, null); 841 } 842 843 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 844 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 845 int count = 0; 846 Set<Destination> set = getDestinations(consumerDestination); 847 if (set != null) { 848 for (Destination dest : set) { 849 count += dest.getDestinationStatistics().getConsumers().getCount(); 850 } 851 } 852 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count); 853 854 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 855 } 856 857 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { 858 fireProducerAdvisory(context, producerDestination, topic, command, null); 859 } 860 861 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 862 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 863 int count = 0; 864 if (producerDestination != null) { 865 Set<Destination> set = getDestinations(producerDestination); 866 if (set != null) { 867 for (Destination dest : set) { 868 count += dest.getDestinationStatistics().getProducers().getCount(); 869 } 870 } 871 } 872 advisoryMessage.setIntProperty("producerCount", count); 873 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 874 } 875 876 public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { 877 //set properties 878 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); 879 String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; 880 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); 881 882 String url = getBrokerService().getVmConnectorURI().toString(); 883 //try and find the URL on the transport connector and use if it exists else 884 //try and find a default URL 885 if (context.getConnector() instanceof TransportConnector 886 && ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) { 887 url = ((TransportConnector) context.getConnector()).getPublishableConnectString(); 888 } else if (getBrokerService().getDefaultSocketURIString() != null) { 889 url = getBrokerService().getDefaultSocketURIString(); 890 } 891 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); 892 893 //set the data structure 894 advisoryMessage.setDataStructure(command); 895 advisoryMessage.setPersistent(false); 896 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 897 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); 898 advisoryMessage.setTargetConsumerId(targetConsumerId); 899 advisoryMessage.setDestination(topic); 900 advisoryMessage.setResponseRequired(false); 901 advisoryMessage.setProducerId(advisoryProducerId); 902 boolean originalFlowControl = context.isProducerFlowControl(); 903 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 904 producerExchange.setConnectionContext(context); 905 producerExchange.setMutable(true); 906 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 907 try { 908 context.setProducerFlowControl(false); 909 next.send(producerExchange, advisoryMessage); 910 } finally { 911 context.setProducerFlowControl(originalFlowControl); 912 } 913 } 914 915 public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() { 916 return connections; 917 } 918 919 public Collection<ConsumerInfo> getAdvisoryConsumers() { 920 consumersLock.readLock().lock(); 921 try { 922 return new ArrayList<ConsumerInfo>(consumers.values()); 923 } finally { 924 consumersLock.readLock().unlock(); 925 } 926 } 927 928 public Map<ProducerId, ProducerInfo> getAdvisoryProducers() { 929 return producers; 930 } 931 932 public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() { 933 return destinations; 934 } 935 936 public ConcurrentMap<ConsumerInfo, VirtualDestination> getVirtualDestinationConsumers() { 937 return virtualDestinationConsumers; 938 } 939 940 private class VirtualConsumerPair { 941 private final VirtualDestination virtualDestination; 942 943 //destination that matches this virtualDestination as part target 944 //this is so we can keep track of more than one destination that might 945 //match the virtualDestination and cause demand 946 private final ActiveMQDestination activeMQDestination; 947 948 public VirtualConsumerPair(VirtualDestination virtualDestination, 949 ActiveMQDestination activeMQDestination) { 950 super(); 951 this.virtualDestination = virtualDestination; 952 this.activeMQDestination = activeMQDestination; 953 } 954 955 @Override 956 public int hashCode() { 957 final int prime = 31; 958 int result = 1; 959 result = prime * result + getOuterType().hashCode(); 960 result = prime 961 * result 962 + ((activeMQDestination == null) ? 0 : activeMQDestination 963 .hashCode()); 964 result = prime 965 * result 966 + ((virtualDestination == null) ? 0 : virtualDestination 967 .hashCode()); 968 return result; 969 } 970 971 @Override 972 public boolean equals(Object obj) { 973 if (this == obj) 974 return true; 975 if (obj == null) 976 return false; 977 if (getClass() != obj.getClass()) 978 return false; 979 VirtualConsumerPair other = (VirtualConsumerPair) obj; 980 if (!getOuterType().equals(other.getOuterType())) 981 return false; 982 if (activeMQDestination == null) { 983 if (other.activeMQDestination != null) 984 return false; 985 } else if (!activeMQDestination.equals(other.activeMQDestination)) 986 return false; 987 if (virtualDestination == null) { 988 if (other.virtualDestination != null) 989 return false; 990 } else if (!virtualDestination.equals(other.virtualDestination)) 991 return false; 992 return true; 993 } 994 995 @Override 996 public String toString() { 997 return "VirtualConsumerPair [virtualDestination=" + virtualDestination + ", activeMQDestination=" 998 + activeMQDestination + "]"; 999 } 1000 1001 private AdvisoryBroker getOuterType() { 1002 return AdvisoryBroker.this; 1003 } 1004 } 1005}