001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020
021import org.apache.activemq.broker.region.DurableTopicSubscription;
022import org.apache.activemq.broker.region.RegionBroker;
023import org.apache.activemq.broker.region.Subscription;
024import org.apache.activemq.broker.region.TopicRegion;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ConsumerId;
027import org.apache.activemq.command.ConsumerInfo;
028import org.apache.activemq.command.RemoveSubscriptionInfo;
029import org.apache.activemq.filter.DestinationFilter;
030import org.apache.activemq.transport.Transport;
031import org.apache.activemq.util.NetworkBridgeUtils;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Consolidates subscriptions
037 */
038public class DurableConduitBridge extends ConduitBridge {
039    private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
040
041    @Override
042    public String toString() {
043        return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
044    }
045    /**
046     * Constructor
047     *
048     * @param configuration
049     *
050     * @param localBroker
051     * @param remoteBroker
052     */
053    public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
054                                Transport remoteBroker) {
055        super(configuration, localBroker, remoteBroker);
056    }
057
058    /**
059     * Subscriptions for these destinations are always created
060     *
061     */
062    @Override
063    protected void setupStaticDestinations() {
064        super.setupStaticDestinations();
065        ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
066        if (dests != null) {
067            for (ActiveMQDestination dest : dests) {
068                if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
069                    try {
070                        //Filtering by non-empty subscriptions, see AMQ-5875
071                        if (dest.isTopic()) {
072                            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
073                            TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
074
075                            String candidateSubName = getSubscriberName(dest);
076                            for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
077                                String subName = subscription.getConsumerInfo().getSubscriptionName();
078                                String clientId = subscription.getContext().getClientId();
079                                if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) {
080                                    DemandSubscription sub = createDemandSubscription(dest, subName);
081                                    if (sub != null) {
082                                        sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
083                                        sub.setStaticallyIncluded(true);
084                                        addSubscription(sub);
085                                        break;
086                                    }
087                                }
088                            }
089                        }
090                    } catch (IOException e) {
091                        LOG.error("Failed to add static destination {}", dest, e);
092                    }
093                    LOG.trace("Forwarding messages for durable destination: {}", dest);
094                } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) {
095                    if (dest.isTopic()) {
096                        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
097                        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
098
099                        String candidateSubName = getSubscriberName(dest);
100                        for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
101                            String subName = subscription.getConsumerInfo().getSubscriptionName();
102                            if (subName != null && subName.equals(candidateSubName) &&
103                                    subscription instanceof DurableTopicSubscription) {
104                               try {
105                                    DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription;
106                                    //check the clientId so we only remove subs for the matching bridge
107                                    if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) {
108                                        // remove the NC subscription as it is no longer for a permissible dest
109                                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
110                                        sending.setClientId(localClientId);
111                                        sending.setSubscriptionName(subName);
112                                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
113                                        localBroker.oneway(sending);
114                                    }
115                                } catch (IOException e) {
116                                    LOG.debug("Exception removing NC durable subscription: {}", subName, e);
117                                    serviceRemoteException(e);
118                                }
119                                break;
120                            }
121                        }
122                    }
123                }
124            }
125        }
126    }
127
128    @Override
129    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
130        boolean isForcedDurable = NetworkBridgeUtils.isForcedDurable(info,
131                dynamicallyIncludedDestinations, staticallyIncludedDestinations);
132
133        if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
134            return null; // don't want this subscription added
135        }
136        //add our original id to ourselves
137        info.addNetworkConsumerId(info.getConsumerId());
138        ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null;
139
140        if(info.isDurable() || isForcedDurable) {
141            // set the subscriber name to something reproducible
142            info.setSubscriptionName(getSubscriberName(info.getDestination()));
143            // and override the consumerId with something unique so that it won't
144            // be removed if the durable subscriber (at the other end) goes away
145           info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
146                               consumerIdGenerator.getNextSequenceId()));
147        }
148        info.setSelector(null);
149        DemandSubscription demandSubscription = doCreateDemandSubscription(info);
150        if (forcedDurableId != null) {
151            demandSubscription.addForcedDurableConsumer(forcedDurableId);
152            forcedDurableRemoteId.add(forcedDurableId);
153        }
154        return demandSubscription;
155    }
156
157    protected String getSubscriberName(ActiveMQDestination dest) {
158        String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
159        return subscriberName;
160    }
161
162    protected boolean doesConsumerExist(ActiveMQDestination dest) {
163        DestinationFilter filter = DestinationFilter.parseFilter(dest);
164        for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
165            if (filter.matches(ds.getLocalInfo().getDestination())) {
166                return true;
167            }
168        }
169        return false;
170    }
171}