001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import org.apache.activemq.broker.BrokerService; 020 import org.apache.activemq.broker.BrokerServiceAware; 021 import org.apache.activemq.broker.ConnectionContext; 022 import org.apache.activemq.command.ActiveMQDestination; 023 import org.apache.activemq.command.ActiveMQQueue; 024 import org.apache.activemq.command.ActiveMQTopic; 025 import org.apache.activemq.command.LocalTransactionId; 026 import org.apache.activemq.command.ProducerId; 027 import org.apache.activemq.command.TransactionId; 028 import org.apache.activemq.command.XATransactionId; 029 import org.apache.activemq.protobuf.Buffer; 030 import org.apache.activemq.store.MessageStore; 031 import org.apache.activemq.store.PersistenceAdapter; 032 import org.apache.activemq.store.TopicMessageStore; 033 import org.apache.activemq.store.TransactionStore; 034 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 035 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 036 import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 037 import org.apache.activemq.usage.SystemUsage; 038 039 import java.io.File; 040 import java.io.IOException; 041 import java.util.Set; 042 043 /** 044 * An implementation of {@link PersistenceAdapter} designed for use with 045 * KahaDB - Embedded Lightweight Non-Relational Database 046 * 047 * @org.apache.xbean.XBean element="kahaDB" 048 * 049 */ 050 public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { 051 private final KahaDBStore letter = new KahaDBStore(); 052 053 /** 054 * @param context 055 * @throws IOException 056 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 057 */ 058 public void beginTransaction(ConnectionContext context) throws IOException { 059 this.letter.beginTransaction(context); 060 } 061 062 /** 063 * @param sync 064 * @throws IOException 065 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 066 */ 067 public void checkpoint(boolean sync) throws IOException { 068 this.letter.checkpoint(sync); 069 } 070 071 /** 072 * @param context 073 * @throws IOException 074 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 075 */ 076 public void commitTransaction(ConnectionContext context) throws IOException { 077 this.letter.commitTransaction(context); 078 } 079 080 /** 081 * @param destination 082 * @return MessageStore 083 * @throws IOException 084 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 085 */ 086 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 087 return this.letter.createQueueMessageStore(destination); 088 } 089 090 /** 091 * @param destination 092 * @return TopicMessageStore 093 * @throws IOException 094 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 095 */ 096 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 097 return this.letter.createTopicMessageStore(destination); 098 } 099 100 /** 101 * @return TransactionStore 102 * @throws IOException 103 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 104 */ 105 public TransactionStore createTransactionStore() throws IOException { 106 return this.letter.createTransactionStore(); 107 } 108 109 /** 110 * @throws IOException 111 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 112 */ 113 public void deleteAllMessages() throws IOException { 114 this.letter.deleteAllMessages(); 115 } 116 117 /** 118 * @return destinations 119 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 120 */ 121 public Set<ActiveMQDestination> getDestinations() { 122 return this.letter.getDestinations(); 123 } 124 125 /** 126 * @return lastMessageBrokerSequenceId 127 * @throws IOException 128 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 129 */ 130 public long getLastMessageBrokerSequenceId() throws IOException { 131 return this.letter.getLastMessageBrokerSequenceId(); 132 } 133 134 public long getLastProducerSequenceId(ProducerId id) throws IOException { 135 return this.letter.getLastProducerSequenceId(id); 136 } 137 138 /** 139 * @param destination 140 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 141 */ 142 public void removeQueueMessageStore(ActiveMQQueue destination) { 143 this.letter.removeQueueMessageStore(destination); 144 } 145 146 /** 147 * @param destination 148 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 149 */ 150 public void removeTopicMessageStore(ActiveMQTopic destination) { 151 this.letter.removeTopicMessageStore(destination); 152 } 153 154 /** 155 * @param context 156 * @throws IOException 157 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 158 */ 159 public void rollbackTransaction(ConnectionContext context) throws IOException { 160 this.letter.rollbackTransaction(context); 161 } 162 163 /** 164 * @param brokerName 165 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 166 */ 167 public void setBrokerName(String brokerName) { 168 this.letter.setBrokerName(brokerName); 169 } 170 171 /** 172 * @param usageManager 173 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 174 */ 175 public void setUsageManager(SystemUsage usageManager) { 176 this.letter.setUsageManager(usageManager); 177 } 178 179 /** 180 * @return the size of the store 181 * @see org.apache.activemq.store.PersistenceAdapter#size() 182 */ 183 public long size() { 184 return this.letter.size(); 185 } 186 187 /** 188 * @throws Exception 189 * @see org.apache.activemq.Service#start() 190 */ 191 public void start() throws Exception { 192 this.letter.start(); 193 } 194 195 /** 196 * @throws Exception 197 * @see org.apache.activemq.Service#stop() 198 */ 199 public void stop() throws Exception { 200 this.letter.stop(); 201 } 202 203 /** 204 * Get the journalMaxFileLength 205 * 206 * @return the journalMaxFileLength 207 */ 208 public int getJournalMaxFileLength() { 209 return this.letter.getJournalMaxFileLength(); 210 } 211 212 /** 213 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 214 * be used 215 * 216 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 217 */ 218 public void setJournalMaxFileLength(int journalMaxFileLength) { 219 this.letter.setJournalMaxFileLength(journalMaxFileLength); 220 } 221 222 /** 223 * Set the max number of producers (LRU cache) to track for duplicate sends 224 */ 225 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 226 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); 227 } 228 229 public int getMaxFailoverProducersToTrack() { 230 return this.letter.getMaxFailoverProducersToTrack(); 231 } 232 233 /** 234 * set the audit window depth for duplicate suppression (should exceed the max transaction 235 * batch) 236 */ 237 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 238 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); 239 } 240 241 public int getFailoverProducersAuditDepth() { 242 return this.letter.getFailoverProducersAuditDepth(); 243 } 244 245 /** 246 * Get the checkpointInterval 247 * 248 * @return the checkpointInterval 249 */ 250 public long getCheckpointInterval() { 251 return this.letter.getCheckpointInterval(); 252 } 253 254 /** 255 * Set the checkpointInterval 256 * 257 * @param checkpointInterval 258 * the checkpointInterval to set 259 */ 260 public void setCheckpointInterval(long checkpointInterval) { 261 this.letter.setCheckpointInterval(checkpointInterval); 262 } 263 264 /** 265 * Get the cleanupInterval 266 * 267 * @return the cleanupInterval 268 */ 269 public long getCleanupInterval() { 270 return this.letter.getCleanupInterval(); 271 } 272 273 /** 274 * Set the cleanupInterval 275 * 276 * @param cleanupInterval 277 * the cleanupInterval to set 278 */ 279 public void setCleanupInterval(long cleanupInterval) { 280 this.letter.setCleanupInterval(cleanupInterval); 281 } 282 283 /** 284 * Get the indexWriteBatchSize 285 * 286 * @return the indexWriteBatchSize 287 */ 288 public int getIndexWriteBatchSize() { 289 return this.letter.getIndexWriteBatchSize(); 290 } 291 292 /** 293 * Set the indexWriteBatchSize 294 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 295 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 296 * @param indexWriteBatchSize 297 * the indexWriteBatchSize to set 298 */ 299 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 300 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 301 } 302 303 /** 304 * Get the journalMaxWriteBatchSize 305 * 306 * @return the journalMaxWriteBatchSize 307 */ 308 public int getJournalMaxWriteBatchSize() { 309 return this.letter.getJournalMaxWriteBatchSize(); 310 } 311 312 /** 313 * Set the journalMaxWriteBatchSize 314 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 315 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 316 * @param journalMaxWriteBatchSize 317 * the journalMaxWriteBatchSize to set 318 */ 319 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 320 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 321 } 322 323 /** 324 * Get the enableIndexWriteAsync 325 * 326 * @return the enableIndexWriteAsync 327 */ 328 public boolean isEnableIndexWriteAsync() { 329 return this.letter.isEnableIndexWriteAsync(); 330 } 331 332 /** 333 * Set the enableIndexWriteAsync 334 * 335 * @param enableIndexWriteAsync 336 * the enableIndexWriteAsync to set 337 */ 338 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 339 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 340 } 341 342 /** 343 * Get the directory 344 * 345 * @return the directory 346 */ 347 public File getDirectory() { 348 return this.letter.getDirectory(); 349 } 350 351 /** 352 * @param dir 353 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 354 */ 355 public void setDirectory(File dir) { 356 this.letter.setDirectory(dir); 357 } 358 359 /** 360 * Get the enableJournalDiskSyncs 361 * 362 * @return the enableJournalDiskSyncs 363 */ 364 public boolean isEnableJournalDiskSyncs() { 365 return this.letter.isEnableJournalDiskSyncs(); 366 } 367 368 /** 369 * Set the enableJournalDiskSyncs 370 * 371 * @param enableJournalDiskSyncs 372 * the enableJournalDiskSyncs to set 373 */ 374 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 375 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 376 } 377 378 /** 379 * Get the indexCacheSize 380 * 381 * @return the indexCacheSize 382 */ 383 public int getIndexCacheSize() { 384 return this.letter.getIndexCacheSize(); 385 } 386 387 /** 388 * Set the indexCacheSize 389 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 390 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 391 * @param indexCacheSize 392 * the indexCacheSize to set 393 */ 394 public void setIndexCacheSize(int indexCacheSize) { 395 this.letter.setIndexCacheSize(indexCacheSize); 396 } 397 398 /** 399 * Get the ignoreMissingJournalfiles 400 * 401 * @return the ignoreMissingJournalfiles 402 */ 403 public boolean isIgnoreMissingJournalfiles() { 404 return this.letter.isIgnoreMissingJournalfiles(); 405 } 406 407 /** 408 * Set the ignoreMissingJournalfiles 409 * 410 * @param ignoreMissingJournalfiles 411 * the ignoreMissingJournalfiles to set 412 */ 413 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 414 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 415 } 416 417 public boolean isChecksumJournalFiles() { 418 return letter.isChecksumJournalFiles(); 419 } 420 421 public boolean isCheckForCorruptJournalFiles() { 422 return letter.isCheckForCorruptJournalFiles(); 423 } 424 425 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 426 letter.setChecksumJournalFiles(checksumJournalFiles); 427 } 428 429 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 430 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 431 } 432 433 public void setBrokerService(BrokerService brokerService) { 434 letter.setBrokerService(brokerService); 435 } 436 437 public boolean isArchiveDataLogs() { 438 return letter.isArchiveDataLogs(); 439 } 440 441 public void setArchiveDataLogs(boolean archiveDataLogs) { 442 letter.setArchiveDataLogs(archiveDataLogs); 443 } 444 445 public File getDirectoryArchive() { 446 return letter.getDirectoryArchive(); 447 } 448 449 public void setDirectoryArchive(File directoryArchive) { 450 letter.setDirectoryArchive(directoryArchive); 451 } 452 453 public boolean isConcurrentStoreAndDispatchQueues() { 454 return letter.isConcurrentStoreAndDispatchQueues(); 455 } 456 457 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 458 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); 459 } 460 461 public boolean isConcurrentStoreAndDispatchTopics() { 462 return letter.isConcurrentStoreAndDispatchTopics(); 463 } 464 465 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 466 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); 467 } 468 469 public int getMaxAsyncJobs() { 470 return letter.getMaxAsyncJobs(); 471 } 472 /** 473 * @param maxAsyncJobs 474 * the maxAsyncJobs to set 475 */ 476 public void setMaxAsyncJobs(int maxAsyncJobs) { 477 letter.setMaxAsyncJobs(maxAsyncJobs); 478 } 479 480 /** 481 * @return the databaseLockedWaitDelay 482 */ 483 public int getDatabaseLockedWaitDelay() { 484 return letter.getDatabaseLockedWaitDelay(); 485 } 486 487 /** 488 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 489 */ 490 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) { 491 letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay); 492 } 493 494 public boolean getForceRecoverIndex() { 495 return letter.getForceRecoverIndex(); 496 } 497 498 public void setForceRecoverIndex(boolean forceRecoverIndex) { 499 letter.setForceRecoverIndex(forceRecoverIndex); 500 } 501 502 public boolean isArchiveCorruptedIndex() { 503 return letter.isArchiveCorruptedIndex(); 504 } 505 506 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 507 letter.setArchiveCorruptedIndex(archiveCorruptedIndex); 508 } 509 510 /** 511 * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure 512 * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true 513 */ 514 public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) { 515 letter.setRewriteOnRedelivery(rewriteOnRedelivery); 516 } 517 518 public boolean isRewriteOnRedelivery() { 519 return letter.isRewriteOnRedelivery(); 520 } 521 522 public float getIndexLFUEvictionFactor() { 523 return letter.getIndexLFUEvictionFactor(); 524 } 525 526 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 527 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor); 528 } 529 530 public boolean isUseIndexLFRUEviction() { 531 return letter.isUseIndexLFRUEviction(); 532 } 533 534 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 535 letter.setUseIndexLFRUEviction(useIndexLFRUEviction); 536 } 537 538 public void setEnableIndexDiskSyncs(boolean diskSyncs) { 539 letter.setEnableIndexDiskSyncs(diskSyncs); 540 } 541 542 public boolean isEnableIndexDiskSyncs() { 543 return letter.isEnableIndexDiskSyncs(); 544 } 545 546 public void setEnableIndexRecoveryFile(boolean enable) { 547 letter.setEnableIndexRecoveryFile(enable); 548 } 549 550 public boolean isEnableIndexRecoveryFile() { 551 return letter.isEnableIndexRecoveryFile(); 552 } 553 554 public void setEnableIndexPageCaching(boolean enable) { 555 letter.setEnableIndexPageCaching(enable); 556 } 557 558 public boolean isEnableIndexPageCaching() { 559 return letter.isEnableIndexPageCaching(); 560 } 561 562 public KahaDBStore getStore() { 563 return letter; 564 } 565 566 public KahaTransactionInfo createTransactionInfo(TransactionId txid) { 567 if (txid == null) { 568 return null; 569 } 570 KahaTransactionInfo rc = new KahaTransactionInfo(); 571 572 if (txid.isLocalTransaction()) { 573 LocalTransactionId t = (LocalTransactionId) txid; 574 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 575 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 576 kahaTxId.setTransacitonId(t.getValue()); 577 rc.setLocalTransacitonId(kahaTxId); 578 } else { 579 XATransactionId t = (XATransactionId) txid; 580 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 581 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 582 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 583 kahaTxId.setFormatId(t.getFormatId()); 584 rc.setXaTransacitonId(kahaTxId); 585 } 586 return rc; 587 } 588 589 @Override 590 public String toString() { 591 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 592 return "KahaDBPersistenceAdapter[" + path + "]"; 593 } 594 595 }