001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.List; 022 023import org.apache.activemq.command.BrokerId; 024import org.apache.activemq.command.ConsumerId; 025import org.apache.activemq.command.ConsumerInfo; 026import org.apache.activemq.command.SubscriptionInfo; 027import org.apache.activemq.filter.DestinationFilter; 028import org.apache.activemq.transport.Transport; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Consolidates subscriptions 034 */ 035public class ConduitBridge extends DemandForwardingBridge { 036 private static final Logger LOG = LoggerFactory.getLogger(ConduitBridge.class); 037 038 /** 039 * Constructor 040 * 041 * @param localBroker 042 * @param remoteBroker 043 */ 044 public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 045 super(configuration, localBroker, remoteBroker); 046 } 047 048 @Override 049 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 050 if (addToAlreadyInterestedConsumers(info, false)) { 051 return null; // don't want this subscription added 052 } 053 //add our original id to ourselves 054 info.addNetworkConsumerId(info.getConsumerId()); 055 info.setSelector(null); 056 return doCreateDemandSubscription(info); 057 } 058 059 protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) { 060 //If a network subscription and a queue check if isConduitNetworkQueueSubscriptions is true 061 //If true then we want to try and conduit 062 //For topics we always want to conduit regardless of network subscription or not 063 if (info.isNetworkSubscription() && info.getDestination().isQueue() && 064 !configuration.isConduitNetworkQueueSubscriptions()) { 065 return false; 066 } 067 boolean matched = false; 068 069 // search through existing subscriptions and see if we have a match 070 for (DemandSubscription ds : subscriptionMapByLocalId.values()) { 071 DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination()); 072 if (canConduit(ds) && filter.matches(info.getDestination())) { 073 LOG.debug("{} {} with ids {} matched (add interest) {}", new Object[]{ 074 configuration.getBrokerName(), info, info.getNetworkConsumerIds(), ds 075 }); 076 // add the interest in the subscription 077 if (!info.isDurable()) { 078 ds.add(info.getConsumerId()); 079 if (isForcedDurable) { 080 forcedDurableRemoteId.add(info.getConsumerId()); 081 ds.addForcedDurableConsumer(info.getConsumerId()); 082 } 083 } else { 084 //Handle the demand generated by proxy network subscriptions 085 //The broker path is case is normal 086 if (isProxyNSConsumerBrokerPath(info) && 087 info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) { 088 final BrokerId[] path = info.getBrokerPath(); 089 addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName()); 090 //This is the durable sync case on broker restart 091 } else if (isProxyNSConsumerClientId(info.getClientId()) && 092 isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) { 093 addProxyNetworkSubscriptionClientId(ds, info.getClientId(), info.getSubscriptionName()); 094 } else { 095 ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); 096 } 097 } 098 matched = true; 099 // continue - we want interest to any existing DemandSubscriptions 100 } 101 } 102 return matched; 103 } 104 105 // we want to conduit statically included consumers which are local networkSubs 106 // but we don't want to conduit remote network queue subs i.e. (proxy proxy) consumers 107 // unless isConduitNetworkQueueSubscriptions is true 108 // We always want to conduit topic subscriptions 109 private boolean canConduit(DemandSubscription ds) { 110 return ds.isStaticallyIncluded() || ds.getRemoteInfo().getDestination().isTopic() || 111 !ds.getRemoteInfo().isNetworkSubscription() || 112 (ds.getRemoteInfo().getDestination().isQueue() && configuration.isConduitNetworkQueueSubscriptions()); 113 } 114 115 @Override 116 protected void removeDemandSubscription(ConsumerId id) throws IOException { 117 List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>(); 118 119 for (DemandSubscription ds : subscriptionMapByLocalId.values()) { 120 if (ds.remove(id)) { 121 LOG.debug("{} on {} from {} removed interest for: {} from {}", new Object[]{ 122 configuration.getBrokerName(), localBroker, remoteBrokerName, id, ds 123 }); 124 } 125 if (ds.isEmpty()) { 126 tmpList.add(ds); 127 } 128 } 129 130 for (DemandSubscription ds : tmpList) { 131 removeSubscription(ds); 132 LOG.debug("{} on {} from {} removed {}", new Object[]{ 133 configuration.getBrokerName(), localBroker, remoteBrokerName, ds 134 }); 135 } 136 } 137}