001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.Date; 022import java.util.HashSet; 023import java.util.Set; 024import java.util.TreeSet; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.atomic.AtomicBoolean; 028 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.region.BaseDestination; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.command.MessageAck; 034import org.apache.activemq.command.MessageId; 035import org.apache.activemq.command.TransactionId; 036import org.apache.activemq.command.XATransactionId; 037import org.apache.activemq.store.AbstractMessageStore; 038import org.apache.activemq.store.IndexListener; 039import org.apache.activemq.store.ListenableFuture; 040import org.apache.activemq.store.MessageStore; 041import org.apache.activemq.store.PersistenceAdapter; 042import org.apache.activemq.store.ProxyMessageStore; 043import org.apache.activemq.store.ProxyTopicMessageStore; 044import org.apache.activemq.store.TopicMessageStore; 045import org.apache.activemq.store.TransactionRecoveryListener; 046import org.apache.activemq.store.TransactionStore; 047import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 048import org.apache.activemq.store.kahadb.data.KahaEntryType; 049import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 050import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 051import org.apache.activemq.store.kahadb.disk.journal.Journal; 052import org.apache.activemq.store.kahadb.disk.journal.Location; 053import org.apache.activemq.usage.StoreUsage; 054import org.apache.activemq.util.DataByteArrayInputStream; 055import org.apache.activemq.util.DataByteArrayOutputStream; 056import org.apache.activemq.util.IOHelper; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060public class MultiKahaDBTransactionStore implements TransactionStore { 061 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); 062 final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; 063 final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 064 final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>(); 065 private Journal journal; 066 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 067 private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 068 private final AtomicBoolean started = new AtomicBoolean(false); 069 070 public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { 071 this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; 072 } 073 074 public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) { 075 return new ProxyMessageStore(messageStore) { 076 @Override 077 public void addMessage(ConnectionContext context, final Message send) throws IOException { 078 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 079 } 080 081 @Override 082 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 083 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 084 } 085 086 @Override 087 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 088 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 089 } 090 091 @Override 092 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 093 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 094 } 095 096 @Override 097 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 098 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 099 } 100 101 @Override 102 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 103 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 104 } 105 106 @Override 107 public void registerIndexListener(IndexListener indexListener) { 108 getDelegate().registerIndexListener(indexListener); 109 try { 110 if (indexListener instanceof BaseDestination) { 111 // update queue storeUsage 112 Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination()); 113 if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter) { 114 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) matchingPersistenceAdapter; 115 if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { 116 StoreUsage storeUsage = filteredAdapter.getUsage(); 117 storeUsage.setStore(filteredAdapter.getPersistenceAdapter()); 118 storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage()); 119 ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage); 120 } 121 } 122 } 123 } catch (Exception ignored) { 124 LOG.warn("Failed to set mKahaDB destination store usage", ignored); 125 } 126 } 127 }; 128 } 129 130 public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) { 131 return new ProxyTopicMessageStore(messageStore) { 132 @Override 133 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 134 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 135 } 136 137 @Override 138 public void addMessage(ConnectionContext context, final Message send) throws IOException { 139 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 140 } 141 142 @Override 143 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 144 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 145 } 146 147 @Override 148 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 149 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 150 } 151 152 @Override 153 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 154 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 155 } 156 157 @Override 158 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 159 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 160 } 161 162 @Override 163 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 164 MessageId messageId, MessageAck ack) throws IOException { 165 MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId, 166 subscriptionName, messageId, ack); 167 } 168 }; 169 } 170 171 public void deleteAllMessages() { 172 IOHelper.deleteChildren(getDirectory()); 173 } 174 175 public int getJournalMaxFileLength() { 176 return journalMaxFileLength; 177 } 178 179 public void setJournalMaxFileLength(int journalMaxFileLength) { 180 this.journalMaxFileLength = journalMaxFileLength; 181 } 182 183 public int getJournalMaxWriteBatchSize() { 184 return journalWriteBatchSize; 185 } 186 187 public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) { 188 this.journalWriteBatchSize = journalWriteBatchSize; 189 } 190 191 public class Tx { 192 private final Set<TransactionStore> stores = new HashSet<TransactionStore>(); 193 private int prepareLocationId = 0; 194 195 public void trackStore(TransactionStore store) { 196 stores.add(store); 197 } 198 199 public Set<TransactionStore> getStores() { 200 return stores; 201 } 202 203 public void trackPrepareLocation(Location location) { 204 this.prepareLocationId = location.getDataFileId(); 205 } 206 207 public int getPreparedLocationId() { 208 return prepareLocationId; 209 } 210 } 211 212 public Tx getTx(TransactionId txid) { 213 Tx tx = inflightTransactions.get(txid); 214 if (tx == null) { 215 tx = new Tx(); 216 inflightTransactions.put(txid, tx); 217 } 218 return tx; 219 } 220 221 public Tx removeTx(TransactionId txid) { 222 return inflightTransactions.remove(txid); 223 } 224 225 @Override 226 public void prepare(TransactionId txid) throws IOException { 227 Tx tx = getTx(txid); 228 for (TransactionStore store : tx.getStores()) { 229 store.prepare(txid); 230 } 231 } 232 233 @Override 234 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 235 throws IOException { 236 237 if (preCommit != null) { 238 preCommit.run(); 239 } 240 241 Tx tx = getTx(txid); 242 if (wasPrepared) { 243 for (TransactionStore store : tx.getStores()) { 244 store.commit(txid, true, null, null); 245 } 246 } else { 247 // can only do 1pc on a single store 248 if (tx.getStores().size() == 1) { 249 for (TransactionStore store : tx.getStores()) { 250 store.commit(txid, false, null, null); 251 } 252 } else { 253 // need to do local 2pc 254 for (TransactionStore store : tx.getStores()) { 255 store.prepare(txid); 256 } 257 persistOutcome(tx, txid); 258 for (TransactionStore store : tx.getStores()) { 259 store.commit(txid, true, null, null); 260 } 261 persistCompletion(txid); 262 } 263 } 264 removeTx(txid); 265 if (postCommit != null) { 266 postCommit.run(); 267 } 268 } 269 270 public void persistOutcome(Tx tx, TransactionId txid) throws IOException { 271 tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))))); 272 } 273 274 public void persistCompletion(TransactionId txid) throws IOException { 275 store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); 276 } 277 278 private Location store(JournalCommand<?> data) throws IOException { 279 int size = data.serializedSizeFramed(); 280 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 281 os.writeByte(data.type().getNumber()); 282 data.writeFramed(os); 283 Location location = journal.write(os.toByteSequence(), true); 284 journal.setLastAppendLocation(location); 285 return location; 286 } 287 288 @Override 289 public void rollback(TransactionId txid) throws IOException { 290 Tx tx = removeTx(txid); 291 if (tx != null) { 292 for (TransactionStore store : tx.getStores()) { 293 store.rollback(txid); 294 } 295 } 296 } 297 298 @Override 299 public void start() throws Exception { 300 if (started.compareAndSet(false, true)) { 301 journal = new Journal() { 302 @Override 303 public void cleanup() { 304 super.cleanup(); 305 txStoreCleanup(); 306 } 307 }; 308 journal.setDirectory(getDirectory()); 309 journal.setMaxFileLength(journalMaxFileLength); 310 journal.setWriteBatchSize(journalWriteBatchSize); 311 IOHelper.mkdirs(journal.getDirectory()); 312 journal.start(); 313 recoverPendingLocalTransactions(); 314 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 315 } 316 } 317 318 private void txStoreCleanup() { 319 Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet()); 320 for (Tx tx : inflightTransactions.values()) { 321 knownDataFileIds.remove(tx.getPreparedLocationId()); 322 } 323 try { 324 journal.removeDataFiles(knownDataFileIds); 325 } catch (Exception e) { 326 LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds); 327 } 328 } 329 330 private File getDirectory() { 331 return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); 332 } 333 334 @Override 335 public void stop() throws Exception { 336 if (started.compareAndSet(true, false) && journal != null) { 337 journal.close(); 338 journal = null; 339 } 340 } 341 342 private void recoverPendingLocalTransactions() throws IOException { 343 Location location = journal.getNextLocation(null); 344 while (location != null) { 345 process(load(location)); 346 location = journal.getNextLocation(location); 347 } 348 recoveredPendingCommit.addAll(inflightTransactions.keySet()); 349 LOG.info("pending local transactions: " + recoveredPendingCommit); 350 } 351 352 public JournalCommand<?> load(Location location) throws IOException { 353 DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location)); 354 byte readByte = is.readByte(); 355 KahaEntryType type = KahaEntryType.valueOf(readByte); 356 if (type == null) { 357 throw new IOException("Could not load journal record. Invalid location: " + location); 358 } 359 JournalCommand<?> message = (JournalCommand<?>) type.createMessage(); 360 message.mergeFramed(is); 361 return message; 362 } 363 364 public void process(JournalCommand<?> command) throws IOException { 365 switch (command.type()) { 366 case KAHA_PREPARE_COMMAND: 367 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command; 368 getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())); 369 break; 370 case KAHA_COMMIT_COMMAND: 371 KahaCommitCommand commitCommand = (KahaCommitCommand) command; 372 removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo())); 373 break; 374 case KAHA_TRACE_COMMAND: 375 break; 376 default: 377 throw new IOException("Unexpected command in transaction journal: " + command); 378 } 379 } 380 381 382 @Override 383 public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { 384 385 for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { 386 adapter.createTransactionStore().recover(new TransactionRecoveryListener() { 387 @Override 388 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) { 389 try { 390 getTx(xid).trackStore(adapter.createTransactionStore()); 391 } catch (IOException e) { 392 LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e); 393 } 394 listener.recover(xid, addedMessages, acks); 395 } 396 }); 397 } 398 399 try { 400 Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker(); 401 // force completion of local xa 402 for (TransactionId txid : broker.getPreparedTransactions(null)) { 403 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { 404 try { 405 if (recoveredPendingCommit.contains(txid)) { 406 LOG.info("delivering pending commit outcome for tid: " + txid); 407 broker.commitTransaction(null, txid, false); 408 409 } else { 410 LOG.info("delivering rollback outcome to store for tid: " + txid); 411 broker.forgetTransaction(null, txid); 412 } 413 persistCompletion(txid); 414 } catch (Exception ex) { 415 LOG.error("failed to deliver pending outcome for tid: " + txid, ex); 416 } 417 } 418 } 419 } catch (Exception e) { 420 LOG.error("failed to resolve pending local transactions", e); 421 } 422 } 423 424 void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 425 throws IOException { 426 if (message.getTransactionId() != null) { 427 getTx(message.getTransactionId()).trackStore(transactionStore); 428 } 429 destination.addMessage(context, message); 430 } 431 432 ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 433 throws IOException { 434 if (message.getTransactionId() != null) { 435 getTx(message.getTransactionId()).trackStore(transactionStore); 436 destination.addMessage(context, message); 437 return AbstractMessageStore.FUTURE; 438 } else { 439 return destination.asyncAddQueueMessage(context, message); 440 } 441 } 442 443 ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 444 throws IOException { 445 446 if (message.getTransactionId() != null) { 447 getTx(message.getTransactionId()).trackStore(transactionStore); 448 destination.addMessage(context, message); 449 return AbstractMessageStore.FUTURE; 450 } else { 451 return destination.asyncAddTopicMessage(context, message); 452 } 453 } 454 455 final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 456 throws IOException { 457 if (ack.getTransactionId() != null) { 458 getTx(ack.getTransactionId()).trackStore(transactionStore); 459 } 460 destination.removeMessage(context, ack); 461 } 462 463 final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 464 throws IOException { 465 if (ack.getTransactionId() != null) { 466 getTx(ack.getTransactionId()).trackStore(transactionStore); 467 } 468 destination.removeAsyncMessage(context, ack); 469 } 470 471 final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination, 472 final String clientId, final String subscriptionName, 473 final MessageId messageId, final MessageAck ack) throws IOException { 474 if (ack.getTransactionId() != null) { 475 getTx(ack.getTransactionId()).trackStore(transactionStore); 476 } 477 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 478 } 479 480}