001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.ClosedChannelException;
030import java.nio.channels.SelectionKey;
031import java.nio.channels.Selector;
032import java.nio.channels.ServerSocketChannel;
033import java.nio.channels.SocketChannel;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.Set;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import javax.net.ServerSocketFactory;
043import javax.net.ssl.SSLServerSocket;
044
045import org.apache.activemq.Service;
046import org.apache.activemq.ThreadPriorities;
047import org.apache.activemq.TransportLoggerSupport;
048import org.apache.activemq.command.BrokerInfo;
049import org.apache.activemq.openwire.OpenWireFormatFactory;
050import org.apache.activemq.transport.Transport;
051import org.apache.activemq.transport.TransportFactory;
052import org.apache.activemq.transport.TransportServer;
053import org.apache.activemq.transport.TransportServerThreadSupport;
054import org.apache.activemq.util.IOExceptionSupport;
055import org.apache.activemq.util.InetAddressUtil;
056import org.apache.activemq.util.IntrospectionSupport;
057import org.apache.activemq.util.ServiceListener;
058import org.apache.activemq.util.ServiceStopper;
059import org.apache.activemq.util.ServiceSupport;
060import org.apache.activemq.wireformat.WireFormat;
061import org.apache.activemq.wireformat.WireFormatFactory;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A TCP based implementation of {@link TransportServer}
067 */
068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
069
070    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
071
072    protected volatile ServerSocket serverSocket;
073    protected volatile Selector selector;
074    protected int backlog = 5000;
075    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
076    protected final TcpTransportFactory transportFactory;
077    protected long maxInactivityDuration = 30000;
078    protected long maxInactivityDurationInitalDelay = 10000;
079    protected int minmumWireFormatVersion;
080    protected boolean useQueueForAccept = true;
081    protected boolean allowLinkStealing;
082
083    /**
084     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
085     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
086     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
087     * TransportConnector URIs.
088     */
089    protected boolean trace = false;
090
091    protected int soTimeout = 0;
092    protected int socketBufferSize = 64 * 1024;
093    protected int connectionTimeout = 30000;
094
095    /**
096     * Name of the LogWriter implementation to use. Names are mapped to classes in the
097     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
098     * set in Connection or TransportConnector URIs.
099     */
100    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
101
102    /**
103     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
104     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
105     */
106    protected boolean dynamicManagement = false;
107
108    /**
109     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
110     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
111     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
112     * TransportConnector URIs.
113     */
114    protected boolean startLogging = true;
115    protected int jmxPort = TransportLoggerSupport.defaultJmxPort;
116    protected final ServerSocketFactory serverSocketFactory;
117    protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
118    protected Thread socketHandlerThread;
119
120    /**
121     * The maximum number of sockets allowed for this server
122     */
123    protected int maximumConnections = Integer.MAX_VALUE;
124    protected final AtomicInteger currentTransportCount = new AtomicInteger();
125
126    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
127        URISyntaxException {
128        super(location);
129        this.transportFactory = transportFactory;
130        this.serverSocketFactory = serverSocketFactory;
131    }
132
133    public void bind() throws IOException {
134        URI bind = getBindLocation();
135
136        String host = bind.getHost();
137        host = (host == null || host.length() == 0) ? "localhost" : host;
138        InetAddress addr = InetAddress.getByName(host);
139
140        try {
141            serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
142            configureServerSocket(serverSocket);
143        } catch (IOException e) {
144            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
145        }
146        try {
147            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
148                bind.getQuery(), bind.getFragment()));
149        } catch (URISyntaxException e) {
150            // it could be that the host name contains invalid characters such
151            // as _ on unix platforms so lets try use the IP address instead
152            try {
153                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
154                    bind.getQuery(), bind.getFragment()));
155            } catch (URISyntaxException e2) {
156                throw IOExceptionSupport.create(e2);
157            }
158        }
159    }
160
161    private void configureServerSocket(ServerSocket socket) throws SocketException {
162        socket.setSoTimeout(2000);
163        if (transportOptions != null) {
164
165            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
166            // to SSLServerSocket to configure it has a side effect on the socket rendering it
167            // useless as all suites are enabled many of which are considered as insecure.  We
168            // instead trap that option here and throw an exception.  We should really consider
169            // all invalid options as breaking and not start the transport but the current design
170            // doesn't really allow for this.
171            //
172            //  see: https://issues.apache.org/jira/browse/AMQ-4582
173            //
174            if (socket instanceof SSLServerSocket) {
175                if (transportOptions.containsKey("enabledCipherSuites")) {
176                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
177
178                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
179                        throw new SocketException(String.format(
180                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
181                    }
182                }
183            }
184
185            //AMQ-6599 - don't strip out set properties on the socket as we need to set them
186            //on the Transport as well later
187            IntrospectionSupport.setProperties(socket, transportOptions, false);
188        }
189    }
190
191    /**
192     * @return Returns the wireFormatFactory.
193     */
194    public WireFormatFactory getWireFormatFactory() {
195        return wireFormatFactory;
196    }
197
198    /**
199     * @param wireFormatFactory
200     *            The wireFormatFactory to set.
201     */
202    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
203        this.wireFormatFactory = wireFormatFactory;
204    }
205
206    /**
207     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
208     * broker.
209     *
210     * @param brokerInfo
211     */
212    @Override
213    public void setBrokerInfo(BrokerInfo brokerInfo) {
214    }
215
216    public long getMaxInactivityDuration() {
217        return maxInactivityDuration;
218    }
219
220    public void setMaxInactivityDuration(long maxInactivityDuration) {
221        this.maxInactivityDuration = maxInactivityDuration;
222    }
223
224    public long getMaxInactivityDurationInitalDelay() {
225        return this.maxInactivityDurationInitalDelay;
226    }
227
228    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
229        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
230    }
231
232    public int getMinmumWireFormatVersion() {
233        return minmumWireFormatVersion;
234    }
235
236    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
237        this.minmumWireFormatVersion = minmumWireFormatVersion;
238    }
239
240    public boolean isTrace() {
241        return trace;
242    }
243
244    public void setTrace(boolean trace) {
245        this.trace = trace;
246    }
247
248    public String getLogWriterName() {
249        return logWriterName;
250    }
251
252    public void setLogWriterName(String logFormat) {
253        this.logWriterName = logFormat;
254    }
255
256    public boolean isDynamicManagement() {
257        return dynamicManagement;
258    }
259
260    public void setDynamicManagement(boolean useJmx) {
261        this.dynamicManagement = useJmx;
262    }
263
264    public void setJmxPort(int jmxPort) {
265        this.jmxPort = jmxPort;
266    }
267
268    public int getJmxPort() {
269        return jmxPort;
270    }
271
272    public boolean isStartLogging() {
273        return startLogging;
274    }
275
276    public void setStartLogging(boolean startLogging) {
277        this.startLogging = startLogging;
278    }
279
280    /**
281     * @return the backlog
282     */
283    public int getBacklog() {
284        return backlog;
285    }
286
287    /**
288     * @param backlog
289     *            the backlog to set
290     */
291    public void setBacklog(int backlog) {
292        this.backlog = backlog;
293    }
294
295    /**
296     * @return the useQueueForAccept
297     */
298    public boolean isUseQueueForAccept() {
299        return useQueueForAccept;
300    }
301
302    /**
303     * @param useQueueForAccept
304     *            the useQueueForAccept to set
305     */
306    public void setUseQueueForAccept(boolean useQueueForAccept) {
307        this.useQueueForAccept = useQueueForAccept;
308    }
309
310    /**
311     * pull Sockets from the ServerSocket
312     */
313    @Override
314    public void run() {
315        if (!isStopped() && !isStopping()) {
316            final ServerSocket serverSocket = this.serverSocket;
317            if (serverSocket == null) {
318                onAcceptError(new IOException("Server started without a valid ServerSocket"));
319            }
320
321            final ServerSocketChannel channel = serverSocket.getChannel();
322            if (channel != null) {
323                doRunWithServerSocketChannel(channel);
324            } else {
325                doRunWithServerSocket(serverSocket);
326            }
327        }
328    }
329
330    private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
331        try {
332            channel.configureBlocking(false);
333            final Selector selector = Selector.open();
334
335            try {
336                channel.register(selector, SelectionKey.OP_ACCEPT);
337            } catch (ClosedChannelException ex) {
338                try {
339                    selector.close();
340                } catch (IOException ignore) {}
341
342                throw ex;
343            }
344
345            // Update object instance for later cleanup.
346            this.selector = selector;
347
348            while (!isStopped()) {
349                int count = selector.select(10);
350
351                if (count == 0) {
352                    continue;
353                }
354
355                Set<SelectionKey> keys = selector.selectedKeys();
356
357                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
358                    final SelectionKey key = i.next();
359                    if (key.isAcceptable()) {
360                        try {
361                            SocketChannel sc = channel.accept();
362                            if (sc != null) {
363                                if (isStopped() || getAcceptListener() == null) {
364                                    sc.close();
365                                } else {
366                                    if (useQueueForAccept) {
367                                        socketQueue.put(sc.socket());
368                                    } else {
369                                        handleSocket(sc.socket());
370                                    }
371                                }
372                            }
373
374                        } catch (SocketTimeoutException ste) {
375                            // expect this to happen
376                        } catch (Exception e) {
377                            e.printStackTrace();
378                            if (!isStopping()) {
379                                onAcceptError(e);
380                            } else if (!isStopped()) {
381                                LOG.warn("run()", e);
382                                onAcceptError(e);
383                            }
384                        }
385                    }
386                    i.remove();
387                }
388            }
389        } catch (IOException ex) {
390            if (!isStopping()) {
391                onAcceptError(ex);
392            } else if (!isStopped()) {
393                LOG.warn("run()", ex);
394                onAcceptError(ex);
395            }
396        }
397    }
398
399    private void doRunWithServerSocket(final ServerSocket serverSocket) {
400        while (!isStopped()) {
401            Socket socket = null;
402            try {
403                socket = serverSocket.accept();
404                if (socket != null) {
405                    if (isStopped() || getAcceptListener() == null) {
406                        socket.close();
407                    } else {
408                        if (useQueueForAccept) {
409                            socketQueue.put(socket);
410                        } else {
411                            handleSocket(socket);
412                        }
413                    }
414                }
415            } catch (SocketTimeoutException ste) {
416                // expect this to happen
417            } catch (Exception e) {
418                if (!isStopping()) {
419                    onAcceptError(e);
420                } else if (!isStopped()) {
421                    LOG.warn("run()", e);
422                    onAcceptError(e);
423                }
424            }
425        }
426    }
427
428    /**
429     * Allow derived classes to override the Transport implementation that this transport server creates.
430     *
431     * @param socket
432     * @param format
433     *
434     * @return a new Transport instance.
435     *
436     * @throws IOException
437     */
438    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
439        return new TcpTransport(format, socket);
440    }
441
442    /**
443     * @return pretty print of this
444     */
445    @Override
446    public String toString() {
447        return "" + getBindLocation();
448    }
449
450    /**
451     * @param socket
452     * @param bindAddress
453     * @return real hostName
454     * @throws UnknownHostException
455     */
456    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
457        String result = null;
458        if (socket.isBound()) {
459            if (socket.getInetAddress().isAnyLocalAddress()) {
460                // make it more human readable and useful, an alternative to 0.0.0.0
461                result = InetAddressUtil.getLocalHostName();
462            } else {
463                result = socket.getInetAddress().getCanonicalHostName();
464            }
465        } else {
466            result = bindAddress.getCanonicalHostName();
467        }
468        return result;
469    }
470
471    @Override
472    protected void doStart() throws Exception {
473        if (useQueueForAccept) {
474            Runnable run = new Runnable() {
475                @Override
476                public void run() {
477                    try {
478                        while (!isStopped() && !isStopping()) {
479                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
480                            if (sock != null) {
481                                try {
482                                    handleSocket(sock);
483                                } catch (Throwable thrown) {
484                                    if (!isStopping()) {
485                                        onAcceptError(new Exception(thrown));
486                                    } else if (!isStopped()) {
487                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
488                                        onAcceptError(new Exception(thrown));
489                                    }
490                                }
491                            }
492                        }
493
494                    } catch (InterruptedException e) {
495                        if (!isStopped() || !isStopping()) {
496                            LOG.info("socketQueue interrupted - stopping");
497                            onAcceptError(e);
498                        }
499                    }
500                }
501            };
502            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
503            socketHandlerThread.setDaemon(true);
504            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
505            socketHandlerThread.start();
506        }
507        super.doStart();
508    }
509
510    @Override
511    protected void doStop(ServiceStopper stopper) throws Exception {
512        Exception firstFailure = null;
513
514        try {
515            if (selector != null) {
516                selector.close();
517                selector = null;
518            }
519        } catch (Exception error) {
520        }
521
522        try {
523            final ServerSocket serverSocket = this.serverSocket;
524            if (serverSocket != null) {
525                this.serverSocket = null;
526                serverSocket.close();
527            }
528        } catch (Exception error) {
529            firstFailure = error;
530        }
531
532        if (socketHandlerThread != null) {
533            socketHandlerThread.interrupt();
534            socketHandlerThread = null;
535        }
536
537        try {
538            super.doStop(stopper);
539        } catch (Exception error) {
540            if (firstFailure != null) {
541                firstFailure = error;
542            }
543        }
544
545        if (firstFailure != null) {
546            throw firstFailure;
547        }
548    }
549
550    @Override
551    public InetSocketAddress getSocketAddress() {
552        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
553    }
554
555    protected void handleSocket(Socket socket) {
556        doHandleSocket(socket);
557    }
558
559    final protected void doHandleSocket(Socket socket) {
560        boolean closeSocket = true;
561        boolean countIncremented = false;
562        try {
563            int currentCount;
564            do {
565                currentCount = currentTransportCount.get();
566                if (currentCount >= this.maximumConnections) {
567                     throw new ExceededMaximumConnectionsException(
568                         "Exceeded the maximum number of allowed client connections. See the '" +
569                         "maximumConnections' property on the TCP transport configuration URI " +
570                         "in the ActiveMQ configuration file (e.g., activemq.xml)");
571                 }
572
573            //Increment this value before configuring the transport
574            //This is necessary because some of the transport servers must read from the
575            //socket during configureTransport() so we want to make sure this value is
576            //accurate as the transport server could pause here waiting for data to be sent from a client
577            } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1));
578            countIncremented = true;
579
580            HashMap<String, Object> options = new HashMap<String, Object>();
581            options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
582            options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
583            options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
584            options.put("trace", Boolean.valueOf(trace));
585            options.put("soTimeout", Integer.valueOf(soTimeout));
586            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
587            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
588            options.put("logWriterName", logWriterName);
589            options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
590            options.put("startLogging", Boolean.valueOf(startLogging));
591            options.put("jmxPort", Integer.valueOf(jmxPort));
592            options.putAll(transportOptions);
593
594            TransportInfo transportInfo = configureTransport(this, socket);
595            closeSocket = false;
596
597            if (transportInfo.transport instanceof ServiceSupport) {
598                ((ServiceSupport) transportInfo.transport).addServiceListener(this);
599            }
600
601            Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
602                    transportInfo.transport, transportInfo.format, options);
603
604            getAcceptListener().onAccept(configuredTransport);
605
606        } catch (SocketTimeoutException ste) {
607            // expect this to happen
608        } catch (Exception e) {
609            if (closeSocket) {
610                try {
611                    //if closing the socket, only decrement the count it was actually incremented
612                    //where it was incremented
613                    if (countIncremented) {
614                        currentTransportCount.decrementAndGet();
615                    }
616                    socket.close();
617                } catch (Exception ignore) {
618                }
619            }
620
621            if (!isStopping()) {
622                onAcceptError(e);
623            } else if (!isStopped()) {
624                LOG.warn("run()", e);
625                onAcceptError(e);
626            }
627        }
628    }
629
630    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
631        WireFormat format = wireFormatFactory.createWireFormat();
632        Transport transport = createTransport(socket, format);
633        return new TransportInfo(format, transport, transportFactory);
634    }
635
636    protected class TransportInfo {
637        final WireFormat format;
638        final Transport transport;
639        final TransportFactory transportFactory;
640
641        public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
642            this.format = format;
643            this.transport = transport;
644            this.transportFactory = transportFactory;
645        }
646    }
647
648    public int getSoTimeout() {
649        return soTimeout;
650    }
651
652    public void setSoTimeout(int soTimeout) {
653        this.soTimeout = soTimeout;
654    }
655
656    public int getSocketBufferSize() {
657        return socketBufferSize;
658    }
659
660    public void setSocketBufferSize(int socketBufferSize) {
661        this.socketBufferSize = socketBufferSize;
662    }
663
664    public int getConnectionTimeout() {
665        return connectionTimeout;
666    }
667
668    public void setConnectionTimeout(int connectionTimeout) {
669        this.connectionTimeout = connectionTimeout;
670    }
671
672    /**
673     * @return the maximumConnections
674     */
675    public int getMaximumConnections() {
676        return maximumConnections;
677    }
678
679    /**
680     * @param maximumConnections
681     *            the maximumConnections to set
682     */
683    public void setMaximumConnections(int maximumConnections) {
684        this.maximumConnections = maximumConnections;
685    }
686
687    public AtomicInteger getCurrentTransportCount() {
688        return currentTransportCount;
689    }
690
691    @Override
692    public void started(Service service) {
693    }
694
695    @Override
696    public void stopped(Service service) {
697        this.currentTransportCount.decrementAndGet();
698    }
699
700    @Override
701    public boolean isSslServer() {
702        return false;
703    }
704
705    @Override
706    public boolean isAllowLinkStealing() {
707        return allowLinkStealing;
708    }
709
710    @Override
711    public void setAllowLinkStealing(boolean allowLinkStealing) {
712        this.allowLinkStealing = allowLinkStealing;
713    }
714}