001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Set; 024import java.util.concurrent.Callable; 025 026import javax.management.ObjectName; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.LockableServiceSupport; 031import org.apache.activemq.broker.Locker; 032import org.apache.activemq.broker.jmx.AnnotatedMBean; 033import org.apache.activemq.broker.jmx.PersistenceAdapterView; 034import org.apache.activemq.broker.scheduler.JobSchedulerStore; 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.ActiveMQQueue; 037import org.apache.activemq.command.ActiveMQTopic; 038import org.apache.activemq.command.LocalTransactionId; 039import org.apache.activemq.command.ProducerId; 040import org.apache.activemq.command.TransactionId; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.protobuf.Buffer; 043import org.apache.activemq.store.JournaledStore; 044import org.apache.activemq.store.MessageStore; 045import org.apache.activemq.store.NoLocalSubscriptionAware; 046import org.apache.activemq.store.PersistenceAdapter; 047import org.apache.activemq.store.SharedFileLocker; 048import org.apache.activemq.store.TopicMessageStore; 049import org.apache.activemq.store.TransactionIdTransformer; 050import org.apache.activemq.store.TransactionIdTransformerAware; 051import org.apache.activemq.store.TransactionStore; 052import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 053import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 054import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 055import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 056import org.apache.activemq.usage.SystemUsage; 057import org.apache.activemq.util.ServiceStopper; 058 059/** 060 * An implementation of {@link PersistenceAdapter} designed for use with 061 * KahaDB - Embedded Lightweight Non-Relational Database 062 * 063 * @org.apache.xbean.XBean element="kahaDB" 064 * 065 */ 066public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, 067 JournaledStore, TransactionIdTransformerAware, NoLocalSubscriptionAware { 068 069 private final KahaDBStore letter = new KahaDBStore(); 070 071 /** 072 * @param context 073 * @throws IOException 074 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 075 */ 076 @Override 077 public void beginTransaction(ConnectionContext context) throws IOException { 078 this.letter.beginTransaction(context); 079 } 080 081 /** 082 * @param sync 083 * @throws IOException 084 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 085 */ 086 @Override 087 public void checkpoint(boolean sync) throws IOException { 088 this.letter.checkpoint(sync); 089 } 090 091 /** 092 * @param context 093 * @throws IOException 094 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 095 */ 096 @Override 097 public void commitTransaction(ConnectionContext context) throws IOException { 098 this.letter.commitTransaction(context); 099 } 100 101 /** 102 * @param destination 103 * @return MessageStore 104 * @throws IOException 105 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 106 */ 107 @Override 108 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 109 return this.letter.createQueueMessageStore(destination); 110 } 111 112 /** 113 * @param destination 114 * @return TopicMessageStore 115 * @throws IOException 116 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 117 */ 118 @Override 119 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 120 return this.letter.createTopicMessageStore(destination); 121 } 122 123 /** 124 * @return TransactionStore 125 * @throws IOException 126 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 127 */ 128 @Override 129 public TransactionStore createTransactionStore() throws IOException { 130 return this.letter.createTransactionStore(); 131 } 132 133 /** 134 * @throws IOException 135 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 136 */ 137 @Override 138 public void deleteAllMessages() throws IOException { 139 this.letter.deleteAllMessages(); 140 } 141 142 /** 143 * @return destinations 144 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 145 */ 146 @Override 147 public Set<ActiveMQDestination> getDestinations() { 148 return this.letter.getDestinations(); 149 } 150 151 /** 152 * @return lastMessageBrokerSequenceId 153 * @throws IOException 154 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 155 */ 156 @Override 157 public long getLastMessageBrokerSequenceId() throws IOException { 158 return this.letter.getLastMessageBrokerSequenceId(); 159 } 160 161 @Override 162 public long getLastProducerSequenceId(ProducerId id) throws IOException { 163 return this.letter.getLastProducerSequenceId(id); 164 } 165 166 @Override 167 public void allowIOResumption() { 168 this.letter.allowIOResumption(); 169 } 170 171 /** 172 * @param destination 173 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 174 */ 175 @Override 176 public void removeQueueMessageStore(ActiveMQQueue destination) { 177 this.letter.removeQueueMessageStore(destination); 178 } 179 180 /** 181 * @param destination 182 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 183 */ 184 @Override 185 public void removeTopicMessageStore(ActiveMQTopic destination) { 186 this.letter.removeTopicMessageStore(destination); 187 } 188 189 /** 190 * @param context 191 * @throws IOException 192 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 193 */ 194 @Override 195 public void rollbackTransaction(ConnectionContext context) throws IOException { 196 this.letter.rollbackTransaction(context); 197 } 198 199 /** 200 * @param brokerName 201 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 202 */ 203 @Override 204 public void setBrokerName(String brokerName) { 205 this.letter.setBrokerName(brokerName); 206 } 207 208 /** 209 * @param usageManager 210 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 211 */ 212 @Override 213 public void setUsageManager(SystemUsage usageManager) { 214 this.letter.setUsageManager(usageManager); 215 } 216 217 /** 218 * @return the size of the store 219 * @see org.apache.activemq.store.PersistenceAdapter#size() 220 */ 221 @Override 222 public long size() { 223 return this.letter.isStarted() ? this.letter.size() : 0l; 224 } 225 226 /** 227 * @throws Exception 228 * @see org.apache.activemq.Service#start() 229 */ 230 @Override 231 public void doStart() throws Exception { 232 this.letter.start(); 233 234 if (brokerService != null && brokerService.isUseJmx()) { 235 PersistenceAdapterView view = new PersistenceAdapterView(this); 236 view.setInflightTransactionViewCallable(new Callable<String>() { 237 @Override 238 public String call() throws Exception { 239 return letter.getTransactions(); 240 } 241 }); 242 view.setDataViewCallable(new Callable<String>() { 243 @Override 244 public String call() throws Exception { 245 return letter.getJournal().getFileMap().keySet().toString(); 246 } 247 }); 248 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, 249 createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString())); 250 } 251 } 252 253 /** 254 * @throws Exception 255 * @see org.apache.activemq.Service#stop() 256 */ 257 @Override 258 public void doStop(ServiceStopper stopper) throws Exception { 259 this.letter.stop(); 260 261 if (brokerService != null && brokerService.isUseJmx()) { 262 ObjectName brokerObjectName = brokerService.getBrokerObjectName(); 263 brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString())); 264 } 265 } 266 267 /** 268 * Get the journalMaxFileLength 269 * 270 * @return the journalMaxFileLength 271 */ 272 @Override 273 public int getJournalMaxFileLength() { 274 return this.letter.getJournalMaxFileLength(); 275 } 276 277 /** 278 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 279 * be used 280 * 281 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 282 */ 283 public void setJournalMaxFileLength(int journalMaxFileLength) { 284 this.letter.setJournalMaxFileLength(journalMaxFileLength); 285 } 286 287 /** 288 * Set the max number of producers (LRU cache) to track for duplicate sends 289 */ 290 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 291 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); 292 } 293 294 public int getMaxFailoverProducersToTrack() { 295 return this.letter.getMaxFailoverProducersToTrack(); 296 } 297 298 /** 299 * set the audit window depth for duplicate suppression (should exceed the max transaction 300 * batch) 301 */ 302 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 303 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); 304 } 305 306 public int getFailoverProducersAuditDepth() { 307 return this.letter.getFailoverProducersAuditDepth(); 308 } 309 310 /** 311 * Get the checkpointInterval 312 * 313 * @return the checkpointInterval 314 */ 315 public long getCheckpointInterval() { 316 return this.letter.getCheckpointInterval(); 317 } 318 319 /** 320 * Set the checkpointInterval 321 * 322 * @param checkpointInterval 323 * the checkpointInterval to set 324 */ 325 public void setCheckpointInterval(long checkpointInterval) { 326 this.letter.setCheckpointInterval(checkpointInterval); 327 } 328 329 /** 330 * Get the cleanupInterval 331 * 332 * @return the cleanupInterval 333 */ 334 public long getCleanupInterval() { 335 return this.letter.getCleanupInterval(); 336 } 337 338 /** 339 * Set the cleanupInterval 340 * 341 * @param cleanupInterval 342 * the cleanupInterval to set 343 */ 344 public void setCleanupInterval(long cleanupInterval) { 345 this.letter.setCleanupInterval(cleanupInterval); 346 } 347 348 /** 349 * Get the indexWriteBatchSize 350 * 351 * @return the indexWriteBatchSize 352 */ 353 public int getIndexWriteBatchSize() { 354 return this.letter.getIndexWriteBatchSize(); 355 } 356 357 /** 358 * Set the indexWriteBatchSize 359 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 360 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 361 * @param indexWriteBatchSize 362 * the indexWriteBatchSize to set 363 */ 364 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 365 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 366 } 367 368 /** 369 * Get the journalMaxWriteBatchSize 370 * 371 * @return the journalMaxWriteBatchSize 372 */ 373 public int getJournalMaxWriteBatchSize() { 374 return this.letter.getJournalMaxWriteBatchSize(); 375 } 376 377 /** 378 * Set the journalMaxWriteBatchSize 379 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 380 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 381 * @param journalMaxWriteBatchSize 382 * the journalMaxWriteBatchSize to set 383 */ 384 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 385 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 386 } 387 388 /** 389 * Get the enableIndexWriteAsync 390 * 391 * @return the enableIndexWriteAsync 392 */ 393 public boolean isEnableIndexWriteAsync() { 394 return this.letter.isEnableIndexWriteAsync(); 395 } 396 397 /** 398 * Set the enableIndexWriteAsync 399 * 400 * @param enableIndexWriteAsync 401 * the enableIndexWriteAsync to set 402 */ 403 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 404 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 405 } 406 407 /** 408 * Get the directory 409 * 410 * @return the directory 411 */ 412 @Override 413 public File getDirectory() { 414 return this.letter.getDirectory(); 415 } 416 417 /** 418 * @param dir 419 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 420 */ 421 @Override 422 public void setDirectory(File dir) { 423 this.letter.setDirectory(dir); 424 } 425 426 /** 427 * @return the currently configured location of the KahaDB index files. 428 */ 429 public File getIndexDirectory() { 430 return this.letter.getIndexDirectory(); 431 } 432 433 /** 434 * Sets the directory where KahaDB index files should be written. 435 * 436 * @param indexDirectory 437 * the directory where the KahaDB store index files should be written. 438 */ 439 public void setIndexDirectory(File indexDirectory) { 440 this.letter.setIndexDirectory(indexDirectory); 441 } 442 443 /** 444 * Get the enableJournalDiskSyncs 445 * @deprecated use {@link #getJournalDiskSyncStrategy} instead 446 * @return the enableJournalDiskSyncs 447 */ 448 public boolean isEnableJournalDiskSyncs() { 449 return this.letter.isEnableJournalDiskSyncs(); 450 } 451 452 /** 453 * Set the enableJournalDiskSyncs 454 * 455 * @deprecated use {@link #setJournalDiskSyncStrategy} instead 456 * @param enableJournalDiskSyncs 457 * the enableJournalDiskSyncs to set 458 */ 459 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 460 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 461 } 462 463 /** 464 * @return 465 */ 466 public String getJournalDiskSyncStrategy() { 467 return letter.getJournalDiskSyncStrategy(); 468 } 469 470 public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() { 471 return letter.getJournalDiskSyncStrategyEnum(); 472 } 473 474 /** 475 * @param journalDiskSyncStrategy 476 */ 477 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 478 letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 479 } 480 481 /** 482 * @return 483 */ 484 public long getJournalDiskSyncInterval() { 485 return letter.getJournalDiskSyncInterval(); 486 } 487 488 /** 489 * @param journalDiskSyncInterval 490 */ 491 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 492 letter.setJournalDiskSyncInterval(journalDiskSyncInterval); 493 } 494 495 /** 496 * Get the indexCacheSize 497 * 498 * @return the indexCacheSize 499 */ 500 public int getIndexCacheSize() { 501 return this.letter.getIndexCacheSize(); 502 } 503 504 /** 505 * Set the indexCacheSize 506 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 507 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 508 * @param indexCacheSize 509 * the indexCacheSize to set 510 */ 511 public void setIndexCacheSize(int indexCacheSize) { 512 this.letter.setIndexCacheSize(indexCacheSize); 513 } 514 515 /** 516 * Get the ignoreMissingJournalfiles 517 * 518 * @return the ignoreMissingJournalfiles 519 */ 520 public boolean isIgnoreMissingJournalfiles() { 521 return this.letter.isIgnoreMissingJournalfiles(); 522 } 523 524 /** 525 * Set the ignoreMissingJournalfiles 526 * 527 * @param ignoreMissingJournalfiles 528 * the ignoreMissingJournalfiles to set 529 */ 530 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 531 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 532 } 533 534 public boolean isChecksumJournalFiles() { 535 return letter.isChecksumJournalFiles(); 536 } 537 538 public boolean isCheckForCorruptJournalFiles() { 539 return letter.isCheckForCorruptJournalFiles(); 540 } 541 542 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 543 letter.setChecksumJournalFiles(checksumJournalFiles); 544 } 545 546 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 547 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 548 } 549 550 @Override 551 public void setBrokerService(BrokerService brokerService) { 552 super.setBrokerService(brokerService); 553 letter.setBrokerService(brokerService); 554 } 555 556 public String getPreallocationScope() { 557 return letter.getPreallocationScope(); 558 } 559 560 public void setPreallocationScope(String preallocationScope) { 561 this.letter.setPreallocationScope(preallocationScope); 562 } 563 564 public String getPreallocationStrategy() { 565 return letter.getPreallocationStrategy(); 566 } 567 568 public void setPreallocationStrategy(String preallocationStrategy) { 569 this.letter.setPreallocationStrategy(preallocationStrategy); 570 } 571 572 public boolean isArchiveDataLogs() { 573 return letter.isArchiveDataLogs(); 574 } 575 576 public void setArchiveDataLogs(boolean archiveDataLogs) { 577 letter.setArchiveDataLogs(archiveDataLogs); 578 } 579 580 public File getDirectoryArchive() { 581 return letter.getDirectoryArchive(); 582 } 583 584 public void setDirectoryArchive(File directoryArchive) { 585 letter.setDirectoryArchive(directoryArchive); 586 } 587 588 public boolean isConcurrentStoreAndDispatchQueues() { 589 return letter.isConcurrentStoreAndDispatchQueues(); 590 } 591 592 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 593 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); 594 } 595 596 public boolean isConcurrentStoreAndDispatchTopics() { 597 return letter.isConcurrentStoreAndDispatchTopics(); 598 } 599 600 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 601 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); 602 } 603 604 public int getMaxAsyncJobs() { 605 return letter.getMaxAsyncJobs(); 606 } 607 /** 608 * @param maxAsyncJobs 609 * the maxAsyncJobs to set 610 */ 611 public void setMaxAsyncJobs(int maxAsyncJobs) { 612 letter.setMaxAsyncJobs(maxAsyncJobs); 613 } 614 615 /** 616 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 617 * 618 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 619 */ 620 @Deprecated 621 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException { 622 getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay); 623 } 624 625 public boolean getForceRecoverIndex() { 626 return letter.getForceRecoverIndex(); 627 } 628 629 public void setForceRecoverIndex(boolean forceRecoverIndex) { 630 letter.setForceRecoverIndex(forceRecoverIndex); 631 } 632 633 public boolean isArchiveCorruptedIndex() { 634 return letter.isArchiveCorruptedIndex(); 635 } 636 637 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 638 letter.setArchiveCorruptedIndex(archiveCorruptedIndex); 639 } 640 641 public float getIndexLFUEvictionFactor() { 642 return letter.getIndexLFUEvictionFactor(); 643 } 644 645 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 646 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor); 647 } 648 649 public boolean isUseIndexLFRUEviction() { 650 return letter.isUseIndexLFRUEviction(); 651 } 652 653 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 654 letter.setUseIndexLFRUEviction(useIndexLFRUEviction); 655 } 656 657 public void setEnableIndexDiskSyncs(boolean diskSyncs) { 658 letter.setEnableIndexDiskSyncs(diskSyncs); 659 } 660 661 public boolean isEnableIndexDiskSyncs() { 662 return letter.isEnableIndexDiskSyncs(); 663 } 664 665 public void setEnableIndexRecoveryFile(boolean enable) { 666 letter.setEnableIndexRecoveryFile(enable); 667 } 668 669 public boolean isEnableIndexRecoveryFile() { 670 return letter.isEnableIndexRecoveryFile(); 671 } 672 673 public void setEnableIndexPageCaching(boolean enable) { 674 letter.setEnableIndexPageCaching(enable); 675 } 676 677 public boolean isEnableIndexPageCaching() { 678 return letter.isEnableIndexPageCaching(); 679 } 680 681 public int getCompactAcksAfterNoGC() { 682 return letter.getCompactAcksAfterNoGC(); 683 } 684 685 /** 686 * Sets the number of GC cycles where no journal logs were removed before an attempt to 687 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 688 * <p> 689 * A value of -1 will disable this feature. 690 * 691 * @param compactAcksAfterNoGC 692 * Number of empty GC cycles before we rewrite old ACKS. 693 */ 694 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 695 this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC); 696 } 697 698 public boolean isCompactAcksIgnoresStoreGrowth() { 699 return this.letter.isCompactAcksIgnoresStoreGrowth(); 700 } 701 702 /** 703 * Configure if Ack compaction will occur regardless of continued growth of the 704 * journal logs meaning that the store has not run out of space yet. Because the 705 * compaction operation can be costly this value is defaulted to off and the Ack 706 * compaction is only done when it seems that the store cannot grow and larger. 707 * 708 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 709 */ 710 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 711 this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth); 712 } 713 714 /** 715 * Returns whether Ack compaction is enabled 716 * 717 * @return enableAckCompaction 718 */ 719 public boolean isEnableAckCompaction() { 720 return letter.isEnableAckCompaction(); 721 } 722 723 /** 724 * Configure if the Ack compaction task should be enabled to run 725 * 726 * @param enableAckCompaction 727 */ 728 public void setEnableAckCompaction(boolean enableAckCompaction) { 729 letter.setEnableAckCompaction(enableAckCompaction); 730 } 731 732 /** 733 * Whether non-blocking subscription statistics have been enabled 734 * 735 * @return 736 */ 737 public boolean isEnableSubscriptionStatistics() { 738 return letter.isEnableSubscriptionStatistics(); 739 } 740 741 /** 742 * Enable caching statistics for each subscription to allow non-blocking 743 * retrieval of metrics. This could incur some overhead to compute if there are a lot 744 * of subscriptions. 745 * 746 * @param enableSubscriptionStatistics 747 */ 748 public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) { 749 letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics); 750 } 751 752 public KahaDBStore getStore() { 753 return letter; 754 } 755 756 public KahaTransactionInfo createTransactionInfo(TransactionId txid) { 757 if (txid == null) { 758 return null; 759 } 760 KahaTransactionInfo rc = new KahaTransactionInfo(); 761 762 if (txid.isLocalTransaction()) { 763 LocalTransactionId t = (LocalTransactionId) txid; 764 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 765 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 766 kahaTxId.setTransactionId(t.getValue()); 767 rc.setLocalTransactionId(kahaTxId); 768 } else { 769 XATransactionId t = (XATransactionId) txid; 770 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 771 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 772 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 773 kahaTxId.setFormatId(t.getFormatId()); 774 rc.setXaTransactionId(kahaTxId); 775 } 776 return rc; 777 } 778 779 @Override 780 public Locker createDefaultLocker() throws IOException { 781 SharedFileLocker locker = new SharedFileLocker(); 782 locker.configure(this); 783 return locker; 784 } 785 786 @Override 787 public void init() throws Exception {} 788 789 @Override 790 public String toString() { 791 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 792 return "KahaDBPersistenceAdapter[" + path + (getIndexDirectory() != null ? ",Index:" + getIndexDirectory().getAbsolutePath() : "") + "]"; 793 } 794 795 @Override 796 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 797 getStore().setTransactionIdTransformer(transactionIdTransformer); 798 } 799 800 @Override 801 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 802 return this.letter.createJobSchedulerStore(); 803 } 804 805 /* (non-Javadoc) 806 * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() 807 */ 808 @Override 809 public boolean isPersistNoLocal() { 810 return this.letter.isPersistNoLocal(); 811 } 812}