001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.URI;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.apache.activemq.command.BrokerInfo;
025import org.apache.activemq.transport.MutexTransport;
026import org.apache.activemq.transport.ResponseCorrelator;
027import org.apache.activemq.transport.Transport;
028import org.apache.activemq.transport.TransportAcceptListener;
029import org.apache.activemq.transport.TransportServer;
030
031/**
032 * Broker side of the VMTransport
033 */
034public class VMTransportServer implements TransportServer {
035
036    private TransportAcceptListener acceptListener;
037    private final URI location;
038    private boolean disposed;
039
040    private final AtomicInteger connectionCount = new AtomicInteger(0);
041    private final boolean disposeOnDisconnect;
042    private boolean allowLinkStealing;
043
044    /**
045     * @param location
046     * @param disposeOnDisconnect
047     */
048    public VMTransportServer(URI location, boolean disposeOnDisconnect) {
049        this.location = location;
050        this.disposeOnDisconnect = disposeOnDisconnect;
051    }
052
053    /**
054     * @return a pretty print of this
055     */
056    public String toString() {
057        return "VMTransportServer(" + location + ")";
058    }
059
060    /**
061     * @return new VMTransport
062     * @throws IOException
063     */
064    public VMTransport connect() throws IOException {
065        TransportAcceptListener al;
066        synchronized (this) {
067            if (disposed) {
068                throw new IOException("Server has been disposed.");
069            }
070            al = acceptListener;
071        }
072        if (al == null) {
073            throw new IOException("Server TransportAcceptListener is null.");
074        }
075
076        connectionCount.incrementAndGet();
077        VMTransport client = new VMTransport(location) {
078            public void stop() throws Exception {
079                if (!disposed.get()) {
080                    super.stop();
081                    if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect) {
082                        VMTransportServer.this.stop();
083                    }
084                }
085            };
086        };
087
088        VMTransport server = new VMTransport(location);
089        client.setPeer(server);
090        server.setPeer(client);
091        al.onAccept(configure(server));
092        return client;
093    }
094
095    /**
096     * Configure transport
097     *
098     * @param transport
099     * @return the Transport
100     */
101    public static Transport configure(Transport transport) {
102        transport = new MutexTransport(transport);
103        transport = new ResponseCorrelator(transport);
104        return transport;
105    }
106
107    /**
108     * Set the Transport accept listener for new Connections
109     *
110     * @param acceptListener
111     */
112    public synchronized void setAcceptListener(TransportAcceptListener acceptListener) {
113        this.acceptListener = acceptListener;
114    }
115
116    public void start() throws IOException {
117    }
118
119    public void stop() throws IOException {
120        VMTransportFactory.stopped(this);
121    }
122
123    public URI getConnectURI() {
124        return location;
125    }
126
127    public URI getBindURI() {
128        return location;
129    }
130
131    public void setBrokerInfo(BrokerInfo brokerInfo) {
132    }
133
134    public InetSocketAddress getSocketAddress() {
135        return null;
136    }
137
138    public int getConnectionCount() {
139        return connectionCount.intValue();
140    }
141
142    @Override
143    public boolean isSslServer() {
144        return false;
145    }
146
147    @Override
148    public boolean isAllowLinkStealing() {
149        return allowLinkStealing;
150    }
151
152    public void setAllowLinkStealing(boolean allowLinkStealing) {
153        this.allowLinkStealing = allowLinkStealing;
154    }
155}