001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.util.Date; 022 import java.util.HashSet; 023 import java.util.Set; 024 import java.util.TreeSet; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.Future; 027 028 import org.apache.activemq.broker.Broker; 029 import org.apache.activemq.broker.ConnectionContext; 030 import org.apache.activemq.command.Message; 031 import org.apache.activemq.command.MessageAck; 032 import org.apache.activemq.command.MessageId; 033 import org.apache.activemq.command.TransactionId; 034 import org.apache.activemq.command.XATransactionId; 035 import org.apache.activemq.store.AbstractMessageStore; 036 import org.apache.activemq.store.MessageStore; 037 import org.apache.activemq.store.ProxyMessageStore; 038 import org.apache.activemq.store.ProxyTopicMessageStore; 039 import org.apache.activemq.store.TopicMessageStore; 040 import org.apache.activemq.store.TransactionRecoveryListener; 041 import org.apache.activemq.store.TransactionStore; 042 import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 043 import org.apache.activemq.store.kahadb.data.KahaEntryType; 044 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 045 import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 046 import org.apache.activemq.util.IOHelper; 047 import org.apache.kahadb.journal.Journal; 048 import org.apache.kahadb.journal.Location; 049 import org.apache.kahadb.util.DataByteArrayInputStream; 050 import org.apache.kahadb.util.DataByteArrayOutputStream; 051 import org.slf4j.Logger; 052 import org.slf4j.LoggerFactory; 053 054 public class MultiKahaDBTransactionStore implements TransactionStore { 055 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); 056 final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; 057 final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 058 final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>(); 059 private Journal journal; 060 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 061 private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 062 063 public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { 064 this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; 065 } 066 067 public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) { 068 return new ProxyMessageStore(messageStore) { 069 @Override 070 public void addMessage(ConnectionContext context, final Message send) throws IOException { 071 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 072 } 073 074 @Override 075 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 076 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 077 } 078 079 @Override 080 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 081 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 082 } 083 084 @Override 085 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 086 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 087 } 088 089 @Override 090 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 091 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 092 } 093 094 @Override 095 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 096 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 097 } 098 }; 099 } 100 101 public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) { 102 return new ProxyTopicMessageStore(messageStore) { 103 @Override 104 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 105 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 106 } 107 108 @Override 109 public void addMessage(ConnectionContext context, final Message send) throws IOException { 110 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 111 } 112 113 @Override 114 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 115 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 116 } 117 118 @Override 119 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 120 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 121 } 122 123 @Override 124 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 125 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 126 } 127 128 @Override 129 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 130 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 131 } 132 133 @Override 134 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 135 MessageId messageId, MessageAck ack) throws IOException { 136 MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId, 137 subscriptionName, messageId, ack); 138 } 139 }; 140 } 141 142 public void deleteAllMessages() { 143 IOHelper.deleteChildren(getDirectory()); 144 } 145 146 public int getJournalMaxFileLength() { 147 return journalMaxFileLength; 148 } 149 150 public void setJournalMaxFileLength(int journalMaxFileLength) { 151 this.journalMaxFileLength = journalMaxFileLength; 152 } 153 154 public int getJournalMaxWriteBatchSize() { 155 return journalWriteBatchSize; 156 } 157 158 public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) { 159 this.journalWriteBatchSize = journalWriteBatchSize; 160 } 161 162 public class Tx { 163 private final Set<TransactionStore> stores = new HashSet<TransactionStore>(); 164 private int prepareLocationId = 0; 165 166 public void trackStore(TransactionStore store) { 167 stores.add(store); 168 } 169 170 public Set<TransactionStore> getStores() { 171 return stores; 172 } 173 174 public void trackPrepareLocation(Location location) { 175 this.prepareLocationId = location.getDataFileId(); 176 } 177 178 public int getPreparedLocationId() { 179 return prepareLocationId; 180 } 181 } 182 183 public Tx getTx(TransactionId txid) { 184 Tx tx = inflightTransactions.get(txid); 185 if (tx == null) { 186 tx = new Tx(); 187 inflightTransactions.put(txid, tx); 188 } 189 return tx; 190 } 191 192 public Tx removeTx(TransactionId txid) { 193 return inflightTransactions.remove(txid); 194 } 195 196 public void prepare(TransactionId txid) throws IOException { 197 Tx tx = getTx(txid); 198 for (TransactionStore store : tx.getStores()) { 199 store.prepare(txid); 200 } 201 } 202 203 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 204 throws IOException { 205 206 if (preCommit != null) { 207 preCommit.run(); 208 } 209 210 Tx tx = getTx(txid); 211 if (wasPrepared) { 212 for (TransactionStore store : tx.getStores()) { 213 store.commit(txid, true, null, null); 214 } 215 } else { 216 // can only do 1pc on a single store 217 if (tx.getStores().size() == 1) { 218 for (TransactionStore store : tx.getStores()) { 219 store.commit(txid, false, null, null); 220 } 221 } else { 222 // need to do local 2pc 223 for (TransactionStore store : tx.getStores()) { 224 store.prepare(txid); 225 } 226 persistOutcome(tx, txid); 227 for (TransactionStore store : tx.getStores()) { 228 store.commit(txid, true, null, null); 229 } 230 persistCompletion(txid); 231 } 232 } 233 removeTx(txid); 234 if (postCommit != null) { 235 postCommit.run(); 236 } 237 } 238 239 public void persistOutcome(Tx tx, TransactionId txid) throws IOException { 240 tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); 241 } 242 243 public void persistCompletion(TransactionId txid) throws IOException { 244 store(new KahaCommitCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))); 245 } 246 247 private Location store(JournalCommand<?> data) throws IOException { 248 int size = data.serializedSizeFramed(); 249 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 250 os.writeByte(data.type().getNumber()); 251 data.writeFramed(os); 252 Location location = journal.write(os.toByteSequence(), true); 253 journal.setLastAppendLocation(location); 254 return location; 255 } 256 257 public void rollback(TransactionId txid) throws IOException { 258 Tx tx = removeTx(txid); 259 if (tx != null) { 260 for (TransactionStore store : tx.getStores()) { 261 store.rollback(txid); 262 } 263 } 264 } 265 266 public void start() throws Exception { 267 journal = new Journal() { 268 @Override 269 protected void cleanup() { 270 super.cleanup(); 271 txStoreCleanup(); 272 } 273 }; 274 journal.setDirectory(getDirectory()); 275 journal.setMaxFileLength(journalMaxFileLength); 276 journal.setWriteBatchSize(journalWriteBatchSize); 277 IOHelper.mkdirs(journal.getDirectory()); 278 journal.start(); 279 recoverPendingLocalTransactions(); 280 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 281 } 282 283 private void txStoreCleanup() { 284 Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet()); 285 for (Tx tx : inflightTransactions.values()) { 286 knownDataFileIds.remove(tx.getPreparedLocationId()); 287 } 288 try { 289 journal.removeDataFiles(knownDataFileIds); 290 } catch (Exception e) { 291 LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds); 292 } 293 } 294 295 private File getDirectory() { 296 return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); 297 } 298 299 public void stop() throws Exception { 300 journal.close(); 301 journal = null; 302 } 303 304 private void recoverPendingLocalTransactions() throws IOException { 305 Location location = journal.getNextLocation(null); 306 while (location != null) { 307 process(load(location)); 308 location = journal.getNextLocation(location); 309 } 310 recoveredPendingCommit.addAll(inflightTransactions.keySet()); 311 LOG.info("pending local transactions: " + recoveredPendingCommit); 312 } 313 314 public JournalCommand<?> load(Location location) throws IOException { 315 DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location)); 316 byte readByte = is.readByte(); 317 KahaEntryType type = KahaEntryType.valueOf(readByte); 318 if (type == null) { 319 throw new IOException("Could not load journal record. Invalid location: " + location); 320 } 321 JournalCommand<?> message = (JournalCommand<?>) type.createMessage(); 322 message.mergeFramed(is); 323 return message; 324 } 325 326 public void process(JournalCommand<?> command) throws IOException { 327 switch (command.type()) { 328 case KAHA_PREPARE_COMMAND: 329 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command; 330 getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())); 331 break; 332 case KAHA_COMMIT_COMMAND: 333 KahaCommitCommand commitCommand = (KahaCommitCommand) command; 334 removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo())); 335 break; 336 case KAHA_TRACE_COMMAND: 337 break; 338 default: 339 throw new IOException("Unexpected command in transaction journal: " + command); 340 } 341 } 342 343 344 public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { 345 346 for (final KahaDBPersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { 347 adapter.createTransactionStore().recover(new TransactionRecoveryListener() { 348 @Override 349 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) { 350 try { 351 getTx(xid).trackStore(adapter.createTransactionStore()); 352 } catch (IOException e) { 353 LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e); 354 } 355 listener.recover(xid, addedMessages, acks); 356 } 357 }); 358 } 359 360 try { 361 Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker(); 362 // force completion of local xa 363 for (TransactionId txid : broker.getPreparedTransactions(null)) { 364 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { 365 try { 366 if (recoveredPendingCommit.contains(txid)) { 367 LOG.info("delivering pending commit outcome for tid: " + txid); 368 broker.commitTransaction(null, txid, false); 369 370 } else { 371 LOG.info("delivering rollback outcome to store for tid: " + txid); 372 broker.forgetTransaction(null, txid); 373 } 374 persistCompletion(txid); 375 } catch (Exception ex) { 376 LOG.error("failed to deliver pending outcome for tid: " + txid, ex); 377 } 378 } 379 } 380 } catch (Exception e) { 381 LOG.error("failed to resolve pending local transactions", e); 382 } 383 } 384 385 void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 386 throws IOException { 387 if (message.getTransactionId() != null) { 388 getTx(message.getTransactionId()).trackStore(transactionStore); 389 } 390 destination.addMessage(context, message); 391 } 392 393 Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 394 throws IOException { 395 if (message.getTransactionId() != null) { 396 getTx(message.getTransactionId()).trackStore(transactionStore); 397 destination.addMessage(context, message); 398 return AbstractMessageStore.FUTURE; 399 } else { 400 return destination.asyncAddQueueMessage(context, message); 401 } 402 } 403 404 Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 405 throws IOException { 406 407 if (message.getTransactionId() != null) { 408 getTx(message.getTransactionId()).trackStore(transactionStore); 409 destination.addMessage(context, message); 410 return AbstractMessageStore.FUTURE; 411 } else { 412 return destination.asyncAddTopicMessage(context, message); 413 } 414 } 415 416 final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 417 throws IOException { 418 if (ack.getTransactionId() != null) { 419 getTx(ack.getTransactionId()).trackStore(transactionStore); 420 } 421 destination.removeMessage(context, ack); 422 } 423 424 final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 425 throws IOException { 426 if (ack.getTransactionId() != null) { 427 getTx(ack.getTransactionId()).trackStore(transactionStore); 428 } 429 destination.removeAsyncMessage(context, ack); 430 } 431 432 final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination, 433 final String clientId, final String subscriptionName, 434 final MessageId messageId, final MessageAck ack) throws IOException { 435 if (ack.getTransactionId() != null) { 436 getTx(ack.getTransactionId()).trackStore(transactionStore); 437 } 438 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 439 } 440 }