001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.HashMap; 023 import java.util.Iterator; 024 import java.util.Map; 025 026 import org.apache.activemq.broker.BrokerService; 027 import org.apache.activemq.broker.SslContext; 028 import org.apache.activemq.command.DiscoveryEvent; 029 import org.apache.activemq.transport.Transport; 030 import org.apache.activemq.transport.TransportDisposedIOException; 031 import org.apache.activemq.transport.TransportFactory; 032 import org.apache.activemq.transport.discovery.DiscoveryAgent; 033 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 034 import org.apache.activemq.transport.discovery.DiscoveryListener; 035 import org.apache.activemq.util.IntrospectionSupport; 036 import org.apache.activemq.util.ServiceStopper; 037 import org.apache.activemq.util.ServiceSupport; 038 import org.apache.activemq.util.URISupport; 039 import org.apache.activemq.util.URISupport.CompositeData; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 043 import javax.management.ObjectName; 044 045 /** 046 * A network connector which uses a discovery agent to detect the remote brokers 047 * available and setup a connection to each available remote broker 048 * 049 * @org.apache.xbean.XBean element="networkConnector" 050 * 051 */ 052 public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { 053 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class); 054 055 private DiscoveryAgent discoveryAgent; 056 057 private Map<String, String> parameters; 058 059 public DiscoveryNetworkConnector() { 060 } 061 062 public DiscoveryNetworkConnector(URI discoveryURI) throws IOException { 063 setUri(discoveryURI); 064 } 065 066 public void setUri(URI discoveryURI) throws IOException { 067 setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); 068 try { 069 parameters = URISupport.parseParameters(discoveryURI); 070 // allow discovery agent to grab it's parameters 071 IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters); 072 } catch (URISyntaxException e) { 073 LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e); 074 } 075 076 } 077 078 public void onServiceAdd(DiscoveryEvent event) { 079 // Ignore events once we start stopping. 080 if (serviceSupport.isStopped() || serviceSupport.isStopping()) { 081 return; 082 } 083 String url = event.getServiceName(); 084 if (url != null) { 085 URI uri; 086 try { 087 uri = new URI(url); 088 } catch (URISyntaxException e) { 089 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 090 return; 091 } 092 // Should we try to connect to that URI? 093 synchronized (bridges) { 094 if( bridges.containsKey(uri) ) { 095 LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri ); 096 return; 097 } 098 } 099 if (localURI.equals(uri)) { 100 LOG.debug("not connecting loopback: " + uri); 101 return; 102 } 103 104 if (connectionFilter != null && !connectionFilter.connectTo(uri)) { 105 LOG.debug("connectionFilter disallows connection to: " + uri); 106 return; 107 } 108 109 URI connectUri = uri; 110 try { 111 connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX); 112 } catch (URISyntaxException e) { 113 LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e); 114 } 115 LOG.info("Establishing network connection from " + localURI + " to " + connectUri); 116 117 Transport remoteTransport; 118 Transport localTransport; 119 try { 120 // Allows the transport to access the broker's ssl configuration. 121 SslContext.setCurrentSslContext(getBrokerService().getSslContext()); 122 try { 123 remoteTransport = TransportFactory.connect(connectUri); 124 } catch (Exception e) { 125 LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage()); 126 LOG.debug("Connection failure exception: " + e, e); 127 return; 128 } 129 try { 130 localTransport = createLocalTransport(); 131 } catch (Exception e) { 132 ServiceSupport.dispose(remoteTransport); 133 LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage()); 134 LOG.debug("Connection failure exception: " + e, e); 135 return; 136 } 137 } finally { 138 SslContext.setCurrentSslContext(null); 139 } 140 NetworkBridge bridge = createBridge(localTransport, remoteTransport, event); 141 try { 142 bridge.start(); 143 synchronized (bridges) { 144 bridges.put(uri, bridge); 145 } 146 } catch (Exception e) { 147 ServiceSupport.dispose(localTransport); 148 ServiceSupport.dispose(remoteTransport); 149 LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e); 150 LOG.debug("Start failure exception: " + e, e); 151 try { 152 discoveryAgent.serviceFailed(event); 153 } catch (IOException e1) { 154 LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1); 155 } 156 } 157 } 158 } 159 160 public void onServiceRemove(DiscoveryEvent event) { 161 String url = event.getServiceName(); 162 if (url != null) { 163 URI uri; 164 try { 165 uri = new URI(url); 166 } catch (URISyntaxException e) { 167 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 168 return; 169 } 170 171 NetworkBridge bridge; 172 synchronized (bridges) { 173 bridge = bridges.remove(uri); 174 } 175 } 176 } 177 178 public DiscoveryAgent getDiscoveryAgent() { 179 return discoveryAgent; 180 } 181 182 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 183 this.discoveryAgent = discoveryAgent; 184 if (discoveryAgent != null) { 185 this.discoveryAgent.setDiscoveryListener(this); 186 } 187 } 188 189 protected void handleStart() throws Exception { 190 if (discoveryAgent == null) { 191 throw new IllegalStateException("You must configure the 'discoveryAgent' property"); 192 } 193 this.discoveryAgent.start(); 194 super.handleStart(); 195 } 196 197 protected void handleStop(ServiceStopper stopper) throws Exception { 198 for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) { 199 NetworkBridge bridge = i.next(); 200 try { 201 bridge.stop(); 202 } catch (Exception e) { 203 stopper.onException(this, e); 204 } 205 } 206 bridges.clear(); 207 try { 208 this.discoveryAgent.stop(); 209 } catch (Exception e) { 210 stopper.onException(this, e); 211 } 212 213 super.handleStop(stopper); 214 } 215 216 protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { 217 class DiscoverNetworkBridgeListener extends MBeanNetworkListener { 218 219 public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) { 220 super(brokerService, connectorName); 221 } 222 223 public void bridgeFailed() { 224 if (!serviceSupport.isStopped()) { 225 try { 226 discoveryAgent.serviceFailed(event); 227 } catch (IOException e) { 228 } 229 } 230 231 } 232 } 233 NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName()); 234 235 DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener); 236 result.setBrokerService(getBrokerService()); 237 return configureBridge(result); 238 } 239 240 @Override 241 public String toString() { 242 return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService(); 243 } 244 }