001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.nio.channels.ClosedChannelException; 030import java.nio.channels.SelectionKey; 031import java.nio.channels.Selector; 032import java.nio.channels.ServerSocketChannel; 033import java.nio.channels.SocketChannel; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.Set; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import javax.net.ServerSocketFactory; 043import javax.net.ssl.SSLServerSocket; 044 045import org.apache.activemq.Service; 046import org.apache.activemq.ThreadPriorities; 047import org.apache.activemq.TransportLoggerSupport; 048import org.apache.activemq.command.BrokerInfo; 049import org.apache.activemq.openwire.OpenWireFormatFactory; 050import org.apache.activemq.transport.Transport; 051import org.apache.activemq.transport.TransportFactory; 052import org.apache.activemq.transport.TransportServer; 053import org.apache.activemq.transport.TransportServerThreadSupport; 054import org.apache.activemq.util.IOExceptionSupport; 055import org.apache.activemq.util.InetAddressUtil; 056import org.apache.activemq.util.IntrospectionSupport; 057import org.apache.activemq.util.ServiceListener; 058import org.apache.activemq.util.ServiceStopper; 059import org.apache.activemq.util.ServiceSupport; 060import org.apache.activemq.wireformat.WireFormat; 061import org.apache.activemq.wireformat.WireFormatFactory; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * A TCP based implementation of {@link TransportServer} 067 */ 068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { 069 070 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 071 072 protected volatile ServerSocket serverSocket; 073 protected volatile Selector selector; 074 protected int backlog = 5000; 075 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 076 protected final TcpTransportFactory transportFactory; 077 protected long maxInactivityDuration = 30000; 078 protected long maxInactivityDurationInitalDelay = 10000; 079 protected int minmumWireFormatVersion; 080 protected boolean useQueueForAccept = true; 081 protected boolean allowLinkStealing; 082 083 /** 084 * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer 085 * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, 086 * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or 087 * TransportConnector URIs. 088 */ 089 protected boolean trace = false; 090 091 protected int soTimeout = 0; 092 protected int socketBufferSize = 64 * 1024; 093 protected int connectionTimeout = 30000; 094 095 /** 096 * Name of the LogWriter implementation to use. Names are mapped to classes in the 097 * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably 098 * set in Connection or TransportConnector URIs. 099 */ 100 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 101 102 /** 103 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 104 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. 105 */ 106 protected boolean dynamicManagement = false; 107 108 /** 109 * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. 110 * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the 111 * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or 112 * TransportConnector URIs. 113 */ 114 protected boolean startLogging = true; 115 protected int jmxPort = TransportLoggerSupport.defaultJmxPort; 116 protected final ServerSocketFactory serverSocketFactory; 117 protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 118 protected Thread socketHandlerThread; 119 120 /** 121 * The maximum number of sockets allowed for this server 122 */ 123 protected int maximumConnections = Integer.MAX_VALUE; 124 protected final AtomicInteger currentTransportCount = new AtomicInteger(); 125 126 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, 127 URISyntaxException { 128 super(location); 129 this.transportFactory = transportFactory; 130 this.serverSocketFactory = serverSocketFactory; 131 } 132 133 public void bind() throws IOException { 134 URI bind = getBindLocation(); 135 136 String host = bind.getHost(); 137 host = (host == null || host.length() == 0) ? "localhost" : host; 138 InetAddress addr = InetAddress.getByName(host); 139 140 try { 141 serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 142 configureServerSocket(serverSocket); 143 } catch (IOException e) { 144 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 145 } 146 try { 147 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), 148 bind.getQuery(), bind.getFragment())); 149 } catch (URISyntaxException e) { 150 // it could be that the host name contains invalid characters such 151 // as _ on unix platforms so lets try use the IP address instead 152 try { 153 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 154 bind.getQuery(), bind.getFragment())); 155 } catch (URISyntaxException e2) { 156 throw IOExceptionSupport.create(e2); 157 } 158 } 159 } 160 161 private void configureServerSocket(ServerSocket socket) throws SocketException { 162 socket.setSoTimeout(2000); 163 if (transportOptions != null) { 164 165 // If the enabledCipherSuites option is invalid we don't want to ignore it as the call 166 // to SSLServerSocket to configure it has a side effect on the socket rendering it 167 // useless as all suites are enabled many of which are considered as insecure. We 168 // instead trap that option here and throw an exception. We should really consider 169 // all invalid options as breaking and not start the transport but the current design 170 // doesn't really allow for this. 171 // 172 // see: https://issues.apache.org/jira/browse/AMQ-4582 173 // 174 if (socket instanceof SSLServerSocket) { 175 if (transportOptions.containsKey("enabledCipherSuites")) { 176 Object cipherSuites = transportOptions.remove("enabledCipherSuites"); 177 178 if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) { 179 throw new SocketException(String.format( 180 "Invalid transport options {enabledCipherSuites=%s}", cipherSuites)); 181 } 182 } 183 } 184 185 //AMQ-6599 - don't strip out set properties on the socket as we need to set them 186 //on the Transport as well later 187 IntrospectionSupport.setProperties(socket, transportOptions, false); 188 } 189 } 190 191 /** 192 * @return Returns the wireFormatFactory. 193 */ 194 public WireFormatFactory getWireFormatFactory() { 195 return wireFormatFactory; 196 } 197 198 /** 199 * @param wireFormatFactory 200 * The wireFormatFactory to set. 201 */ 202 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 203 this.wireFormatFactory = wireFormatFactory; 204 } 205 206 /** 207 * Associates a broker info with the transport server so that the transport can do discovery advertisements of the 208 * broker. 209 * 210 * @param brokerInfo 211 */ 212 @Override 213 public void setBrokerInfo(BrokerInfo brokerInfo) { 214 } 215 216 public long getMaxInactivityDuration() { 217 return maxInactivityDuration; 218 } 219 220 public void setMaxInactivityDuration(long maxInactivityDuration) { 221 this.maxInactivityDuration = maxInactivityDuration; 222 } 223 224 public long getMaxInactivityDurationInitalDelay() { 225 return this.maxInactivityDurationInitalDelay; 226 } 227 228 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 229 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 230 } 231 232 public int getMinmumWireFormatVersion() { 233 return minmumWireFormatVersion; 234 } 235 236 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 237 this.minmumWireFormatVersion = minmumWireFormatVersion; 238 } 239 240 public boolean isTrace() { 241 return trace; 242 } 243 244 public void setTrace(boolean trace) { 245 this.trace = trace; 246 } 247 248 public String getLogWriterName() { 249 return logWriterName; 250 } 251 252 public void setLogWriterName(String logFormat) { 253 this.logWriterName = logFormat; 254 } 255 256 public boolean isDynamicManagement() { 257 return dynamicManagement; 258 } 259 260 public void setDynamicManagement(boolean useJmx) { 261 this.dynamicManagement = useJmx; 262 } 263 264 public void setJmxPort(int jmxPort) { 265 this.jmxPort = jmxPort; 266 } 267 268 public int getJmxPort() { 269 return jmxPort; 270 } 271 272 public boolean isStartLogging() { 273 return startLogging; 274 } 275 276 public void setStartLogging(boolean startLogging) { 277 this.startLogging = startLogging; 278 } 279 280 /** 281 * @return the backlog 282 */ 283 public int getBacklog() { 284 return backlog; 285 } 286 287 /** 288 * @param backlog 289 * the backlog to set 290 */ 291 public void setBacklog(int backlog) { 292 this.backlog = backlog; 293 } 294 295 /** 296 * @return the useQueueForAccept 297 */ 298 public boolean isUseQueueForAccept() { 299 return useQueueForAccept; 300 } 301 302 /** 303 * @param useQueueForAccept 304 * the useQueueForAccept to set 305 */ 306 public void setUseQueueForAccept(boolean useQueueForAccept) { 307 this.useQueueForAccept = useQueueForAccept; 308 } 309 310 /** 311 * pull Sockets from the ServerSocket 312 */ 313 @Override 314 public void run() { 315 if (!isStopped() && !isStopping()) { 316 final ServerSocket serverSocket = this.serverSocket; 317 if (serverSocket == null) { 318 onAcceptError(new IOException("Server started without a valid ServerSocket")); 319 } 320 321 final ServerSocketChannel channel = serverSocket.getChannel(); 322 if (channel != null) { 323 doRunWithServerSocketChannel(channel); 324 } else { 325 doRunWithServerSocket(serverSocket); 326 } 327 } 328 } 329 330 private void doRunWithServerSocketChannel(final ServerSocketChannel channel) { 331 try { 332 channel.configureBlocking(false); 333 final Selector selector = Selector.open(); 334 335 try { 336 channel.register(selector, SelectionKey.OP_ACCEPT); 337 } catch (ClosedChannelException ex) { 338 try { 339 selector.close(); 340 } catch (IOException ignore) {} 341 342 throw ex; 343 } 344 345 // Update object instance for later cleanup. 346 this.selector = selector; 347 348 while (!isStopped()) { 349 int count = selector.select(10); 350 351 if (count == 0) { 352 continue; 353 } 354 355 Set<SelectionKey> keys = selector.selectedKeys(); 356 357 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { 358 final SelectionKey key = i.next(); 359 if (key.isAcceptable()) { 360 try { 361 SocketChannel sc = channel.accept(); 362 if (sc != null) { 363 if (isStopped() || getAcceptListener() == null) { 364 sc.close(); 365 } else { 366 if (useQueueForAccept) { 367 socketQueue.put(sc.socket()); 368 } else { 369 handleSocket(sc.socket()); 370 } 371 } 372 } 373 374 } catch (SocketTimeoutException ste) { 375 // expect this to happen 376 } catch (Exception e) { 377 e.printStackTrace(); 378 if (!isStopping()) { 379 onAcceptError(e); 380 } else if (!isStopped()) { 381 LOG.warn("run()", e); 382 onAcceptError(e); 383 } 384 } 385 } 386 i.remove(); 387 } 388 } 389 } catch (IOException ex) { 390 if (!isStopping()) { 391 onAcceptError(ex); 392 } else if (!isStopped()) { 393 LOG.warn("run()", ex); 394 onAcceptError(ex); 395 } 396 } 397 } 398 399 private void doRunWithServerSocket(final ServerSocket serverSocket) { 400 while (!isStopped()) { 401 Socket socket = null; 402 try { 403 socket = serverSocket.accept(); 404 if (socket != null) { 405 if (isStopped() || getAcceptListener() == null) { 406 socket.close(); 407 } else { 408 if (useQueueForAccept) { 409 socketQueue.put(socket); 410 } else { 411 handleSocket(socket); 412 } 413 } 414 } 415 } catch (SocketTimeoutException ste) { 416 // expect this to happen 417 } catch (Exception e) { 418 if (!isStopping()) { 419 onAcceptError(e); 420 } else if (!isStopped()) { 421 LOG.warn("run()", e); 422 onAcceptError(e); 423 } 424 } 425 } 426 } 427 428 /** 429 * Allow derived classes to override the Transport implementation that this transport server creates. 430 * 431 * @param socket 432 * @param format 433 * 434 * @return a new Transport instance. 435 * 436 * @throws IOException 437 */ 438 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 439 return new TcpTransport(format, socket); 440 } 441 442 /** 443 * @return pretty print of this 444 */ 445 @Override 446 public String toString() { 447 return "" + getBindLocation(); 448 } 449 450 /** 451 * @param socket 452 * @param bindAddress 453 * @return real hostName 454 * @throws UnknownHostException 455 */ 456 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 457 String result = null; 458 if (socket.isBound()) { 459 if (socket.getInetAddress().isAnyLocalAddress()) { 460 // make it more human readable and useful, an alternative to 0.0.0.0 461 result = InetAddressUtil.getLocalHostName(); 462 } else { 463 result = socket.getInetAddress().getCanonicalHostName(); 464 } 465 } else { 466 result = bindAddress.getCanonicalHostName(); 467 } 468 return result; 469 } 470 471 @Override 472 protected void doStart() throws Exception { 473 if (useQueueForAccept) { 474 Runnable run = new Runnable() { 475 @Override 476 public void run() { 477 try { 478 while (!isStopped() && !isStopping()) { 479 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 480 if (sock != null) { 481 try { 482 handleSocket(sock); 483 } catch (Throwable thrown) { 484 if (!isStopping()) { 485 onAcceptError(new Exception(thrown)); 486 } else if (!isStopped()) { 487 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 488 onAcceptError(new Exception(thrown)); 489 } 490 } 491 } 492 } 493 494 } catch (InterruptedException e) { 495 if (!isStopped() || !isStopping()) { 496 LOG.info("socketQueue interrupted - stopping"); 497 onAcceptError(e); 498 } 499 } 500 } 501 }; 502 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 503 socketHandlerThread.setDaemon(true); 504 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 505 socketHandlerThread.start(); 506 } 507 super.doStart(); 508 } 509 510 @Override 511 protected void doStop(ServiceStopper stopper) throws Exception { 512 Exception firstFailure = null; 513 514 try { 515 if (selector != null) { 516 selector.close(); 517 selector = null; 518 } 519 } catch (Exception error) { 520 } 521 522 try { 523 final ServerSocket serverSocket = this.serverSocket; 524 if (serverSocket != null) { 525 this.serverSocket = null; 526 serverSocket.close(); 527 } 528 } catch (Exception error) { 529 firstFailure = error; 530 } 531 532 if (socketHandlerThread != null) { 533 socketHandlerThread.interrupt(); 534 socketHandlerThread = null; 535 } 536 537 try { 538 super.doStop(stopper); 539 } catch (Exception error) { 540 if (firstFailure != null) { 541 firstFailure = error; 542 } 543 } 544 545 if (firstFailure != null) { 546 throw firstFailure; 547 } 548 } 549 550 @Override 551 public InetSocketAddress getSocketAddress() { 552 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 553 } 554 555 protected void handleSocket(Socket socket) { 556 doHandleSocket(socket); 557 } 558 559 final protected void doHandleSocket(Socket socket) { 560 boolean closeSocket = true; 561 boolean countIncremented = false; 562 try { 563 int currentCount; 564 do { 565 currentCount = currentTransportCount.get(); 566 if (currentCount >= this.maximumConnections) { 567 throw new ExceededMaximumConnectionsException( 568 "Exceeded the maximum number of allowed client connections. See the '" + 569 "maximumConnections' property on the TCP transport configuration URI " + 570 "in the ActiveMQ configuration file (e.g., activemq.xml)"); 571 } 572 573 //Increment this value before configuring the transport 574 //This is necessary because some of the transport servers must read from the 575 //socket during configureTransport() so we want to make sure this value is 576 //accurate as the transport server could pause here waiting for data to be sent from a client 577 } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1)); 578 countIncremented = true; 579 580 HashMap<String, Object> options = new HashMap<String, Object>(); 581 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 582 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); 583 options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); 584 options.put("trace", Boolean.valueOf(trace)); 585 options.put("soTimeout", Integer.valueOf(soTimeout)); 586 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 587 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 588 options.put("logWriterName", logWriterName); 589 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 590 options.put("startLogging", Boolean.valueOf(startLogging)); 591 options.put("jmxPort", Integer.valueOf(jmxPort)); 592 options.putAll(transportOptions); 593 594 TransportInfo transportInfo = configureTransport(this, socket); 595 closeSocket = false; 596 597 if (transportInfo.transport instanceof ServiceSupport) { 598 ((ServiceSupport) transportInfo.transport).addServiceListener(this); 599 } 600 601 Transport configuredTransport = transportInfo.transportFactory.serverConfigure( 602 transportInfo.transport, transportInfo.format, options); 603 604 getAcceptListener().onAccept(configuredTransport); 605 606 } catch (SocketTimeoutException ste) { 607 // expect this to happen 608 } catch (Exception e) { 609 if (closeSocket) { 610 try { 611 //if closing the socket, only decrement the count it was actually incremented 612 //where it was incremented 613 if (countIncremented) { 614 currentTransportCount.decrementAndGet(); 615 } 616 socket.close(); 617 } catch (Exception ignore) { 618 } 619 } 620 621 if (!isStopping()) { 622 onAcceptError(e); 623 } else if (!isStopped()) { 624 LOG.warn("run()", e); 625 onAcceptError(e); 626 } 627 } 628 } 629 630 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 631 WireFormat format = wireFormatFactory.createWireFormat(); 632 Transport transport = createTransport(socket, format); 633 return new TransportInfo(format, transport, transportFactory); 634 } 635 636 protected class TransportInfo { 637 final WireFormat format; 638 final Transport transport; 639 final TransportFactory transportFactory; 640 641 public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) { 642 this.format = format; 643 this.transport = transport; 644 this.transportFactory = transportFactory; 645 } 646 } 647 648 public int getSoTimeout() { 649 return soTimeout; 650 } 651 652 public void setSoTimeout(int soTimeout) { 653 this.soTimeout = soTimeout; 654 } 655 656 public int getSocketBufferSize() { 657 return socketBufferSize; 658 } 659 660 public void setSocketBufferSize(int socketBufferSize) { 661 this.socketBufferSize = socketBufferSize; 662 } 663 664 public int getConnectionTimeout() { 665 return connectionTimeout; 666 } 667 668 public void setConnectionTimeout(int connectionTimeout) { 669 this.connectionTimeout = connectionTimeout; 670 } 671 672 /** 673 * @return the maximumConnections 674 */ 675 public int getMaximumConnections() { 676 return maximumConnections; 677 } 678 679 /** 680 * @param maximumConnections 681 * the maximumConnections to set 682 */ 683 public void setMaximumConnections(int maximumConnections) { 684 this.maximumConnections = maximumConnections; 685 } 686 687 public AtomicInteger getCurrentTransportCount() { 688 return currentTransportCount; 689 } 690 691 @Override 692 public void started(Service service) { 693 } 694 695 @Override 696 public void stopped(Service service) { 697 this.currentTransportCount.decrementAndGet(); 698 } 699 700 @Override 701 public boolean isSslServer() { 702 return false; 703 } 704 705 @Override 706 public boolean isAllowLinkStealing() { 707 return allowLinkStealing; 708 } 709 710 @Override 711 public void setAllowLinkStealing(boolean allowLinkStealing) { 712 this.allowLinkStealing = allowLinkStealing; 713 } 714}