001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.EOFException; 020import java.io.IOException; 021import java.net.SocketException; 022import java.net.URI; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Properties; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038 039import javax.transaction.xa.XAResource; 040 041import org.apache.activemq.advisory.AdvisorySupport; 042import org.apache.activemq.broker.region.ConnectionStatistics; 043import org.apache.activemq.broker.region.RegionBroker; 044import org.apache.activemq.command.ActiveMQDestination; 045import org.apache.activemq.command.BrokerInfo; 046import org.apache.activemq.command.BrokerSubscriptionInfo; 047import org.apache.activemq.command.Command; 048import org.apache.activemq.command.CommandTypes; 049import org.apache.activemq.command.ConnectionControl; 050import org.apache.activemq.command.ConnectionError; 051import org.apache.activemq.command.ConnectionId; 052import org.apache.activemq.command.ConnectionInfo; 053import org.apache.activemq.command.ConsumerControl; 054import org.apache.activemq.command.ConsumerId; 055import org.apache.activemq.command.ConsumerInfo; 056import org.apache.activemq.command.ControlCommand; 057import org.apache.activemq.command.DataArrayResponse; 058import org.apache.activemq.command.DestinationInfo; 059import org.apache.activemq.command.ExceptionResponse; 060import org.apache.activemq.command.FlushCommand; 061import org.apache.activemq.command.IntegerResponse; 062import org.apache.activemq.command.KeepAliveInfo; 063import org.apache.activemq.command.Message; 064import org.apache.activemq.command.MessageAck; 065import org.apache.activemq.command.MessageDispatch; 066import org.apache.activemq.command.MessageDispatchNotification; 067import org.apache.activemq.command.MessagePull; 068import org.apache.activemq.command.ProducerAck; 069import org.apache.activemq.command.ProducerId; 070import org.apache.activemq.command.ProducerInfo; 071import org.apache.activemq.command.RemoveInfo; 072import org.apache.activemq.command.RemoveSubscriptionInfo; 073import org.apache.activemq.command.Response; 074import org.apache.activemq.command.SessionId; 075import org.apache.activemq.command.SessionInfo; 076import org.apache.activemq.command.ShutdownInfo; 077import org.apache.activemq.command.TransactionId; 078import org.apache.activemq.command.TransactionInfo; 079import org.apache.activemq.command.WireFormatInfo; 080import org.apache.activemq.network.DemandForwardingBridge; 081import org.apache.activemq.network.MBeanNetworkListener; 082import org.apache.activemq.network.NetworkBridgeConfiguration; 083import org.apache.activemq.network.NetworkBridgeFactory; 084import org.apache.activemq.network.NetworkConnector; 085import org.apache.activemq.security.MessageAuthorizationPolicy; 086import org.apache.activemq.state.CommandVisitor; 087import org.apache.activemq.state.ConnectionState; 088import org.apache.activemq.state.ConsumerState; 089import org.apache.activemq.state.ProducerState; 090import org.apache.activemq.state.SessionState; 091import org.apache.activemq.state.TransactionState; 092import org.apache.activemq.thread.Task; 093import org.apache.activemq.thread.TaskRunner; 094import org.apache.activemq.thread.TaskRunnerFactory; 095import org.apache.activemq.transaction.Transaction; 096import org.apache.activemq.transport.DefaultTransportListener; 097import org.apache.activemq.transport.ResponseCorrelator; 098import org.apache.activemq.transport.TransmitCallback; 099import org.apache.activemq.transport.Transport; 100import org.apache.activemq.transport.TransportDisposedIOException; 101import org.apache.activemq.util.IntrospectionSupport; 102import org.apache.activemq.util.MarshallingSupport; 103import org.apache.activemq.util.NetworkBridgeUtils; 104import org.apache.activemq.util.SubscriptionKey; 105import org.slf4j.Logger; 106import org.slf4j.LoggerFactory; 107import org.slf4j.MDC; 108 109public class TransportConnection implements Connection, Task, CommandVisitor { 110 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 111 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 112 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 113 // Keeps track of the broker and connector that created this connection. 114 protected final Broker broker; 115 protected final BrokerService brokerService; 116 protected final TransportConnector connector; 117 // Keeps track of the state of the connections. 118 // protected final ConcurrentHashMap localConnectionStates=new 119 // ConcurrentHashMap(); 120 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 121 // The broker and wireformat info that was exchanged. 122 protected BrokerInfo brokerInfo; 123 protected final List<Command> dispatchQueue = new LinkedList<>(); 124 protected TaskRunner taskRunner; 125 protected final AtomicReference<Throwable> transportException = new AtomicReference<>(); 126 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 127 private final Transport transport; 128 private MessageAuthorizationPolicy messageAuthorizationPolicy; 129 private WireFormatInfo wireFormatInfo; 130 // Used to do async dispatch.. this should perhaps be pushed down into the 131 // transport layer.. 132 private boolean inServiceException; 133 private final ConnectionStatistics statistics = new ConnectionStatistics(); 134 private boolean manageable; 135 private boolean slow; 136 private boolean markedCandidate; 137 private boolean blockedCandidate; 138 private boolean blocked; 139 private boolean connected; 140 private boolean active; 141 private final AtomicBoolean starting = new AtomicBoolean(); 142 private final AtomicBoolean pendingStop = new AtomicBoolean(); 143 private long timeStamp; 144 private final AtomicBoolean stopping = new AtomicBoolean(false); 145 private final CountDownLatch stopped = new CountDownLatch(1); 146 private final AtomicBoolean asyncException = new AtomicBoolean(false); 147 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>(); 148 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>(); 149 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 150 private ConnectionContext context; 151 private boolean networkConnection; 152 private boolean faultTolerantConnection; 153 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 154 private DemandForwardingBridge duplexBridge; 155 private final TaskRunnerFactory taskRunnerFactory; 156 private final TaskRunnerFactory stopTaskRunnerFactory; 157 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 158 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 159 private String duplexNetworkConnectorId; 160 161 /** 162 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport 163 * else commands are sent async. 164 * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection. 165 */ 166 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 167 TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) { 168 this.connector = connector; 169 this.broker = broker; 170 this.brokerService = broker.getBrokerService(); 171 172 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 173 brokerConnectionStates = rb.getConnectionStates(); 174 if (connector != null) { 175 this.statistics.setParent(connector.getStatistics()); 176 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 177 } 178 this.taskRunnerFactory = taskRunnerFactory; 179 this.stopTaskRunnerFactory = stopTaskRunnerFactory; 180 this.transport = transport; 181 if( this.transport instanceof BrokerServiceAware ) { 182 ((BrokerServiceAware)this.transport).setBrokerService(brokerService); 183 } 184 this.transport.setTransportListener(new DefaultTransportListener() { 185 @Override 186 public void onCommand(Object o) { 187 serviceLock.readLock().lock(); 188 try { 189 if (!(o instanceof Command)) { 190 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 191 } 192 Command command = (Command) o; 193 if (!brokerService.isStopping()) { 194 Response response = service(command); 195 if (response != null && !brokerService.isStopping()) { 196 dispatchSync(response); 197 } 198 } else { 199 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 200 } 201 } finally { 202 serviceLock.readLock().unlock(); 203 } 204 } 205 206 @Override 207 public void onException(IOException exception) { 208 serviceLock.readLock().lock(); 209 try { 210 serviceTransportException(exception); 211 } finally { 212 serviceLock.readLock().unlock(); 213 } 214 } 215 }); 216 connected = true; 217 } 218 219 /** 220 * Returns the number of messages to be dispatched to this connection 221 * 222 * @return size of dispatch queue 223 */ 224 @Override 225 public int getDispatchQueueSize() { 226 synchronized (dispatchQueue) { 227 return dispatchQueue.size(); 228 } 229 } 230 231 public void serviceTransportException(IOException e) { 232 if (!stopping.get() && !pendingStop.get()) { 233 transportException.set(e); 234 if (TRANSPORTLOG.isDebugEnabled()) { 235 TRANSPORTLOG.debug(this + " failed: " + e, e); 236 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { 237 TRANSPORTLOG.warn(this + " failed: " + e); 238 } 239 stopAsync(e); 240 } 241 } 242 243 private boolean expected(IOException e) { 244 return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); 245 } 246 247 private boolean isStomp() { 248 URI uri = connector.getUri(); 249 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1; 250 } 251 252 /** 253 * Calls the serviceException method in an async thread. Since handling a 254 * service exception closes a socket, we should not tie up broker threads 255 * since client sockets may hang or cause deadlocks. 256 */ 257 @Override 258 public void serviceExceptionAsync(final IOException e) { 259 if (asyncException.compareAndSet(false, true)) { 260 new Thread("Async Exception Handler") { 261 @Override 262 public void run() { 263 serviceException(e); 264 } 265 }.start(); 266 } 267 } 268 269 /** 270 * Closes a clients connection due to a detected error. Errors are ignored 271 * if: the client is closing or broker is closing. Otherwise, the connection 272 * error transmitted to the client before stopping it's transport. 273 */ 274 @Override 275 public void serviceException(Throwable e) { 276 // are we a transport exception such as not being able to dispatch 277 // synchronously to a transport 278 if (e instanceof IOException) { 279 serviceTransportException((IOException) e); 280 } else if (e.getClass() == BrokerStoppedException.class) { 281 // Handle the case where the broker is stopped 282 // But the client is still connected. 283 if (!stopping.get()) { 284 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 285 ConnectionError ce = new ConnectionError(); 286 ce.setException(e); 287 dispatchSync(ce); 288 // Record the error that caused the transport to stop 289 transportException.set(e); 290 // Wait a little bit to try to get the output buffer to flush 291 // the exception notification to the client. 292 try { 293 Thread.sleep(500); 294 } catch (InterruptedException ie) { 295 Thread.currentThread().interrupt(); 296 } 297 // Worst case is we just kill the connection before the 298 // notification gets to him. 299 stopAsync(); 300 } 301 } else if (!stopping.get() && !inServiceException) { 302 inServiceException = true; 303 try { 304 if (SERVICELOG.isDebugEnabled()) { 305 SERVICELOG.debug("Async error occurred: " + e, e); 306 } else { 307 SERVICELOG.warn("Async error occurred: " + e); 308 } 309 ConnectionError ce = new ConnectionError(); 310 ce.setException(e); 311 if (pendingStop.get()) { 312 dispatchSync(ce); 313 } else { 314 dispatchAsync(ce); 315 } 316 } finally { 317 inServiceException = false; 318 } 319 } 320 } 321 322 @Override 323 public Response service(Command command) { 324 MDC.put("activemq.connector", connector.getUri().toString()); 325 Response response = null; 326 boolean responseRequired = command.isResponseRequired(); 327 int commandId = command.getCommandId(); 328 try { 329 if (!pendingStop.get()) { 330 response = command.visit(this); 331 } else { 332 response = new ExceptionResponse(transportException.get()); 333 } 334 } catch (Throwable e) { 335 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 336 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 337 + " command: " + command + ", exception: " + e, e); 338 } 339 340 if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) { 341 LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause()); 342 responseRequired = false; 343 } 344 345 if (responseRequired) { 346 if (e instanceof SecurityException || e.getCause() instanceof SecurityException) { 347 SERVICELOG.warn("Security Error occurred on connection to: {}, {}", 348 transport.getRemoteAddress(), e.getMessage()); 349 } 350 response = new ExceptionResponse(e); 351 } else { 352 forceRollbackOnlyOnFailedAsyncTransactionOp(e, command); 353 serviceException(e); 354 } 355 } 356 if (responseRequired) { 357 if (response == null) { 358 response = new Response(); 359 } 360 response.setCorrelationId(commandId); 361 } 362 // The context may have been flagged so that the response is not 363 // sent. 364 if (context != null) { 365 if (context.isDontSendReponse()) { 366 context.setDontSendReponse(false); 367 response = null; 368 } 369 context = null; 370 } 371 MDC.remove("activemq.connector"); 372 return response; 373 } 374 375 private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) { 376 if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) { 377 Transaction transaction = getActiveTransaction(command); 378 if (transaction != null && !transaction.isRollbackOnly()) { 379 LOG.debug("on async exception, force rollback of transaction for: " + command, e); 380 transaction.setRollbackOnly(e); 381 } 382 } 383 } 384 385 private Transaction getActiveTransaction(Command command) { 386 Transaction transaction = null; 387 try { 388 if (command instanceof Message) { 389 Message messageSend = (Message) command; 390 ProducerId producerId = messageSend.getProducerId(); 391 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 392 transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId()); 393 } else if (command instanceof MessageAck) { 394 MessageAck messageAck = (MessageAck) command; 395 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId()); 396 if (consumerExchange != null) { 397 transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId()); 398 } 399 } 400 } catch(Exception ignored){ 401 LOG.trace("failed to find active transaction for command: " + command, ignored); 402 } 403 return transaction; 404 } 405 406 private boolean isInTransaction(Command command) { 407 return command instanceof Message && ((Message)command).isInTransaction() 408 || command instanceof MessageAck && ((MessageAck)command).isInTransaction(); 409 } 410 411 @Override 412 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 413 return null; 414 } 415 416 @Override 417 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 418 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 419 return null; 420 } 421 422 @Override 423 public Response processWireFormat(WireFormatInfo info) throws Exception { 424 wireFormatInfo = info; 425 protocolVersion.set(info.getVersion()); 426 return null; 427 } 428 429 @Override 430 public Response processShutdown(ShutdownInfo info) throws Exception { 431 stopAsync(); 432 return null; 433 } 434 435 @Override 436 public Response processFlush(FlushCommand command) throws Exception { 437 return null; 438 } 439 440 @Override 441 public Response processBeginTransaction(TransactionInfo info) throws Exception { 442 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 443 context = null; 444 if (cs != null) { 445 context = cs.getContext(); 446 } 447 if (cs == null) { 448 throw new NullPointerException("Context is null"); 449 } 450 // Avoid replaying dup commands 451 if (cs.getTransactionState(info.getTransactionId()) == null) { 452 cs.addTransactionState(info.getTransactionId()); 453 broker.beginTransaction(context, info.getTransactionId()); 454 } 455 return null; 456 } 457 458 @Override 459 public int getActiveTransactionCount() { 460 int rc = 0; 461 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 462 rc += cs.getTransactionStates().size(); 463 } 464 return rc; 465 } 466 467 @Override 468 public Long getOldestActiveTransactionDuration() { 469 TransactionState oldestTX = null; 470 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 471 Collection<TransactionState> transactions = cs.getTransactionStates(); 472 for (TransactionState transaction : transactions) { 473 if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) { 474 oldestTX = transaction; 475 } 476 } 477 } 478 if( oldestTX == null ) { 479 return null; 480 } 481 return System.currentTimeMillis() - oldestTX.getCreatedAt(); 482 } 483 484 @Override 485 public Response processEndTransaction(TransactionInfo info) throws Exception { 486 // No need to do anything. This packet is just sent by the client 487 // make sure he is synced with the server as commit command could 488 // come from a different connection. 489 return null; 490 } 491 492 @Override 493 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 494 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 495 context = null; 496 if (cs != null) { 497 context = cs.getContext(); 498 } 499 if (cs == null) { 500 throw new NullPointerException("Context is null"); 501 } 502 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 503 if (transactionState == null) { 504 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 505 + info.getTransactionId()); 506 } 507 // Avoid dups. 508 if (!transactionState.isPrepared()) { 509 transactionState.setPrepared(true); 510 int result = broker.prepareTransaction(context, info.getTransactionId()); 511 transactionState.setPreparedResult(result); 512 if (result == XAResource.XA_RDONLY) { 513 // we are done, no further rollback or commit from TM 514 cs.removeTransactionState(info.getTransactionId()); 515 } 516 IntegerResponse response = new IntegerResponse(result); 517 return response; 518 } else { 519 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 520 return response; 521 } 522 } 523 524 @Override 525 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 526 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 527 context = cs.getContext(); 528 cs.removeTransactionState(info.getTransactionId()); 529 broker.commitTransaction(context, info.getTransactionId(), true); 530 return null; 531 } 532 533 @Override 534 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 535 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 536 context = cs.getContext(); 537 cs.removeTransactionState(info.getTransactionId()); 538 broker.commitTransaction(context, info.getTransactionId(), false); 539 return null; 540 } 541 542 @Override 543 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 544 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 545 context = cs.getContext(); 546 cs.removeTransactionState(info.getTransactionId()); 547 broker.rollbackTransaction(context, info.getTransactionId()); 548 return null; 549 } 550 551 @Override 552 public Response processForgetTransaction(TransactionInfo info) throws Exception { 553 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 554 context = cs.getContext(); 555 broker.forgetTransaction(context, info.getTransactionId()); 556 return null; 557 } 558 559 @Override 560 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 561 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 562 context = cs.getContext(); 563 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 564 return new DataArrayResponse(preparedTransactions); 565 } 566 567 @Override 568 public Response processMessage(Message messageSend) throws Exception { 569 ProducerId producerId = messageSend.getProducerId(); 570 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 571 if (producerExchange.canDispatch(messageSend)) { 572 broker.send(producerExchange, messageSend); 573 } 574 return null; 575 } 576 577 @Override 578 public Response processMessageAck(MessageAck ack) throws Exception { 579 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 580 if (consumerExchange != null) { 581 broker.acknowledge(consumerExchange, ack); 582 } else if (ack.isInTransaction()) { 583 LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack); 584 } 585 return null; 586 } 587 588 @Override 589 public Response processMessagePull(MessagePull pull) throws Exception { 590 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 591 } 592 593 @Override 594 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 595 broker.processDispatchNotification(notification); 596 return null; 597 } 598 599 @Override 600 public Response processAddDestination(DestinationInfo info) throws Exception { 601 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 602 broker.addDestinationInfo(cs.getContext(), info); 603 if (info.getDestination().isTemporary()) { 604 cs.addTempDestination(info); 605 } 606 return null; 607 } 608 609 @Override 610 public Response processRemoveDestination(DestinationInfo info) throws Exception { 611 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 612 broker.removeDestinationInfo(cs.getContext(), info); 613 if (info.getDestination().isTemporary()) { 614 cs.removeTempDestination(info.getDestination()); 615 } 616 return null; 617 } 618 619 @Override 620 public Response processAddProducer(ProducerInfo info) throws Exception { 621 SessionId sessionId = info.getProducerId().getParentId(); 622 ConnectionId connectionId = sessionId.getParentId(); 623 TransportConnectionState cs = lookupConnectionState(connectionId); 624 if (cs == null) { 625 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " 626 + connectionId); 627 } 628 SessionState ss = cs.getSessionState(sessionId); 629 if (ss == null) { 630 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 631 + sessionId); 632 } 633 // Avoid replaying dup commands 634 if (!ss.getProducerIds().contains(info.getProducerId())) { 635 ActiveMQDestination destination = info.getDestination(); 636 // Do not check for null here as it would cause the count of max producers to exclude 637 // anonymous producers. The isAdvisoryTopic method checks for null so it is safe to 638 // call it from here with a null Destination value. 639 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 640 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ 641 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); 642 } 643 } 644 broker.addProducer(cs.getContext(), info); 645 try { 646 ss.addProducer(info); 647 } catch (IllegalStateException e) { 648 broker.removeProducer(cs.getContext(), info); 649 } 650 651 } 652 return null; 653 } 654 655 @Override 656 public Response processRemoveProducer(ProducerId id) throws Exception { 657 SessionId sessionId = id.getParentId(); 658 ConnectionId connectionId = sessionId.getParentId(); 659 TransportConnectionState cs = lookupConnectionState(connectionId); 660 SessionState ss = cs.getSessionState(sessionId); 661 if (ss == null) { 662 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 663 + sessionId); 664 } 665 ProducerState ps = ss.removeProducer(id); 666 if (ps == null) { 667 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 668 } 669 removeProducerBrokerExchange(id); 670 broker.removeProducer(cs.getContext(), ps.getInfo()); 671 return null; 672 } 673 674 @Override 675 public Response processAddConsumer(ConsumerInfo info) throws Exception { 676 SessionId sessionId = info.getConsumerId().getParentId(); 677 ConnectionId connectionId = sessionId.getParentId(); 678 TransportConnectionState cs = lookupConnectionState(connectionId); 679 if (cs == null) { 680 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " 681 + connectionId); 682 } 683 SessionState ss = cs.getSessionState(sessionId); 684 if (ss == null) { 685 throw new IllegalStateException(broker.getBrokerName() 686 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 687 } 688 // Avoid replaying dup commands 689 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 690 ActiveMQDestination destination = info.getDestination(); 691 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 692 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){ 693 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection()); 694 } 695 } 696 697 broker.addConsumer(cs.getContext(), info); 698 try { 699 ss.addConsumer(info); 700 addConsumerBrokerExchange(cs, info.getConsumerId()); 701 } catch (IllegalStateException e) { 702 broker.removeConsumer(cs.getContext(), info); 703 } 704 705 } 706 return null; 707 } 708 709 @Override 710 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 711 SessionId sessionId = id.getParentId(); 712 ConnectionId connectionId = sessionId.getParentId(); 713 TransportConnectionState cs = lookupConnectionState(connectionId); 714 if (cs == null) { 715 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 716 + connectionId); 717 } 718 SessionState ss = cs.getSessionState(sessionId); 719 if (ss == null) { 720 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 721 + sessionId); 722 } 723 ConsumerState consumerState = ss.removeConsumer(id); 724 if (consumerState == null) { 725 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 726 } 727 ConsumerInfo info = consumerState.getInfo(); 728 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 729 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 730 removeConsumerBrokerExchange(id); 731 return null; 732 } 733 734 @Override 735 public Response processAddSession(SessionInfo info) throws Exception { 736 ConnectionId connectionId = info.getSessionId().getParentId(); 737 TransportConnectionState cs = lookupConnectionState(connectionId); 738 // Avoid replaying dup commands 739 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 740 broker.addSession(cs.getContext(), info); 741 try { 742 cs.addSession(info); 743 } catch (IllegalStateException e) { 744 LOG.warn("Failed to add session: {}", info.getSessionId(), e); 745 broker.removeSession(cs.getContext(), info); 746 } 747 } 748 return null; 749 } 750 751 @Override 752 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 753 ConnectionId connectionId = id.getParentId(); 754 TransportConnectionState cs = lookupConnectionState(connectionId); 755 if (cs == null) { 756 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 757 } 758 SessionState session = cs.getSessionState(id); 759 if (session == null) { 760 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 761 } 762 // Don't let new consumers or producers get added while we are closing 763 // this down. 764 session.shutdown(); 765 // Cascade the connection stop to the consumers and producers. 766 for (ConsumerId consumerId : session.getConsumerIds()) { 767 try { 768 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 769 } catch (Throwable e) { 770 LOG.warn("Failed to remove consumer: {}", consumerId, e); 771 } 772 } 773 for (ProducerId producerId : session.getProducerIds()) { 774 try { 775 processRemoveProducer(producerId); 776 } catch (Throwable e) { 777 LOG.warn("Failed to remove producer: {}", producerId, e); 778 } 779 } 780 cs.removeSession(id); 781 broker.removeSession(cs.getContext(), session.getInfo()); 782 return null; 783 } 784 785 @Override 786 public Response processAddConnection(ConnectionInfo info) throws Exception { 787 // Older clients should have been defaulting this field to true.. but 788 // they were not. 789 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 790 info.setClientMaster(true); 791 } 792 TransportConnectionState state; 793 // Make sure 2 concurrent connections by the same ID only generate 1 794 // TransportConnectionState object. 795 synchronized (brokerConnectionStates) { 796 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 797 if (state == null) { 798 state = new TransportConnectionState(info, this); 799 brokerConnectionStates.put(info.getConnectionId(), state); 800 } 801 state.incrementReference(); 802 } 803 // If there are 2 concurrent connections for the same connection id, 804 // then last one in wins, we need to sync here 805 // to figure out the winner. 806 synchronized (state.getConnectionMutex()) { 807 if (state.getConnection() != this) { 808 LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress()); 809 state.getConnection().stop(); 810 LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress()); 811 state.setConnection(this); 812 state.reset(info); 813 } 814 } 815 registerConnectionState(info.getConnectionId(), state); 816 LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info }); 817 this.faultTolerantConnection = info.isFaultTolerant(); 818 // Setup the context. 819 String clientId = info.getClientId(); 820 context = new ConnectionContext(); 821 context.setBroker(broker); 822 context.setClientId(clientId); 823 context.setClientMaster(info.isClientMaster()); 824 context.setConnection(this); 825 context.setConnectionId(info.getConnectionId()); 826 context.setConnector(connector); 827 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 828 context.setNetworkConnection(networkConnection); 829 context.setFaultTolerant(faultTolerantConnection); 830 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 831 context.setUserName(info.getUserName()); 832 context.setWireFormatInfo(wireFormatInfo); 833 context.setReconnect(info.isFailoverReconnect()); 834 this.manageable = info.isManageable(); 835 context.setConnectionState(state); 836 state.setContext(context); 837 state.setConnection(this); 838 if (info.getClientIp() == null) { 839 info.setClientIp(getRemoteAddress()); 840 } 841 842 try { 843 broker.addConnection(context, info); 844 } catch (Exception e) { 845 synchronized (brokerConnectionStates) { 846 brokerConnectionStates.remove(info.getConnectionId()); 847 } 848 unregisterConnectionState(info.getConnectionId()); 849 LOG.warn("Failed to add Connection id={}, clientId={} due to {}", info.getConnectionId(), clientId, e); 850 //AMQ-6561 - stop for all exceptions on addConnection 851 // close this down - in case the peer of this transport doesn't play nice 852 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); 853 throw e; 854 } 855 if (info.isManageable()) { 856 // send ConnectionCommand 857 ConnectionControl command = this.connector.getConnectionControl(); 858 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 859 if (info.isFailoverReconnect()) { 860 command.setRebalanceConnection(false); 861 } 862 dispatchAsync(command); 863 } 864 return null; 865 } 866 867 @Override 868 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 869 throws InterruptedException { 870 LOG.debug("remove connection id: {}", id); 871 TransportConnectionState cs = lookupConnectionState(id); 872 if (cs != null) { 873 // Don't allow things to be added to the connection state while we 874 // are shutting down. 875 cs.shutdown(); 876 // Cascade the connection stop to the sessions. 877 for (SessionId sessionId : cs.getSessionIds()) { 878 try { 879 processRemoveSession(sessionId, lastDeliveredSequenceId); 880 } catch (Throwable e) { 881 SERVICELOG.warn("Failed to remove session {}", sessionId, e); 882 } 883 } 884 // Cascade the connection stop to temp destinations. 885 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { 886 DestinationInfo di = iter.next(); 887 try { 888 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 889 } catch (Throwable e) { 890 SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e); 891 } 892 iter.remove(); 893 } 894 try { 895 broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get()); 896 } catch (Throwable e) { 897 SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e); 898 } 899 TransportConnectionState state = unregisterConnectionState(id); 900 if (state != null) { 901 synchronized (brokerConnectionStates) { 902 // If we are the last reference, we should remove the state 903 // from the broker. 904 if (state.decrementReference() == 0) { 905 brokerConnectionStates.remove(id); 906 } 907 } 908 } 909 } 910 return null; 911 } 912 913 @Override 914 public Response processProducerAck(ProducerAck ack) throws Exception { 915 // A broker should not get ProducerAck messages. 916 return null; 917 } 918 919 @Override 920 public Connector getConnector() { 921 return connector; 922 } 923 924 @Override 925 public void dispatchSync(Command message) { 926 try { 927 processDispatch(message); 928 } catch (IOException e) { 929 serviceExceptionAsync(e); 930 } 931 } 932 933 @Override 934 public void dispatchAsync(Command message) { 935 if (!stopping.get()) { 936 if (taskRunner == null) { 937 dispatchSync(message); 938 } else { 939 synchronized (dispatchQueue) { 940 dispatchQueue.add(message); 941 } 942 try { 943 taskRunner.wakeup(); 944 } catch (InterruptedException e) { 945 Thread.currentThread().interrupt(); 946 } 947 } 948 } else { 949 if (message.isMessageDispatch()) { 950 MessageDispatch md = (MessageDispatch) message; 951 TransmitCallback sub = md.getTransmitCallback(); 952 broker.postProcessDispatch(md); 953 if (sub != null) { 954 sub.onFailure(); 955 } 956 } 957 } 958 } 959 960 protected void processDispatch(Command command) throws IOException { 961 MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 962 try { 963 if (!stopping.get()) { 964 if (messageDispatch != null) { 965 try { 966 broker.preProcessDispatch(messageDispatch); 967 } catch (RuntimeException convertToIO) { 968 throw new IOException(convertToIO); 969 } 970 } 971 dispatch(command); 972 } 973 } catch (IOException e) { 974 if (messageDispatch != null) { 975 TransmitCallback sub = messageDispatch.getTransmitCallback(); 976 broker.postProcessDispatch(messageDispatch); 977 if (sub != null) { 978 sub.onFailure(); 979 } 980 messageDispatch = null; 981 throw e; 982 } else { 983 if (TRANSPORTLOG.isDebugEnabled()) { 984 TRANSPORTLOG.debug("Unexpected exception on asyncDispatch, command of type: " + command.getDataStructureType(), e); 985 } 986 } 987 } finally { 988 if (messageDispatch != null) { 989 TransmitCallback sub = messageDispatch.getTransmitCallback(); 990 broker.postProcessDispatch(messageDispatch); 991 if (sub != null) { 992 sub.onSuccess(); 993 } 994 } 995 } 996 } 997 998 @Override 999 public boolean iterate() { 1000 try { 1001 if (pendingStop.get() || stopping.get()) { 1002 if (dispatchStopped.compareAndSet(false, true)) { 1003 if (transportException.get() == null) { 1004 try { 1005 dispatch(new ShutdownInfo()); 1006 } catch (Throwable ignore) { 1007 } 1008 } 1009 dispatchStoppedLatch.countDown(); 1010 } 1011 return false; 1012 } 1013 if (!dispatchStopped.get()) { 1014 Command command = null; 1015 synchronized (dispatchQueue) { 1016 if (dispatchQueue.isEmpty()) { 1017 return false; 1018 } 1019 command = dispatchQueue.remove(0); 1020 } 1021 processDispatch(command); 1022 return true; 1023 } 1024 return false; 1025 } catch (IOException e) { 1026 if (dispatchStopped.compareAndSet(false, true)) { 1027 dispatchStoppedLatch.countDown(); 1028 } 1029 serviceExceptionAsync(e); 1030 return false; 1031 } 1032 } 1033 1034 /** 1035 * Returns the statistics for this connection 1036 */ 1037 @Override 1038 public ConnectionStatistics getStatistics() { 1039 return statistics; 1040 } 1041 1042 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1043 return messageAuthorizationPolicy; 1044 } 1045 1046 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1047 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1048 } 1049 1050 @Override 1051 public boolean isManageable() { 1052 return manageable; 1053 } 1054 1055 @Override 1056 public void start() throws Exception { 1057 try { 1058 synchronized (this) { 1059 starting.set(true); 1060 if (taskRunnerFactory != null) { 1061 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 1062 + getRemoteAddress()); 1063 } else { 1064 taskRunner = null; 1065 } 1066 transport.start(); 1067 active = true; 1068 BrokerInfo info = connector.getBrokerInfo().copy(); 1069 if (connector.isUpdateClusterClients()) { 1070 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 1071 } else { 1072 info.setPeerBrokerInfos(null); 1073 } 1074 dispatchAsync(info); 1075 1076 connector.onStarted(this); 1077 } 1078 } catch (Exception e) { 1079 // Force clean up on an error starting up. 1080 pendingStop.set(true); 1081 throw e; 1082 } finally { 1083 // stop() can be called from within the above block, 1084 // but we want to be sure start() completes before 1085 // stop() runs, so queue the stop until right now: 1086 setStarting(false); 1087 if (isPendingStop()) { 1088 LOG.debug("Calling the delayed stop() after start() {}", this); 1089 stop(); 1090 } 1091 } 1092 } 1093 1094 @Override 1095 public void stop() throws Exception { 1096 // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) 1097 // as their lifecycle is handled elsewhere 1098 1099 stopAsync(); 1100 while (!stopped.await(5, TimeUnit.SECONDS)) { 1101 LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress()); 1102 } 1103 } 1104 1105 public void delayedStop(final int waitTime, final String reason, Throwable cause) { 1106 if (waitTime > 0) { 1107 synchronized (this) { 1108 pendingStop.set(true); 1109 transportException.set(cause); 1110 } 1111 try { 1112 stopTaskRunnerFactory.execute(new Runnable() { 1113 @Override 1114 public void run() { 1115 try { 1116 Thread.sleep(waitTime); 1117 stopAsync(); 1118 LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason); 1119 } catch (InterruptedException e) { 1120 } 1121 } 1122 }); 1123 } catch (Throwable t) { 1124 LOG.warn("Cannot create stopAsync. This exception will be ignored.", t); 1125 } 1126 } 1127 } 1128 1129 public void stopAsync(Throwable cause) { 1130 transportException.set(cause); 1131 stopAsync(); 1132 } 1133 1134 public void stopAsync() { 1135 // If we're in the middle of starting then go no further... for now. 1136 synchronized (this) { 1137 pendingStop.set(true); 1138 if (starting.get()) { 1139 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); 1140 return; 1141 } 1142 } 1143 if (stopping.compareAndSet(false, true)) { 1144 // Let all the connection contexts know we are shutting down 1145 // so that in progress operations can notice and unblock. 1146 List<TransportConnectionState> connectionStates = listConnectionStates(); 1147 for (TransportConnectionState cs : connectionStates) { 1148 ConnectionContext connectionContext = cs.getContext(); 1149 if (connectionContext != null) { 1150 connectionContext.getStopping().set(true); 1151 } 1152 } 1153 try { 1154 stopTaskRunnerFactory.execute(new Runnable() { 1155 @Override 1156 public void run() { 1157 serviceLock.writeLock().lock(); 1158 try { 1159 doStop(); 1160 } catch (Throwable e) { 1161 LOG.debug("Error occurred while shutting down a connection {}", this, e); 1162 } finally { 1163 stopped.countDown(); 1164 serviceLock.writeLock().unlock(); 1165 } 1166 } 1167 }); 1168 } catch (Throwable t) { 1169 LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t); 1170 stopped.countDown(); 1171 } 1172 } 1173 } 1174 1175 @Override 1176 public String toString() { 1177 return "Transport Connection to: " + transport.getRemoteAddress(); 1178 } 1179 1180 protected void doStop() throws Exception { 1181 LOG.debug("Stopping connection: {}", transport.getRemoteAddress()); 1182 connector.onStopped(this); 1183 try { 1184 synchronized (this) { 1185 if (duplexBridge != null) { 1186 duplexBridge.stop(); 1187 } 1188 } 1189 } catch (Exception ignore) { 1190 LOG.trace("Exception caught stopping. This exception is ignored.", ignore); 1191 } 1192 try { 1193 transport.stop(); 1194 LOG.debug("Stopped transport: {}", transport.getRemoteAddress()); 1195 } catch (Exception e) { 1196 LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e); 1197 } 1198 if (taskRunner != null) { 1199 taskRunner.shutdown(1); 1200 taskRunner = null; 1201 } 1202 active = false; 1203 // Run the MessageDispatch callbacks so that message references get 1204 // cleaned up. 1205 synchronized (dispatchQueue) { 1206 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) { 1207 Command command = iter.next(); 1208 if (command.isMessageDispatch()) { 1209 MessageDispatch md = (MessageDispatch) command; 1210 TransmitCallback sub = md.getTransmitCallback(); 1211 broker.postProcessDispatch(md); 1212 if (sub != null) { 1213 sub.onFailure(); 1214 } 1215 } 1216 } 1217 dispatchQueue.clear(); 1218 } 1219 // 1220 // Remove all logical connection associated with this connection 1221 // from the broker. 1222 if (!broker.isStopped()) { 1223 List<TransportConnectionState> connectionStates = listConnectionStates(); 1224 connectionStates = listConnectionStates(); 1225 for (TransportConnectionState cs : connectionStates) { 1226 cs.getContext().getStopping().set(true); 1227 try { 1228 LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); 1229 processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 1230 } catch (Throwable ignore) { 1231 LOG.debug("Exception caught removing connection {}. This exception is ignored.", cs.getInfo().getConnectionId(), ignore); 1232 } 1233 } 1234 } 1235 LOG.debug("Connection Stopped: {}", getRemoteAddress()); 1236 } 1237 1238 /** 1239 * @return Returns the blockedCandidate. 1240 */ 1241 public boolean isBlockedCandidate() { 1242 return blockedCandidate; 1243 } 1244 1245 /** 1246 * @param blockedCandidate The blockedCandidate to set. 1247 */ 1248 public void setBlockedCandidate(boolean blockedCandidate) { 1249 this.blockedCandidate = blockedCandidate; 1250 } 1251 1252 /** 1253 * @return Returns the markedCandidate. 1254 */ 1255 public boolean isMarkedCandidate() { 1256 return markedCandidate; 1257 } 1258 1259 /** 1260 * @param markedCandidate The markedCandidate to set. 1261 */ 1262 public void setMarkedCandidate(boolean markedCandidate) { 1263 this.markedCandidate = markedCandidate; 1264 if (!markedCandidate) { 1265 timeStamp = 0; 1266 blockedCandidate = false; 1267 } 1268 } 1269 1270 /** 1271 * @param slow The slow to set. 1272 */ 1273 public void setSlow(boolean slow) { 1274 this.slow = slow; 1275 } 1276 1277 /** 1278 * @return true if the Connection is slow 1279 */ 1280 @Override 1281 public boolean isSlow() { 1282 return slow; 1283 } 1284 1285 /** 1286 * @return true if the Connection is potentially blocked 1287 */ 1288 public boolean isMarkedBlockedCandidate() { 1289 return markedCandidate; 1290 } 1291 1292 /** 1293 * Mark the Connection, so we can deem if it's collectable on the next sweep 1294 */ 1295 public void doMark() { 1296 if (timeStamp == 0) { 1297 timeStamp = System.currentTimeMillis(); 1298 } 1299 } 1300 1301 /** 1302 * @return if after being marked, the Connection is still writing 1303 */ 1304 @Override 1305 public boolean isBlocked() { 1306 return blocked; 1307 } 1308 1309 /** 1310 * @return true if the Connection is connected 1311 */ 1312 @Override 1313 public boolean isConnected() { 1314 return connected; 1315 } 1316 1317 /** 1318 * @param blocked The blocked to set. 1319 */ 1320 public void setBlocked(boolean blocked) { 1321 this.blocked = blocked; 1322 } 1323 1324 /** 1325 * @param connected The connected to set. 1326 */ 1327 public void setConnected(boolean connected) { 1328 this.connected = connected; 1329 } 1330 1331 /** 1332 * @return true if the Connection is active 1333 */ 1334 @Override 1335 public boolean isActive() { 1336 return active; 1337 } 1338 1339 /** 1340 * @param active The active to set. 1341 */ 1342 public void setActive(boolean active) { 1343 this.active = active; 1344 } 1345 1346 /** 1347 * @return true if the Connection is starting 1348 */ 1349 public boolean isStarting() { 1350 return starting.get(); 1351 } 1352 1353 @Override 1354 public synchronized boolean isNetworkConnection() { 1355 return networkConnection; 1356 } 1357 1358 @Override 1359 public boolean isFaultTolerantConnection() { 1360 return this.faultTolerantConnection; 1361 } 1362 1363 protected void setStarting(boolean starting) { 1364 this.starting.set(starting); 1365 } 1366 1367 /** 1368 * @return true if the Connection needs to stop 1369 */ 1370 public boolean isPendingStop() { 1371 return pendingStop.get(); 1372 } 1373 1374 protected void setPendingStop(boolean pendingStop) { 1375 this.pendingStop.set(pendingStop); 1376 } 1377 1378 private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException { 1379 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1380 Map<String, String> props = createMap(properties); 1381 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1382 IntrospectionSupport.setProperties(config, props, ""); 1383 return config; 1384 } 1385 1386 @Override 1387 public Response processBrokerInfo(BrokerInfo info) { 1388 if (info.isSlaveBroker()) { 1389 LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); 1390 } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { 1391 try { 1392 NetworkBridgeConfiguration config = getNetworkConfiguration(info); 1393 if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { 1394 LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); 1395 dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); 1396 } 1397 } catch (Exception e) { 1398 LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); 1399 return null; 1400 } 1401 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1402 // so this TransportConnection is the rear end of a network bridge 1403 // We have been requested to create a two way pipe ... 1404 try { 1405 NetworkBridgeConfiguration config = getNetworkConfiguration(info); 1406 config.setBrokerName(broker.getBrokerName()); 1407 1408 if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { 1409 LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); 1410 dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); 1411 } 1412 1413 // check for existing duplex connection hanging about 1414 1415 // We first look if existing network connection already exists for the same broker Id and network connector name 1416 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1417 // and the duplex network connector side wanting to open a new one 1418 // In this case, the old connection must be broken 1419 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1420 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1421 synchronized (connections) { 1422 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) { 1423 TransportConnection c = iter.next(); 1424 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1425 LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); 1426 c.stopAsync(); 1427 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1428 c.getStopped().await(1, TimeUnit.SECONDS); 1429 } 1430 } 1431 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1432 } 1433 Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI()); 1434 Transport remoteBridgeTransport = transport; 1435 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { 1436 // the vm transport case is already wrapped 1437 remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport); 1438 } 1439 String duplexName = localTransport.toString(); 1440 if (duplexName.contains("#")) { 1441 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1442 } 1443 MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName)); 1444 listener.setCreatedByDuplex(true); 1445 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1446 duplexBridge.setBrokerService(brokerService); 1447 //Need to set durableDestinations to properly restart subs when dynamicOnly=false 1448 duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations( 1449 broker.getDurableDestinations())); 1450 1451 // now turn duplex off this side 1452 info.setDuplexConnection(false); 1453 duplexBridge.setCreatedByDuplex(true); 1454 duplexBridge.duplexStart(this, brokerInfo, info); 1455 LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); 1456 return null; 1457 } catch (TransportDisposedIOException e) { 1458 LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); 1459 return null; 1460 } catch (Exception e) { 1461 LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); 1462 return null; 1463 } 1464 } 1465 // We only expect to get one broker info command per connection 1466 if (this.brokerInfo != null) { 1467 LOG.warn("Unexpected extra broker info command received: {}", info); 1468 } 1469 this.brokerInfo = info; 1470 networkConnection = true; 1471 List<TransportConnectionState> connectionStates = listConnectionStates(); 1472 for (TransportConnectionState cs : connectionStates) { 1473 cs.getContext().setNetworkConnection(true); 1474 } 1475 return null; 1476 } 1477 1478 @SuppressWarnings({"unchecked", "rawtypes"}) 1479 private HashMap<String, String> createMap(Properties properties) { 1480 return new HashMap(properties); 1481 } 1482 1483 protected void dispatch(Command command) throws IOException { 1484 try { 1485 setMarkedCandidate(true); 1486 transport.oneway(command); 1487 } finally { 1488 setMarkedCandidate(false); 1489 } 1490 } 1491 1492 @Override 1493 public String getRemoteAddress() { 1494 return transport.getRemoteAddress(); 1495 } 1496 1497 public Transport getTransport() { 1498 return transport; 1499 } 1500 1501 @Override 1502 public String getConnectionId() { 1503 List<TransportConnectionState> connectionStates = listConnectionStates(); 1504 for (TransportConnectionState cs : connectionStates) { 1505 if (cs.getInfo().getClientId() != null) { 1506 return cs.getInfo().getClientId(); 1507 } 1508 return cs.getInfo().getConnectionId().toString(); 1509 } 1510 return null; 1511 } 1512 1513 @Override 1514 public void updateClient(ConnectionControl control) { 1515 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1516 && this.wireFormatInfo.getVersion() >= 6) { 1517 dispatchAsync(control); 1518 } 1519 } 1520 1521 public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){ 1522 ProducerBrokerExchange result = null; 1523 if (producerInfo != null && producerInfo.getProducerId() != null){ 1524 synchronized (producerExchanges){ 1525 result = producerExchanges.get(producerInfo.getProducerId()); 1526 } 1527 } 1528 return result; 1529 } 1530 1531 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1532 ProducerBrokerExchange result = producerExchanges.get(id); 1533 if (result == null) { 1534 synchronized (producerExchanges) { 1535 result = new ProducerBrokerExchange(); 1536 TransportConnectionState state = lookupConnectionState(id); 1537 context = state.getContext(); 1538 result.setConnectionContext(context); 1539 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) { 1540 result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id)); 1541 } 1542 SessionState ss = state.getSessionState(id.getParentId()); 1543 if (ss != null) { 1544 result.setProducerState(ss.getProducerState(id)); 1545 ProducerState producerState = ss.getProducerState(id); 1546 if (producerState != null && producerState.getInfo() != null) { 1547 ProducerInfo info = producerState.getInfo(); 1548 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1549 } 1550 } 1551 producerExchanges.put(id, result); 1552 } 1553 } else { 1554 context = result.getConnectionContext(); 1555 } 1556 return result; 1557 } 1558 1559 private void removeProducerBrokerExchange(ProducerId id) { 1560 synchronized (producerExchanges) { 1561 producerExchanges.remove(id); 1562 } 1563 } 1564 1565 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1566 ConsumerBrokerExchange result = consumerExchanges.get(id); 1567 return result; 1568 } 1569 1570 private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) { 1571 ConsumerBrokerExchange result = consumerExchanges.get(id); 1572 if (result == null) { 1573 synchronized (consumerExchanges) { 1574 result = new ConsumerBrokerExchange(); 1575 context = connectionState.getContext(); 1576 result.setConnectionContext(context); 1577 SessionState ss = connectionState.getSessionState(id.getParentId()); 1578 if (ss != null) { 1579 ConsumerState cs = ss.getConsumerState(id); 1580 if (cs != null) { 1581 ConsumerInfo info = cs.getInfo(); 1582 if (info != null) { 1583 if (info.getDestination() != null && info.getDestination().isPattern()) { 1584 result.setWildcard(true); 1585 } 1586 } 1587 } 1588 } 1589 consumerExchanges.put(id, result); 1590 } 1591 } 1592 return result; 1593 } 1594 1595 private void removeConsumerBrokerExchange(ConsumerId id) { 1596 synchronized (consumerExchanges) { 1597 consumerExchanges.remove(id); 1598 } 1599 } 1600 1601 public int getProtocolVersion() { 1602 return protocolVersion.get(); 1603 } 1604 1605 @Override 1606 public Response processControlCommand(ControlCommand command) throws Exception { 1607 return null; 1608 } 1609 1610 @Override 1611 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1612 return null; 1613 } 1614 1615 @Override 1616 public Response processConnectionControl(ConnectionControl control) throws Exception { 1617 if (control != null) { 1618 faultTolerantConnection = control.isFaultTolerant(); 1619 } 1620 return null; 1621 } 1622 1623 @Override 1624 public Response processConnectionError(ConnectionError error) throws Exception { 1625 return null; 1626 } 1627 1628 @Override 1629 public Response processConsumerControl(ConsumerControl control) throws Exception { 1630 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1631 broker.processConsumerControl(consumerExchange, control); 1632 return null; 1633 } 1634 1635 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1636 TransportConnectionState state) { 1637 TransportConnectionState cs = null; 1638 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1639 // swap implementations 1640 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1641 newRegister.intialize(connectionStateRegister); 1642 connectionStateRegister = newRegister; 1643 } 1644 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1645 return cs; 1646 } 1647 1648 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1649 return connectionStateRegister.unregisterConnectionState(connectionId); 1650 } 1651 1652 protected synchronized List<TransportConnectionState> listConnectionStates() { 1653 return connectionStateRegister.listConnectionStates(); 1654 } 1655 1656 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1657 return connectionStateRegister.lookupConnectionState(connectionId); 1658 } 1659 1660 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1661 return connectionStateRegister.lookupConnectionState(id); 1662 } 1663 1664 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1665 return connectionStateRegister.lookupConnectionState(id); 1666 } 1667 1668 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1669 return connectionStateRegister.lookupConnectionState(id); 1670 } 1671 1672 // public only for testing 1673 public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1674 return connectionStateRegister.lookupConnectionState(connectionId); 1675 } 1676 1677 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1678 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1679 } 1680 1681 protected synchronized String getDuplexNetworkConnectorId() { 1682 return this.duplexNetworkConnectorId; 1683 } 1684 1685 public boolean isStopping() { 1686 return stopping.get(); 1687 } 1688 1689 protected CountDownLatch getStopped() { 1690 return stopped; 1691 } 1692 1693 private int getProducerCount(ConnectionId connectionId) { 1694 int result = 0; 1695 TransportConnectionState cs = lookupConnectionState(connectionId); 1696 if (cs != null) { 1697 for (SessionId sessionId : cs.getSessionIds()) { 1698 SessionState sessionState = cs.getSessionState(sessionId); 1699 if (sessionState != null) { 1700 result += sessionState.getProducerIds().size(); 1701 } 1702 } 1703 } 1704 return result; 1705 } 1706 1707 private int getConsumerCount(ConnectionId connectionId) { 1708 int result = 0; 1709 TransportConnectionState cs = lookupConnectionState(connectionId); 1710 if (cs != null) { 1711 for (SessionId sessionId : cs.getSessionIds()) { 1712 SessionState sessionState = cs.getSessionState(sessionId); 1713 if (sessionState != null) { 1714 result += sessionState.getConsumerIds().size(); 1715 } 1716 } 1717 } 1718 return result; 1719 } 1720 1721 public WireFormatInfo getRemoteWireFormatInfo() { 1722 return wireFormatInfo; 1723 } 1724 1725 /* (non-Javadoc) 1726 * @see org.apache.activemq.state.CommandVisitor#processBrokerSubscriptionInfo(org.apache.activemq.command.BrokerSubscriptionInfo) 1727 */ 1728 @Override 1729 public Response processBrokerSubscriptionInfo(BrokerSubscriptionInfo info) throws Exception { 1730 return null; 1731 } 1732}