001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.state; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.LinkedHashMap; 022 import java.util.Map; 023 import java.util.Vector; 024 import java.util.Map.Entry; 025 import java.util.concurrent.ConcurrentHashMap; 026 027 import javax.jms.TransactionRolledBackException; 028 import javax.transaction.xa.XAResource; 029 030 import org.apache.activemq.command.Command; 031 import org.apache.activemq.command.ConnectionId; 032 import org.apache.activemq.command.ConnectionInfo; 033 import org.apache.activemq.command.ConsumerControl; 034 import org.apache.activemq.command.ConsumerId; 035 import org.apache.activemq.command.ConsumerInfo; 036 import org.apache.activemq.command.DestinationInfo; 037 import org.apache.activemq.command.ExceptionResponse; 038 import org.apache.activemq.command.IntegerResponse; 039 import org.apache.activemq.command.Message; 040 import org.apache.activemq.command.MessagePull; 041 import org.apache.activemq.command.ProducerId; 042 import org.apache.activemq.command.ProducerInfo; 043 import org.apache.activemq.command.Response; 044 import org.apache.activemq.command.SessionId; 045 import org.apache.activemq.command.SessionInfo; 046 import org.apache.activemq.command.TransactionInfo; 047 import org.apache.activemq.transport.Transport; 048 import org.apache.activemq.util.IOExceptionSupport; 049 import org.slf4j.Logger; 050 import org.slf4j.LoggerFactory; 051 052 /** 053 * Tracks the state of a connection so a newly established transport can be 054 * re-initialized to the state that was tracked. 055 * 056 * 057 */ 058 public class ConnectionStateTracker extends CommandVisitorAdapter { 059 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class); 060 061 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); 062 private static final int MESSAGE_PULL_SIZE = 400; 063 protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 064 065 private boolean trackTransactions; 066 private boolean restoreSessions = true; 067 private boolean restoreConsumers = true; 068 private boolean restoreProducers = true; 069 private boolean restoreTransaction = true; 070 private boolean trackMessages = true; 071 private boolean trackTransactionProducers = true; 072 private int maxCacheSize = 128 * 1024; 073 private int currentCacheSize; 074 private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){ 075 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) { 076 boolean result = currentCacheSize > maxCacheSize; 077 if (result) { 078 if (eldest.getValue() instanceof Message) { 079 currentCacheSize -= ((Message)eldest.getValue()).getSize(); 080 } else if (eldest.getValue() instanceof MessagePull) { 081 currentCacheSize -= MESSAGE_PULL_SIZE; 082 } 083 if (LOG.isTraceEnabled()) { 084 LOG.trace("removing tracked message: " + eldest.getKey()); 085 } 086 } 087 return result; 088 } 089 }; 090 091 private class RemoveTransactionAction implements ResponseHandler { 092 private final TransactionInfo info; 093 094 public RemoveTransactionAction(TransactionInfo info) { 095 this.info = info; 096 } 097 098 public void onResponse(Command response) { 099 ConnectionId connectionId = info.getConnectionId(); 100 ConnectionState cs = connectionStates.get(connectionId); 101 cs.removeTransactionState(info.getTransactionId()); 102 } 103 } 104 105 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction { 106 107 public PrepareReadonlyTransactionAction(TransactionInfo info) { 108 super(info); 109 } 110 111 public void onResponse(Command command) { 112 IntegerResponse response = (IntegerResponse) command; 113 if (XAResource.XA_RDONLY == response.getResult()) { 114 // all done, no commit or rollback from TM 115 super.onResponse(command); 116 } 117 } 118 } 119 120 /** 121 * 122 * 123 * @param command 124 * @return null if the command is not state tracked. 125 * @throws IOException 126 */ 127 public Tracked track(Command command) throws IOException { 128 try { 129 return (Tracked)command.visit(this); 130 } catch (IOException e) { 131 throw e; 132 } catch (Throwable e) { 133 throw IOExceptionSupport.create(e); 134 } 135 } 136 137 public void trackBack(Command command) { 138 if (command != null) { 139 if (trackMessages && command.isMessage()) { 140 Message message = (Message) command; 141 if (message.getTransactionId()==null) { 142 currentCacheSize = currentCacheSize + message.getSize(); 143 } 144 } else if (command instanceof MessagePull) { 145 // just needs to be a rough estimate of size, ~4 identifiers 146 currentCacheSize += MESSAGE_PULL_SIZE; 147 } 148 } 149 } 150 151 public void restore(Transport transport) throws IOException { 152 // Restore the connections. 153 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) { 154 ConnectionState connectionState = iter.next(); 155 connectionState.getInfo().setFailoverReconnect(true); 156 if (LOG.isDebugEnabled()) { 157 LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); 158 } 159 transport.oneway(connectionState.getInfo()); 160 restoreTempDestinations(transport, connectionState); 161 162 if (restoreSessions) { 163 restoreSessions(transport, connectionState); 164 } 165 166 if (restoreTransaction) { 167 restoreTransactions(transport, connectionState); 168 } 169 } 170 //now flush messages 171 for (Command msg:messageCache.values()) { 172 if (LOG.isDebugEnabled()) { 173 LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg)); 174 } 175 transport.oneway(msg); 176 } 177 } 178 179 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { 180 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>(); 181 for (TransactionState transactionState : connectionState.getTransactionStates()) { 182 if (LOG.isDebugEnabled()) { 183 LOG.debug("tx: " + transactionState.getId()); 184 } 185 186 // rollback any completed transactions - no way to know if commit got there 187 // or if reply went missing 188 // 189 if (!transactionState.getCommands().isEmpty()) { 190 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1); 191 if (lastCommand instanceof TransactionInfo) { 192 TransactionInfo transactionInfo = (TransactionInfo) lastCommand; 193 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) { 194 if (LOG.isDebugEnabled()) { 195 LOG.debug("rolling back potentially completed tx: " + transactionState.getId()); 196 } 197 toRollback.add(transactionInfo); 198 continue; 199 } 200 } 201 } 202 203 // replay short lived producers that may have been involved in the transaction 204 for (ProducerState producerState : transactionState.getProducerStates().values()) { 205 if (LOG.isDebugEnabled()) { 206 LOG.debug("tx replay producer :" + producerState.getInfo()); 207 } 208 transport.oneway(producerState.getInfo()); 209 } 210 211 for (Command command : transactionState.getCommands()) { 212 if (LOG.isDebugEnabled()) { 213 LOG.debug("tx replay: " + command); 214 } 215 transport.oneway(command); 216 } 217 218 for (ProducerState producerState : transactionState.getProducerStates().values()) { 219 if (LOG.isDebugEnabled()) { 220 LOG.debug("tx remove replayed producer :" + producerState.getInfo()); 221 } 222 transport.oneway(producerState.getInfo().createRemoveCommand()); 223 } 224 } 225 226 for (TransactionInfo command: toRollback) { 227 // respond to the outstanding commit 228 ExceptionResponse response = new ExceptionResponse(); 229 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId())); 230 response.setCorrelationId(command.getCommandId()); 231 transport.getTransportListener().onCommand(response); 232 } 233 } 234 235 /** 236 * @param transport 237 * @param connectionState 238 * @throws IOException 239 */ 240 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException { 241 // Restore the connection's sessions 242 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { 243 SessionState sessionState = (SessionState)iter2.next(); 244 if (LOG.isDebugEnabled()) { 245 LOG.debug("session: " + sessionState.getInfo().getSessionId()); 246 } 247 transport.oneway(sessionState.getInfo()); 248 249 if (restoreProducers) { 250 restoreProducers(transport, sessionState); 251 } 252 253 if (restoreConsumers) { 254 restoreConsumers(transport, sessionState); 255 } 256 } 257 } 258 259 /** 260 * @param transport 261 * @param sessionState 262 * @throws IOException 263 */ 264 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException { 265 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete 266 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId()); 267 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete(); 268 for (ConsumerState consumerState : sessionState.getConsumerStates()) { 269 ConsumerInfo infoToSend = consumerState.getInfo(); 270 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) { 271 infoToSend = consumerState.getInfo().copy(); 272 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo()); 273 infoToSend.setPrefetchSize(0); 274 if (LOG.isDebugEnabled()) { 275 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize()); 276 } 277 } 278 if (LOG.isDebugEnabled()) { 279 LOG.debug("restore consumer: " + infoToSend.getConsumerId()); 280 } 281 transport.oneway(infoToSend); 282 } 283 } 284 285 /** 286 * @param transport 287 * @param sessionState 288 * @throws IOException 289 */ 290 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException { 291 // Restore the session's producers 292 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { 293 ProducerState producerState = (ProducerState)iter3.next(); 294 if (LOG.isDebugEnabled()) { 295 LOG.debug("producer: " + producerState.getInfo().getProducerId()); 296 } 297 transport.oneway(producerState.getInfo()); 298 } 299 } 300 301 /** 302 * @param transport 303 * @param connectionState 304 * @throws IOException 305 */ 306 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState) 307 throws IOException { 308 // Restore the connection's temp destinations. 309 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) { 310 transport.oneway((DestinationInfo)iter2.next()); 311 } 312 } 313 314 public Response processAddDestination(DestinationInfo info) { 315 if (info != null) { 316 ConnectionState cs = connectionStates.get(info.getConnectionId()); 317 if (cs != null && info.getDestination().isTemporary()) { 318 cs.addTempDestination(info); 319 } 320 } 321 return TRACKED_RESPONSE_MARKER; 322 } 323 324 public Response processRemoveDestination(DestinationInfo info) { 325 if (info != null) { 326 ConnectionState cs = connectionStates.get(info.getConnectionId()); 327 if (cs != null && info.getDestination().isTemporary()) { 328 cs.removeTempDestination(info.getDestination()); 329 } 330 } 331 return TRACKED_RESPONSE_MARKER; 332 } 333 334 public Response processAddProducer(ProducerInfo info) { 335 if (info != null && info.getProducerId() != null) { 336 SessionId sessionId = info.getProducerId().getParentId(); 337 if (sessionId != null) { 338 ConnectionId connectionId = sessionId.getParentId(); 339 if (connectionId != null) { 340 ConnectionState cs = connectionStates.get(connectionId); 341 if (cs != null) { 342 SessionState ss = cs.getSessionState(sessionId); 343 if (ss != null) { 344 ss.addProducer(info); 345 } 346 } 347 } 348 } 349 } 350 return TRACKED_RESPONSE_MARKER; 351 } 352 353 public Response processRemoveProducer(ProducerId id) { 354 if (id != null) { 355 SessionId sessionId = id.getParentId(); 356 if (sessionId != null) { 357 ConnectionId connectionId = sessionId.getParentId(); 358 if (connectionId != null) { 359 ConnectionState cs = connectionStates.get(connectionId); 360 if (cs != null) { 361 SessionState ss = cs.getSessionState(sessionId); 362 if (ss != null) { 363 ss.removeProducer(id); 364 } 365 } 366 } 367 } 368 } 369 return TRACKED_RESPONSE_MARKER; 370 } 371 372 public Response processAddConsumer(ConsumerInfo info) { 373 if (info != null) { 374 SessionId sessionId = info.getConsumerId().getParentId(); 375 if (sessionId != null) { 376 ConnectionId connectionId = sessionId.getParentId(); 377 if (connectionId != null) { 378 ConnectionState cs = connectionStates.get(connectionId); 379 if (cs != null) { 380 SessionState ss = cs.getSessionState(sessionId); 381 if (ss != null) { 382 ss.addConsumer(info); 383 } 384 } 385 } 386 } 387 } 388 return TRACKED_RESPONSE_MARKER; 389 } 390 391 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) { 392 if (id != null) { 393 SessionId sessionId = id.getParentId(); 394 if (sessionId != null) { 395 ConnectionId connectionId = sessionId.getParentId(); 396 if (connectionId != null) { 397 ConnectionState cs = connectionStates.get(connectionId); 398 if (cs != null) { 399 SessionState ss = cs.getSessionState(sessionId); 400 if (ss != null) { 401 ss.removeConsumer(id); 402 } 403 } 404 } 405 } 406 } 407 return TRACKED_RESPONSE_MARKER; 408 } 409 410 public Response processAddSession(SessionInfo info) { 411 if (info != null) { 412 ConnectionId connectionId = info.getSessionId().getParentId(); 413 if (connectionId != null) { 414 ConnectionState cs = connectionStates.get(connectionId); 415 if (cs != null) { 416 cs.addSession(info); 417 } 418 } 419 } 420 return TRACKED_RESPONSE_MARKER; 421 } 422 423 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) { 424 if (id != null) { 425 ConnectionId connectionId = id.getParentId(); 426 if (connectionId != null) { 427 ConnectionState cs = connectionStates.get(connectionId); 428 if (cs != null) { 429 cs.removeSession(id); 430 } 431 } 432 } 433 return TRACKED_RESPONSE_MARKER; 434 } 435 436 public Response processAddConnection(ConnectionInfo info) { 437 if (info != null) { 438 connectionStates.put(info.getConnectionId(), new ConnectionState(info)); 439 } 440 return TRACKED_RESPONSE_MARKER; 441 } 442 443 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { 444 if (id != null) { 445 connectionStates.remove(id); 446 } 447 return TRACKED_RESPONSE_MARKER; 448 } 449 450 public Response processMessage(Message send) throws Exception { 451 if (send != null) { 452 if (trackTransactions && send.getTransactionId() != null) { 453 ProducerId producerId = send.getProducerId(); 454 ConnectionId connectionId = producerId.getParentId().getParentId(); 455 if (connectionId != null) { 456 ConnectionState cs = connectionStates.get(connectionId); 457 if (cs != null) { 458 TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); 459 if (transactionState != null) { 460 transactionState.addCommand(send); 461 462 if (trackTransactionProducers) { 463 // for jmstemplate, track the producer in case it is closed before commit 464 // and needs to be replayed 465 SessionState ss = cs.getSessionState(producerId.getParentId()); 466 ProducerState producerState = ss.getProducerState(producerId); 467 producerState.setTransactionState(transactionState); 468 } 469 } 470 } 471 } 472 return TRACKED_RESPONSE_MARKER; 473 }else if (trackMessages) { 474 messageCache.put(send.getMessageId(), send); 475 } 476 } 477 return null; 478 } 479 480 public Response processBeginTransaction(TransactionInfo info) { 481 if (trackTransactions && info != null && info.getTransactionId() != null) { 482 ConnectionId connectionId = info.getConnectionId(); 483 if (connectionId != null) { 484 ConnectionState cs = connectionStates.get(connectionId); 485 if (cs != null) { 486 cs.addTransactionState(info.getTransactionId()); 487 TransactionState state = cs.getTransactionState(info.getTransactionId()); 488 state.addCommand(info); 489 } 490 } 491 return TRACKED_RESPONSE_MARKER; 492 } 493 return null; 494 } 495 496 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 497 if (trackTransactions && info != null) { 498 ConnectionId connectionId = info.getConnectionId(); 499 if (connectionId != null) { 500 ConnectionState cs = connectionStates.get(connectionId); 501 if (cs != null) { 502 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 503 if (transactionState != null) { 504 transactionState.addCommand(info); 505 return new Tracked(new PrepareReadonlyTransactionAction(info)); 506 } 507 } 508 } 509 } 510 return null; 511 } 512 513 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 514 if (trackTransactions && info != null) { 515 ConnectionId connectionId = info.getConnectionId(); 516 if (connectionId != null) { 517 ConnectionState cs = connectionStates.get(connectionId); 518 if (cs != null) { 519 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 520 if (transactionState != null) { 521 transactionState.addCommand(info); 522 return new Tracked(new RemoveTransactionAction(info)); 523 } 524 } 525 } 526 } 527 return null; 528 } 529 530 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 531 if (trackTransactions && info != null) { 532 ConnectionId connectionId = info.getConnectionId(); 533 if (connectionId != null) { 534 ConnectionState cs = connectionStates.get(connectionId); 535 if (cs != null) { 536 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 537 if (transactionState != null) { 538 transactionState.addCommand(info); 539 return new Tracked(new RemoveTransactionAction(info)); 540 } 541 } 542 } 543 } 544 return null; 545 } 546 547 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 548 if (trackTransactions && info != null) { 549 ConnectionId connectionId = info.getConnectionId(); 550 if (connectionId != null) { 551 ConnectionState cs = connectionStates.get(connectionId); 552 if (cs != null) { 553 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 554 if (transactionState != null) { 555 transactionState.addCommand(info); 556 return new Tracked(new RemoveTransactionAction(info)); 557 } 558 } 559 } 560 } 561 return null; 562 } 563 564 public Response processEndTransaction(TransactionInfo info) throws Exception { 565 if (trackTransactions && info != null) { 566 ConnectionId connectionId = info.getConnectionId(); 567 if (connectionId != null) { 568 ConnectionState cs = connectionStates.get(connectionId); 569 if (cs != null) { 570 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 571 if (transactionState != null) { 572 transactionState.addCommand(info); 573 } 574 } 575 } 576 return TRACKED_RESPONSE_MARKER; 577 } 578 return null; 579 } 580 581 @Override 582 public Response processMessagePull(MessagePull pull) throws Exception { 583 if (pull != null) { 584 // leave a single instance in the cache 585 final String id = pull.getDestination() + "::" + pull.getConsumerId(); 586 messageCache.put(id.intern(), pull); 587 } 588 return null; 589 } 590 591 public boolean isRestoreConsumers() { 592 return restoreConsumers; 593 } 594 595 public void setRestoreConsumers(boolean restoreConsumers) { 596 this.restoreConsumers = restoreConsumers; 597 } 598 599 public boolean isRestoreProducers() { 600 return restoreProducers; 601 } 602 603 public void setRestoreProducers(boolean restoreProducers) { 604 this.restoreProducers = restoreProducers; 605 } 606 607 public boolean isRestoreSessions() { 608 return restoreSessions; 609 } 610 611 public void setRestoreSessions(boolean restoreSessions) { 612 this.restoreSessions = restoreSessions; 613 } 614 615 public boolean isTrackTransactions() { 616 return trackTransactions; 617 } 618 619 public void setTrackTransactions(boolean trackTransactions) { 620 this.trackTransactions = trackTransactions; 621 } 622 623 public boolean isTrackTransactionProducers() { 624 return this.trackTransactionProducers; 625 } 626 627 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 628 this.trackTransactionProducers = trackTransactionProducers; 629 } 630 631 public boolean isRestoreTransaction() { 632 return restoreTransaction; 633 } 634 635 public void setRestoreTransaction(boolean restoreTransaction) { 636 this.restoreTransaction = restoreTransaction; 637 } 638 639 public boolean isTrackMessages() { 640 return trackMessages; 641 } 642 643 public void setTrackMessages(boolean trackMessages) { 644 this.trackMessages = trackMessages; 645 } 646 647 public int getMaxCacheSize() { 648 return maxCacheSize; 649 } 650 651 public void setMaxCacheSize(int maxCacheSize) { 652 this.maxCacheSize = maxCacheSize; 653 } 654 655 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) { 656 ConnectionState connectionState = connectionStates.get(connectionId); 657 if (connectionState != null) { 658 connectionState.setConnectionInterruptProcessingComplete(true); 659 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers(); 660 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) { 661 ConsumerControl control = new ConsumerControl(); 662 control.setConsumerId(entry.getKey()); 663 control.setPrefetch(entry.getValue().getPrefetchSize()); 664 control.setDestination(entry.getValue().getDestination()); 665 try { 666 if (LOG.isDebugEnabled()) { 667 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch()); 668 } 669 transport.oneway(control); 670 } catch (Exception ex) { 671 if (LOG.isDebugEnabled()) { 672 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId() 673 + " with: " + control.getPrefetch(), ex); 674 } 675 } 676 } 677 stalledConsumers.clear(); 678 } 679 } 680 681 public void transportInterrupted(ConnectionId connectionId) { 682 ConnectionState connectionState = connectionStates.get(connectionId); 683 if (connectionState != null) { 684 connectionState.setConnectionInterruptProcessingComplete(false); 685 } 686 } 687 }