001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.util.Collection;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027
028import javax.management.MalformedObjectNameException;
029import javax.management.ObjectName;
030
031import org.apache.activemq.Service;
032import org.apache.activemq.broker.BrokerService;
033import org.apache.activemq.broker.jmx.AnnotatedMBean;
034import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
035import org.apache.activemq.broker.jmx.NetworkBridgeView;
036import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ConsumerId;
039import org.apache.activemq.transport.Transport;
040import org.apache.activemq.transport.TransportFactory;
041import org.apache.activemq.util.ServiceStopper;
042import org.apache.activemq.util.ServiceSupport;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Connector class for bridging broker networks.
048 */
049public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
050
051    private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
052    protected URI localURI;
053    protected ConnectionFilter connectionFilter;
054    protected ConcurrentMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
055
056    protected ServiceSupport serviceSupport = new ServiceSupport() {
057
058        @Override
059        protected void doStart() throws Exception {
060            handleStart();
061        }
062
063        @Override
064        protected void doStop(ServiceStopper stopper) throws Exception {
065            handleStop(stopper);
066        }
067    };
068
069    private Set<ActiveMQDestination> durableDestinations;
070
071    private BrokerService brokerService;
072    private ObjectName objectName;
073
074    public NetworkConnector() {
075    }
076
077    public NetworkConnector(URI localURI) {
078        this.localURI = localURI;
079    }
080
081    public URI getLocalUri() throws URISyntaxException {
082        return localURI;
083    }
084
085    public void setLocalUri(URI localURI) {
086        this.localURI = localURI;
087    }
088
089    /**
090     * @return Returns the durableDestinations.
091     */
092    public Set<ActiveMQDestination> getDurableDestinations() {
093        return durableDestinations;
094    }
095
096    /**
097     * @param durableDestinations The durableDestinations to set.
098     */
099    public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) {
100        this.durableDestinations = durableDestinations;
101    }
102
103
104    public void addExcludedDestination(ActiveMQDestination destiantion) {
105        this.excludedDestinations.add(destiantion);
106    }
107
108
109    public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
110        this.staticallyIncludedDestinations.add(destiantion);
111    }
112
113
114    public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
115        this.dynamicallyIncludedDestinations.add(destiantion);
116    }
117
118    public ConnectionFilter getConnectionFilter() {
119        return connectionFilter;
120    }
121
122    public void setConnectionFilter(ConnectionFilter connectionFilter) {
123        this.connectionFilter = connectionFilter;
124    }
125
126    // Implementation methods
127    // -------------------------------------------------------------------------
128    protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
129        List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations();
130        ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]);
131        result.setDynamicallyIncludedDestinations(dests);
132        destsList = getExcludedDestinations();
133        dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
134        result.setExcludedDestinations(dests);
135        destsList = getStaticallyIncludedDestinations();
136        dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
137        result.setStaticallyIncludedDestinations(dests);
138        result.setDurableDestinations(getDurableTopicDestinations(durableDestinations));
139        return result;
140    }
141
142    protected Transport createLocalTransport() throws Exception {
143        return NetworkBridgeFactory.createLocalTransport(this, localURI);
144    }
145
146    public static ActiveMQDestination[] getDurableTopicDestinations(final Set<ActiveMQDestination> durableDestinations) {
147        if (durableDestinations != null) {
148
149            HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
150            for (ActiveMQDestination d : durableDestinations) {
151                if( d.isTopic() ) {
152                    topics.add(d);
153                }
154            }
155
156            ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
157            dest = topics.toArray(dest);
158            return dest;
159        }
160        return null;
161    }
162
163    @Override
164    public void start() throws Exception {
165        serviceSupport.start();
166    }
167
168    @Override
169    public void stop() throws Exception {
170        serviceSupport.stop();
171    }
172
173    protected void handleStart() throws Exception {
174        if (localURI == null) {
175            throw new IllegalStateException("You must configure the 'localURI' property");
176        }
177        LOG.info("Network Connector {} started", this);
178    }
179
180    protected void handleStop(ServiceStopper stopper) throws Exception {
181        LOG.info("Network Connector {} stopped", this);
182    }
183
184    public boolean isStarted() {
185        return serviceSupport.isStarted();
186    }
187
188    public boolean isStopped() {
189        return serviceSupport.isStopped();
190    }
191
192    public boolean isStopping() {
193        return serviceSupport.isStopping();
194    }
195
196    public ObjectName getObjectName() {
197        return objectName;
198    }
199
200    public void setObjectName(ObjectName objectName) {
201        this.objectName = objectName;
202    }
203
204    public BrokerService getBrokerService() {
205        return brokerService;
206    }
207
208    public void setBrokerService(BrokerService brokerService) {
209        this.brokerService = brokerService;
210    }
211
212    protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
213        if (!getBrokerService().isUseJmx()) {
214            return;
215        }
216        NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
217        try {
218            ObjectName objectName = createNetworkBridgeObjectName(bridge);
219            AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName);
220        } catch (Throwable e) {
221            LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(), e);
222        }
223    }
224
225    protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
226        if (!getBrokerService().isUseJmx()) {
227            return;
228        }
229        try {
230            ObjectName objectName = createNetworkBridgeObjectName(bridge);
231            getBrokerService().getManagementContext().unregisterMBean(objectName);
232        } catch (Throwable e) {
233            LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
234        }
235    }
236
237    protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
238        return BrokerMBeanSupport.createNetworkBridgeObjectName(getObjectName(), bridge.getRemoteAddress());
239    }
240
241    // ask all the bridges as we can't know to which this consumer is tied
242    public boolean removeDemandSubscription(ConsumerId consumerId) {
243        boolean removeSucceeded = false;
244        for (NetworkBridge bridge : bridges.values()) {
245            if (bridge instanceof DemandForwardingBridgeSupport) {
246                DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge;
247                if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) {
248                    removeSucceeded = true;
249                    break;
250                }
251            }
252        }
253        return removeSucceeded;
254    }
255
256    public Collection<NetworkBridge> activeBridges() {
257        return bridges.values();
258    }
259}