001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.CancellationException; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.command.MessageAck; 034import org.apache.activemq.command.MessageId; 035import org.apache.activemq.command.TransactionId; 036import org.apache.activemq.command.XATransactionId; 037import org.apache.activemq.protobuf.Buffer; 038import org.apache.activemq.store.AbstractMessageStore; 039import org.apache.activemq.store.ListenableFuture; 040import org.apache.activemq.store.MessageStore; 041import org.apache.activemq.store.ProxyMessageStore; 042import org.apache.activemq.store.ProxyTopicMessageStore; 043import org.apache.activemq.store.TopicMessageStore; 044import org.apache.activemq.store.TransactionRecoveryListener; 045import org.apache.activemq.store.TransactionStore; 046import org.apache.activemq.store.kahadb.MessageDatabase.Operation; 047import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 048import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 049import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 050import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 051import org.apache.activemq.wireformat.WireFormat; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Provides a TransactionStore implementation that can create transaction aware 057 * MessageStore objects from non transaction aware MessageStore objects. 058 * 059 * 060 */ 061public class KahaDBTransactionStore implements TransactionStore { 062 static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); 063 ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 064 private final KahaDBStore theStore; 065 066 public KahaDBTransactionStore(KahaDBStore theStore) { 067 this.theStore = theStore; 068 } 069 070 private WireFormat wireFormat(){ 071 return this.theStore.wireFormat; 072 } 073 074 public class Tx { 075 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); 076 077 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); 078 079 public void add(AddMessageCommand msg) { 080 messages.add(msg); 081 } 082 083 public void add(RemoveMessageCommand ack) { 084 acks.add(ack); 085 } 086 087 public Message[] getMessages() { 088 Message rc[] = new Message[messages.size()]; 089 int count = 0; 090 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 091 AddMessageCommand cmd = iter.next(); 092 rc[count++] = cmd.getMessage(); 093 } 094 return rc; 095 } 096 097 public MessageAck[] getAcks() { 098 MessageAck rc[] = new MessageAck[acks.size()]; 099 int count = 0; 100 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 101 RemoveMessageCommand cmd = iter.next(); 102 rc[count++] = cmd.getMessageAck(); 103 } 104 return rc; 105 } 106 107 /** 108 * @return true if something to commit 109 * @throws IOException 110 */ 111 public List<Future<Object>> commit() throws IOException { 112 List<Future<Object>> results = new ArrayList<Future<Object>>(); 113 // Do all the message adds. 114 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 115 AddMessageCommand cmd = iter.next(); 116 results.add(cmd.run()); 117 118 } 119 // And removes.. 120 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 121 RemoveMessageCommand cmd = iter.next(); 122 cmd.run(); 123 results.add(cmd.run()); 124 } 125 126 return results; 127 } 128 } 129 130 public abstract class AddMessageCommand { 131 private final ConnectionContext ctx; 132 AddMessageCommand(ConnectionContext ctx) { 133 this.ctx = ctx; 134 } 135 abstract Message getMessage(); 136 Future<Object> run() throws IOException { 137 return run(this.ctx); 138 } 139 abstract Future<Object> run(ConnectionContext ctx) throws IOException; 140 } 141 142 public abstract class RemoveMessageCommand { 143 144 private final ConnectionContext ctx; 145 RemoveMessageCommand(ConnectionContext ctx) { 146 this.ctx = ctx; 147 } 148 abstract MessageAck getMessageAck(); 149 Future<Object> run() throws IOException { 150 return run(this.ctx); 151 } 152 abstract Future<Object> run(ConnectionContext context) throws IOException; 153 } 154 155 public MessageStore proxy(MessageStore messageStore) { 156 return new ProxyMessageStore(messageStore) { 157 @Override 158 public void addMessage(ConnectionContext context, final Message send) throws IOException { 159 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 160 } 161 162 @Override 163 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 164 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 165 } 166 167 @Override 168 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 169 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 170 } 171 172 @Override 173 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 174 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 175 } 176 177 @Override 178 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 179 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 180 } 181 182 @Override 183 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 184 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 185 } 186 }; 187 } 188 189 public TopicMessageStore proxy(TopicMessageStore messageStore) { 190 return new ProxyTopicMessageStore(messageStore) { 191 @Override 192 public void addMessage(ConnectionContext context, final Message send) throws IOException { 193 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 194 } 195 196 @Override 197 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 198 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 199 } 200 201 @Override 202 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 203 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 204 } 205 206 @Override 207 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 208 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 209 } 210 211 @Override 212 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 213 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 214 } 215 216 @Override 217 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 218 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 219 } 220 221 @Override 222 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 223 MessageId messageId, MessageAck ack) throws IOException { 224 KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId, 225 subscriptionName, messageId, ack); 226 } 227 228 }; 229 } 230 231 /** 232 * @throws IOException 233 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 234 */ 235 @Override 236 public void prepare(TransactionId txid) throws IOException { 237 KahaTransactionInfo info = getTransactionInfo(txid); 238 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 239 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 240 } else { 241 Tx tx = inflightTransactions.remove(txid); 242 if (tx != null) { 243 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 244 } 245 } 246 } 247 248 public Tx getTx(Object txid) { 249 Tx tx = inflightTransactions.get(txid); 250 if (tx == null) { 251 tx = new Tx(); 252 inflightTransactions.put(txid, tx); 253 } 254 return tx; 255 } 256 257 @Override 258 public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit) 259 throws IOException { 260 if (txid != null) { 261 if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { 262 if (preCommit != null) { 263 preCommit.run(); 264 } 265 Tx tx = inflightTransactions.remove(txid); 266 if (tx != null) { 267 List<Future<Object>> results = tx.commit(); 268 boolean doneSomething = false; 269 for (Future<Object> result : results) { 270 try { 271 result.get(); 272 } catch (InterruptedException e) { 273 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 274 } catch (ExecutionException e) { 275 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 276 }catch(CancellationException e) { 277 } 278 if (!result.isCancelled()) { 279 doneSomething = true; 280 } 281 } 282 if (postCommit != null) { 283 postCommit.run(); 284 } 285 if (doneSomething) { 286 KahaTransactionInfo info = getTransactionInfo(txid); 287 theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); 288 } 289 }else { 290 //The Tx will be null for failed over clients - lets run their post commits 291 if (postCommit != null) { 292 postCommit.run(); 293 } 294 } 295 296 } else { 297 KahaTransactionInfo info = getTransactionInfo(txid); 298 if (preCommit != null) { 299 preCommit.run(); 300 } 301 theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit); 302 forgetRecoveredAcks(txid, false); 303 } 304 }else { 305 LOG.error("Null transaction passed on commit"); 306 } 307 } 308 309 /** 310 * @throws IOException 311 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 312 */ 313 @Override 314 public void rollback(TransactionId txid) throws IOException { 315 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 316 KahaTransactionInfo info = getTransactionInfo(txid); 317 theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); 318 forgetRecoveredAcks(txid, true); 319 } else { 320 inflightTransactions.remove(txid); 321 } 322 } 323 324 protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException { 325 if (txid.isXATransaction()) { 326 XATransactionId xaTid = ((XATransactionId) txid); 327 theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback); 328 } 329 } 330 331 @Override 332 public void start() throws Exception { 333 } 334 335 @Override 336 public void stop() throws Exception { 337 } 338 339 @Override 340 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 341 for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) { 342 XATransactionId xid = (XATransactionId) entry.getKey(); 343 ArrayList<Message> messageList = new ArrayList<Message>(); 344 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 345 346 for (Operation op : entry.getValue()) { 347 if (op.getClass() == MessageDatabase.AddOperation.class) { 348 MessageDatabase.AddOperation addOp = (MessageDatabase.AddOperation) op; 349 Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage() 350 .newInput())); 351 messageList.add(msg); 352 } else { 353 MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op; 354 Buffer ackb = rmOp.getCommand().getAck(); 355 MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput())); 356 ackList.add(ack); 357 } 358 } 359 360 Message[] addedMessages = new Message[messageList.size()]; 361 MessageAck[] acks = new MessageAck[ackList.size()]; 362 messageList.toArray(addedMessages); 363 ackList.toArray(acks); 364 xid.setPreparedAcks(ackList); 365 theStore.trackRecoveredAcks(ackList); 366 listener.recover(xid, addedMessages, acks); 367 } 368 } 369 370 /** 371 * @param message 372 * @throws IOException 373 */ 374 void addMessage(ConnectionContext context, final MessageStore destination, final Message message) 375 throws IOException { 376 377 if (message.getTransactionId() != null) { 378 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 379 destination.addMessage(context, message); 380 } else { 381 Tx tx = getTx(message.getTransactionId()); 382 tx.add(new AddMessageCommand(context) { 383 @Override 384 public Message getMessage() { 385 return message; 386 } 387 @Override 388 public Future<Object> run(ConnectionContext ctx) throws IOException { 389 destination.addMessage(ctx, message); 390 return AbstractMessageStore.FUTURE; 391 } 392 393 }); 394 } 395 } else { 396 destination.addMessage(context, message); 397 } 398 } 399 400 ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message) 401 throws IOException { 402 403 if (message.getTransactionId() != null) { 404 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 405 destination.addMessage(context, message); 406 return AbstractMessageStore.FUTURE; 407 } else { 408 Tx tx = getTx(message.getTransactionId()); 409 tx.add(new AddMessageCommand(context) { 410 @Override 411 public Message getMessage() { 412 return message; 413 } 414 @Override 415 public Future<Object> run(ConnectionContext ctx) throws IOException { 416 return destination.asyncAddQueueMessage(ctx, message); 417 } 418 419 }); 420 return AbstractMessageStore.FUTURE; 421 } 422 } else { 423 return destination.asyncAddQueueMessage(context, message); 424 } 425 } 426 427 ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message) 428 throws IOException { 429 430 if (message.getTransactionId() != null) { 431 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 432 destination.addMessage(context, message); 433 return AbstractMessageStore.FUTURE; 434 } else { 435 Tx tx = getTx(message.getTransactionId()); 436 tx.add(new AddMessageCommand(context) { 437 @Override 438 public Message getMessage() { 439 return message; 440 } 441 @Override 442 public Future<Object> run(ConnectionContext ctx) throws IOException { 443 return destination.asyncAddTopicMessage(ctx, message); 444 } 445 446 }); 447 return AbstractMessageStore.FUTURE; 448 } 449 } else { 450 return destination.asyncAddTopicMessage(context, message); 451 } 452 } 453 454 /** 455 * @param ack 456 * @throws IOException 457 */ 458 final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 459 throws IOException { 460 461 if (ack.isInTransaction()) { 462 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 463 destination.removeMessage(context, ack); 464 } else { 465 Tx tx = getTx(ack.getTransactionId()); 466 tx.add(new RemoveMessageCommand(context) { 467 @Override 468 public MessageAck getMessageAck() { 469 return ack; 470 } 471 472 @Override 473 public Future<Object> run(ConnectionContext ctx) throws IOException { 474 destination.removeMessage(ctx, ack); 475 return AbstractMessageStore.FUTURE; 476 } 477 }); 478 } 479 } else { 480 destination.removeMessage(context, ack); 481 } 482 } 483 484 final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 485 throws IOException { 486 487 if (ack.isInTransaction()) { 488 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 489 destination.removeAsyncMessage(context, ack); 490 } else { 491 Tx tx = getTx(ack.getTransactionId()); 492 tx.add(new RemoveMessageCommand(context) { 493 @Override 494 public MessageAck getMessageAck() { 495 return ack; 496 } 497 498 @Override 499 public Future<Object> run(ConnectionContext ctx) throws IOException { 500 destination.removeMessage(ctx, ack); 501 return AbstractMessageStore.FUTURE; 502 } 503 }); 504 } 505 } else { 506 destination.removeAsyncMessage(context, ack); 507 } 508 } 509 510 final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName, 511 final MessageId messageId, final MessageAck ack) throws IOException { 512 513 if (ack.isInTransaction()) { 514 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 515 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 516 } else { 517 Tx tx = getTx(ack.getTransactionId()); 518 tx.add(new RemoveMessageCommand(context) { 519 @Override 520 public MessageAck getMessageAck() { 521 return ack; 522 } 523 524 @Override 525 public Future<Object> run(ConnectionContext ctx) throws IOException { 526 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 527 return AbstractMessageStore.FUTURE; 528 } 529 }); 530 } 531 } else { 532 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 533 } 534 } 535 536 537 private KahaTransactionInfo getTransactionInfo(TransactionId txid) { 538 return TransactionIdConversion.convert(theStore.getTransactionIdTransformer().transform(txid)); 539 } 540}