001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInput; 022 import java.io.DataOutput; 023 import java.io.EOFException; 024 import java.io.File; 025 import java.io.IOException; 026 import java.io.InputStream; 027 import java.io.InterruptedIOException; 028 import java.io.ObjectInputStream; 029 import java.io.ObjectOutputStream; 030 import java.io.OutputStream; 031 import java.util.ArrayList; 032 import java.util.Collection; 033 import java.util.Collections; 034 import java.util.Date; 035 import java.util.HashMap; 036 import java.util.HashSet; 037 import java.util.Iterator; 038 import java.util.LinkedHashMap; 039 import java.util.LinkedHashSet; 040 import java.util.List; 041 import java.util.Map; 042 import java.util.Map.Entry; 043 import java.util.Set; 044 import java.util.SortedSet; 045 import java.util.Stack; 046 import java.util.TreeMap; 047 import java.util.TreeSet; 048 import java.util.concurrent.atomic.AtomicBoolean; 049 import java.util.concurrent.atomic.AtomicLong; 050 import java.util.concurrent.locks.ReentrantReadWriteLock; 051 052 import org.apache.activemq.ActiveMQMessageAuditNoSync; 053 import org.apache.activemq.broker.BrokerService; 054 import org.apache.activemq.broker.BrokerServiceAware; 055 import org.apache.activemq.command.MessageAck; 056 import org.apache.activemq.command.SubscriptionInfo; 057 import org.apache.activemq.command.TransactionId; 058 import org.apache.activemq.protobuf.Buffer; 059 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 060 import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 061 import org.apache.activemq.store.kahadb.data.KahaDestination; 062 import org.apache.activemq.store.kahadb.data.KahaEntryType; 063 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 064 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 065 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 066 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 067 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 068 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 069 import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 070 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 071 import org.apache.activemq.util.Callback; 072 import org.apache.activemq.util.IOHelper; 073 import org.apache.activemq.util.ServiceStopper; 074 import org.apache.activemq.util.ServiceSupport; 075 import org.apache.kahadb.index.BTreeIndex; 076 import org.apache.kahadb.index.BTreeVisitor; 077 import org.apache.kahadb.index.ListIndex; 078 import org.apache.kahadb.journal.DataFile; 079 import org.apache.kahadb.journal.Journal; 080 import org.apache.kahadb.journal.Location; 081 import org.apache.kahadb.page.Page; 082 import org.apache.kahadb.page.PageFile; 083 import org.apache.kahadb.page.Transaction; 084 import org.apache.kahadb.util.ByteSequence; 085 import org.apache.kahadb.util.DataByteArrayInputStream; 086 import org.apache.kahadb.util.DataByteArrayOutputStream; 087 import org.apache.kahadb.util.LocationMarshaller; 088 import org.apache.kahadb.util.LockFile; 089 import org.apache.kahadb.util.LongMarshaller; 090 import org.apache.kahadb.util.Marshaller; 091 import org.apache.kahadb.util.Sequence; 092 import org.apache.kahadb.util.SequenceSet; 093 import org.apache.kahadb.util.StringMarshaller; 094 import org.apache.kahadb.util.VariableMarshaller; 095 import org.slf4j.Logger; 096 import org.slf4j.LoggerFactory; 097 098 public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 099 100 protected BrokerService brokerService; 101 102 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 103 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 104 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 105 protected static final Buffer UNMATCHED; 106 static { 107 UNMATCHED = new Buffer(new byte[]{}); 108 } 109 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 110 private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 111 112 static final int CLOSED_STATE = 1; 113 static final int OPEN_STATE = 2; 114 static final long NOT_ACKED = -1; 115 116 static final int VERSION = 4; 117 118 protected class Metadata { 119 protected Page<Metadata> page; 120 protected int state; 121 protected BTreeIndex<String, StoredDestination> destinations; 122 protected Location lastUpdate; 123 protected Location firstInProgressTransactionLocation; 124 protected Location producerSequenceIdTrackerLocation = null; 125 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 126 protected int version = VERSION; 127 public void read(DataInput is) throws IOException { 128 state = is.readInt(); 129 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 130 if (is.readBoolean()) { 131 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 132 } else { 133 lastUpdate = null; 134 } 135 if (is.readBoolean()) { 136 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 137 } else { 138 firstInProgressTransactionLocation = null; 139 } 140 try { 141 if (is.readBoolean()) { 142 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 143 } else { 144 producerSequenceIdTrackerLocation = null; 145 } 146 } catch (EOFException expectedOnUpgrade) { 147 } 148 try { 149 version = is.readInt(); 150 } catch (EOFException expectedOnUpgrade) { 151 version=1; 152 } 153 LOG.info("KahaDB is version " + version); 154 } 155 156 public void write(DataOutput os) throws IOException { 157 os.writeInt(state); 158 os.writeLong(destinations.getPageId()); 159 160 if (lastUpdate != null) { 161 os.writeBoolean(true); 162 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 163 } else { 164 os.writeBoolean(false); 165 } 166 167 if (firstInProgressTransactionLocation != null) { 168 os.writeBoolean(true); 169 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 170 } else { 171 os.writeBoolean(false); 172 } 173 174 if (producerSequenceIdTrackerLocation != null) { 175 os.writeBoolean(true); 176 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 177 } else { 178 os.writeBoolean(false); 179 } 180 os.writeInt(VERSION); 181 } 182 } 183 184 class MetadataMarshaller extends VariableMarshaller<Metadata> { 185 public Metadata readPayload(DataInput dataIn) throws IOException { 186 Metadata rc = new Metadata(); 187 rc.read(dataIn); 188 return rc; 189 } 190 191 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 192 object.write(dataOut); 193 } 194 } 195 196 protected PageFile pageFile; 197 protected Journal journal; 198 protected Metadata metadata = new Metadata(); 199 200 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 201 202 protected boolean failIfDatabaseIsLocked; 203 204 protected boolean deleteAllMessages; 205 protected File directory = DEFAULT_DIRECTORY; 206 protected Thread checkpointThread; 207 protected boolean enableJournalDiskSyncs=true; 208 protected boolean archiveDataLogs; 209 protected File directoryArchive; 210 protected AtomicLong storeSize = new AtomicLong(0); 211 long checkpointInterval = 5*1000; 212 long cleanupInterval = 30*1000; 213 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 214 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 215 boolean enableIndexWriteAsync = false; 216 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 217 218 protected AtomicBoolean opened = new AtomicBoolean(); 219 private LockFile lockFile; 220 private boolean ignoreMissingJournalfiles = false; 221 private int indexCacheSize = 10000; 222 private boolean checkForCorruptJournalFiles = false; 223 private boolean checksumJournalFiles = false; 224 private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY; 225 protected boolean forceRecoverIndex = false; 226 private final Object checkpointThreadLock = new Object(); 227 private boolean rewriteOnRedelivery = false; 228 private boolean archiveCorruptedIndex = false; 229 private boolean useIndexLFRUEviction = false; 230 private float indexLFUEvictionFactor = 0.2f; 231 private boolean enableIndexDiskSyncs = true; 232 private boolean enableIndexRecoveryFile = true; 233 private boolean enableIndexPageCaching = true; 234 235 public MessageDatabase() { 236 } 237 238 @Override 239 public void doStart() throws Exception { 240 load(); 241 } 242 243 @Override 244 public void doStop(ServiceStopper stopper) throws Exception { 245 unload(); 246 } 247 248 private void loadPageFile() throws IOException { 249 this.indexLock.writeLock().lock(); 250 try { 251 final PageFile pageFile = getPageFile(); 252 pageFile.load(); 253 pageFile.tx().execute(new Transaction.Closure<IOException>() { 254 public void execute(Transaction tx) throws IOException { 255 if (pageFile.getPageCount() == 0) { 256 // First time this is created.. Initialize the metadata 257 Page<Metadata> page = tx.allocate(); 258 assert page.getPageId() == 0; 259 page.set(metadata); 260 metadata.page = page; 261 metadata.state = CLOSED_STATE; 262 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 263 264 tx.store(metadata.page, metadataMarshaller, true); 265 } else { 266 Page<Metadata> page = tx.load(0, metadataMarshaller); 267 metadata = page.get(); 268 metadata.page = page; 269 } 270 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 271 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 272 metadata.destinations.load(tx); 273 } 274 }); 275 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 276 // Perhaps we should just keep an index of file 277 storedDestinations.clear(); 278 pageFile.tx().execute(new Transaction.Closure<IOException>() { 279 public void execute(Transaction tx) throws IOException { 280 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 281 Entry<String, StoredDestination> entry = iterator.next(); 282 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 283 storedDestinations.put(entry.getKey(), sd); 284 } 285 } 286 }); 287 pageFile.flush(); 288 } finally { 289 this.indexLock.writeLock().unlock(); 290 } 291 } 292 293 private void startCheckpoint() { 294 if (checkpointInterval == 0 && cleanupInterval == 0) { 295 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); 296 return; 297 } 298 synchronized (checkpointThreadLock) { 299 boolean start = false; 300 if (checkpointThread == null) { 301 start = true; 302 } else if (!checkpointThread.isAlive()) { 303 start = true; 304 LOG.info("KahaDB: Recovering checkpoint thread after death"); 305 } 306 if (start) { 307 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { 308 @Override 309 public void run() { 310 try { 311 long lastCleanup = System.currentTimeMillis(); 312 long lastCheckpoint = System.currentTimeMillis(); 313 // Sleep for a short time so we can periodically check 314 // to see if we need to exit this thread. 315 long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 316 while (opened.get()) { 317 Thread.sleep(sleepTime); 318 long now = System.currentTimeMillis(); 319 if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { 320 checkpointCleanup(true); 321 lastCleanup = now; 322 lastCheckpoint = now; 323 } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { 324 checkpointCleanup(false); 325 lastCheckpoint = now; 326 } 327 } 328 } catch (InterruptedException e) { 329 // Looks like someone really wants us to exit this thread... 330 } catch (IOException ioe) { 331 LOG.error("Checkpoint failed", ioe); 332 brokerService.handleIOException(ioe); 333 } 334 } 335 }; 336 337 checkpointThread.setDaemon(true); 338 checkpointThread.start(); 339 } 340 } 341 } 342 343 public void open() throws IOException { 344 if( opened.compareAndSet(false, true) ) { 345 getJournal().start(); 346 try { 347 loadPageFile(); 348 } catch (Throwable t) { 349 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 350 if (LOG.isDebugEnabled()) { 351 LOG.debug("Index load failure", t); 352 } 353 // try to recover index 354 try { 355 pageFile.unload(); 356 } catch (Exception ignore) {} 357 if (archiveCorruptedIndex) { 358 pageFile.archive(); 359 } else { 360 pageFile.delete(); 361 } 362 metadata = new Metadata(); 363 pageFile = null; 364 loadPageFile(); 365 } 366 startCheckpoint(); 367 recover(); 368 } 369 } 370 371 private void lock() throws IOException { 372 373 if (lockFile == null) { 374 File lockFileName = new File(directory, "lock"); 375 lockFile = new LockFile(lockFileName, true); 376 if (failIfDatabaseIsLocked) { 377 lockFile.lock(); 378 } else { 379 boolean locked = false; 380 while ((!isStopped()) && (!isStopping())) { 381 try { 382 lockFile.lock(); 383 locked = true; 384 break; 385 } catch (IOException e) { 386 LOG.info("Database " 387 + lockFileName 388 + " is locked... waiting " 389 + (getDatabaseLockedWaitDelay() / 1000) 390 + " seconds for the database to be unlocked. Reason: " 391 + e); 392 try { 393 Thread.sleep(getDatabaseLockedWaitDelay()); 394 } catch (InterruptedException e1) { 395 } 396 } 397 } 398 if (!locked) { 399 throw new IOException("attempt to obtain lock aborted due to shutdown"); 400 } 401 } 402 } 403 } 404 405 // for testing 406 public LockFile getLockFile() { 407 return lockFile; 408 } 409 410 public void load() throws IOException { 411 this.indexLock.writeLock().lock(); 412 try { 413 lock(); 414 if (deleteAllMessages) { 415 getJournal().start(); 416 getJournal().delete(); 417 getJournal().close(); 418 journal = null; 419 getPageFile().delete(); 420 LOG.info("Persistence store purged."); 421 deleteAllMessages = false; 422 } 423 424 open(); 425 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 426 } finally { 427 this.indexLock.writeLock().unlock(); 428 } 429 } 430 431 public void close() throws IOException, InterruptedException { 432 if( opened.compareAndSet(true, false)) { 433 try { 434 this.indexLock.writeLock().lock(); 435 try { 436 if (metadata.page != null) { 437 pageFile.tx().execute(new Transaction.Closure<IOException>() { 438 public void execute(Transaction tx) throws IOException { 439 checkpointUpdate(tx, true); 440 } 441 }); 442 } 443 pageFile.unload(); 444 metadata = new Metadata(); 445 } finally { 446 this.indexLock.writeLock().unlock(); 447 } 448 journal.close(); 449 synchronized (checkpointThreadLock) { 450 if (checkpointThread != null) { 451 checkpointThread.join(); 452 } 453 } 454 } finally { 455 lockFile.unlock(); 456 lockFile=null; 457 } 458 } 459 } 460 461 public void unload() throws IOException, InterruptedException { 462 this.indexLock.writeLock().lock(); 463 try { 464 if( pageFile != null && pageFile.isLoaded() ) { 465 metadata.state = CLOSED_STATE; 466 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); 467 468 if (metadata.page != null) { 469 pageFile.tx().execute(new Transaction.Closure<IOException>() { 470 public void execute(Transaction tx) throws IOException { 471 tx.store(metadata.page, metadataMarshaller, true); 472 } 473 }); 474 } 475 } 476 } finally { 477 this.indexLock.writeLock().unlock(); 478 } 479 close(); 480 } 481 482 // public for testing 483 @SuppressWarnings("rawtypes") 484 public Location getFirstInProgressTxLocation() { 485 Location l = null; 486 synchronized (inflightTransactions) { 487 if (!inflightTransactions.isEmpty()) { 488 for (List<Operation> ops : inflightTransactions.values()) { 489 if (!ops.isEmpty()) { 490 l = ops.get(0).getLocation(); 491 break; 492 } 493 } 494 } 495 if (!preparedTransactions.isEmpty()) { 496 for (List<Operation> ops : preparedTransactions.values()) { 497 if (!ops.isEmpty()) { 498 Location t = ops.get(0).getLocation(); 499 if (l==null || t.compareTo(l) <= 0) { 500 l = t; 501 } 502 break; 503 } 504 } 505 } 506 } 507 return l; 508 } 509 510 /** 511 * Move all the messages that were in the journal into long term storage. We 512 * just replay and do a checkpoint. 513 * 514 * @throws IOException 515 * @throws IOException 516 * @throws IllegalStateException 517 */ 518 private void recover() throws IllegalStateException, IOException { 519 this.indexLock.writeLock().lock(); 520 try { 521 522 long start = System.currentTimeMillis(); 523 Location producerAuditPosition = recoverProducerAudit(); 524 Location lastIndoubtPosition = getRecoveryPosition(); 525 526 Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition); 527 528 if (recoveryPosition != null) { 529 int redoCounter = 0; 530 LOG.info("Recovering from the journal ..."); 531 while (recoveryPosition != null) { 532 JournalCommand<?> message = load(recoveryPosition); 533 metadata.lastUpdate = recoveryPosition; 534 process(message, recoveryPosition, lastIndoubtPosition); 535 redoCounter++; 536 recoveryPosition = journal.getNextLocation(recoveryPosition); 537 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 538 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 539 } 540 } 541 if (LOG.isInfoEnabled()) { 542 long end = System.currentTimeMillis(); 543 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 544 } 545 } 546 547 // We may have to undo some index updates. 548 pageFile.tx().execute(new Transaction.Closure<IOException>() { 549 public void execute(Transaction tx) throws IOException { 550 recoverIndex(tx); 551 } 552 }); 553 554 // rollback any recovered inflight local transactions 555 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 556 synchronized (inflightTransactions) { 557 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 558 TransactionId id = it.next(); 559 if (id.isLocalTransaction()) { 560 toRollback.add(id); 561 } 562 } 563 for (TransactionId tx: toRollback) { 564 if (LOG.isDebugEnabled()) { 565 LOG.debug("rolling back recovered indoubt local transaction " + tx); 566 } 567 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 568 } 569 } 570 } finally { 571 this.indexLock.writeLock().unlock(); 572 } 573 } 574 575 @SuppressWarnings("unused") 576 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 577 return TransactionIdConversion.convertToLocal(tx); 578 } 579 580 private Location minimum(Location producerAuditPosition, 581 Location lastIndoubtPosition) { 582 Location min = null; 583 if (producerAuditPosition != null) { 584 min = producerAuditPosition; 585 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { 586 min = lastIndoubtPosition; 587 } 588 } else { 589 min = lastIndoubtPosition; 590 } 591 return min; 592 } 593 594 private Location recoverProducerAudit() throws IOException { 595 if (metadata.producerSequenceIdTrackerLocation != null) { 596 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 597 try { 598 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 599 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 600 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); 601 } catch (Exception e) { 602 LOG.warn("Cannot recover message audit", e); 603 return journal.getNextLocation(null); 604 } 605 } else { 606 // got no audit stored so got to recreate via replay from start of the journal 607 return journal.getNextLocation(null); 608 } 609 } 610 611 protected void recoverIndex(Transaction tx) throws IOException { 612 long start = System.currentTimeMillis(); 613 // It is possible index updates got applied before the journal updates.. 614 // in that case we need to removed references to messages that are not in the journal 615 final Location lastAppendLocation = journal.getLastAppendLocation(); 616 long undoCounter=0; 617 618 // Go through all the destinations to see if they have messages past the lastAppendLocation 619 for (StoredDestination sd : storedDestinations.values()) { 620 621 final ArrayList<Long> matches = new ArrayList<Long>(); 622 // Find all the Locations that are >= than the last Append Location. 623 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 624 @Override 625 protected void matched(Location key, Long value) { 626 matches.add(value); 627 } 628 }); 629 630 for (Long sequenceId : matches) { 631 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 632 sd.locationIndex.remove(tx, keys.location); 633 sd.messageIdIndex.remove(tx, keys.messageId); 634 metadata.producerSequenceIdTracker.rollback(keys.messageId); 635 undoCounter++; 636 // TODO: do we need to modify the ack positions for the pub sub case? 637 } 638 } 639 640 if( undoCounter > 0 ) { 641 // The rolledback operations are basically in flight journal writes. To avoid getting 642 // these the end user should do sync writes to the journal. 643 if (LOG.isInfoEnabled()) { 644 long end = System.currentTimeMillis(); 645 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 646 } 647 } 648 649 undoCounter = 0; 650 start = System.currentTimeMillis(); 651 652 // Lets be extra paranoid here and verify that all the datafiles being referenced 653 // by the indexes still exists. 654 655 final SequenceSet ss = new SequenceSet(); 656 for (StoredDestination sd : storedDestinations.values()) { 657 // Use a visitor to cut down the number of pages that we load 658 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 659 int last=-1; 660 661 public boolean isInterestedInKeysBetween(Location first, Location second) { 662 if( first==null ) { 663 return !ss.contains(0, second.getDataFileId()); 664 } else if( second==null ) { 665 return true; 666 } else { 667 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 668 } 669 } 670 671 public void visit(List<Location> keys, List<Long> values) { 672 for (Location l : keys) { 673 int fileId = l.getDataFileId(); 674 if( last != fileId ) { 675 ss.add(fileId); 676 last = fileId; 677 } 678 } 679 } 680 681 }); 682 } 683 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 684 while (!ss.isEmpty()) { 685 missingJournalFiles.add((int) ss.removeFirst()); 686 } 687 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 688 689 if (!missingJournalFiles.isEmpty()) { 690 if (LOG.isInfoEnabled()) { 691 LOG.info("Some journal files are missing: " + missingJournalFiles); 692 } 693 } 694 695 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 696 for (Integer missing : missingJournalFiles) { 697 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 698 } 699 700 if (checkForCorruptJournalFiles) { 701 Collection<DataFile> dataFiles = journal.getFileMap().values(); 702 for (DataFile dataFile : dataFiles) { 703 int id = dataFile.getDataFileId(); 704 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 705 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 706 while (seq != null) { 707 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1))); 708 seq = seq.getNext(); 709 } 710 } 711 } 712 713 if (!missingPredicates.isEmpty()) { 714 for (StoredDestination sd : storedDestinations.values()) { 715 716 final ArrayList<Long> matches = new ArrayList<Long>(); 717 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 718 @Override 719 protected void matched(Location key, Long value) { 720 matches.add(value); 721 } 722 }); 723 724 // If somes message references are affected by the missing data files... 725 if (!matches.isEmpty()) { 726 727 // We either 'gracefully' recover dropping the missing messages or 728 // we error out. 729 if( ignoreMissingJournalfiles ) { 730 // Update the index to remove the references to the missing data 731 for (Long sequenceId : matches) { 732 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 733 sd.locationIndex.remove(tx, keys.location); 734 sd.messageIdIndex.remove(tx, keys.messageId); 735 undoCounter++; 736 // TODO: do we need to modify the ack positions for the pub sub case? 737 } 738 739 } else { 740 throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); 741 } 742 } 743 } 744 } 745 746 if( undoCounter > 0 ) { 747 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 748 // should do sync writes to the journal. 749 if (LOG.isInfoEnabled()) { 750 long end = System.currentTimeMillis(); 751 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 752 } 753 } 754 } 755 756 private Location nextRecoveryPosition; 757 private Location lastRecoveryPosition; 758 759 public void incrementalRecover() throws IOException { 760 this.indexLock.writeLock().lock(); 761 try { 762 if( nextRecoveryPosition == null ) { 763 if( lastRecoveryPosition==null ) { 764 nextRecoveryPosition = getRecoveryPosition(); 765 } else { 766 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 767 } 768 } 769 while (nextRecoveryPosition != null) { 770 lastRecoveryPosition = nextRecoveryPosition; 771 metadata.lastUpdate = lastRecoveryPosition; 772 JournalCommand<?> message = load(lastRecoveryPosition); 773 process(message, lastRecoveryPosition, (Runnable)null); 774 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 775 } 776 } finally { 777 this.indexLock.writeLock().unlock(); 778 } 779 } 780 781 public Location getLastUpdatePosition() throws IOException { 782 return metadata.lastUpdate; 783 } 784 785 private Location getRecoveryPosition() throws IOException { 786 787 if (!this.forceRecoverIndex) { 788 789 // If we need to recover the transactions.. 790 if (metadata.firstInProgressTransactionLocation != null) { 791 return metadata.firstInProgressTransactionLocation; 792 } 793 794 // Perhaps there were no transactions... 795 if( metadata.lastUpdate!=null) { 796 // Start replay at the record after the last one recorded in the index file. 797 return journal.getNextLocation(metadata.lastUpdate); 798 } 799 } 800 // This loads the first position. 801 return journal.getNextLocation(null); 802 } 803 804 protected void checkpointCleanup(final boolean cleanup) throws IOException { 805 long start; 806 this.indexLock.writeLock().lock(); 807 try { 808 start = System.currentTimeMillis(); 809 if( !opened.get() ) { 810 return; 811 } 812 pageFile.tx().execute(new Transaction.Closure<IOException>() { 813 public void execute(Transaction tx) throws IOException { 814 checkpointUpdate(tx, cleanup); 815 } 816 }); 817 } finally { 818 this.indexLock.writeLock().unlock(); 819 } 820 821 long end = System.currentTimeMillis(); 822 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 823 if (LOG.isInfoEnabled()) { 824 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 825 } 826 } 827 } 828 829 public void checkpoint(Callback closure) throws Exception { 830 this.indexLock.writeLock().lock(); 831 try { 832 pageFile.tx().execute(new Transaction.Closure<IOException>() { 833 public void execute(Transaction tx) throws IOException { 834 checkpointUpdate(tx, false); 835 } 836 }); 837 closure.execute(); 838 } finally { 839 this.indexLock.writeLock().unlock(); 840 } 841 } 842 843 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 844 int size = data.serializedSizeFramed(); 845 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 846 os.writeByte(data.type().getNumber()); 847 data.writeFramed(os); 848 return os.toByteSequence(); 849 } 850 851 // ///////////////////////////////////////////////////////////////// 852 // Methods call by the broker to update and query the store. 853 // ///////////////////////////////////////////////////////////////// 854 public Location store(JournalCommand<?> data) throws IOException { 855 return store(data, false, null,null); 856 } 857 858 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 859 return store(data, false, null,null, onJournalStoreComplete); 860 } 861 862 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException { 863 return store(data, sync, before, after, null); 864 } 865 866 /** 867 * All updated are are funneled through this method. The updates are converted 868 * to a JournalMessage which is logged to the journal and then the data from 869 * the JournalMessage is used to update the index just like it would be done 870 * during a recovery process. 871 */ 872 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException { 873 if (before != null) { 874 before.run(); 875 } 876 try { 877 ByteSequence sequence = toByteSequence(data); 878 long start = System.currentTimeMillis(); 879 Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 880 long start2 = System.currentTimeMillis(); 881 process(data, location, after); 882 long end = System.currentTimeMillis(); 883 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 884 if (LOG.isInfoEnabled()) { 885 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 886 } 887 } 888 889 if (after != null) { 890 Runnable afterCompletion = null; 891 synchronized (orderedTransactionAfters) { 892 if (!orderedTransactionAfters.empty()) { 893 afterCompletion = orderedTransactionAfters.pop(); 894 } 895 } 896 if (afterCompletion != null) { 897 afterCompletion.run(); 898 } else { 899 // non persistent message case 900 after.run(); 901 } 902 } 903 904 if (checkpointThread != null && !checkpointThread.isAlive()) { 905 startCheckpoint(); 906 } 907 return location; 908 } catch (IOException ioe) { 909 LOG.error("KahaDB failed to store to Journal", ioe); 910 brokerService.handleIOException(ioe); 911 throw ioe; 912 } 913 } 914 915 /** 916 * Loads a previously stored JournalMessage 917 * 918 * @param location 919 * @return 920 * @throws IOException 921 */ 922 public JournalCommand<?> load(Location location) throws IOException { 923 long start = System.currentTimeMillis(); 924 ByteSequence data = journal.read(location); 925 long end = System.currentTimeMillis(); 926 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 927 if (LOG.isInfoEnabled()) { 928 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 929 } 930 } 931 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 932 byte readByte = is.readByte(); 933 KahaEntryType type = KahaEntryType.valueOf(readByte); 934 if( type == null ) { 935 throw new IOException("Could not load journal record. Invalid location: "+location); 936 } 937 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 938 message.mergeFramed(is); 939 return message; 940 } 941 942 /** 943 * do minimal recovery till we reach the last inDoubtLocation 944 * @param data 945 * @param location 946 * @param inDoubtlocation 947 * @throws IOException 948 */ 949 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 950 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 951 process(data, location, (Runnable) null); 952 } else { 953 // just recover producer audit 954 data.visit(new Visitor() { 955 public void visit(KahaAddMessageCommand command) throws IOException { 956 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 957 } 958 }); 959 } 960 } 961 962 // ///////////////////////////////////////////////////////////////// 963 // Journaled record processing methods. Once the record is journaled, 964 // these methods handle applying the index updates. These may be called 965 // from the recovery method too so they need to be idempotent 966 // ///////////////////////////////////////////////////////////////// 967 968 void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException { 969 data.visit(new Visitor() { 970 @Override 971 public void visit(KahaAddMessageCommand command) throws IOException { 972 process(command, location); 973 } 974 975 @Override 976 public void visit(KahaRemoveMessageCommand command) throws IOException { 977 process(command, location); 978 } 979 980 @Override 981 public void visit(KahaPrepareCommand command) throws IOException { 982 process(command, location); 983 } 984 985 @Override 986 public void visit(KahaCommitCommand command) throws IOException { 987 process(command, location, after); 988 } 989 990 @Override 991 public void visit(KahaRollbackCommand command) throws IOException { 992 process(command, location); 993 } 994 995 @Override 996 public void visit(KahaRemoveDestinationCommand command) throws IOException { 997 process(command, location); 998 } 999 1000 @Override 1001 public void visit(KahaSubscriptionCommand command) throws IOException { 1002 process(command, location); 1003 } 1004 1005 @Override 1006 public void visit(KahaProducerAuditCommand command) throws IOException { 1007 processLocation(location); 1008 } 1009 1010 @Override 1011 public void visit(KahaTraceCommand command) { 1012 processLocation(location); 1013 } 1014 }); 1015 } 1016 1017 @SuppressWarnings("rawtypes") 1018 protected void process(final KahaAddMessageCommand command, final Location location) throws IOException { 1019 if (command.hasTransactionInfo()) { 1020 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location); 1021 inflightTx.add(new AddOpperation(command, location)); 1022 } else { 1023 this.indexLock.writeLock().lock(); 1024 try { 1025 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1026 public void execute(Transaction tx) throws IOException { 1027 upadateIndex(tx, command, location); 1028 } 1029 }); 1030 } finally { 1031 this.indexLock.writeLock().unlock(); 1032 } 1033 } 1034 } 1035 1036 @SuppressWarnings("rawtypes") 1037 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1038 if (command.hasTransactionInfo()) { 1039 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location); 1040 inflightTx.add(new RemoveOpperation(command, location)); 1041 } else { 1042 this.indexLock.writeLock().lock(); 1043 try { 1044 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1045 public void execute(Transaction tx) throws IOException { 1046 updateIndex(tx, command, location); 1047 } 1048 }); 1049 } finally { 1050 this.indexLock.writeLock().unlock(); 1051 } 1052 } 1053 } 1054 1055 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1056 this.indexLock.writeLock().lock(); 1057 try { 1058 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1059 public void execute(Transaction tx) throws IOException { 1060 updateIndex(tx, command, location); 1061 } 1062 }); 1063 } finally { 1064 this.indexLock.writeLock().unlock(); 1065 } 1066 } 1067 1068 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1069 this.indexLock.writeLock().lock(); 1070 try { 1071 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1072 public void execute(Transaction tx) throws IOException { 1073 updateIndex(tx, command, location); 1074 } 1075 }); 1076 } finally { 1077 this.indexLock.writeLock().unlock(); 1078 } 1079 } 1080 1081 protected void processLocation(final Location location) { 1082 this.indexLock.writeLock().lock(); 1083 try { 1084 metadata.lastUpdate = location; 1085 } finally { 1086 this.indexLock.writeLock().unlock(); 1087 } 1088 } 1089 1090 private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>(); 1091 private void push(Runnable after) { 1092 if (after != null) { 1093 synchronized (orderedTransactionAfters) { 1094 orderedTransactionAfters.push(after); 1095 } 1096 } 1097 } 1098 1099 @SuppressWarnings("rawtypes") 1100 protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException { 1101 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1102 List<Operation> inflightTx; 1103 synchronized (inflightTransactions) { 1104 inflightTx = inflightTransactions.remove(key); 1105 if (inflightTx == null) { 1106 inflightTx = preparedTransactions.remove(key); 1107 } 1108 } 1109 if (inflightTx == null) { 1110 if (after != null) { 1111 // since we don't push this after and we may find another, lets run it now 1112 after.run(); 1113 } 1114 return; 1115 } 1116 1117 final List<Operation> messagingTx = inflightTx; 1118 this.indexLock.writeLock().lock(); 1119 try { 1120 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1121 public void execute(Transaction tx) throws IOException { 1122 for (Operation op : messagingTx) { 1123 op.execute(tx); 1124 } 1125 } 1126 }); 1127 metadata.lastUpdate = location; 1128 push(after); 1129 } finally { 1130 this.indexLock.writeLock().unlock(); 1131 } 1132 } 1133 1134 @SuppressWarnings("rawtypes") 1135 protected void process(KahaPrepareCommand command, Location location) { 1136 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1137 synchronized (inflightTransactions) { 1138 List<Operation> tx = inflightTransactions.remove(key); 1139 if (tx != null) { 1140 preparedTransactions.put(key, tx); 1141 } 1142 } 1143 } 1144 1145 @SuppressWarnings("rawtypes") 1146 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1147 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1148 List<Operation> updates = null; 1149 synchronized (inflightTransactions) { 1150 updates = inflightTransactions.remove(key); 1151 if (updates == null) { 1152 updates = preparedTransactions.remove(key); 1153 } 1154 } 1155 if (isRewriteOnRedelivery()) { 1156 persistRedeliveryCount(updates); 1157 } 1158 } 1159 1160 @SuppressWarnings("rawtypes") 1161 private void persistRedeliveryCount(List<Operation> updates) throws IOException { 1162 if (updates != null) { 1163 for (Operation operation : updates) { 1164 operation.getCommand().visit(new Visitor() { 1165 @Override 1166 public void visit(KahaRemoveMessageCommand command) throws IOException { 1167 incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination()); 1168 } 1169 }); 1170 } 1171 } 1172 } 1173 1174 abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException; 1175 1176 // ///////////////////////////////////////////////////////////////// 1177 // These methods do the actual index updates. 1178 // ///////////////////////////////////////////////////////////////// 1179 1180 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1181 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1182 1183 void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1184 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1185 1186 // Skip adding the message to the index if this is a topic and there are 1187 // no subscriptions. 1188 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1189 return; 1190 } 1191 1192 // Add the message. 1193 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1194 long id = sd.orderIndex.getNextMessageId(priority); 1195 Long previous = sd.locationIndex.put(tx, location, id); 1196 if (previous == null) { 1197 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1198 if (previous == null) { 1199 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1200 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1201 addAckLocationForNewMessage(tx, sd, id); 1202 } 1203 } else { 1204 // If the message ID as indexed, then the broker asked us to 1205 // store a DUP 1206 // message. Bad BOY! Don't do it, and log a warning. 1207 LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId()); 1208 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1209 sd.locationIndex.remove(tx, location); 1210 rollbackStatsOnDuplicate(command.getDestination()); 1211 } 1212 } else { 1213 // restore the previous value.. Looks like this was a redo of a 1214 // previously 1215 // added message. We don't want to assign it a new id as the other 1216 // indexes would 1217 // be wrong.. 1218 // 1219 sd.locationIndex.put(tx, location, previous); 1220 } 1221 // record this id in any event, initial send or recovery 1222 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1223 metadata.lastUpdate = location; 1224 } 1225 1226 abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination); 1227 1228 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1229 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1230 if (!command.hasSubscriptionKey()) { 1231 1232 // In the queue case we just remove the message from the index.. 1233 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1234 if (sequenceId != null) { 1235 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1236 if (keys != null) { 1237 sd.locationIndex.remove(tx, keys.location); 1238 recordAckMessageReferenceLocation(ackLocation, keys.location); 1239 } else if (LOG.isDebugEnabled()) { 1240 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1241 } 1242 } else if (LOG.isDebugEnabled()) { 1243 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1244 } 1245 } else { 1246 // In the topic case we need remove the message once it's been acked 1247 // by all the subs 1248 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1249 1250 // Make sure it's a valid message id... 1251 if (sequence != null) { 1252 String subscriptionKey = command.getSubscriptionKey(); 1253 if (command.getAck() != UNMATCHED) { 1254 sd.orderIndex.get(tx, sequence); 1255 byte priority = sd.orderIndex.lastGetPriority(); 1256 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1257 } 1258 // The following method handles deleting un-referenced messages. 1259 removeAckLocation(tx, sd, subscriptionKey, sequence); 1260 } else if (LOG.isDebugEnabled()) { 1261 LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1262 } 1263 1264 } 1265 metadata.lastUpdate = ackLocation; 1266 } 1267 1268 Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 1269 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1270 Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1271 if (referenceFileIds == null) { 1272 referenceFileIds = new HashSet<Integer>(); 1273 referenceFileIds.add(messageLocation.getDataFileId()); 1274 ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1275 } else { 1276 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1277 if (!referenceFileIds.contains(id)) { 1278 referenceFileIds.add(id); 1279 } 1280 } 1281 } 1282 1283 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1284 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1285 sd.orderIndex.remove(tx); 1286 1287 sd.locationIndex.clear(tx); 1288 sd.locationIndex.unload(tx); 1289 tx.free(sd.locationIndex.getPageId()); 1290 1291 sd.messageIdIndex.clear(tx); 1292 sd.messageIdIndex.unload(tx); 1293 tx.free(sd.messageIdIndex.getPageId()); 1294 1295 if (sd.subscriptions != null) { 1296 sd.subscriptions.clear(tx); 1297 sd.subscriptions.unload(tx); 1298 tx.free(sd.subscriptions.getPageId()); 1299 1300 sd.subscriptionAcks.clear(tx); 1301 sd.subscriptionAcks.unload(tx); 1302 tx.free(sd.subscriptionAcks.getPageId()); 1303 1304 sd.ackPositions.clear(tx); 1305 sd.ackPositions.unload(tx); 1306 tx.free(sd.ackPositions.getHeadPageId()); 1307 } 1308 1309 String key = key(command.getDestination()); 1310 storedDestinations.remove(key); 1311 metadata.destinations.remove(tx, key); 1312 } 1313 1314 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1315 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1316 final String subscriptionKey = command.getSubscriptionKey(); 1317 1318 // If set then we are creating it.. otherwise we are destroying the sub 1319 if (command.hasSubscriptionInfo()) { 1320 sd.subscriptions.put(tx, subscriptionKey, command); 1321 long ackLocation=NOT_ACKED; 1322 if (!command.getRetroactive()) { 1323 ackLocation = sd.orderIndex.nextMessageId-1; 1324 } else { 1325 addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey); 1326 } 1327 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1328 sd.subscriptionCache.add(subscriptionKey); 1329 } else { 1330 // delete the sub... 1331 sd.subscriptions.remove(tx, subscriptionKey); 1332 sd.subscriptionAcks.remove(tx, subscriptionKey); 1333 sd.subscriptionCache.remove(subscriptionKey); 1334 removeAckLocationsForSub(tx, sd, subscriptionKey); 1335 1336 if (sd.subscriptions.isEmpty(tx)) { 1337 sd.messageIdIndex.clear(tx); 1338 sd.locationIndex.clear(tx); 1339 sd.orderIndex.clear(tx); 1340 } 1341 } 1342 } 1343 1344 /** 1345 * @param tx 1346 * @throws IOException 1347 */ 1348 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1349 LOG.debug("Checkpoint started."); 1350 1351 // reflect last update exclusive of current checkpoint 1352 Location firstTxLocation = metadata.lastUpdate; 1353 1354 metadata.state = OPEN_STATE; 1355 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1356 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); 1357 tx.store(metadata.page, metadataMarshaller, true); 1358 pageFile.flush(); 1359 1360 if( cleanup ) { 1361 1362 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1363 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); 1364 1365 if (LOG.isTraceEnabled()) { 1366 LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet); 1367 } 1368 1369 // Don't GC files under replication 1370 if( journalFilesBeingReplicated!=null ) { 1371 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1372 } 1373 1374 if (metadata.producerSequenceIdTrackerLocation != null) { 1375 gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId()); 1376 } 1377 1378 // Don't GC files after the first in progress tx 1379 if( metadata.firstInProgressTransactionLocation!=null ) { 1380 if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) { 1381 firstTxLocation = metadata.firstInProgressTransactionLocation; 1382 } 1383 } 1384 1385 if( firstTxLocation!=null ) { 1386 while( !gcCandidateSet.isEmpty() ) { 1387 Integer last = gcCandidateSet.last(); 1388 if( last >= firstTxLocation.getDataFileId() ) { 1389 gcCandidateSet.remove(last); 1390 } else { 1391 break; 1392 } 1393 } 1394 if (LOG.isTraceEnabled()) { 1395 LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet); 1396 } 1397 } 1398 1399 // Go through all the destinations to see if any of them can remove GC candidates. 1400 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1401 if( gcCandidateSet.isEmpty() ) { 1402 break; 1403 } 1404 1405 // Use a visitor to cut down the number of pages that we load 1406 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1407 int last=-1; 1408 public boolean isInterestedInKeysBetween(Location first, Location second) { 1409 if( first==null ) { 1410 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1411 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1412 subset.remove(second.getDataFileId()); 1413 } 1414 return !subset.isEmpty(); 1415 } else if( second==null ) { 1416 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1417 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1418 subset.remove(first.getDataFileId()); 1419 } 1420 return !subset.isEmpty(); 1421 } else { 1422 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1423 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1424 subset.remove(first.getDataFileId()); 1425 } 1426 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1427 subset.remove(second.getDataFileId()); 1428 } 1429 return !subset.isEmpty(); 1430 } 1431 } 1432 1433 public void visit(List<Location> keys, List<Long> values) { 1434 for (Location l : keys) { 1435 int fileId = l.getDataFileId(); 1436 if( last != fileId ) { 1437 gcCandidateSet.remove(fileId); 1438 last = fileId; 1439 } 1440 } 1441 } 1442 }); 1443 if (LOG.isTraceEnabled()) { 1444 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1445 } 1446 } 1447 1448 // check we are not deleting file with ack for in-use journal files 1449 if (LOG.isTraceEnabled()) { 1450 LOG.trace("gc candidates: " + gcCandidateSet); 1451 } 1452 final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet); 1453 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1454 while (candidates.hasNext()) { 1455 Integer candidate = candidates.next(); 1456 Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate); 1457 if (referencedFileIds != null) { 1458 for (Integer referencedFileId : referencedFileIds) { 1459 if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) { 1460 // active file that is not targeted for deletion is referenced so don't delete 1461 candidates.remove(); 1462 break; 1463 } 1464 } 1465 if (gcCandidateSet.contains(candidate)) { 1466 ackMessageFileMap.remove(candidate); 1467 } else { 1468 if (LOG.isTraceEnabled()) { 1469 LOG.trace("not removing data file: " + candidate 1470 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1471 } 1472 } 1473 } 1474 } 1475 1476 if (!gcCandidateSet.isEmpty()) { 1477 if (LOG.isDebugEnabled()) { 1478 LOG.debug("Cleanup removing the data files: " + gcCandidateSet); 1479 } 1480 journal.removeDataFiles(gcCandidateSet); 1481 } 1482 } 1483 1484 LOG.debug("Checkpoint done."); 1485 } 1486 1487 final Runnable nullCompletionCallback = new Runnable() { 1488 @Override 1489 public void run() { 1490 } 1491 }; 1492 private Location checkpointProducerAudit() throws IOException { 1493 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1494 ObjectOutputStream oout = new ObjectOutputStream(baos); 1495 oout.writeObject(metadata.producerSequenceIdTracker); 1496 oout.flush(); 1497 oout.close(); 1498 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 1499 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 1500 try { 1501 location.getLatch().await(); 1502 } catch (InterruptedException e) { 1503 throw new InterruptedIOException(e.toString()); 1504 } 1505 return location; 1506 } 1507 1508 public HashSet<Integer> getJournalFilesBeingReplicated() { 1509 return journalFilesBeingReplicated; 1510 } 1511 1512 // ///////////////////////////////////////////////////////////////// 1513 // StoredDestination related implementation methods. 1514 // ///////////////////////////////////////////////////////////////// 1515 1516 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 1517 1518 class StoredSubscription { 1519 SubscriptionInfo subscriptionInfo; 1520 String lastAckId; 1521 Location lastAckLocation; 1522 Location cursor; 1523 } 1524 1525 static class MessageKeys { 1526 final String messageId; 1527 final Location location; 1528 1529 public MessageKeys(String messageId, Location location) { 1530 this.messageId=messageId; 1531 this.location=location; 1532 } 1533 1534 @Override 1535 public String toString() { 1536 return "["+messageId+","+location+"]"; 1537 } 1538 } 1539 1540 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 1541 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 1542 1543 public MessageKeys readPayload(DataInput dataIn) throws IOException { 1544 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); 1545 } 1546 1547 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 1548 dataOut.writeUTF(object.messageId); 1549 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); 1550 } 1551 } 1552 1553 class LastAck { 1554 long lastAckedSequence; 1555 byte priority; 1556 1557 public LastAck(LastAck source) { 1558 this.lastAckedSequence = source.lastAckedSequence; 1559 this.priority = source.priority; 1560 } 1561 1562 public LastAck() { 1563 this.priority = MessageOrderIndex.HI; 1564 } 1565 1566 public LastAck(long ackLocation) { 1567 this.lastAckedSequence = ackLocation; 1568 this.priority = MessageOrderIndex.LO; 1569 } 1570 1571 public LastAck(long ackLocation, byte priority) { 1572 this.lastAckedSequence = ackLocation; 1573 this.priority = priority; 1574 } 1575 1576 public String toString() { 1577 return "[" + lastAckedSequence + ":" + priority + "]"; 1578 } 1579 } 1580 1581 protected class LastAckMarshaller implements Marshaller<LastAck> { 1582 1583 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 1584 dataOut.writeLong(object.lastAckedSequence); 1585 dataOut.writeByte(object.priority); 1586 } 1587 1588 public LastAck readPayload(DataInput dataIn) throws IOException { 1589 LastAck lastAcked = new LastAck(); 1590 lastAcked.lastAckedSequence = dataIn.readLong(); 1591 if (metadata.version >= 3) { 1592 lastAcked.priority = dataIn.readByte(); 1593 } 1594 return lastAcked; 1595 } 1596 1597 public int getFixedSize() { 1598 return 9; 1599 } 1600 1601 public LastAck deepCopy(LastAck source) { 1602 return new LastAck(source); 1603 } 1604 1605 public boolean isDeepCopySupported() { 1606 return true; 1607 } 1608 } 1609 1610 class StoredDestination { 1611 1612 MessageOrderIndex orderIndex = new MessageOrderIndex(); 1613 BTreeIndex<Location, Long> locationIndex; 1614 BTreeIndex<String, Long> messageIdIndex; 1615 1616 // These bits are only set for Topics 1617 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 1618 BTreeIndex<String, LastAck> subscriptionAcks; 1619 HashMap<String, MessageOrderCursor> subscriptionCursors; 1620 ListIndex<String, SequenceSet> ackPositions; 1621 1622 // Transient data used to track which Messages are no longer needed. 1623 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>(); 1624 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 1625 } 1626 1627 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 1628 1629 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 1630 final StoredDestination value = new StoredDestination(); 1631 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1632 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 1633 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 1634 1635 if (dataIn.readBoolean()) { 1636 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 1637 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 1638 if (metadata.version >= 4) { 1639 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 1640 } else { 1641 // upgrade 1642 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1643 public void execute(Transaction tx) throws IOException { 1644 BTreeIndex<Long, HashSet<String>> oldAckPositions = 1645 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 1646 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 1647 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 1648 oldAckPositions.load(tx); 1649 1650 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 1651 1652 // Do the initial build of the data in memory before writing into the store 1653 // based Ack Positions List to avoid a lot of disk thrashing. 1654 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 1655 while (iterator.hasNext()) { 1656 Entry<Long, HashSet<String>> entry = iterator.next(); 1657 1658 for(String subKey : entry.getValue()) { 1659 SequenceSet pendingAcks = temp.get(subKey); 1660 if (pendingAcks == null) { 1661 pendingAcks = new SequenceSet(); 1662 temp.put(subKey, pendingAcks); 1663 } 1664 1665 pendingAcks.add(entry.getKey()); 1666 } 1667 } 1668 1669 // Now move the pending messages to ack data into the store backed 1670 // structure. 1671 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 1672 for(String subscriptionKey : temp.keySet()) { 1673 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 1674 } 1675 1676 } 1677 }); 1678 } 1679 } 1680 if (metadata.version >= 2) { 1681 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1682 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1683 } else { 1684 // upgrade 1685 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1686 public void execute(Transaction tx) throws IOException { 1687 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 1688 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 1689 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 1690 value.orderIndex.lowPriorityIndex.load(tx); 1691 1692 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 1693 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 1694 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 1695 value.orderIndex.highPriorityIndex.load(tx); 1696 } 1697 }); 1698 } 1699 1700 return value; 1701 } 1702 1703 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 1704 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 1705 dataOut.writeLong(value.locationIndex.getPageId()); 1706 dataOut.writeLong(value.messageIdIndex.getPageId()); 1707 if (value.subscriptions != null) { 1708 dataOut.writeBoolean(true); 1709 dataOut.writeLong(value.subscriptions.getPageId()); 1710 dataOut.writeLong(value.subscriptionAcks.getPageId()); 1711 dataOut.writeLong(value.ackPositions.getHeadPageId()); 1712 } else { 1713 dataOut.writeBoolean(false); 1714 } 1715 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 1716 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 1717 } 1718 } 1719 1720 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 1721 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 1722 1723 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 1724 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 1725 rc.mergeFramed((InputStream)dataIn); 1726 return rc; 1727 } 1728 1729 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 1730 object.writeFramed((OutputStream)dataOut); 1731 } 1732 } 1733 1734 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 1735 String key = key(destination); 1736 StoredDestination rc = storedDestinations.get(key); 1737 if (rc == null) { 1738 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 1739 rc = loadStoredDestination(tx, key, topic); 1740 // Cache it. We may want to remove/unload destinations from the 1741 // cache that are not used for a while 1742 // to reduce memory usage. 1743 storedDestinations.put(key, rc); 1744 } 1745 return rc; 1746 } 1747 1748 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 1749 String key = key(destination); 1750 StoredDestination rc = storedDestinations.get(key); 1751 if (rc == null && metadata.destinations.containsKey(tx, key)) { 1752 rc = getStoredDestination(destination, tx); 1753 } 1754 return rc; 1755 } 1756 1757 /** 1758 * @param tx 1759 * @param key 1760 * @param topic 1761 * @return 1762 * @throws IOException 1763 */ 1764 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 1765 // Try to load the existing indexes.. 1766 StoredDestination rc = metadata.destinations.get(tx, key); 1767 if (rc == null) { 1768 // Brand new destination.. allocate indexes for it. 1769 rc = new StoredDestination(); 1770 rc.orderIndex.allocate(tx); 1771 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 1772 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 1773 1774 if (topic) { 1775 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 1776 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 1777 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 1778 } 1779 metadata.destinations.put(tx, key, rc); 1780 } 1781 1782 // Configure the marshalers and load. 1783 rc.orderIndex.load(tx); 1784 1785 // Figure out the next key using the last entry in the destination. 1786 rc.orderIndex.configureLast(tx); 1787 1788 rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE); 1789 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 1790 rc.locationIndex.load(tx); 1791 1792 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 1793 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 1794 rc.messageIdIndex.load(tx); 1795 1796 // If it was a topic... 1797 if (topic) { 1798 1799 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 1800 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 1801 rc.subscriptions.load(tx); 1802 1803 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 1804 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 1805 rc.subscriptionAcks.load(tx); 1806 1807 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 1808 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 1809 rc.ackPositions.load(tx); 1810 1811 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 1812 1813 if (metadata.version < 3) { 1814 1815 // on upgrade need to fill ackLocation with available messages past last ack 1816 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 1817 Entry<String, LastAck> entry = iterator.next(); 1818 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 1819 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 1820 Long sequence = orderIterator.next().getKey(); 1821 addAckLocation(tx, rc, sequence, entry.getKey()); 1822 } 1823 // modify so it is upgraded 1824 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 1825 } 1826 } 1827 1828 // Configure the message references index 1829 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 1830 while (subscriptions.hasNext()) { 1831 Entry<String, SequenceSet> subscription = subscriptions.next(); 1832 SequenceSet pendingAcks = subscription.getValue(); 1833 if (pendingAcks != null && !pendingAcks.isEmpty()) { 1834 Long lastPendingAck = pendingAcks.getTail().getLast(); 1835 for(Long sequenceId : pendingAcks) { 1836 Long current = rc.messageReferences.get(sequenceId); 1837 if (current == null) { 1838 current = new Long(0); 1839 } 1840 1841 // We always add a trailing empty entry for the next position to start from 1842 // so we need to ensure we don't count that as a message reference on reload. 1843 if (!sequenceId.equals(lastPendingAck)) { 1844 current = current.longValue() + 1; 1845 } 1846 1847 rc.messageReferences.put(sequenceId, current); 1848 } 1849 } 1850 } 1851 1852 // Configure the subscription cache 1853 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 1854 Entry<String, LastAck> entry = iterator.next(); 1855 rc.subscriptionCache.add(entry.getKey()); 1856 } 1857 1858 if (rc.orderIndex.nextMessageId == 0) { 1859 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 1860 if (!rc.subscriptionAcks.isEmpty(tx)) { 1861 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 1862 Entry<String, LastAck> entry = iterator.next(); 1863 rc.orderIndex.nextMessageId = 1864 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 1865 } 1866 } 1867 } else { 1868 // update based on ackPositions for unmatched, last entry is always the next 1869 if (!rc.messageReferences.isEmpty()) { 1870 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; 1871 rc.orderIndex.nextMessageId = 1872 Math.max(rc.orderIndex.nextMessageId, nextMessageId); 1873 } 1874 } 1875 } 1876 1877 if (metadata.version < VERSION) { 1878 // store again after upgrade 1879 metadata.destinations.put(tx, key, rc); 1880 } 1881 return rc; 1882 } 1883 1884 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 1885 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 1886 if (sequences == null) { 1887 sequences = new SequenceSet(); 1888 sequences.add(messageSequence); 1889 sd.ackPositions.add(tx, subscriptionKey, sequences); 1890 } else { 1891 sequences.add(messageSequence); 1892 sd.ackPositions.put(tx, subscriptionKey, sequences); 1893 } 1894 1895 Long count = sd.messageReferences.get(messageSequence); 1896 if (count == null) { 1897 count = Long.valueOf(0L); 1898 } 1899 count = count.longValue() + 1; 1900 sd.messageReferences.put(messageSequence, count); 1901 } 1902 1903 // new sub is interested in potentially all existing messages 1904 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 1905 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 1906 if (sequences == null) { 1907 sequences = new SequenceSet(); 1908 sequences.add(messageSequence); 1909 sd.ackPositions.add(tx, subscriptionKey, sequences); 1910 } else { 1911 sequences.add(messageSequence); 1912 sd.ackPositions.put(tx, subscriptionKey, sequences); 1913 } 1914 1915 Long count = sd.messageReferences.get(messageSequence); 1916 if (count == null) { 1917 count = Long.valueOf(0L); 1918 } 1919 count = count.longValue() + 1; 1920 sd.messageReferences.put(messageSequence, count); 1921 } 1922 1923 // on a new message add, all existing subs are interested in this message 1924 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 1925 for(String subscriptionKey : sd.subscriptionCache) { 1926 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 1927 if (sequences == null) { 1928 sequences = new SequenceSet(); 1929 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 1930 sd.ackPositions.add(tx, subscriptionKey, sequences); 1931 } else { 1932 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 1933 sd.ackPositions.put(tx, subscriptionKey, sequences); 1934 } 1935 1936 Long count = sd.messageReferences.get(messageSequence); 1937 if (count == null) { 1938 count = Long.valueOf(0L); 1939 } 1940 count = count.longValue() + 1; 1941 sd.messageReferences.put(messageSequence, count); 1942 sd.messageReferences.put(messageSequence+1, Long.valueOf(0L)); 1943 } 1944 } 1945 1946 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 1947 if (!sd.ackPositions.isEmpty(tx)) { 1948 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 1949 if (sequences == null || sequences.isEmpty()) { 1950 return; 1951 } 1952 1953 ArrayList<Long> unreferenced = new ArrayList<Long>(); 1954 1955 for(Long sequenceId : sequences) { 1956 Long references = sd.messageReferences.get(sequenceId); 1957 if (references != null) { 1958 references = references.longValue() - 1; 1959 1960 if (references.longValue() > 0) { 1961 sd.messageReferences.put(sequenceId, references); 1962 } else { 1963 sd.messageReferences.remove(sequenceId); 1964 unreferenced.add(sequenceId); 1965 } 1966 } 1967 } 1968 1969 for(Long sequenceId : unreferenced) { 1970 // Find all the entries that need to get deleted. 1971 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 1972 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 1973 1974 // Do the actual deletes. 1975 for (Entry<Long, MessageKeys> entry : deletes) { 1976 sd.locationIndex.remove(tx, entry.getValue().location); 1977 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 1978 sd.orderIndex.remove(tx, entry.getKey()); 1979 } 1980 } 1981 } 1982 } 1983 1984 /** 1985 * @param tx 1986 * @param sd 1987 * @param subscriptionKey 1988 * @param messageSequence 1989 * @throws IOException 1990 */ 1991 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { 1992 // Remove the sub from the previous location set.. 1993 if (messageSequence != null) { 1994 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 1995 if (range != null && !range.isEmpty()) { 1996 range.remove(messageSequence); 1997 if (!range.isEmpty()) { 1998 sd.ackPositions.put(tx, subscriptionKey, range); 1999 } else { 2000 sd.ackPositions.remove(tx, subscriptionKey); 2001 } 2002 2003 // Check if the message is reference by any other subscription. 2004 Long count = sd.messageReferences.get(messageSequence); 2005 if (count != null){ 2006 long references = count.longValue() - 1; 2007 if (references > 0) { 2008 sd.messageReferences.put(messageSequence, Long.valueOf(references)); 2009 return; 2010 } else { 2011 sd.messageReferences.remove(messageSequence); 2012 } 2013 } 2014 2015 // Find all the entries that need to get deleted. 2016 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2017 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2018 2019 // Do the actual deletes. 2020 for (Entry<Long, MessageKeys> entry : deletes) { 2021 sd.locationIndex.remove(tx, entry.getValue().location); 2022 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2023 sd.orderIndex.remove(tx, entry.getKey()); 2024 } 2025 } 2026 } 2027 } 2028 2029 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2030 return sd.subscriptionAcks.get(tx, subscriptionKey); 2031 } 2032 2033 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2034 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2035 if (messageSequences != null) { 2036 long result = messageSequences.rangeSize(); 2037 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2038 return result > 0 ? result - 1 : 0; 2039 } 2040 2041 return 0; 2042 } 2043 2044 private String key(KahaDestination destination) { 2045 return destination.getType().getNumber() + ":" + destination.getName(); 2046 } 2047 2048 // ///////////////////////////////////////////////////////////////// 2049 // Transaction related implementation methods. 2050 // ///////////////////////////////////////////////////////////////// 2051 @SuppressWarnings("rawtypes") 2052 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2053 @SuppressWarnings("rawtypes") 2054 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2055 protected final Set<String> ackedAndPrepared = new HashSet<String>(); 2056 2057 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 2058 // till then they are skipped by the store. 2059 // 'at most once' XA guarantee 2060 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 2061 this.indexLock.writeLock().lock(); 2062 try { 2063 for (MessageAck ack : acks) { 2064 ackedAndPrepared.add(ack.getLastMessageId().toString()); 2065 } 2066 } finally { 2067 this.indexLock.writeLock().unlock(); 2068 } 2069 } 2070 2071 public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException { 2072 if (acks != null) { 2073 this.indexLock.writeLock().lock(); 2074 try { 2075 for (MessageAck ack : acks) { 2076 ackedAndPrepared.remove(ack.getLastMessageId().toString()); 2077 } 2078 } finally { 2079 this.indexLock.writeLock().unlock(); 2080 } 2081 } 2082 } 2083 2084 @SuppressWarnings("rawtypes") 2085 private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) { 2086 TransactionId key = TransactionIdConversion.convert(info); 2087 List<Operation> tx; 2088 synchronized (inflightTransactions) { 2089 tx = inflightTransactions.get(key); 2090 if (tx == null) { 2091 tx = Collections.synchronizedList(new ArrayList<Operation>()); 2092 inflightTransactions.put(key, tx); 2093 } 2094 } 2095 return tx; 2096 } 2097 2098 @SuppressWarnings("unused") 2099 private TransactionId key(KahaTransactionInfo transactionInfo) { 2100 return TransactionIdConversion.convert(transactionInfo); 2101 } 2102 2103 abstract class Operation <T extends JournalCommand<T>> { 2104 final T command; 2105 final Location location; 2106 2107 public Operation(T command, Location location) { 2108 this.command = command; 2109 this.location = location; 2110 } 2111 2112 public Location getLocation() { 2113 return location; 2114 } 2115 2116 public T getCommand() { 2117 return command; 2118 } 2119 2120 abstract public void execute(Transaction tx) throws IOException; 2121 } 2122 2123 class AddOpperation extends Operation<KahaAddMessageCommand> { 2124 2125 public AddOpperation(KahaAddMessageCommand command, Location location) { 2126 super(command, location); 2127 } 2128 2129 @Override 2130 public void execute(Transaction tx) throws IOException { 2131 upadateIndex(tx, command, location); 2132 } 2133 2134 } 2135 2136 class RemoveOpperation extends Operation<KahaRemoveMessageCommand> { 2137 2138 public RemoveOpperation(KahaRemoveMessageCommand command, Location location) { 2139 super(command, location); 2140 } 2141 2142 @Override 2143 public void execute(Transaction tx) throws IOException { 2144 updateIndex(tx, command, location); 2145 } 2146 } 2147 2148 // ///////////////////////////////////////////////////////////////// 2149 // Initialization related implementation methods. 2150 // ///////////////////////////////////////////////////////////////// 2151 2152 private PageFile createPageFile() { 2153 PageFile index = new PageFile(directory, "db"); 2154 index.setEnableWriteThread(isEnableIndexWriteAsync()); 2155 index.setWriteBatchSize(getIndexWriteBatchSize()); 2156 index.setPageCacheSize(indexCacheSize); 2157 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 2158 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 2159 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 2160 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 2161 index.setEnablePageCaching(isEnableIndexPageCaching()); 2162 return index; 2163 } 2164 2165 private Journal createJournal() throws IOException { 2166 Journal manager = new Journal(); 2167 manager.setDirectory(directory); 2168 manager.setMaxFileLength(getJournalMaxFileLength()); 2169 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 2170 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 2171 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 2172 manager.setArchiveDataLogs(isArchiveDataLogs()); 2173 manager.setSizeAccumulator(storeSize); 2174 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 2175 if (getDirectoryArchive() != null) { 2176 IOHelper.mkdirs(getDirectoryArchive()); 2177 manager.setDirectoryArchive(getDirectoryArchive()); 2178 } 2179 return manager; 2180 } 2181 2182 public int getJournalMaxWriteBatchSize() { 2183 return journalMaxWriteBatchSize; 2184 } 2185 2186 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 2187 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 2188 } 2189 2190 public File getDirectory() { 2191 return directory; 2192 } 2193 2194 public void setDirectory(File directory) { 2195 this.directory = directory; 2196 } 2197 2198 public boolean isDeleteAllMessages() { 2199 return deleteAllMessages; 2200 } 2201 2202 public void setDeleteAllMessages(boolean deleteAllMessages) { 2203 this.deleteAllMessages = deleteAllMessages; 2204 } 2205 2206 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 2207 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 2208 } 2209 2210 public int getIndexWriteBatchSize() { 2211 return setIndexWriteBatchSize; 2212 } 2213 2214 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 2215 this.enableIndexWriteAsync = enableIndexWriteAsync; 2216 } 2217 2218 boolean isEnableIndexWriteAsync() { 2219 return enableIndexWriteAsync; 2220 } 2221 2222 public boolean isEnableJournalDiskSyncs() { 2223 return enableJournalDiskSyncs; 2224 } 2225 2226 public void setEnableJournalDiskSyncs(boolean syncWrites) { 2227 this.enableJournalDiskSyncs = syncWrites; 2228 } 2229 2230 public long getCheckpointInterval() { 2231 return checkpointInterval; 2232 } 2233 2234 public void setCheckpointInterval(long checkpointInterval) { 2235 this.checkpointInterval = checkpointInterval; 2236 } 2237 2238 public long getCleanupInterval() { 2239 return cleanupInterval; 2240 } 2241 2242 public void setCleanupInterval(long cleanupInterval) { 2243 this.cleanupInterval = cleanupInterval; 2244 } 2245 2246 public void setJournalMaxFileLength(int journalMaxFileLength) { 2247 this.journalMaxFileLength = journalMaxFileLength; 2248 } 2249 2250 public int getJournalMaxFileLength() { 2251 return journalMaxFileLength; 2252 } 2253 2254 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 2255 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 2256 } 2257 2258 public int getMaxFailoverProducersToTrack() { 2259 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 2260 } 2261 2262 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 2263 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 2264 } 2265 2266 public int getFailoverProducersAuditDepth() { 2267 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 2268 } 2269 2270 public PageFile getPageFile() { 2271 if (pageFile == null) { 2272 pageFile = createPageFile(); 2273 } 2274 return pageFile; 2275 } 2276 2277 public Journal getJournal() throws IOException { 2278 if (journal == null) { 2279 journal = createJournal(); 2280 } 2281 return journal; 2282 } 2283 2284 public boolean isFailIfDatabaseIsLocked() { 2285 return failIfDatabaseIsLocked; 2286 } 2287 2288 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 2289 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 2290 } 2291 2292 public boolean isIgnoreMissingJournalfiles() { 2293 return ignoreMissingJournalfiles; 2294 } 2295 2296 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 2297 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 2298 } 2299 2300 public int getIndexCacheSize() { 2301 return indexCacheSize; 2302 } 2303 2304 public void setIndexCacheSize(int indexCacheSize) { 2305 this.indexCacheSize = indexCacheSize; 2306 } 2307 2308 public boolean isCheckForCorruptJournalFiles() { 2309 return checkForCorruptJournalFiles; 2310 } 2311 2312 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 2313 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 2314 } 2315 2316 public boolean isChecksumJournalFiles() { 2317 return checksumJournalFiles; 2318 } 2319 2320 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 2321 this.checksumJournalFiles = checksumJournalFiles; 2322 } 2323 2324 public void setBrokerService(BrokerService brokerService) { 2325 this.brokerService = brokerService; 2326 } 2327 2328 /** 2329 * @return the archiveDataLogs 2330 */ 2331 public boolean isArchiveDataLogs() { 2332 return this.archiveDataLogs; 2333 } 2334 2335 /** 2336 * @param archiveDataLogs the archiveDataLogs to set 2337 */ 2338 public void setArchiveDataLogs(boolean archiveDataLogs) { 2339 this.archiveDataLogs = archiveDataLogs; 2340 } 2341 2342 /** 2343 * @return the directoryArchive 2344 */ 2345 public File getDirectoryArchive() { 2346 return this.directoryArchive; 2347 } 2348 2349 /** 2350 * @param directoryArchive the directoryArchive to set 2351 */ 2352 public void setDirectoryArchive(File directoryArchive) { 2353 this.directoryArchive = directoryArchive; 2354 } 2355 2356 /** 2357 * @return the databaseLockedWaitDelay 2358 */ 2359 public int getDatabaseLockedWaitDelay() { 2360 return this.databaseLockedWaitDelay; 2361 } 2362 2363 /** 2364 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 2365 */ 2366 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) { 2367 this.databaseLockedWaitDelay = databaseLockedWaitDelay; 2368 } 2369 2370 public boolean isRewriteOnRedelivery() { 2371 return rewriteOnRedelivery; 2372 } 2373 2374 public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) { 2375 this.rewriteOnRedelivery = rewriteOnRedelivery; 2376 } 2377 2378 public boolean isArchiveCorruptedIndex() { 2379 return archiveCorruptedIndex; 2380 } 2381 2382 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 2383 this.archiveCorruptedIndex = archiveCorruptedIndex; 2384 } 2385 2386 public float getIndexLFUEvictionFactor() { 2387 return indexLFUEvictionFactor; 2388 } 2389 2390 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 2391 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 2392 } 2393 2394 public boolean isUseIndexLFRUEviction() { 2395 return useIndexLFRUEviction; 2396 } 2397 2398 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 2399 this.useIndexLFRUEviction = useIndexLFRUEviction; 2400 } 2401 2402 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 2403 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 2404 } 2405 2406 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 2407 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 2408 } 2409 2410 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 2411 this.enableIndexPageCaching = enableIndexPageCaching; 2412 } 2413 2414 public boolean isEnableIndexDiskSyncs() { 2415 return enableIndexDiskSyncs; 2416 } 2417 2418 public boolean isEnableIndexRecoveryFile() { 2419 return enableIndexRecoveryFile; 2420 } 2421 2422 public boolean isEnableIndexPageCaching() { 2423 return enableIndexPageCaching; 2424 } 2425 2426 // ///////////////////////////////////////////////////////////////// 2427 // Internal conversion methods. 2428 // ///////////////////////////////////////////////////////////////// 2429 2430 class MessageOrderCursor{ 2431 long defaultCursorPosition; 2432 long lowPriorityCursorPosition; 2433 long highPriorityCursorPosition; 2434 MessageOrderCursor(){ 2435 } 2436 2437 MessageOrderCursor(long position){ 2438 this.defaultCursorPosition=position; 2439 this.lowPriorityCursorPosition=position; 2440 this.highPriorityCursorPosition=position; 2441 } 2442 2443 MessageOrderCursor(MessageOrderCursor other){ 2444 this.defaultCursorPosition=other.defaultCursorPosition; 2445 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 2446 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 2447 } 2448 2449 MessageOrderCursor copy() { 2450 return new MessageOrderCursor(this); 2451 } 2452 2453 void reset() { 2454 this.defaultCursorPosition=0; 2455 this.highPriorityCursorPosition=0; 2456 this.lowPriorityCursorPosition=0; 2457 } 2458 2459 void increment() { 2460 if (defaultCursorPosition!=0) { 2461 defaultCursorPosition++; 2462 } 2463 if (highPriorityCursorPosition!=0) { 2464 highPriorityCursorPosition++; 2465 } 2466 if (lowPriorityCursorPosition!=0) { 2467 lowPriorityCursorPosition++; 2468 } 2469 } 2470 2471 public String toString() { 2472 return "MessageOrderCursor:[def:" + defaultCursorPosition 2473 + ", low:" + lowPriorityCursorPosition 2474 + ", high:" + highPriorityCursorPosition + "]"; 2475 } 2476 2477 public void sync(MessageOrderCursor other) { 2478 this.defaultCursorPosition=other.defaultCursorPosition; 2479 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 2480 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 2481 } 2482 } 2483 2484 class MessageOrderIndex { 2485 static final byte HI = 9; 2486 static final byte LO = 0; 2487 static final byte DEF = 4; 2488 2489 long nextMessageId; 2490 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 2491 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 2492 BTreeIndex<Long, MessageKeys> highPriorityIndex; 2493 MessageOrderCursor cursor = new MessageOrderCursor(); 2494 Long lastDefaultKey; 2495 Long lastHighKey; 2496 Long lastLowKey; 2497 byte lastGetPriority; 2498 2499 MessageKeys remove(Transaction tx, Long key) throws IOException { 2500 MessageKeys result = defaultPriorityIndex.remove(tx, key); 2501 if (result == null && highPriorityIndex!=null) { 2502 result = highPriorityIndex.remove(tx, key); 2503 if (result ==null && lowPriorityIndex!=null) { 2504 result = lowPriorityIndex.remove(tx, key); 2505 } 2506 } 2507 return result; 2508 } 2509 2510 void load(Transaction tx) throws IOException { 2511 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2512 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2513 defaultPriorityIndex.load(tx); 2514 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2515 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2516 lowPriorityIndex.load(tx); 2517 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2518 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2519 highPriorityIndex.load(tx); 2520 } 2521 2522 void allocate(Transaction tx) throws IOException { 2523 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2524 if (metadata.version >= 2) { 2525 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2526 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2527 } 2528 } 2529 2530 void configureLast(Transaction tx) throws IOException { 2531 // Figure out the next key using the last entry in the destination. 2532 if (highPriorityIndex != null) { 2533 Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx); 2534 if (lastEntry != null) { 2535 nextMessageId = lastEntry.getKey() + 1; 2536 } else { 2537 lastEntry = defaultPriorityIndex.getLast(tx); 2538 if (lastEntry != null) { 2539 nextMessageId = lastEntry.getKey() + 1; 2540 } else { 2541 lastEntry = lowPriorityIndex.getLast(tx); 2542 if (lastEntry != null) { 2543 nextMessageId = lastEntry.getKey() + 1; 2544 } 2545 } 2546 } 2547 } else { 2548 Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx); 2549 if (lastEntry != null) { 2550 nextMessageId = lastEntry.getKey() + 1; 2551 } 2552 } 2553 } 2554 2555 void clear(Transaction tx) throws IOException { 2556 this.remove(tx); 2557 this.resetCursorPosition(); 2558 this.allocate(tx); 2559 this.load(tx); 2560 this.configureLast(tx); 2561 } 2562 2563 void remove(Transaction tx) throws IOException { 2564 defaultPriorityIndex.clear(tx); 2565 defaultPriorityIndex.unload(tx); 2566 tx.free(defaultPriorityIndex.getPageId()); 2567 if (lowPriorityIndex != null) { 2568 lowPriorityIndex.clear(tx); 2569 lowPriorityIndex.unload(tx); 2570 2571 tx.free(lowPriorityIndex.getPageId()); 2572 } 2573 if (highPriorityIndex != null) { 2574 highPriorityIndex.clear(tx); 2575 highPriorityIndex.unload(tx); 2576 tx.free(highPriorityIndex.getPageId()); 2577 } 2578 } 2579 2580 void resetCursorPosition() { 2581 this.cursor.reset(); 2582 lastDefaultKey = null; 2583 lastHighKey = null; 2584 lastLowKey = null; 2585 } 2586 2587 void setBatch(Transaction tx, Long sequence) throws IOException { 2588 if (sequence != null) { 2589 Long nextPosition = new Long(sequence.longValue() + 1); 2590 if (defaultPriorityIndex.containsKey(tx, sequence)) { 2591 lastDefaultKey = sequence; 2592 cursor.defaultCursorPosition = nextPosition.longValue(); 2593 } else if (highPriorityIndex != null) { 2594 if (highPriorityIndex.containsKey(tx, sequence)) { 2595 lastHighKey = sequence; 2596 cursor.highPriorityCursorPosition = nextPosition.longValue(); 2597 } else if (lowPriorityIndex.containsKey(tx, sequence)) { 2598 lastLowKey = sequence; 2599 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 2600 } 2601 } else { 2602 LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this); 2603 lastDefaultKey = sequence; 2604 cursor.defaultCursorPosition = nextPosition.longValue(); 2605 } 2606 } 2607 } 2608 2609 void setBatch(Transaction tx, LastAck last) throws IOException { 2610 setBatch(tx, last.lastAckedSequence); 2611 if (cursor.defaultCursorPosition == 0 2612 && cursor.highPriorityCursorPosition == 0 2613 && cursor.lowPriorityCursorPosition == 0) { 2614 long next = last.lastAckedSequence + 1; 2615 switch (last.priority) { 2616 case DEF: 2617 cursor.defaultCursorPosition = next; 2618 cursor.highPriorityCursorPosition = next; 2619 break; 2620 case HI: 2621 cursor.highPriorityCursorPosition = next; 2622 break; 2623 case LO: 2624 cursor.lowPriorityCursorPosition = next; 2625 cursor.defaultCursorPosition = next; 2626 cursor.highPriorityCursorPosition = next; 2627 break; 2628 } 2629 } 2630 } 2631 2632 void stoppedIterating() { 2633 if (lastDefaultKey!=null) { 2634 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 2635 } 2636 if (lastHighKey!=null) { 2637 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 2638 } 2639 if (lastLowKey!=null) { 2640 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 2641 } 2642 lastDefaultKey = null; 2643 lastHighKey = null; 2644 lastLowKey = null; 2645 } 2646 2647 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 2648 throws IOException { 2649 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 2650 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 2651 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 2652 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 2653 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 2654 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 2655 } 2656 } 2657 2658 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 2659 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 2660 2661 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId); 2662 deletes.add(iterator.next()); 2663 } 2664 2665 long getNextMessageId(int priority) { 2666 return nextMessageId++; 2667 } 2668 2669 MessageKeys get(Transaction tx, Long key) throws IOException { 2670 MessageKeys result = defaultPriorityIndex.get(tx, key); 2671 if (result == null) { 2672 result = highPriorityIndex.get(tx, key); 2673 if (result == null) { 2674 result = lowPriorityIndex.get(tx, key); 2675 lastGetPriority = LO; 2676 } else { 2677 lastGetPriority = HI; 2678 } 2679 } else { 2680 lastGetPriority = DEF; 2681 } 2682 return result; 2683 } 2684 2685 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 2686 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 2687 return defaultPriorityIndex.put(tx, key, value); 2688 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 2689 return highPriorityIndex.put(tx, key, value); 2690 } else { 2691 return lowPriorityIndex.put(tx, key, value); 2692 } 2693 } 2694 2695 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 2696 return new MessageOrderIterator(tx,cursor); 2697 } 2698 2699 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 2700 return new MessageOrderIterator(tx,m); 2701 } 2702 2703 public byte lastGetPriority() { 2704 return lastGetPriority; 2705 } 2706 2707 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 2708 Iterator<Entry<Long, MessageKeys>>currentIterator; 2709 final Iterator<Entry<Long, MessageKeys>>highIterator; 2710 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 2711 final Iterator<Entry<Long, MessageKeys>>lowIterator; 2712 2713 MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException { 2714 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition); 2715 if (highPriorityIndex != null) { 2716 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition); 2717 } else { 2718 this.highIterator = null; 2719 } 2720 if (lowPriorityIndex != null) { 2721 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition); 2722 } else { 2723 this.lowIterator = null; 2724 } 2725 } 2726 2727 public boolean hasNext() { 2728 if (currentIterator == null) { 2729 if (highIterator != null) { 2730 if (highIterator.hasNext()) { 2731 currentIterator = highIterator; 2732 return currentIterator.hasNext(); 2733 } 2734 if (defaultIterator.hasNext()) { 2735 currentIterator = defaultIterator; 2736 return currentIterator.hasNext(); 2737 } 2738 if (lowIterator.hasNext()) { 2739 currentIterator = lowIterator; 2740 return currentIterator.hasNext(); 2741 } 2742 return false; 2743 } else { 2744 currentIterator = defaultIterator; 2745 return currentIterator.hasNext(); 2746 } 2747 } 2748 if (highIterator != null) { 2749 if (currentIterator.hasNext()) { 2750 return true; 2751 } 2752 if (currentIterator == highIterator) { 2753 if (defaultIterator.hasNext()) { 2754 currentIterator = defaultIterator; 2755 return currentIterator.hasNext(); 2756 } 2757 if (lowIterator.hasNext()) { 2758 currentIterator = lowIterator; 2759 return currentIterator.hasNext(); 2760 } 2761 return false; 2762 } 2763 2764 if (currentIterator == defaultIterator) { 2765 if (lowIterator.hasNext()) { 2766 currentIterator = lowIterator; 2767 return currentIterator.hasNext(); 2768 } 2769 return false; 2770 } 2771 } 2772 return currentIterator.hasNext(); 2773 } 2774 2775 public Entry<Long, MessageKeys> next() { 2776 Entry<Long, MessageKeys> result = currentIterator.next(); 2777 if (result != null) { 2778 Long key = result.getKey(); 2779 if (highIterator != null) { 2780 if (currentIterator == defaultIterator) { 2781 lastDefaultKey = key; 2782 } else if (currentIterator == highIterator) { 2783 lastHighKey = key; 2784 } else { 2785 lastLowKey = key; 2786 } 2787 } else { 2788 lastDefaultKey = key; 2789 } 2790 } 2791 return result; 2792 } 2793 2794 public void remove() { 2795 throw new UnsupportedOperationException(); 2796 } 2797 2798 } 2799 } 2800 2801 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 2802 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 2803 2804 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 2805 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2806 ObjectOutputStream oout = new ObjectOutputStream(baos); 2807 oout.writeObject(object); 2808 oout.flush(); 2809 oout.close(); 2810 byte[] data = baos.toByteArray(); 2811 dataOut.writeInt(data.length); 2812 dataOut.write(data); 2813 } 2814 2815 @SuppressWarnings("unchecked") 2816 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 2817 int dataLen = dataIn.readInt(); 2818 byte[] data = new byte[dataLen]; 2819 dataIn.readFully(data); 2820 ByteArrayInputStream bais = new ByteArrayInputStream(data); 2821 ObjectInputStream oin = new ObjectInputStream(bais); 2822 try { 2823 return (HashSet<String>) oin.readObject(); 2824 } catch (ClassNotFoundException cfe) { 2825 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 2826 ioe.initCause(cfe); 2827 throw ioe; 2828 } 2829 } 2830 } 2831 }