001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.page; 018 019 import org.apache.kahadb.page.PageFile.PageWrite; 020 import org.apache.kahadb.util.*; 021 022 import java.io.*; 023 import java.util.Iterator; 024 import java.util.NoSuchElementException; 025 import java.util.TreeMap; 026 027 /** 028 * The class used to read/update a PageFile object. Using a transaction allows you to 029 * do multiple update operations in a single unit of work. 030 */ 031 public class Transaction implements Iterable<Page> { 032 033 private RandomAccessFile tmpFile; 034 private File txFile; 035 private long nextLocation = 0; 036 037 /** 038 * The PageOverflowIOException occurs when a page write is requested 039 * and it's data is larger than what would fit into a single page. 040 */ 041 public class PageOverflowIOException extends IOException { 042 private static final long serialVersionUID = 1L; 043 044 public PageOverflowIOException(String message) { 045 super(message); 046 } 047 } 048 049 /** 050 * The InvalidPageIOException is thrown if try to load/store a a page 051 * with an invalid page id. 052 */ 053 public class InvalidPageIOException extends IOException { 054 private static final long serialVersionUID = 1L; 055 056 private final long page; 057 058 public InvalidPageIOException(String message, long page) { 059 super(message); 060 this.page = page; 061 } 062 063 public long getPage() { 064 return page; 065 } 066 } 067 068 /** 069 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method. 070 * 071 * @param <T> The type of exceptions that operation will throw. 072 */ 073 public interface Closure <T extends Throwable> { 074 public void execute(Transaction tx) throws T; 075 } 076 077 /** 078 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method. 079 * 080 * @param <R> The type of result that the closure produces. 081 * @param <T> The type of exceptions that operation will throw. 082 */ 083 public interface CallableClosure<R, T extends Throwable> { 084 public R execute(Transaction tx) throws T; 085 } 086 087 088 // The page file that this Transaction operates against. 089 private final PageFile pageFile; 090 // If this transaction is updating stuff.. this is the tx of 091 private long writeTransactionId=-1; 092 // List of pages that this transaction has modified. 093 private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>(); 094 // List of pages allocated in this transaction 095 private final SequenceSet allocateList = new SequenceSet(); 096 // List of pages freed in this transaction 097 private final SequenceSet freeList = new SequenceSet(); 098 099 private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L); 100 101 private long size = 0; 102 103 Transaction(PageFile pageFile) { 104 this.pageFile = pageFile; 105 } 106 107 /** 108 * @return the page file that created this Transaction 109 */ 110 public PageFile getPageFile() { 111 return this.pageFile; 112 } 113 114 /** 115 * Allocates a free page that you can write data to. 116 * 117 * @return a newly allocated page. 118 * @throws IOException 119 * If an disk error occurred. 120 * @throws IllegalStateException 121 * if the PageFile is not loaded 122 */ 123 public <T> Page<T> allocate() throws IOException { 124 return allocate(1); 125 } 126 127 /** 128 * Allocates a block of free pages that you can write data to. 129 * 130 * @param count the number of sequential pages to allocate 131 * @return the first page of the sequential set. 132 * @throws IOException 133 * If an disk error occurred. 134 * @throws IllegalStateException 135 * if the PageFile is not loaded 136 */ 137 public <T> Page<T> allocate(int count) throws IOException { 138 Page<T> rc = pageFile.allocate(count); 139 allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1)); 140 return rc; 141 } 142 143 /** 144 * Frees up a previously allocated page so that it can be re-allocated again. 145 * 146 * @param pageId the page to free up 147 * @throws IOException 148 * If an disk error occurred. 149 * @throws IllegalStateException 150 * if the PageFile is not loaded 151 */ 152 public void free(long pageId) throws IOException { 153 free(load(pageId, null)); 154 } 155 156 /** 157 * Frees up a previously allocated sequence of pages so that it can be re-allocated again. 158 * 159 * @param pageId the initial page of the sequence that will be getting freed 160 * @param count the number of pages in the sequence 161 * 162 * @throws IOException 163 * If an disk error occurred. 164 * @throws IllegalStateException 165 * if the PageFile is not loaded 166 */ 167 public void free(long pageId, int count) throws IOException { 168 free(load(pageId, null), count); 169 } 170 171 /** 172 * Frees up a previously allocated sequence of pages so that it can be re-allocated again. 173 * 174 * @param page the initial page of the sequence that will be getting freed 175 * @param count the number of pages in the sequence 176 * 177 * @throws IOException 178 * If an disk error occurred. 179 * @throws IllegalStateException 180 * if the PageFile is not loaded 181 */ 182 public <T> void free(Page<T> page, int count) throws IOException { 183 pageFile.assertLoaded(); 184 long offsetPage = page.getPageId(); 185 while (count-- > 0) { 186 if (page == null) { 187 page = load(offsetPage, null); 188 } 189 free(page); 190 page = null; 191 // Increment the offsetPage value since using it depends on the current count. 192 offsetPage++; 193 } 194 } 195 196 /** 197 * Frees up a previously allocated page so that it can be re-allocated again. 198 * 199 * @param page the page to free up 200 * @throws IOException 201 * If an disk error occurred. 202 * @throws IllegalStateException 203 * if the PageFile is not loaded 204 */ 205 public <T> void free(Page<T> page) throws IOException { 206 pageFile.assertLoaded(); 207 208 // We may need loop to free up a page chain. 209 while (page != null) { 210 211 // Is it already free?? 212 if (page.getType() == Page.PAGE_FREE_TYPE) { 213 return; 214 } 215 216 Page<T> next = null; 217 if (page.getType() == Page.PAGE_PART_TYPE) { 218 next = load(page.getNext(), null); 219 } 220 221 page.makeFree(getWriteTransactionId()); 222 // ensure free page is visible while write is pending 223 pageFile.addToCache(page.copy()); 224 225 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize()); 226 page.write(out); 227 write(page, out.getData()); 228 229 freeList.add(page.getPageId()); 230 page = next; 231 } 232 } 233 234 /** 235 * 236 * @param page 237 * the page to write. The Page object must be fully populated with a valid pageId, type, and data. 238 * @param marshaller 239 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data. 240 * @param overflow 241 * If true, then if the page data marshalls to a bigger size than can fit in one page, then additional 242 * overflow pages are automatically allocated and chained to this page to store all the data. If false, 243 * and the overflow condition would occur, then the PageOverflowIOException is thrown. 244 * @throws IOException 245 * If an disk error occurred. 246 * @throws PageOverflowIOException 247 * If the page data marshalls to size larger than maximum page size and overflow was false. 248 * @throws IllegalStateException 249 * if the PageFile is not loaded 250 */ 251 public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException { 252 DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow); 253 if (marshaller != null) { 254 marshaller.writePayload(page.get(), out); 255 } 256 out.close(); 257 } 258 259 /** 260 * @throws IOException 261 */ 262 public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException { 263 pageFile.assertLoaded(); 264 265 // Copy to protect against the end user changing 266 // the page instance while we are doing a write. 267 final Page copy = page.copy(); 268 pageFile.addToCache(copy); 269 270 // 271 // To support writing VERY large data, we override the output stream so 272 // that we 273 // we do the page writes incrementally while the data is being 274 // marshalled. 275 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) { 276 Page current = copy; 277 278 @SuppressWarnings("unchecked") 279 @Override 280 protected void onWrite() throws IOException { 281 282 // Are we at an overflow condition? 283 final int pageSize = pageFile.getPageSize(); 284 if (pos >= pageSize) { 285 // If overflow is allowed 286 if (overflow) { 287 288 do { 289 Page next; 290 if (current.getType() == Page.PAGE_PART_TYPE) { 291 next = load(current.getNext(), null); 292 } else { 293 next = allocate(); 294 } 295 296 next.txId = current.txId; 297 298 // Write the page header 299 int oldPos = pos; 300 pos = 0; 301 302 current.makePagePart(next.getPageId(), getWriteTransactionId()); 303 current.write(this); 304 305 // Do the page write.. 306 byte[] data = new byte[pageSize]; 307 System.arraycopy(buf, 0, data, 0, pageSize); 308 Transaction.this.write(current, data); 309 310 // Reset for the next page chunk 311 pos = 0; 312 // The page header marshalled after the data is written. 313 skip(Page.PAGE_HEADER_SIZE); 314 // Move the overflow data after the header. 315 System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize); 316 pos += oldPos - pageSize; 317 current = next; 318 319 } while (pos > pageSize); 320 } else { 321 throw new PageOverflowIOException("Page overflow."); 322 } 323 } 324 325 } 326 327 @Override 328 public void close() throws IOException { 329 super.close(); 330 331 // We need to free up the rest of the page chain.. 332 if (current.getType() == Page.PAGE_PART_TYPE) { 333 free(current.getNext()); 334 } 335 336 current.makePageEnd(pos, getWriteTransactionId()); 337 338 // Write the header.. 339 pos = 0; 340 current.write(this); 341 342 Transaction.this.write(current, buf); 343 } 344 }; 345 346 // The page header marshaled after the data is written. 347 out.skip(Page.PAGE_HEADER_SIZE); 348 return out; 349 } 350 351 /** 352 * Loads a page from disk. 353 * 354 * @param pageId 355 * the id of the page to load 356 * @param marshaller 357 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data. 358 * @return The page with the given id 359 * @throws IOException 360 * If an disk error occurred. 361 * @throws IllegalStateException 362 * if the PageFile is not loaded 363 */ 364 public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException { 365 pageFile.assertLoaded(); 366 Page<T> page = new Page<T>(pageId); 367 load(page, marshaller); 368 return page; 369 } 370 371 /** 372 * Loads a page from disk. 373 * 374 * @param page - The pageId field must be properly set 375 * @param marshaller 376 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data. 377 * @throws IOException 378 * If an disk error occurred. 379 * @throws InvalidPageIOException 380 * If the page is is not valid. 381 * @throws IllegalStateException 382 * if the PageFile is not loaded 383 */ 384 @SuppressWarnings("unchecked") 385 public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException { 386 pageFile.assertLoaded(); 387 388 // Can't load invalid offsets... 389 long pageId = page.getPageId(); 390 if (pageId < 0) { 391 throw new InvalidPageIOException("Page id is not valid", pageId); 392 } 393 394 // It might be a page this transaction has modified... 395 PageWrite update = writes.get(pageId); 396 if (update != null) { 397 page.copy(update.getPage()); 398 return; 399 } 400 401 // We may be able to get it from the cache... 402 Page<T> t = pageFile.getFromCache(pageId); 403 if (t != null) { 404 page.copy(t); 405 return; 406 } 407 408 if (marshaller != null) { 409 // Full page read.. 410 InputStream is = openInputStream(page); 411 DataInputStream dataIn = new DataInputStream(is); 412 page.set(marshaller.readPayload(dataIn)); 413 is.close(); 414 } else { 415 // Page header read. 416 DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]); 417 pageFile.readPage(pageId, in.getRawData()); 418 page.read(in); 419 page.set(null); 420 } 421 422 // Cache it. 423 if (marshaller != null) { 424 pageFile.addToCache(page); 425 } 426 } 427 428 /** 429 * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, 430 * org.apache.kahadb.util.Marshaller) 431 */ 432 public InputStream openInputStream(final Page p) throws IOException { 433 434 return new InputStream() { 435 436 private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]); 437 private Page page = readPage(p); 438 private int pageCount = 1; 439 440 private Page markPage; 441 private ByteSequence markChunk; 442 443 private Page readPage(Page page) throws IOException { 444 // Read the page data 445 446 pageFile.readPage(page.getPageId(), chunk.getData()); 447 448 chunk.setOffset(0); 449 chunk.setLength(pageFile.getPageSize()); 450 451 DataByteArrayInputStream in = new DataByteArrayInputStream(chunk); 452 page.read(in); 453 454 chunk.setOffset(Page.PAGE_HEADER_SIZE); 455 if (page.getType() == Page.PAGE_END_TYPE) { 456 chunk.setLength((int)(page.getNext())); 457 } 458 459 if (page.getType() == Page.PAGE_FREE_TYPE) { 460 throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free"); 461 } 462 463 return page; 464 } 465 466 public int read() throws IOException { 467 if (!atEOF()) { 468 return chunk.data[chunk.offset++] & 0xff; 469 } else { 470 return -1; 471 } 472 } 473 474 private boolean atEOF() throws IOException { 475 if (chunk.offset < chunk.length) { 476 return false; 477 } 478 if (page.getType() == Page.PAGE_END_TYPE) { 479 return true; 480 } 481 fill(); 482 return chunk.offset >= chunk.length; 483 } 484 485 private void fill() throws IOException { 486 page = readPage(new Page(page.getNext())); 487 pageCount++; 488 } 489 490 public int read(byte[] b) throws IOException { 491 return read(b, 0, b.length); 492 } 493 494 public int read(byte b[], int off, int len) throws IOException { 495 if (!atEOF()) { 496 int rc = 0; 497 while (!atEOF() && rc < len) { 498 len = Math.min(len, chunk.length - chunk.offset); 499 if (len > 0) { 500 System.arraycopy(chunk.data, chunk.offset, b, off, len); 501 chunk.offset += len; 502 } 503 rc += len; 504 } 505 return rc; 506 } else { 507 return -1; 508 } 509 } 510 511 public long skip(long len) throws IOException { 512 if (atEOF()) { 513 int rc = 0; 514 while (!atEOF() && rc < len) { 515 len = Math.min(len, chunk.length - chunk.offset); 516 if (len > 0) { 517 chunk.offset += len; 518 } 519 rc += len; 520 } 521 return rc; 522 } else { 523 return -1; 524 } 525 } 526 527 public int available() { 528 return chunk.length - chunk.offset; 529 } 530 531 public boolean markSupported() { 532 return true; 533 } 534 535 public void mark(int markpos) { 536 markPage = page; 537 byte data[] = new byte[pageFile.getPageSize()]; 538 System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize()); 539 markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength()); 540 } 541 542 public void reset() { 543 page = markPage; 544 chunk = markChunk; 545 } 546 547 }; 548 } 549 550 /** 551 * Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are 552 * not included in this iteration. 553 * 554 * Pages removed with Iterator.remove() will not actually get removed until the transaction commits. 555 * 556 * @throws IllegalStateException 557 * if the PageFile is not loaded 558 */ 559 public Iterator<Page> iterator() { 560 return (Iterator<Page>)iterator(false); 561 } 562 563 /** 564 * Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages 565 * iterated. 566 * 567 * @param includeFreePages - if true, free pages are included in the iteration 568 * @throws IllegalStateException 569 * if the PageFile is not loaded 570 */ 571 public Iterator<Page> iterator(final boolean includeFreePages) { 572 573 pageFile.assertLoaded(); 574 575 return new Iterator<Page>() { 576 577 long nextId; 578 Page nextPage; 579 Page lastPage; 580 581 private void findNextPage() { 582 if (!pageFile.isLoaded()) { 583 throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded"); 584 } 585 586 if (nextPage != null) { 587 return; 588 } 589 590 try { 591 while (nextId < pageFile.getPageCount()) { 592 593 Page page = load(nextId, null); 594 595 if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) { 596 nextPage = page; 597 return; 598 } else { 599 nextId++; 600 } 601 } 602 } catch (IOException e) { 603 } 604 } 605 606 public boolean hasNext() { 607 findNextPage(); 608 return nextPage != null; 609 } 610 611 public Page next() { 612 findNextPage(); 613 if (nextPage != null) { 614 lastPage = nextPage; 615 nextPage = null; 616 nextId++; 617 return lastPage; 618 } else { 619 throw new NoSuchElementException(); 620 } 621 } 622 623 @SuppressWarnings("unchecked") 624 public void remove() { 625 if (lastPage == null) { 626 throw new IllegalStateException(); 627 } 628 try { 629 free(lastPage); 630 lastPage = null; 631 } catch (IOException e) { 632 throw new RuntimeException(e); 633 } 634 } 635 }; 636 } 637 638 /////////////////////////////////////////////////////////////////// 639 // Commit / Rollback related methods.. 640 /////////////////////////////////////////////////////////////////// 641 642 /** 643 * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated 644 * with the transaction are written to disk or none will. 645 */ 646 public void commit() throws IOException { 647 if( writeTransactionId!=-1 ) { 648 if (tmpFile != null) { 649 tmpFile.close(); 650 pageFile.removeTmpFile(getTempFile()); 651 tmpFile = null; 652 txFile = null; 653 } 654 // Actually do the page writes... 655 pageFile.write(writes.entrySet()); 656 // Release the pages that were freed up in the transaction.. 657 freePages(freeList); 658 659 freeList.clear(); 660 allocateList.clear(); 661 writes.clear(); 662 writeTransactionId = -1; 663 } 664 size = 0; 665 } 666 667 /** 668 * Rolls back the transaction. 669 */ 670 public void rollback() throws IOException { 671 if( writeTransactionId!=-1 ) { 672 if (tmpFile != null) { 673 tmpFile.close(); 674 pageFile.removeTmpFile(getTempFile()); 675 tmpFile = null; 676 txFile = null; 677 } 678 // Release the pages that were allocated in the transaction... 679 freePages(allocateList); 680 681 freeList.clear(); 682 allocateList.clear(); 683 writes.clear(); 684 writeTransactionId = -1; 685 } 686 size = 0; 687 } 688 689 private long getWriteTransactionId() { 690 if( writeTransactionId==-1 ) { 691 writeTransactionId = pageFile.getNextWriteTransactionId(); 692 } 693 return writeTransactionId; 694 } 695 696 697 protected File getTempFile() { 698 if (txFile == null) { 699 txFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName("tx-"+ Long.toString(getWriteTransactionId()) + "-" + Long.toString(System.currentTimeMillis()) + ".tmp")); 700 } 701 return txFile; 702 } 703 704 /** 705 * Queues up a page write that should get done when commit() gets called. 706 */ 707 private void write(final Page page, byte[] data) throws IOException { 708 Long key = page.getPageId(); 709 710 // how much pages we have for this transaction 711 size = writes.size() * pageFile.getPageSize(); 712 713 PageWrite write; 714 715 if (size > maxTransactionSize) { 716 if (tmpFile == null) { 717 tmpFile = new RandomAccessFile(getTempFile(), "rw"); 718 } 719 long location = nextLocation; 720 tmpFile.seek(nextLocation); 721 tmpFile.write(data); 722 nextLocation = location + data.length; 723 write = new PageWrite(page, location, data.length, getTempFile()); 724 } else { 725 write = new PageWrite(page, data); 726 } 727 writes.put(key, write); 728 } 729 730 /** 731 * @param list 732 * @throws RuntimeException 733 */ 734 private void freePages(SequenceSet list) throws RuntimeException { 735 Sequence seq = list.getHead(); 736 while( seq!=null ) { 737 seq.each(new Sequence.Closure<RuntimeException>(){ 738 public void execute(long value) { 739 pageFile.freePage(value); 740 } 741 }); 742 seq = seq.getNext(); 743 } 744 } 745 746 /** 747 * @return true if there are no uncommitted page file updates associated with this transaction. 748 */ 749 public boolean isReadOnly() { 750 return writeTransactionId==-1; 751 } 752 753 /////////////////////////////////////////////////////////////////// 754 // Transaction closure helpers... 755 /////////////////////////////////////////////////////////////////// 756 757 /** 758 * Executes a closure and if it does not throw any exceptions, then it commits the transaction. 759 * If the closure throws an Exception, then the transaction is rolled back. 760 * 761 * @param <T> 762 * @param closure - the work to get exectued. 763 * @throws T if the closure throws it 764 * @throws IOException If the commit fails. 765 */ 766 public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException { 767 boolean success = false; 768 try { 769 closure.execute(this); 770 success = true; 771 } finally { 772 if (success) { 773 commit(); 774 } else { 775 rollback(); 776 } 777 } 778 } 779 780 /** 781 * Executes a closure and if it does not throw any exceptions, then it commits the transaction. 782 * If the closure throws an Exception, then the transaction is rolled back. 783 * 784 * @param <T> 785 * @param closure - the work to get exectued. 786 * @throws T if the closure throws it 787 * @throws IOException If the commit fails. 788 */ 789 public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException { 790 boolean success = false; 791 try { 792 R rc = closure.execute(this); 793 success = true; 794 return rc; 795 } finally { 796 if (success) { 797 commit(); 798 } else { 799 rollback(); 800 } 801 } 802 } 803 }