001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.page; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.io.RandomAccessFile; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.LinkedHashMap; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Properties; 038import java.util.TreeMap; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.zip.Adler32; 043import java.util.zip.Checksum; 044 045import org.apache.activemq.store.kahadb.disk.util.Sequence; 046import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 047import org.apache.activemq.util.DataByteArrayOutputStream; 048import org.apache.activemq.util.IOExceptionSupport; 049import org.apache.activemq.util.IOHelper; 050import org.apache.activemq.util.IntrospectionSupport; 051import org.apache.activemq.util.LFUCache; 052import org.apache.activemq.util.LRUCache; 053import org.apache.activemq.util.RecoverableRandomAccessFile; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 059 * be externally synchronized. 060 * <p/> 061 * The file has 3 parts: 062 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file. 063 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent 064 * Page Space: The pages in the page file. 065 */ 066public class PageFile { 067 068 private static final String PAGEFILE_SUFFIX = ".data"; 069 private static final String RECOVERY_FILE_SUFFIX = ".redo"; 070 private static final String FREE_FILE_SUFFIX = ".free"; 071 072 // 4k Default page size. 073 public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4); 074 public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000); 075 public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);; 076 077 private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4; 078 private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4; 079 080 // Recovery header is (long offset) 081 private static final Logger LOG = LoggerFactory.getLogger(PageFile.class); 082 083 // A PageFile will use a couple of files in this directory 084 private final File directory; 085 // And the file names in that directory will be based on this name. 086 private final String name; 087 088 // File handle used for reading pages.. 089 private RecoverableRandomAccessFile readFile; 090 // File handle used for writing pages.. 091 private RecoverableRandomAccessFile writeFile; 092 // File handle used for writing pages.. 093 private RecoverableRandomAccessFile recoveryFile; 094 095 // The size of pages 096 private int pageSize = DEFAULT_PAGE_SIZE; 097 098 // The minimum number of space allocated to the recovery file in number of pages. 099 private int recoveryFileMinPageCount = 1000; 100 // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 101 // to this max size as soon as possible. 102 private int recoveryFileMaxPageCount = 10000; 103 // The number of pages in the current recovery buffer 104 private int recoveryPageCount; 105 106 private final AtomicBoolean loaded = new AtomicBoolean(); 107 // The number of pages we are aiming to write every time we 108 // write to disk. 109 int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; 110 111 // We keep a cache of pages recently used? 112 private Map<Long, Page> pageCache; 113 // The cache of recently used pages. 114 private boolean enablePageCaching = true; 115 // How many pages will we keep in the cache? 116 private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE; 117 118 // Should first log the page write to the recovery buffer? Avoids partial 119 // page write failures.. 120 private boolean enableRecoveryFile = true; 121 // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() 122 private boolean enableDiskSyncs = true; 123 // Will writes be done in an async thread? 124 private boolean enabledWriteThread = false; 125 126 // These are used if enableAsyncWrites==true 127 private final AtomicBoolean stopWriter = new AtomicBoolean(); 128 private Thread writerThread; 129 private CountDownLatch checkpointLatch; 130 131 // Keeps track of writes that are being written to disk. 132 private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>(); 133 134 // Keeps track of free pages. 135 private final AtomicLong nextFreePageId = new AtomicLong(); 136 private SequenceSet freeList = new SequenceSet(); 137 138 private final AtomicLong nextTxid = new AtomicLong(); 139 140 // Persistent settings stored in the page file. 141 private MetaData metaData; 142 143 private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>(); 144 145 private boolean useLFRUEviction = false; 146 private float LFUEvictionFactor = 0.2f; 147 148 /** 149 * Use to keep track of updated pages which have not yet been committed. 150 */ 151 static class PageWrite { 152 Page page; 153 byte[] current; 154 byte[] diskBound; 155 long currentLocation = -1; 156 long diskBoundLocation = -1; 157 File tmpFile; 158 int length; 159 160 public PageWrite(Page page, byte[] data) { 161 this.page = page; 162 current = data; 163 } 164 165 public PageWrite(Page page, long currentLocation, int length, File tmpFile) { 166 this.page = page; 167 this.currentLocation = currentLocation; 168 this.tmpFile = tmpFile; 169 this.length = length; 170 } 171 172 public void setCurrent(Page page, byte[] data) { 173 this.page = page; 174 current = data; 175 currentLocation = -1; 176 diskBoundLocation = -1; 177 } 178 179 public void setCurrentLocation(Page page, long location, int length) { 180 this.page = page; 181 this.currentLocation = location; 182 this.length = length; 183 this.current = null; 184 } 185 186 @Override 187 public String toString() { 188 return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]"; 189 } 190 191 @SuppressWarnings("unchecked") 192 public Page getPage() { 193 return page; 194 } 195 196 public byte[] getDiskBound() throws IOException { 197 if (diskBound == null && diskBoundLocation != -1) { 198 diskBound = new byte[length]; 199 try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) { 200 file.seek(diskBoundLocation); 201 file.read(diskBound); 202 } 203 diskBoundLocation = -1; 204 } 205 return diskBound; 206 } 207 208 void begin() { 209 if (currentLocation != -1) { 210 diskBoundLocation = currentLocation; 211 } else { 212 diskBound = current; 213 } 214 current = null; 215 currentLocation = -1; 216 } 217 218 /** 219 * @return true if there is no pending writes to do. 220 */ 221 boolean done() { 222 diskBoundLocation = -1; 223 diskBound = null; 224 return current == null || currentLocation == -1; 225 } 226 227 boolean isDone() { 228 return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1; 229 } 230 } 231 232 /** 233 * The MetaData object hold the persistent data associated with a PageFile object. 234 */ 235 public static class MetaData { 236 237 String fileType; 238 String fileTypeVersion; 239 240 long metaDataTxId = -1; 241 int pageSize; 242 boolean cleanShutdown; 243 long lastTxId; 244 long freePages; 245 246 public String getFileType() { 247 return fileType; 248 } 249 250 public void setFileType(String fileType) { 251 this.fileType = fileType; 252 } 253 254 public String getFileTypeVersion() { 255 return fileTypeVersion; 256 } 257 258 public void setFileTypeVersion(String version) { 259 this.fileTypeVersion = version; 260 } 261 262 public long getMetaDataTxId() { 263 return metaDataTxId; 264 } 265 266 public void setMetaDataTxId(long metaDataTxId) { 267 this.metaDataTxId = metaDataTxId; 268 } 269 270 public int getPageSize() { 271 return pageSize; 272 } 273 274 public void setPageSize(int pageSize) { 275 this.pageSize = pageSize; 276 } 277 278 public boolean isCleanShutdown() { 279 return cleanShutdown; 280 } 281 282 public void setCleanShutdown(boolean cleanShutdown) { 283 this.cleanShutdown = cleanShutdown; 284 } 285 286 public long getLastTxId() { 287 return lastTxId; 288 } 289 290 public void setLastTxId(long lastTxId) { 291 this.lastTxId = lastTxId; 292 } 293 294 public long getFreePages() { 295 return freePages; 296 } 297 298 public void setFreePages(long value) { 299 this.freePages = value; 300 } 301 } 302 303 public Transaction tx() { 304 assertLoaded(); 305 return new Transaction(this); 306 } 307 308 /** 309 * Creates a PageFile in the specified directory who's data files are named by name. 310 */ 311 public PageFile(File directory, String name) { 312 this.directory = directory; 313 this.name = name; 314 } 315 316 /** 317 * Deletes the files used by the PageFile object. This method can only be used when this object is not loaded. 318 * 319 * @throws IOException if the files cannot be deleted. 320 * @throws IllegalStateException if this PageFile is loaded 321 */ 322 public void delete() throws IOException { 323 if (loaded.get()) { 324 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 325 } 326 delete(getMainPageFile()); 327 delete(getFreeFile()); 328 delete(getRecoveryFile()); 329 } 330 331 public void archive() throws IOException { 332 if (loaded.get()) { 333 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 334 } 335 long timestamp = System.currentTimeMillis(); 336 archive(getMainPageFile(), String.valueOf(timestamp)); 337 archive(getFreeFile(), String.valueOf(timestamp)); 338 archive(getRecoveryFile(), String.valueOf(timestamp)); 339 } 340 341 /** 342 * @param file 343 * @throws IOException 344 */ 345 private void delete(File file) throws IOException { 346 if (file.exists() && !file.delete()) { 347 throw new IOException("Could not delete: " + file.getPath()); 348 } 349 } 350 351 private void archive(File file, String suffix) throws IOException { 352 if (file.exists()) { 353 File archive = new File(file.getPath() + "-" + suffix); 354 if (!file.renameTo(archive)) { 355 throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath()); 356 } 357 } 358 } 359 360 /** 361 * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the 362 * first time the page file is loaded, then this creates the page file in the file system. 363 * 364 * @throws IOException If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 365 * there was a disk error. 366 * @throws IllegalStateException If the page file was already loaded. 367 */ 368 public void load() throws IOException, IllegalStateException { 369 if (loaded.compareAndSet(false, true)) { 370 371 if (enablePageCaching) { 372 if (isUseLFRUEviction()) { 373 pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor())); 374 } else { 375 pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true)); 376 } 377 } 378 379 File file = getMainPageFile(); 380 IOHelper.mkdirs(file.getParentFile()); 381 writeFile = new RecoverableRandomAccessFile(file, "rw", false); 382 readFile = new RecoverableRandomAccessFile(file, "r"); 383 384 if (readFile.length() > 0) { 385 // Load the page size setting cause that can't change once the file is created. 386 loadMetaData(); 387 pageSize = metaData.getPageSize(); 388 } else { 389 // Store the page size setting cause that can't change once the file is created. 390 metaData = new MetaData(); 391 metaData.setFileType(PageFile.class.getName()); 392 metaData.setFileTypeVersion("1"); 393 metaData.setPageSize(getPageSize()); 394 metaData.setCleanShutdown(true); 395 metaData.setFreePages(-1); 396 metaData.setLastTxId(0); 397 storeMetaData(); 398 } 399 400 if (enableRecoveryFile) { 401 recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw"); 402 } 403 404 boolean needsFreePageRecovery = false; 405 406 if (metaData.isCleanShutdown()) { 407 nextTxid.set(metaData.getLastTxId() + 1); 408 if (metaData.getFreePages() > 0) { 409 loadFreeList(); 410 } 411 } else { 412 LOG.debug(toString() + ", Recovering page file..."); 413 nextTxid.set(redoRecoveryUpdates()); 414 needsFreePageRecovery = true; 415 } 416 417 if (writeFile.length() < PAGE_FILE_HEADER_SIZE) { 418 writeFile.setLength(PAGE_FILE_HEADER_SIZE); 419 } 420 nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize); 421 422 if (needsFreePageRecovery) { 423 // Scan all to find the free pages after nextFreePageId is set 424 freeList = new SequenceSet(); 425 for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) { 426 Page page = i.next(); 427 if (page.getType() == Page.PAGE_FREE_TYPE) { 428 freeList.add(page.getPageId()); 429 } 430 } 431 } 432 433 metaData.setCleanShutdown(false); 434 storeMetaData(); 435 getFreeFile().delete(); 436 startWriter(); 437 } else { 438 throw new IllegalStateException("Cannot load the page file when it is already loaded."); 439 } 440 } 441 442 443 /** 444 * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles. 445 * once unloaded, you can no longer use the page file to read or write Pages. 446 * 447 * @throws IOException if there was a disk error occurred while closing the down the page file. 448 * @throws IllegalStateException if the PageFile is not loaded 449 */ 450 public void unload() throws IOException { 451 if (loaded.compareAndSet(true, false)) { 452 flush(); 453 try { 454 stopWriter(); 455 } catch (InterruptedException e) { 456 throw new InterruptedIOException(); 457 } 458 459 if (freeList.isEmpty()) { 460 metaData.setFreePages(0); 461 } else { 462 storeFreeList(); 463 metaData.setFreePages(freeList.size()); 464 } 465 466 metaData.setLastTxId(nextTxid.get() - 1); 467 metaData.setCleanShutdown(true); 468 storeMetaData(); 469 470 if (readFile != null) { 471 readFile.close(); 472 readFile = null; 473 writeFile.close(); 474 writeFile = null; 475 if (enableRecoveryFile) { 476 recoveryFile.close(); 477 recoveryFile = null; 478 } 479 freeList.clear(); 480 if (pageCache != null) { 481 pageCache = null; 482 } 483 synchronized (writes) { 484 writes.clear(); 485 } 486 } 487 } else { 488 throw new IllegalStateException("Cannot unload the page file when it is not loaded"); 489 } 490 } 491 492 public boolean isLoaded() { 493 return loaded.get(); 494 } 495 496 public void allowIOResumption() { 497 loaded.set(true); 498 } 499 500 /** 501 * Flush and sync all write buffers to disk. 502 * 503 * @throws IOException If an disk error occurred. 504 */ 505 public void flush() throws IOException { 506 507 if (enabledWriteThread && stopWriter.get()) { 508 throw new IOException("Page file already stopped: checkpointing is not allowed"); 509 } 510 511 // Setup a latch that gets notified when all buffered writes hits the disk. 512 CountDownLatch checkpointLatch; 513 synchronized (writes) { 514 if (writes.isEmpty()) { 515 return; 516 } 517 if (enabledWriteThread) { 518 if (this.checkpointLatch == null) { 519 this.checkpointLatch = new CountDownLatch(1); 520 } 521 checkpointLatch = this.checkpointLatch; 522 writes.notify(); 523 } else { 524 writeBatch(); 525 return; 526 } 527 } 528 try { 529 checkpointLatch.await(); 530 } catch (InterruptedException e) { 531 InterruptedIOException ioe = new InterruptedIOException(); 532 ioe.initCause(e); 533 throw ioe; 534 } 535 } 536 537 538 @Override 539 public String toString() { 540 return "Page File: " + getMainPageFile(); 541 } 542 543 /////////////////////////////////////////////////////////////////// 544 // Private Implementation Methods 545 /////////////////////////////////////////////////////////////////// 546 private File getMainPageFile() { 547 return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX); 548 } 549 550 public File getFreeFile() { 551 return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX); 552 } 553 554 public File getRecoveryFile() { 555 return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX); 556 } 557 558 public long toOffset(long pageId) { 559 return PAGE_FILE_HEADER_SIZE + (pageId * pageSize); 560 } 561 562 private void loadMetaData() throws IOException { 563 564 ByteArrayInputStream is; 565 MetaData v1 = new MetaData(); 566 MetaData v2 = new MetaData(); 567 try { 568 Properties p = new Properties(); 569 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2]; 570 readFile.seek(0); 571 readFile.readFully(d); 572 is = new ByteArrayInputStream(d); 573 p.load(is); 574 IntrospectionSupport.setProperties(v1, p); 575 } catch (IOException e) { 576 v1 = null; 577 } 578 579 try { 580 Properties p = new Properties(); 581 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2]; 582 readFile.seek(PAGE_FILE_HEADER_SIZE / 2); 583 readFile.readFully(d); 584 is = new ByteArrayInputStream(d); 585 p.load(is); 586 IntrospectionSupport.setProperties(v2, p); 587 } catch (IOException e) { 588 v2 = null; 589 } 590 591 if (v1 == null && v2 == null) { 592 throw new IOException("Could not load page file meta data"); 593 } 594 595 if (v1 == null || v1.metaDataTxId < 0) { 596 metaData = v2; 597 } else if (v2 == null || v1.metaDataTxId < 0) { 598 metaData = v1; 599 } else if (v1.metaDataTxId == v2.metaDataTxId) { 600 metaData = v1; // use the first since the 2nd could be a partial.. 601 } else { 602 metaData = v2; // use the second cause the first is probably a partial. 603 } 604 } 605 606 private void storeMetaData() throws IOException { 607 // Convert the metadata into a property format 608 metaData.metaDataTxId++; 609 Properties p = new Properties(); 610 IntrospectionSupport.getProperties(metaData, p, null); 611 612 ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE); 613 p.store(os, ""); 614 if (os.size() > PAGE_FILE_HEADER_SIZE / 2) { 615 throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2); 616 } 617 // Fill the rest with space... 618 byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()]; 619 Arrays.fill(filler, (byte) ' '); 620 os.write(filler); 621 os.flush(); 622 623 byte[] d = os.toByteArray(); 624 625 // So we don't loose it.. write it 2 times... 626 writeFile.seek(0); 627 writeFile.write(d); 628 writeFile.sync(); 629 writeFile.seek(PAGE_FILE_HEADER_SIZE / 2); 630 writeFile.write(d); 631 writeFile.sync(); 632 } 633 634 private void storeFreeList() throws IOException { 635 FileOutputStream os = new FileOutputStream(getFreeFile()); 636 DataOutputStream dos = new DataOutputStream(os); 637 SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos); 638 dos.close(); 639 } 640 641 private void loadFreeList() throws IOException { 642 freeList.clear(); 643 FileInputStream is = new FileInputStream(getFreeFile()); 644 DataInputStream dis = new DataInputStream(is); 645 freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); 646 dis.close(); 647 } 648 649 /////////////////////////////////////////////////////////////////// 650 // Property Accessors 651 /////////////////////////////////////////////////////////////////// 652 653 /** 654 * Is the recovery buffer used to double buffer page writes. Enabled by default. 655 * 656 * @return is the recovery buffer enabled. 657 */ 658 public boolean isEnableRecoveryFile() { 659 return enableRecoveryFile; 660 } 661 662 /** 663 * Sets if the recovery buffer uses to double buffer page writes. Enabled by default. Disabling this 664 * may potentially cause partial page writes which can lead to page file corruption. 665 */ 666 public void setEnableRecoveryFile(boolean doubleBuffer) { 667 assertNotLoaded(); 668 this.enableRecoveryFile = doubleBuffer; 669 } 670 671 /** 672 * @return Are page writes synced to disk? 673 */ 674 public boolean isEnableDiskSyncs() { 675 return enableDiskSyncs; 676 } 677 678 /** 679 * Allows you enable syncing writes to disk. 680 */ 681 public void setEnableDiskSyncs(boolean syncWrites) { 682 assertNotLoaded(); 683 this.enableDiskSyncs = syncWrites; 684 } 685 686 /** 687 * @return the page size 688 */ 689 public int getPageSize() { 690 return this.pageSize; 691 } 692 693 /** 694 * @return the amount of content data that a page can hold. 695 */ 696 public int getPageContentSize() { 697 return this.pageSize - Page.PAGE_HEADER_SIZE; 698 } 699 700 /** 701 * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk, 702 * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting 703 * can no longer be changed. 704 * 705 * @param pageSize the pageSize to set 706 * @throws IllegalStateException once the page file is loaded. 707 */ 708 public void setPageSize(int pageSize) throws IllegalStateException { 709 assertNotLoaded(); 710 this.pageSize = pageSize; 711 } 712 713 /** 714 * @return true if read page caching is enabled 715 */ 716 public boolean isEnablePageCaching() { 717 return this.enablePageCaching; 718 } 719 720 /** 721 * @param enablePageCaching allows you to enable read page caching 722 */ 723 public void setEnablePageCaching(boolean enablePageCaching) { 724 assertNotLoaded(); 725 this.enablePageCaching = enablePageCaching; 726 } 727 728 /** 729 * @return the maximum number of pages that will get stored in the read page cache. 730 */ 731 public int getPageCacheSize() { 732 return this.pageCacheSize; 733 } 734 735 /** 736 * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache. 737 */ 738 public void setPageCacheSize(int pageCacheSize) { 739 assertNotLoaded(); 740 this.pageCacheSize = pageCacheSize; 741 } 742 743 public boolean isEnabledWriteThread() { 744 return enabledWriteThread; 745 } 746 747 public void setEnableWriteThread(boolean enableAsyncWrites) { 748 assertNotLoaded(); 749 this.enabledWriteThread = enableAsyncWrites; 750 } 751 752 public long getDiskSize() throws IOException { 753 return toOffset(nextFreePageId.get()); 754 } 755 756 /** 757 * @return the number of pages allocated in the PageFile 758 */ 759 public long getPageCount() { 760 return nextFreePageId.get(); 761 } 762 763 public int getRecoveryFileMinPageCount() { 764 return recoveryFileMinPageCount; 765 } 766 767 public long getFreePageCount() { 768 assertLoaded(); 769 return freeList.rangeSize(); 770 } 771 772 public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) { 773 assertNotLoaded(); 774 this.recoveryFileMinPageCount = recoveryFileMinPageCount; 775 } 776 777 public int getRecoveryFileMaxPageCount() { 778 return recoveryFileMaxPageCount; 779 } 780 781 public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) { 782 assertNotLoaded(); 783 this.recoveryFileMaxPageCount = recoveryFileMaxPageCount; 784 } 785 786 public int getWriteBatchSize() { 787 return writeBatchSize; 788 } 789 790 public void setWriteBatchSize(int writeBatchSize) { 791 this.writeBatchSize = writeBatchSize; 792 } 793 794 public float getLFUEvictionFactor() { 795 return LFUEvictionFactor; 796 } 797 798 public void setLFUEvictionFactor(float LFUEvictionFactor) { 799 this.LFUEvictionFactor = LFUEvictionFactor; 800 } 801 802 public boolean isUseLFRUEviction() { 803 return useLFRUEviction; 804 } 805 806 public void setUseLFRUEviction(boolean useLFRUEviction) { 807 this.useLFRUEviction = useLFRUEviction; 808 } 809 810 /////////////////////////////////////////////////////////////////// 811 // Package Protected Methods exposed to Transaction 812 /////////////////////////////////////////////////////////////////// 813 814 /** 815 * @throws IllegalStateException if the page file is not loaded. 816 */ 817 void assertLoaded() throws IllegalStateException { 818 if (!loaded.get()) { 819 throw new IllegalStateException("PageFile is not loaded"); 820 } 821 } 822 823 void assertNotLoaded() throws IllegalStateException { 824 if (loaded.get()) { 825 throw new IllegalStateException("PageFile is loaded"); 826 } 827 } 828 829 /** 830 * Allocates a block of free pages that you can write data to. 831 * 832 * @param count the number of sequential pages to allocate 833 * @return the first page of the sequential set. 834 * @throws IOException If an disk error occurred. 835 * @throws IllegalStateException if the PageFile is not loaded 836 */ 837 <T> Page<T> allocate(int count) throws IOException { 838 assertLoaded(); 839 if (count <= 0) { 840 throw new IllegalArgumentException("The allocation count must be larger than zero"); 841 } 842 843 Sequence seq = freeList.removeFirstSequence(count); 844 845 // We may need to create new free pages... 846 if (seq == null) { 847 848 Page<T> first = null; 849 int c = count; 850 851 // Perform the id's only once.... 852 long pageId = nextFreePageId.getAndAdd(count); 853 long writeTxnId = nextTxid.getAndAdd(count); 854 855 while (c-- > 0) { 856 Page<T> page = new Page<T>(pageId++); 857 page.makeFree(writeTxnId++); 858 859 if (first == null) { 860 first = page; 861 } 862 863 addToCache(page); 864 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize); 865 page.write(out); 866 write(page, out.getData()); 867 868 // LOG.debug("allocate writing: "+page.getPageId()); 869 } 870 871 return first; 872 } 873 874 Page<T> page = new Page<T>(seq.getFirst()); 875 page.makeFree(0); 876 // LOG.debug("allocated: "+page.getPageId()); 877 return page; 878 } 879 880 long getNextWriteTransactionId() { 881 return nextTxid.incrementAndGet(); 882 } 883 884 synchronized void readPage(long pageId, byte[] data) throws IOException { 885 readFile.seek(toOffset(pageId)); 886 readFile.readFully(data); 887 } 888 889 public void freePage(long pageId) { 890 freeList.add(pageId); 891 removeFromCache(pageId); 892 } 893 894 @SuppressWarnings("unchecked") 895 private <T> void write(Page<T> page, byte[] data) throws IOException { 896 final PageWrite write = new PageWrite(page, data); 897 Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() { 898 @Override 899 public Long getKey() { 900 return write.getPage().getPageId(); 901 } 902 903 @Override 904 public PageWrite getValue() { 905 return write; 906 } 907 908 @Override 909 public PageWrite setValue(PageWrite value) { 910 return null; 911 } 912 }; 913 Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry}; 914 write(Arrays.asList(entries)); 915 } 916 917 void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException { 918 synchronized (writes) { 919 if (enabledWriteThread) { 920 while (writes.size() >= writeBatchSize && !stopWriter.get()) { 921 try { 922 writes.wait(); 923 } catch (InterruptedException e) { 924 Thread.currentThread().interrupt(); 925 throw new InterruptedIOException(); 926 } 927 } 928 } 929 930 boolean longTx = false; 931 932 for (Map.Entry<Long, PageWrite> entry : updates) { 933 Long key = entry.getKey(); 934 PageWrite value = entry.getValue(); 935 PageWrite write = writes.get(key); 936 if (write == null) { 937 writes.put(key, value); 938 } else { 939 if (value.currentLocation != -1) { 940 write.setCurrentLocation(value.page, value.currentLocation, value.length); 941 write.tmpFile = value.tmpFile; 942 longTx = true; 943 } else { 944 write.setCurrent(value.page, value.current); 945 } 946 } 947 } 948 949 // Once we start approaching capacity, notify the writer to start writing 950 // sync immediately for long txs 951 if (longTx || canStartWriteBatch()) { 952 953 if (enabledWriteThread) { 954 writes.notify(); 955 } else { 956 writeBatch(); 957 } 958 } 959 } 960 } 961 962 private boolean canStartWriteBatch() { 963 int capacityUsed = ((writes.size() * 100) / writeBatchSize); 964 if (enabledWriteThread) { 965 // The constant 10 here controls how soon write batches start going to disk.. 966 // would be nice to figure out how to auto tune that value. Make to small and 967 // we reduce through put because we are locking the write mutex too often doing writes 968 return capacityUsed >= 10 || checkpointLatch != null; 969 } else { 970 return capacityUsed >= 80 || checkpointLatch != null; 971 } 972 } 973 974 /////////////////////////////////////////////////////////////////// 975 // Cache Related operations 976 /////////////////////////////////////////////////////////////////// 977 @SuppressWarnings("unchecked") 978 <T> Page<T> getFromCache(long pageId) { 979 synchronized (writes) { 980 PageWrite pageWrite = writes.get(pageId); 981 if (pageWrite != null) { 982 return pageWrite.page; 983 } 984 } 985 986 Page<T> result = null; 987 if (enablePageCaching) { 988 result = pageCache.get(pageId); 989 } 990 return result; 991 } 992 993 void addToCache(Page page) { 994 if (enablePageCaching) { 995 pageCache.put(page.getPageId(), page); 996 } 997 } 998 999 void removeFromCache(long pageId) { 1000 if (enablePageCaching) { 1001 pageCache.remove(pageId); 1002 } 1003 } 1004 1005 /////////////////////////////////////////////////////////////////// 1006 // Internal Double write implementation follows... 1007 /////////////////////////////////////////////////////////////////// 1008 1009 private void pollWrites() { 1010 try { 1011 while (!stopWriter.get()) { 1012 // Wait for a notification... 1013 synchronized (writes) { 1014 writes.notifyAll(); 1015 1016 // If there is not enough to write, wait for a notification... 1017 while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) { 1018 writes.wait(100); 1019 } 1020 1021 if (writes.isEmpty()) { 1022 releaseCheckpointWaiter(); 1023 } 1024 } 1025 writeBatch(); 1026 } 1027 } catch (Throwable e) { 1028 LOG.info("An exception was raised while performing poll writes", e); 1029 } finally { 1030 releaseCheckpointWaiter(); 1031 } 1032 } 1033 1034 private void writeBatch() throws IOException { 1035 1036 CountDownLatch checkpointLatch; 1037 ArrayList<PageWrite> batch; 1038 synchronized (writes) { 1039 // If there is not enough to write, wait for a notification... 1040 1041 batch = new ArrayList<PageWrite>(writes.size()); 1042 // build a write batch from the current write cache. 1043 for (PageWrite write : writes.values()) { 1044 batch.add(write); 1045 // Move the current write to the diskBound write, this lets folks update the 1046 // page again without blocking for this write. 1047 write.begin(); 1048 if (write.diskBound == null && write.diskBoundLocation == -1) { 1049 batch.remove(write); 1050 } 1051 } 1052 1053 // Grab on to the existing checkpoint latch cause once we do this write we can 1054 // release the folks that were waiting for those writes to hit disk. 1055 checkpointLatch = this.checkpointLatch; 1056 this.checkpointLatch = null; 1057 } 1058 1059 try { 1060 1061 // First land the writes in the recovery file 1062 if (enableRecoveryFile) { 1063 Checksum checksum = new Adler32(); 1064 1065 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1066 1067 for (PageWrite w : batch) { 1068 try { 1069 checksum.update(w.getDiskBound(), 0, pageSize); 1070 } catch (Throwable t) { 1071 throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); 1072 } 1073 recoveryFile.writeLong(w.page.getPageId()); 1074 recoveryFile.write(w.getDiskBound(), 0, pageSize); 1075 } 1076 1077 // Can we shrink the recovery buffer?? 1078 if (recoveryPageCount > recoveryFileMaxPageCount) { 1079 int t = Math.max(recoveryFileMinPageCount, batch.size()); 1080 recoveryFile.setLength(recoveryFileSizeForPages(t)); 1081 } 1082 1083 // Record the page writes in the recovery buffer. 1084 recoveryFile.seek(0); 1085 // Store the next tx id... 1086 recoveryFile.writeLong(nextTxid.get()); 1087 // Store the checksum for thw write batch so that on recovery we 1088 // know if we have a consistent 1089 // write batch on disk. 1090 recoveryFile.writeLong(checksum.getValue()); 1091 // Write the # of pages that will follow 1092 recoveryFile.writeInt(batch.size()); 1093 1094 if (enableDiskSyncs) { 1095 recoveryFile.sync(); 1096 } 1097 } 1098 1099 for (PageWrite w : batch) { 1100 writeFile.seek(toOffset(w.page.getPageId())); 1101 writeFile.write(w.getDiskBound(), 0, pageSize); 1102 w.done(); 1103 } 1104 1105 if (enableDiskSyncs) { 1106 writeFile.sync(); 1107 } 1108 1109 } catch (IOException ioError) { 1110 LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError); 1111 // any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates 1112 // to ensure disk image is self consistent 1113 loaded.set(false); 1114 throw ioError; 1115 } finally { 1116 synchronized (writes) { 1117 for (PageWrite w : batch) { 1118 // If there are no more pending writes, then remove it from 1119 // the write cache. 1120 if (w.isDone()) { 1121 writes.remove(w.page.getPageId()); 1122 if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) { 1123 if (!w.tmpFile.delete()) { 1124 throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile); 1125 } 1126 tmpFilesForRemoval.remove(w.tmpFile); 1127 } 1128 } 1129 } 1130 } 1131 1132 if (checkpointLatch != null) { 1133 checkpointLatch.countDown(); 1134 } 1135 } 1136 } 1137 1138 public void removeTmpFile(File file) { 1139 tmpFilesForRemoval.add(file); 1140 } 1141 1142 private long recoveryFileSizeForPages(int pageCount) { 1143 return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount); 1144 } 1145 1146 private void releaseCheckpointWaiter() { 1147 if (checkpointLatch != null) { 1148 checkpointLatch.countDown(); 1149 checkpointLatch = null; 1150 } 1151 } 1152 1153 /** 1154 * Inspects the recovery buffer and re-applies any 1155 * partially applied page writes. 1156 * 1157 * @return the next transaction id that can be used. 1158 */ 1159 private long redoRecoveryUpdates() throws IOException { 1160 if (!enableRecoveryFile) { 1161 return 0; 1162 } 1163 recoveryPageCount = 0; 1164 1165 // Are we initializing the recovery file? 1166 if (recoveryFile.length() == 0) { 1167 // Write an empty header.. 1168 recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]); 1169 // Preallocate the minium size for better performance. 1170 recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount)); 1171 return 0; 1172 } 1173 1174 // How many recovery pages do we have in the recovery buffer? 1175 recoveryFile.seek(0); 1176 long nextTxId = recoveryFile.readLong(); 1177 long expectedChecksum = recoveryFile.readLong(); 1178 int pageCounter = recoveryFile.readInt(); 1179 1180 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1181 Checksum checksum = new Adler32(); 1182 LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>(); 1183 try { 1184 for (int i = 0; i < pageCounter; i++) { 1185 long offset = recoveryFile.readLong(); 1186 byte[] data = new byte[pageSize]; 1187 if (recoveryFile.read(data, 0, pageSize) != pageSize) { 1188 // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer 1189 return nextTxId; 1190 } 1191 checksum.update(data, 0, pageSize); 1192 batch.put(offset, data); 1193 } 1194 } catch (Exception e) { 1195 // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 1196 // as the pages should still be consistent. 1197 LOG.debug("Redo buffer was not fully intact: ", e); 1198 return nextTxId; 1199 } 1200 1201 recoveryPageCount = pageCounter; 1202 1203 // If the checksum is not valid then the recovery buffer was partially written to disk. 1204 if (checksum.getValue() != expectedChecksum) { 1205 return nextTxId; 1206 } 1207 1208 // Re-apply all the writes in the recovery buffer. 1209 for (Map.Entry<Long, byte[]> e : batch.entrySet()) { 1210 writeFile.seek(toOffset(e.getKey())); 1211 writeFile.write(e.getValue()); 1212 } 1213 1214 // And sync it to disk 1215 writeFile.sync(); 1216 return nextTxId; 1217 } 1218 1219 private void startWriter() { 1220 synchronized (writes) { 1221 if (enabledWriteThread) { 1222 stopWriter.set(false); 1223 writerThread = new Thread("KahaDB Page Writer") { 1224 @Override 1225 public void run() { 1226 pollWrites(); 1227 } 1228 }; 1229 writerThread.setPriority(Thread.MAX_PRIORITY); 1230 writerThread.setDaemon(true); 1231 writerThread.start(); 1232 } 1233 } 1234 } 1235 1236 private void stopWriter() throws InterruptedException { 1237 if (enabledWriteThread) { 1238 stopWriter.set(true); 1239 writerThread.join(); 1240 } 1241 } 1242 1243 public File getFile() { 1244 return getMainPageFile(); 1245 } 1246 1247 public File getDirectory() { 1248 return directory; 1249 } 1250}