001package org.apache.activemq.transport.auto.nio;
002
003import java.io.IOException;
004import java.net.Socket;
005import java.net.URI;
006import java.net.URISyntaxException;
007import java.nio.ByteBuffer;
008import java.util.HashMap;
009import java.util.Set;
010import java.util.concurrent.Future;
011
012import javax.net.ServerSocketFactory;
013import javax.net.ssl.SSLContext;
014import javax.net.ssl.SSLEngine;
015
016import org.apache.activemq.broker.BrokerService;
017import org.apache.activemq.broker.BrokerServiceAware;
018import org.apache.activemq.transport.Transport;
019import org.apache.activemq.transport.auto.AutoTcpTransportServer;
020import org.apache.activemq.transport.nio.AutoInitNioSSLTransport;
021import org.apache.activemq.transport.nio.NIOSSLTransport;
022import org.apache.activemq.transport.tcp.TcpTransport;
023import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
024import org.apache.activemq.transport.tcp.TcpTransportFactory;
025import org.apache.activemq.transport.tcp.TcpTransportServer;
026import org.apache.activemq.util.IntrospectionSupport;
027import org.apache.activemq.wireformat.WireFormat;
028
029/**
030 * Licensed to the Apache Software Foundation (ASF) under one or more
031 * contributor license agreements.  See the NOTICE file distributed with
032 * this work for additional information regarding copyright ownership.
033 * The ASF licenses this file to You under the Apache License, Version 2.0
034 * (the "License"); you may not use this file except in compliance with
035 * the License.  You may obtain a copy of the License at
036 *
037 *      http://www.apache.org/licenses/LICENSE-2.0
038 *
039 * Unless required by applicable law or agreed to in writing, software
040 * distributed under the License is distributed on an "AS IS" BASIS,
041 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
042 * See the License for the specific language governing permissions and
043 * limitations under the License.
044 */
045public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
046
047    private SSLContext context;
048
049    public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory,
050            BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException {
051        super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols);
052
053        this.context = context;
054    }
055
056    private boolean needClientAuth;
057    private boolean wantClientAuth;
058
059    protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine,
060            InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException {
061        NIOSSLTransport transport = new NIOSSLTransport(format, socket, engine, initBuffer, inputBuffer);
062        if (context != null) {
063            transport.setSslContext(context);
064        }
065
066        transport.setNeedClientAuth(needClientAuth);
067        transport.setWantClientAuth(wantClientAuth);
068
069
070        return transport;
071    }
072
073    @Override
074    protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException {
075        throw new UnsupportedOperationException("method not supported");
076    }
077
078    @Override
079    public boolean isSslServer() {
080        return true;
081    }
082
083    public boolean isNeedClientAuth() {
084        return this.needClientAuth;
085    }
086
087    public void setNeedClientAuth(boolean value) {
088        this.needClientAuth = value;
089    }
090
091    public boolean isWantClientAuth() {
092        return this.wantClientAuth;
093    }
094
095    public void setWantClientAuth(boolean value) {
096        this.wantClientAuth = value;
097    }
098
099
100    @Override
101    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
102        //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format
103        //The wireformat doesn't need properties set here because we aren't using this format during the SSL handshake
104        final AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket);
105        if (context != null) {
106            in.setSslContext(context);
107        }
108        //We need to set the transport options on the init transport so that the SSL options are set
109        if (transportOptions != null) {
110            //Clone the map because we will need to set the options later on the actual transport
111            IntrospectionSupport.setProperties(in, new HashMap<>(transportOptions));
112        }
113
114        //Attempt to read enough bytes to detect the protocol until the timeout period
115        //is reached
116        Future<?> future = protocolDetectionExecutor.submit(new Runnable() {
117            @Override
118            public void run() {
119                try {
120                    in.start();
121                } catch (Exception e) {
122                    throw new IllegalStateException("Could not complete Transport start", e);
123                }
124
125                int attempts = 0;
126                do {
127                    if(attempts > 0) {
128                        try {
129                            //increase sleep period each attempt to prevent high cpu usage
130                            //if the client is hung and not sending bytes
131                            int sleep = attempts >= 1024 ? 1024 : 4 * attempts;
132                            Thread.sleep(sleep);
133                        } catch (InterruptedException e) {
134                            break;
135                        }
136                    }
137                    //In the future it might be better to register a nonblocking selector
138                    //to be told when bytes are ready
139                    in.serviceRead();
140                    attempts++;
141                } while(in.getReadSize().get() < 8 && !Thread.interrupted());
142            }
143        });
144
145        try {
146            //If this fails and throws an exception and the socket will be closed
147            waitForProtocolDetectionFinish(future, in.getReadSize());
148        } finally {
149            //call cancel in case task didn't complete which will interrupt the task
150            future.cancel(true);
151        }
152        in.stop();
153
154        InitBuffer initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));
155        initBuffer.buffer.put(in.getReadData());
156
157        ProtocolInfo protocolInfo = detectProtocol(in.getReadData());
158
159        if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) {
160            ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
161        }
162
163        WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat();
164        Transport transport = createTransport(socket, format, in.getSslSession(), initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory);
165
166        return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory);
167    }
168
169}
170
171