001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.advisory;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.Iterator;
024import java.util.LinkedHashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.broker.Broker;
032import org.apache.activemq.broker.BrokerFilter;
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.ConnectionContext;
035import org.apache.activemq.broker.ProducerBrokerExchange;
036import org.apache.activemq.broker.TransportConnector;
037import org.apache.activemq.broker.region.BaseDestination;
038import org.apache.activemq.broker.region.Destination;
039import org.apache.activemq.broker.region.DurableTopicSubscription;
040import org.apache.activemq.broker.region.MessageReference;
041import org.apache.activemq.broker.region.RegionBroker;
042import org.apache.activemq.broker.region.Subscription;
043import org.apache.activemq.broker.region.TopicRegion;
044import org.apache.activemq.broker.region.TopicSubscription;
045import org.apache.activemq.broker.region.virtual.VirtualDestination;
046import org.apache.activemq.broker.region.virtual.VirtualTopic;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQMessage;
049import org.apache.activemq.command.ActiveMQTopic;
050import org.apache.activemq.command.BrokerInfo;
051import org.apache.activemq.command.Command;
052import org.apache.activemq.command.ConnectionId;
053import org.apache.activemq.command.ConnectionInfo;
054import org.apache.activemq.command.ConsumerId;
055import org.apache.activemq.command.ConsumerInfo;
056import org.apache.activemq.command.DestinationInfo;
057import org.apache.activemq.command.Message;
058import org.apache.activemq.command.MessageId;
059import org.apache.activemq.command.ProducerId;
060import org.apache.activemq.command.ProducerInfo;
061import org.apache.activemq.command.RemoveSubscriptionInfo;
062import org.apache.activemq.command.SessionId;
063import org.apache.activemq.filter.DestinationPath;
064import org.apache.activemq.security.SecurityContext;
065import org.apache.activemq.state.ProducerState;
066import org.apache.activemq.usage.Usage;
067import org.apache.activemq.util.IdGenerator;
068import org.apache.activemq.util.LongSequenceGenerator;
069import org.apache.activemq.util.SubscriptionKey;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073/**
074 * This broker filter handles tracking the state of the broker for purposes of
075 * publishing advisory messages to advisory consumers.
076 */
077public class AdvisoryBroker extends BrokerFilter {
078
079    private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class);
080    private static final IdGenerator ID_GENERATOR = new IdGenerator();
081
082    protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
083
084    private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
085    protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
086
087    /**
088     * This is a set to track all of the virtual destinations that have been added to the broker so
089     * they can be easily referenced later.
090     */
091    protected final Set<VirtualDestination> virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap<VirtualDestination, Boolean>());
092    /**
093     * This is a map to track all consumers that exist on the virtual destination so that we can fire
094     * an advisory later when they go away to remove the demand.
095     */
096    protected final ConcurrentMap<ConsumerInfo, VirtualDestination> virtualDestinationConsumers = new ConcurrentHashMap<>();
097    /**
098     * This is a map to track unique demand for the existence of a virtual destination so we make sure
099     * we don't send duplicate advisories.
100     */
101    protected final ConcurrentMap<VirtualConsumerPair, ConsumerInfo> brokerConsumerDests = new ConcurrentHashMap<>();
102
103    protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
104    protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
105    protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
106    protected final ProducerId advisoryProducerId = new ProducerId();
107
108    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
109
110    private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher();
111
112    public AdvisoryBroker(Broker next) {
113        super(next);
114        advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
115    }
116
117    @Override
118    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
119        super.addConnection(context, info);
120
121        ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
122        // do not distribute passwords in advisory messages. usernames okay
123        ConnectionInfo copy = info.copy();
124        copy.setPassword("");
125        fireAdvisory(context, topic, copy);
126        connections.put(copy.getConnectionId(), copy);
127    }
128
129    @Override
130    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
131        Subscription answer = super.addConsumer(context, info);
132
133        // Don't advise advisory topics.
134        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
135            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
136            consumersLock.writeLock().lock();
137            try {
138                consumers.put(info.getConsumerId(), info);
139
140                //check if this is a consumer on a destination that matches a virtual destination
141                if (getBrokerService().isUseVirtualDestSubs()) {
142                    for (VirtualDestination virtualDestination : virtualDestinations) {
143                        if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) {
144                            fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination);
145                        }
146                    }
147                }
148            } finally {
149                consumersLock.writeLock().unlock();
150            }
151            fireConsumerAdvisory(context, info.getDestination(), topic, info);
152        } else {
153            // We need to replay all the previously collected state objects
154            // for this newly added consumer.
155            if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
156                // Replay the connections.
157                for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext(); ) {
158                    ConnectionInfo value = iter.next();
159                    ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
160                    fireAdvisory(context, topic, value, info.getConsumerId());
161                }
162            }
163
164            // We check here whether the Destination is Temporary Destination specific or not since we
165            // can avoid sending advisory messages to the consumer if it only wants Temporary Destination
166            // notifications.  If its not just temporary destination related destinations then we have
167            // to send them all, a composite destination could want both.
168            if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
169                // Replay the temporary destinations.
170                for (DestinationInfo destination : destinations.values()) {
171                    if (destination.getDestination().isTemporary()) {
172                        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
173                        fireAdvisory(context, topic, destination, info.getConsumerId());
174                    }
175                }
176            } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
177                // Replay all the destinations.
178                for (DestinationInfo destination : destinations.values()) {
179                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
180                    fireAdvisory(context, topic, destination, info.getConsumerId());
181                }
182            }
183
184            // Replay the producers.
185            if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
186                for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext(); ) {
187                    ProducerInfo value = iter.next();
188                    ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
189                    fireProducerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
190                }
191            }
192
193            // Replay the consumers.
194            if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
195                consumersLock.readLock().lock();
196                try {
197                    for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
198                        ConsumerInfo value = iter.next();
199                        ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
200                        fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
201                    }
202                } finally {
203                    consumersLock.readLock().unlock();
204                }
205            }
206
207            // Replay the virtual destination consumers.
208            if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
209                for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) {
210                    ConsumerInfo key = iter.next();
211                    ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination());
212                    fireConsumerAdvisory(context, key.getDestination(), topic, key);
213              }
214            }
215
216            // Replay network bridges
217            if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
218                for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) {
219                    BrokerInfo key = iter.next();
220                    ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
221                    fireAdvisory(context, topic, key, null, networkBridges.get(key));
222                }
223            }
224        }
225        return answer;
226    }
227
228    @Override
229    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
230        super.addProducer(context, info);
231
232        // Don't advise advisory topics.
233        if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
234            ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
235            fireProducerAdvisory(context, info.getDestination(), topic, info);
236            producers.put(info.getProducerId(), info);
237        }
238    }
239
240    @Override
241    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
242        Destination answer = super.addDestination(context, destination, create);
243        if (!AdvisorySupport.isAdvisoryTopic(destination)) {
244            //for queues, create demand if isUseVirtualDestSubsOnCreation is true
245            if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) {
246                //check if this new destination matches a virtual destination that exists
247                for (VirtualDestination virtualDestination : virtualDestinations) {
248                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
249                        fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination);
250                    }
251                }
252            }
253
254            DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
255            DestinationInfo previous = destinations.putIfAbsent(destination, info);
256            if (previous == null) {
257                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
258                fireAdvisory(context, topic, info);
259            }
260        }
261        return answer;
262    }
263
264    @Override
265    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
266        ActiveMQDestination destination = info.getDestination();
267        next.addDestinationInfo(context, info);
268
269        if (!AdvisorySupport.isAdvisoryTopic(destination)) {
270            DestinationInfo previous = destinations.putIfAbsent(destination, info);
271            if (previous == null) {
272                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
273                fireAdvisory(context, topic, info);
274            }
275        }
276    }
277
278    @Override
279    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
280        super.removeDestination(context, destination, timeout);
281        DestinationInfo info = destinations.remove(destination);
282        if (info != null) {
283
284            //on destination removal, remove all demand if using virtual dest subs
285            if (getBrokerService().isUseVirtualDestSubs()) {
286                for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) {
287                    //find all consumers for this virtual destination
288                    VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo);
289
290                    //find a consumer that matches this virtualDest and destination
291                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
292                        //in case of multiple matches
293                        VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination);
294                        ConsumerInfo i = brokerConsumerDests.get(key);
295                        if (consumerInfo.equals(i) && brokerConsumerDests.remove(key) != null) {
296                            LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", key, i);
297                            fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
298                            break;
299                        }
300                    }
301                }
302            }
303
304            // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
305            info = info.copy();
306            info.setDestination(destination);
307            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
308            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
309            fireAdvisory(context, topic, info);
310            ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination);
311            for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
312                try {
313                    next.removeDestination(context, advisoryDestination, -1);
314                } catch (Exception expectedIfDestinationDidNotExistYet) {
315                }
316            }
317        }
318    }
319
320    @Override
321    public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
322        super.removeDestinationInfo(context, destInfo);
323        DestinationInfo info = destinations.remove(destInfo.getDestination());
324        if (info != null) {
325            // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
326            info = info.copy();
327            info.setDestination(destInfo.getDestination());
328            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
329            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
330            fireAdvisory(context, topic, info);
331            ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination());
332            for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
333                try {
334                    next.removeDestination(context, advisoryDestination, -1);
335                } catch (Exception expectedIfDestinationDidNotExistYet) {
336                }
337            }
338        }
339    }
340
341    @Override
342    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
343        super.removeConnection(context, info, error);
344
345        ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
346        fireAdvisory(context, topic, info.createRemoveCommand());
347        connections.remove(info.getConnectionId());
348    }
349
350    @Override
351    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
352        super.removeConsumer(context, info);
353
354        // Don't advise advisory topics.
355        ActiveMQDestination dest = info.getDestination();
356        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
357            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
358            consumersLock.writeLock().lock();
359            try {
360                consumers.remove(info.getConsumerId());
361
362                //remove the demand for this consumer if it matches a virtual destination
363                if(getBrokerService().isUseVirtualDestSubs()) {
364                    fireVirtualDestinationRemoveAdvisory(context, info);
365                }
366            } finally {
367                consumersLock.writeLock().unlock();
368            }
369            if (!dest.isTemporary() || destinations.containsKey(dest)) {
370                fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
371            }
372        }
373    }
374
375    @Override
376    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
377        SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
378
379        RegionBroker regionBroker = null;
380        if (next instanceof RegionBroker) {
381            regionBroker = (RegionBroker) next;
382        } else {
383            BrokerService service = next.getBrokerService();
384            regionBroker = (RegionBroker) service.getRegionBroker();
385        }
386
387        if (regionBroker == null) {
388            LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call");
389            throw new IllegalStateException("No RegionBroker found.");
390        }
391
392        DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key);
393
394        super.removeSubscription(context, info);
395
396        if (sub == null) {
397            LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
398            return;
399        }
400
401        ActiveMQDestination dest = sub.getConsumerInfo().getDestination();
402
403        // Don't advise advisory topics.
404        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
405            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
406            fireConsumerAdvisory(context, dest, topic, info);
407        }
408
409    }
410
411    @Override
412    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
413        super.removeProducer(context, info);
414
415        // Don't advise advisory topics.
416        ActiveMQDestination dest = info.getDestination();
417        if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
418            ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
419            producers.remove(info.getProducerId());
420            if (!dest.isTemporary() || destinations.containsKey(dest)) {
421                fireProducerAdvisory(context, dest, topic, info.createRemoveCommand());
422            }
423        }
424    }
425
426    @Override
427    public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
428        super.messageExpired(context, messageReference, subscription);
429        try {
430            if (!messageReference.isAdvisory()) {
431                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
432                ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination());
433                Message payload = messageReference.getMessage().copy();
434                if (!baseDestination.isIncludeBodyForAdvisory()) {
435                    payload.clearBody();
436                }
437                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
438                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
439                fireAdvisory(context, topic, payload, null, advisoryMessage);
440            }
441        } catch (Exception e) {
442            handleFireFailure("expired", e);
443        }
444    }
445
446    @Override
447    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
448        super.messageConsumed(context, messageReference);
449        try {
450            if (!messageReference.isAdvisory()) {
451                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
452                ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(baseDestination.getActiveMQDestination());
453                Message payload = messageReference.getMessage().copy();
454                if (!baseDestination.isIncludeBodyForAdvisory()) {
455                    payload.clearBody();
456                }
457                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
458                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
459                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
460                fireAdvisory(context, topic, payload, null, advisoryMessage);
461            }
462        } catch (Exception e) {
463            handleFireFailure("consumed", e);
464        }
465    }
466
467    @Override
468    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
469        super.messageDelivered(context, messageReference);
470        try {
471            if (!messageReference.isAdvisory()) {
472                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
473                ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(baseDestination.getActiveMQDestination());
474                Message payload = messageReference.getMessage().copy();
475                if (!baseDestination.isIncludeBodyForAdvisory()) {
476                    payload.clearBody();
477                }
478                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
479                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
480                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
481                fireAdvisory(context, topic, payload, null, advisoryMessage);
482            }
483        } catch (Exception e) {
484            handleFireFailure("delivered", e);
485        }
486    }
487
488    @Override
489    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
490        super.messageDiscarded(context, sub, messageReference);
491        try {
492            if (!messageReference.isAdvisory()) {
493                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
494                ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(baseDestination.getActiveMQDestination());
495                Message payload = messageReference.getMessage().copy();
496                if (!baseDestination.isIncludeBodyForAdvisory()) {
497                    payload.clearBody();
498                }
499                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
500                if (sub instanceof TopicSubscription) {
501                    advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
502                }
503                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
504                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
505                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
506
507                fireAdvisory(context, topic, payload, null, advisoryMessage);
508            }
509        } catch (Exception e) {
510            handleFireFailure("discarded", e);
511        }
512    }
513
514    @Override
515    public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
516        super.slowConsumer(context, destination, subs);
517        try {
518            if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
519                ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
520                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
521                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
522                fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
523            }
524        } catch (Exception e) {
525            handleFireFailure("slow consumer", e);
526        }
527    }
528
529    @Override
530    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination) {
531        super.fastProducer(context, producerInfo, destination);
532        try {
533            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
534                ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination);
535                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
536                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
537                fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
538            }
539        } catch (Exception e) {
540            handleFireFailure("fast producer", e);
541        }
542    }
543
544    private final IdGenerator connectionIdGenerator = new IdGenerator("advisory");
545    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
546    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
547
548    @Override
549    public void virtualDestinationAdded(ConnectionContext context,
550            VirtualDestination virtualDestination) {
551        super.virtualDestinationAdded(context, virtualDestination);
552
553        if (virtualDestinations.add(virtualDestination)) {
554            LOG.debug("Virtual destination added: {}", virtualDestination);
555            try {
556                // Don't advise advisory topics.
557                if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
558
559                    //create demand for consumers on virtual destinations
560                    consumersLock.readLock().lock();
561                    try {
562                        //loop through existing destinations to see if any match this newly
563                        //created virtual destination
564                        if (getBrokerService().isUseVirtualDestSubsOnCreation()) {
565                            //for matches that are a queue, fire an advisory for demand
566                            for (ActiveMQDestination destination : destinations.keySet()) {
567                                if(destination.isQueue()) {
568                                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
569                                        fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination);
570                                    }
571                                }
572                            }
573                        }
574
575                        //loop through existing consumers to see if any of them are consuming on a destination
576                        //that matches the new virtual destination
577                        for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
578                            ConsumerInfo info = iter.next();
579                            if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) {
580                                fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination);
581                            }
582                        }
583                    } finally {
584                        consumersLock.readLock().unlock();
585                    }
586                }
587            } catch (Exception e) {
588                handleFireFailure("virtualDestinationAdded", e);
589            }
590        }
591    }
592
593    private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest,
594            VirtualDestination virtualDestination) throws Exception {
595        //if no consumer info, we need to create one - this is the case when an advisory is fired
596        //because of the existence of a destination matching a virtual destination
597        if (info == null) {
598
599            //store the virtual destination and the activeMQDestination as a pair so that we can keep track
600            //of all matching forwarded destinations that caused demand
601            VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest);
602            if (brokerConsumerDests.get(pair) == null) {
603                ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
604                SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
605                ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
606                info = new ConsumerInfo(consumerId);
607
608                if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
609                    LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
610                    setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
611                    ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
612
613                    if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
614                        LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
615                        fireConsumerAdvisory(context, info.getDestination(), topic, info);
616                    }
617                }
618            }
619        //this is the case of a real consumer coming online
620        } else {
621            info = info.copy();
622            setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
623            ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
624
625            if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
626                LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
627                fireConsumerAdvisory(context, info.getDestination(), topic, info);
628            }
629        }
630    }
631
632    /**
633     * Sets the virtual destination on the ConsumerInfo
634     * If this is a VirtualTopic then the destination used will be the actual topic subscribed
635     * to in order to track demand properly
636     *
637     * @param info
638     * @param virtualDestination
639     * @param activeMQDest
640     */
641    private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) {
642        info.setDestination(virtualDestination.getVirtualDestination());
643        if (virtualDestination instanceof VirtualTopic) {
644            VirtualTopic vt = (VirtualTopic) virtualDestination;
645            String prefix = vt.getPrefix() != null ? vt.getPrefix() : "";
646            String postfix = vt.getPostfix() != null ? vt.getPostfix() : "";
647            if (prefix.endsWith(".")) {
648                prefix = prefix.substring(0, prefix.length() - 1);
649            }
650            if (postfix.startsWith(".")) {
651                postfix = postfix.substring(1, postfix.length());
652            }
653            ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null;
654            ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null;
655
656            String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {};
657            String[] activeMQDestPaths = activeMQDest.getDestinationPaths();
658            String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {};
659
660            //sanity check
661            if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) {
662                String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length,
663                        activeMQDestPaths.length - postfixPaths.length);
664
665                ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath));
666                info.setDestination(newTopic);
667            }
668        }
669    }
670
671    @Override
672    public void virtualDestinationRemoved(ConnectionContext context,
673            VirtualDestination virtualDestination) {
674        super.virtualDestinationRemoved(context, virtualDestination);
675
676        if (virtualDestinations.remove(virtualDestination)) {
677            LOG.debug("Virtual destination removed: {}", virtualDestination);
678            try {
679                consumersLock.readLock().lock();
680                try {
681                    // remove the demand created by the addition of the virtual destination
682                    if (getBrokerService().isUseVirtualDestSubsOnCreation()) {
683                        if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
684                            for (ConsumerInfo info : virtualDestinationConsumers.keySet()) {
685                                //find all consumers for this virtual destination
686                                if (virtualDestinationConsumers.get(info).equals(virtualDestination)) {
687                                    fireVirtualDestinationRemoveAdvisory(context, info);
688
689                                    //check consumers created for the existence of a destination to see if they
690                                    //match the consumerinfo and clean up
691                                    for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
692                                        ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
693                                        if (info.equals(i) && brokerConsumerDests.remove(activeMQDest) != null) {
694                                            LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", activeMQDest, i);
695                                        }
696                                    }
697                                }
698
699                            }
700                        }
701                    }
702                } finally {
703                    consumersLock.readLock().unlock();
704                }
705            } catch (Exception e) {
706                handleFireFailure("virtualDestinationAdded", e);
707            }
708        }
709    }
710
711    private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context,
712            ConsumerInfo info) throws Exception {
713
714        VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info);
715        if (virtualDestination != null) {
716            LOG.debug("Virtual consumer removed: {}, for virtual destination: {}", info, virtualDestination);
717            ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination());
718
719            ActiveMQDestination dest = info.getDestination();
720
721            if (!dest.isTemporary() || destinations.containsKey(dest)) {
722                fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
723            }
724        }
725    }
726
727    @Override
728    public void isFull(ConnectionContext context, Destination destination, Usage<?> usage) {
729        super.isFull(context, destination, usage);
730        if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) {
731            try {
732
733                ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
734                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
735                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName());
736                advisoryMessage.setLongProperty(AdvisorySupport.MSG_PROPERTY_USAGE_COUNT, usage.getUsage());
737                fireAdvisory(context, topic, null, null, advisoryMessage);
738
739            } catch (Exception e) {
740                handleFireFailure("is full", e);
741            }
742        }
743    }
744
745    @Override
746    public void nowMasterBroker() {
747        super.nowMasterBroker();
748        try {
749            ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
750            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
751            ConnectionContext context = new ConnectionContext();
752            context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
753            context.setBroker(getBrokerService().getBroker());
754            fireAdvisory(context, topic, null, null, advisoryMessage);
755        } catch (Exception e) {
756            handleFireFailure("now master broker", e);
757        }
758    }
759
760    @Override
761    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
762                                         Subscription subscription, Throwable poisonCause) {
763        boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
764        if (wasDLQd) {
765            try {
766                if (!messageReference.isAdvisory()) {
767                    BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
768                    ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(baseDestination.getActiveMQDestination());
769                    Message payload = messageReference.getMessage().copy();
770                    if (!baseDestination.isIncludeBodyForAdvisory()) {
771                        payload.clearBody();
772                    }
773                    fireAdvisory(context, topic, payload);
774                }
775            } catch (Exception e) {
776                handleFireFailure("add to DLQ", e);
777            }
778        }
779
780        return wasDLQd;
781    }
782
783    @Override
784    public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
785        try {
786            if (brokerInfo != null) {
787                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
788                advisoryMessage.setBooleanProperty("started", true);
789                advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
790                advisoryMessage.setStringProperty("remoteIp", remoteIp);
791                networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
792
793                ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
794
795                ConnectionContext context = new ConnectionContext();
796                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
797                context.setBroker(getBrokerService().getBroker());
798                fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
799            }
800        } catch (Exception e) {
801            handleFireFailure("network bridge started", e);
802        }
803    }
804
805    @Override
806    public void networkBridgeStopped(BrokerInfo brokerInfo) {
807        try {
808            if (brokerInfo != null) {
809                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
810                advisoryMessage.setBooleanProperty("started", false);
811                networkBridges.remove(brokerInfo);
812
813                ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
814
815                ConnectionContext context = new ConnectionContext();
816                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
817                context.setBroker(getBrokerService().getBroker());
818                fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
819            }
820        } catch (Exception e) {
821            handleFireFailure("network bridge stopped", e);
822        }
823    }
824
825    private void handleFireFailure(String message, Throwable cause) {
826        LOG.warn("Failed to fire {} advisory, reason: {}", message, cause);
827        LOG.debug("{} detail: {}", message, cause, cause);
828    }
829
830    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
831        fireAdvisory(context, topic, command, null);
832    }
833
834    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
835        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
836        fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
837    }
838
839    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception {
840        fireConsumerAdvisory(context, consumerDestination, topic, command, null);
841    }
842
843    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
844        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
845        int count = 0;
846        Set<Destination> set = getDestinations(consumerDestination);
847        if (set != null) {
848            for (Destination dest : set) {
849                count += dest.getDestinationStatistics().getConsumers().getCount();
850            }
851        }
852        advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
853
854        fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
855    }
856
857    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
858        fireProducerAdvisory(context, producerDestination, topic, command, null);
859    }
860
861    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
862        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
863        int count = 0;
864        if (producerDestination != null) {
865            Set<Destination> set = getDestinations(producerDestination);
866            if (set != null) {
867                for (Destination dest : set) {
868                    count += dest.getDestinationStatistics().getProducers().getCount();
869                }
870            }
871        }
872        advisoryMessage.setIntProperty("producerCount", count);
873        fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
874    }
875
876    public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
877        //set properties
878        advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
879        String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
880        advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
881
882        String url = getBrokerService().getVmConnectorURI().toString();
883        //try and find the URL on the transport connector and use if it exists else
884        //try and find a default URL
885        if (context.getConnector() instanceof TransportConnector
886                && ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) {
887            url = ((TransportConnector) context.getConnector()).getPublishableConnectString();
888        } else if (getBrokerService().getDefaultSocketURIString() != null) {
889            url = getBrokerService().getDefaultSocketURIString();
890        }
891        advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
892
893        //set the data structure
894        advisoryMessage.setDataStructure(command);
895        advisoryMessage.setPersistent(false);
896        advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
897        advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
898        advisoryMessage.setTargetConsumerId(targetConsumerId);
899        advisoryMessage.setDestination(topic);
900        advisoryMessage.setResponseRequired(false);
901        advisoryMessage.setProducerId(advisoryProducerId);
902        boolean originalFlowControl = context.isProducerFlowControl();
903        final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
904        producerExchange.setConnectionContext(context);
905        producerExchange.setMutable(true);
906        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
907        try {
908            context.setProducerFlowControl(false);
909            next.send(producerExchange, advisoryMessage);
910        } finally {
911            context.setProducerFlowControl(originalFlowControl);
912        }
913    }
914
915    public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
916        return connections;
917    }
918
919    public Collection<ConsumerInfo> getAdvisoryConsumers() {
920        consumersLock.readLock().lock();
921        try {
922            return new ArrayList<ConsumerInfo>(consumers.values());
923        } finally {
924            consumersLock.readLock().unlock();
925        }
926    }
927
928    public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {
929        return producers;
930    }
931
932    public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
933        return destinations;
934    }
935
936    public ConcurrentMap<ConsumerInfo, VirtualDestination> getVirtualDestinationConsumers() {
937        return virtualDestinationConsumers;
938    }
939
940    private class VirtualConsumerPair {
941        private final VirtualDestination virtualDestination;
942
943        //destination that matches this virtualDestination as part target
944        //this is so we can keep track of more than one destination that might
945        //match the virtualDestination and cause demand
946        private final ActiveMQDestination activeMQDestination;
947
948        public VirtualConsumerPair(VirtualDestination virtualDestination,
949                ActiveMQDestination activeMQDestination) {
950            super();
951            this.virtualDestination = virtualDestination;
952            this.activeMQDestination = activeMQDestination;
953        }
954
955        @Override
956        public int hashCode() {
957            final int prime = 31;
958            int result = 1;
959            result = prime * result + getOuterType().hashCode();
960            result = prime
961                    * result
962                    + ((activeMQDestination == null) ? 0 : activeMQDestination
963                            .hashCode());
964            result = prime
965                    * result
966                    + ((virtualDestination == null) ? 0 : virtualDestination
967                            .hashCode());
968            return result;
969        }
970
971        @Override
972        public boolean equals(Object obj) {
973            if (this == obj)
974                return true;
975            if (obj == null)
976                return false;
977            if (getClass() != obj.getClass())
978                return false;
979            VirtualConsumerPair other = (VirtualConsumerPair) obj;
980            if (!getOuterType().equals(other.getOuterType()))
981                return false;
982            if (activeMQDestination == null) {
983                if (other.activeMQDestination != null)
984                    return false;
985            } else if (!activeMQDestination.equals(other.activeMQDestination))
986                return false;
987            if (virtualDestination == null) {
988                if (other.virtualDestination != null)
989                    return false;
990            } else if (!virtualDestination.equals(other.virtualDestination))
991                return false;
992            return true;
993        }
994
995        @Override
996        public String toString() {
997            return "VirtualConsumerPair [virtualDestination=" + virtualDestination + ", activeMQDestination="
998                    + activeMQDestination + "]";
999        }
1000
1001        private AdvisoryBroker getOuterType() {
1002            return AdvisoryBroker.this;
1003        }
1004    }
1005}