001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020 021import org.apache.activemq.broker.region.DurableTopicSubscription; 022import org.apache.activemq.broker.region.RegionBroker; 023import org.apache.activemq.broker.region.Subscription; 024import org.apache.activemq.broker.region.TopicRegion; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.ConsumerId; 027import org.apache.activemq.command.ConsumerInfo; 028import org.apache.activemq.command.RemoveSubscriptionInfo; 029import org.apache.activemq.filter.DestinationFilter; 030import org.apache.activemq.transport.Transport; 031import org.apache.activemq.util.NetworkBridgeUtils; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Consolidates subscriptions 037 */ 038public class DurableConduitBridge extends ConduitBridge { 039 private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); 040 041 @Override 042 public String toString() { 043 return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName(); 044 } 045 /** 046 * Constructor 047 * 048 * @param configuration 049 * 050 * @param localBroker 051 * @param remoteBroker 052 */ 053 public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, 054 Transport remoteBroker) { 055 super(configuration, localBroker, remoteBroker); 056 } 057 058 /** 059 * Subscriptions for these destinations are always created 060 * 061 */ 062 @Override 063 protected void setupStaticDestinations() { 064 super.setupStaticDestinations(); 065 ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; 066 if (dests != null) { 067 for (ActiveMQDestination dest : dests) { 068 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { 069 try { 070 //Filtering by non-empty subscriptions, see AMQ-5875 071 if (dest.isTopic()) { 072 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 073 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 074 075 String candidateSubName = getSubscriberName(dest); 076 for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { 077 String subName = subscription.getConsumerInfo().getSubscriptionName(); 078 String clientId = subscription.getContext().getClientId(); 079 if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) { 080 DemandSubscription sub = createDemandSubscription(dest, subName); 081 if (sub != null) { 082 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); 083 sub.setStaticallyIncluded(true); 084 addSubscription(sub); 085 break; 086 } 087 } 088 } 089 } 090 } catch (IOException e) { 091 LOG.error("Failed to add static destination {}", dest, e); 092 } 093 LOG.trace("Forwarding messages for durable destination: {}", dest); 094 } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) { 095 if (dest.isTopic()) { 096 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 097 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 098 099 String candidateSubName = getSubscriberName(dest); 100 for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { 101 String subName = subscription.getConsumerInfo().getSubscriptionName(); 102 if (subName != null && subName.equals(candidateSubName) && 103 subscription instanceof DurableTopicSubscription) { 104 try { 105 DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription; 106 //check the clientId so we only remove subs for the matching bridge 107 if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) { 108 // remove the NC subscription as it is no longer for a permissible dest 109 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 110 sending.setClientId(localClientId); 111 sending.setSubscriptionName(subName); 112 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 113 localBroker.oneway(sending); 114 } 115 } catch (IOException e) { 116 LOG.debug("Exception removing NC durable subscription: {}", subName, e); 117 serviceRemoteException(e); 118 } 119 break; 120 } 121 } 122 } 123 } 124 } 125 } 126 } 127 128 @Override 129 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 130 boolean isForcedDurable = NetworkBridgeUtils.isForcedDurable(info, 131 dynamicallyIncludedDestinations, staticallyIncludedDestinations); 132 133 if (addToAlreadyInterestedConsumers(info, isForcedDurable)) { 134 return null; // don't want this subscription added 135 } 136 //add our original id to ourselves 137 info.addNetworkConsumerId(info.getConsumerId()); 138 ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null; 139 140 if(info.isDurable() || isForcedDurable) { 141 // set the subscriber name to something reproducible 142 info.setSubscriptionName(getSubscriberName(info.getDestination())); 143 // and override the consumerId with something unique so that it won't 144 // be removed if the durable subscriber (at the other end) goes away 145 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), 146 consumerIdGenerator.getNextSequenceId())); 147 } 148 info.setSelector(null); 149 DemandSubscription demandSubscription = doCreateDemandSubscription(info); 150 if (forcedDurableId != null) { 151 demandSubscription.addForcedDurableConsumer(forcedDurableId); 152 forcedDurableRemoteId.add(forcedDurableId); 153 } 154 return demandSubscription; 155 } 156 157 protected String getSubscriberName(ActiveMQDestination dest) { 158 String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName(); 159 return subscriberName; 160 } 161 162 protected boolean doesConsumerExist(ActiveMQDestination dest) { 163 DestinationFilter filter = DestinationFilter.parseFilter(dest); 164 for (DemandSubscription ds : subscriptionMapByLocalId.values()) { 165 if (filter.matches(ds.getLocalInfo().getDestination())) { 166 return true; 167 } 168 } 169 return false; 170 } 171}