001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.net.URI; 020 021 import org.apache.activemq.transport.Transport; 022 import org.apache.activemq.transport.TransportFactory; 023 import org.apache.activemq.util.ServiceStopper; 024 025 /** 026 * A network connector which uses some kind of multicast-like transport that 027 * communicates with potentially many remote brokers over a single logical 028 * {@link Transport} instance such as when using multicast. 029 * 030 * This implementation does not depend on multicast at all; any other group 031 * based transport could be used. 032 * 033 * @org.apache.xbean.XBean 034 * 035 * 036 */ 037 public class MulticastNetworkConnector extends NetworkConnector { 038 039 private Transport localTransport; 040 private Transport remoteTransport; 041 private URI remoteURI; 042 private DemandForwardingBridgeSupport bridge; 043 044 public MulticastNetworkConnector() { 045 } 046 047 public MulticastNetworkConnector(URI remoteURI) { 048 this.remoteURI = remoteURI; 049 } 050 051 // Properties 052 // ------------------------------------------------------------------------- 053 054 public DemandForwardingBridgeSupport getBridge() { 055 return bridge; 056 } 057 058 public void setBridge(DemandForwardingBridgeSupport bridge) { 059 this.bridge = bridge; 060 } 061 062 public Transport getLocalTransport() { 063 return localTransport; 064 } 065 066 public void setLocalTransport(Transport localTransport) { 067 this.localTransport = localTransport; 068 } 069 070 public Transport getRemoteTransport() { 071 return remoteTransport; 072 } 073 074 /** 075 * Sets the remote transport implementation 076 */ 077 public void setRemoteTransport(Transport remoteTransport) { 078 this.remoteTransport = remoteTransport; 079 } 080 081 public URI getRemoteURI() { 082 return remoteURI; 083 } 084 085 /** 086 * Sets the remote transport URI to some group transport like 087 * <code>multicast://address:port</code> 088 */ 089 public void setRemoteURI(URI remoteURI) { 090 this.remoteURI = remoteURI; 091 } 092 093 // Implementation methods 094 // ------------------------------------------------------------------------- 095 096 protected void handleStart() throws Exception { 097 if (remoteTransport == null) { 098 if (remoteURI == null) { 099 throw new IllegalArgumentException("You must specify the remoteURI property"); 100 } 101 remoteTransport = TransportFactory.connect(remoteURI); 102 } 103 104 if (localTransport == null) { 105 localTransport = createLocalTransport(); 106 } 107 108 bridge = createBridge(localTransport, remoteTransport); 109 configureBridge(bridge); 110 bridge.start(); 111 112 // we need to start the transports after we've created the bridge 113 remoteTransport.start(); 114 localTransport.start(); 115 116 super.handleStart(); 117 } 118 119 protected void handleStop(ServiceStopper stopper) throws Exception { 120 super.handleStop(stopper); 121 if (bridge != null) { 122 try { 123 bridge.stop(); 124 } catch (Exception e) { 125 stopper.onException(this, e); 126 } 127 } 128 if (remoteTransport != null) { 129 try { 130 remoteTransport.stop(); 131 } catch (Exception e) { 132 stopper.onException(this, e); 133 } 134 } 135 if (localTransport != null) { 136 try { 137 localTransport.stop(); 138 } catch (Exception e) { 139 stopper.onException(this, e); 140 } 141 } 142 } 143 144 @Override 145 public String toString() { 146 return getClass().getName() + ":" + getName() + "[" + remoteTransport.toString() + "]"; 147 } 148 149 protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) { 150 CompositeDemandForwardingBridge bridge = new CompositeDemandForwardingBridge(this, local, remote); 151 bridge.setBrokerService(getBrokerService()); 152 return bridge; 153 } 154 155 }