001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.net.URI;
020import java.util.HashMap;
021import java.util.LinkedHashSet;
022import java.util.ServiceLoader;
023import java.util.Set;
024
025import org.apache.activemq.transport.Transport;
026import org.apache.activemq.transport.TransportFactory;
027import org.apache.activemq.util.URISupport;
028
029/**
030 * Factory for network bridges
031 * 
032 * 
033 */
034public final class NetworkBridgeFactory implements BridgeFactory {
035
036    public final static BridgeFactory INSTANCE = new NetworkBridgeFactory();
037
038    private NetworkBridgeFactory() {
039
040    }
041
042    @Override
043    public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
044        if (configuration.isConduitSubscriptions()) {
045            // dynamicOnly determines whether durables are auto bridged
046            return attachListener(new DurableConduitBridge(configuration, localTransport, remoteTransport), listener);
047        }
048        return attachListener(new DemandForwardingBridge(configuration, localTransport, remoteTransport), listener);
049    }
050
051    private DemandForwardingBridge attachListener(DemandForwardingBridge bridge, NetworkBridgeListener listener) {
052        if (listener != null) {
053            bridge.setNetworkBridgeListener(listener);
054        }
055        return bridge;
056    }
057
058    /**
059     * Create a network bridge
060     * 
061     * @param configuration
062     * @param localTransport
063     * @param remoteTransport
064     * @param listener
065     * @return the NetworkBridge
066     */
067    @Deprecated
068    public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
069                                                      Transport localTransport, Transport remoteTransport,
070                                                      final NetworkBridgeListener listener) {
071        return INSTANCE.createNetworkBridge(configuration, localTransport, remoteTransport, listener);
072    }
073
074    public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception {
075        // one end of the localbroker<->bridge transport needs to be async to allow concurrent forwards and acks
076        return createLocalTransport(uri, !configuration.isDispatchAsync());
077    }
078
079    public static Transport createLocalAsyncTransport(URI uri) throws Exception {
080        return createLocalTransport(uri, true);
081    }
082
083    private static Transport createLocalTransport(URI uri, boolean async) throws Exception {
084        HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
085        map.put("async", String.valueOf(async));
086        map.put("create", "false"); // we don't want a vm connect during shutdown to trigger a broker create
087        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
088        return TransportFactory.connect(uri);
089    }
090
091}