001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.page;
018    
019    import org.apache.kahadb.page.PageFile.PageWrite;
020    import org.apache.kahadb.util.*;
021    
022    import java.io.*;
023    import java.util.Iterator;
024    import java.util.NoSuchElementException;
025    import java.util.TreeMap;
026    
027    /**
028     * The class used to read/update a PageFile object.  Using a transaction allows you to
029     * do multiple update operations in a single unit of work.
030     */
031    public class Transaction implements Iterable<Page> {
032    
033        private RandomAccessFile tmpFile;
034        private File txFile;
035        private long nextLocation = 0;
036    
037        /**
038         * The PageOverflowIOException occurs when a page write is requested
039         * and it's data is larger than what would fit into a single page.
040         */
041        public class PageOverflowIOException extends IOException {
042            private static final long serialVersionUID = 1L;
043    
044            public PageOverflowIOException(String message) {
045                super(message);
046            }
047        }
048    
049        /**
050         * The InvalidPageIOException is thrown if try to load/store a a page
051         * with an invalid page id.
052         */
053        public class InvalidPageIOException extends IOException {
054            private static final long serialVersionUID = 1L;
055    
056            private final long page;
057    
058            public InvalidPageIOException(String message, long page) {
059                super(message);
060                this.page = page;
061            }
062    
063            public long getPage() {
064                return page;
065            }
066        }
067    
068        /**
069         * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
070         *
071         * @param <T> The type of exceptions that operation will throw.
072         */
073        public interface Closure <T extends Throwable> {
074            public void execute(Transaction tx) throws T;
075        }
076    
077        /**
078         * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
079         *
080         * @param <R> The type of result that the closure produces.
081         * @param <T> The type of exceptions that operation will throw.
082         */
083        public interface CallableClosure<R, T extends Throwable> {
084            public R execute(Transaction tx) throws T;
085        }
086    
087    
088        // The page file that this Transaction operates against.
089        private final PageFile pageFile;
090        // If this transaction is updating stuff.. this is the tx of
091        private long writeTransactionId=-1;
092        // List of pages that this transaction has modified.
093        private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
094        // List of pages allocated in this transaction
095        private final SequenceSet allocateList = new SequenceSet();
096        // List of pages freed in this transaction
097        private final SequenceSet freeList = new SequenceSet();
098    
099        private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
100    
101        private long size = 0;
102    
103        Transaction(PageFile pageFile) {
104            this.pageFile = pageFile;
105        }
106    
107        /**
108         * @return the page file that created this Transaction
109         */
110        public PageFile getPageFile() {
111            return this.pageFile;
112        }
113    
114        /**
115         * Allocates a free page that you can write data to.
116         *
117         * @return a newly allocated page.
118         * @throws IOException
119         *         If an disk error occurred.
120         * @throws IllegalStateException
121         *         if the PageFile is not loaded
122         */
123        public <T> Page<T> allocate() throws IOException {
124            return allocate(1);
125        }
126    
127        /**
128         * Allocates a block of free pages that you can write data to.
129         *
130         * @param count the number of sequential pages to allocate
131         * @return the first page of the sequential set.
132         * @throws IOException
133         *         If an disk error occurred.
134         * @throws IllegalStateException
135         *         if the PageFile is not loaded
136         */
137        public <T> Page<T> allocate(int count) throws IOException {
138            Page<T> rc = pageFile.allocate(count);
139            allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
140            return rc;
141        }
142    
143        /**
144         * Frees up a previously allocated page so that it can be re-allocated again.
145         *
146         * @param pageId the page to free up
147         * @throws IOException
148         *         If an disk error occurred.
149         * @throws IllegalStateException
150         *         if the PageFile is not loaded
151         */
152        public void free(long pageId) throws IOException {
153            free(load(pageId, null));
154        }
155    
156        /**
157         * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
158         *
159         * @param pageId the initial page of the sequence that will be getting freed
160         * @param count the number of pages in the sequence
161         *
162         * @throws IOException
163         *         If an disk error occurred.
164         * @throws IllegalStateException
165         *         if the PageFile is not loaded
166         */
167        public void free(long pageId, int count) throws IOException {
168            free(load(pageId, null), count);
169        }
170    
171        /**
172         * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
173         *
174         * @param page the initial page of the sequence that will be getting freed
175         * @param count the number of pages in the sequence
176         *
177         * @throws IOException
178         *         If an disk error occurred.
179         * @throws IllegalStateException
180         *         if the PageFile is not loaded
181         */
182        public <T> void free(Page<T> page, int count) throws IOException {
183            pageFile.assertLoaded();
184            long offsetPage = page.getPageId();
185            while (count-- > 0) {
186                if (page == null) {
187                    page = load(offsetPage, null);
188                }
189                free(page);
190                page = null;
191                // Increment the offsetPage value since using it depends on the current count.
192                offsetPage++;
193            }
194        }
195    
196        /**
197         * Frees up a previously allocated page so that it can be re-allocated again.
198         *
199         * @param page the page to free up
200         * @throws IOException
201         *         If an disk error occurred.
202         * @throws IllegalStateException
203         *         if the PageFile is not loaded
204         */
205        public <T> void free(Page<T> page) throws IOException {
206            pageFile.assertLoaded();
207    
208            // We may need loop to free up a page chain.
209            while (page != null) {
210    
211                // Is it already free??
212                if (page.getType() == Page.PAGE_FREE_TYPE) {
213                    return;
214                }
215    
216                Page<T> next = null;
217                if (page.getType() == Page.PAGE_PART_TYPE) {
218                    next = load(page.getNext(), null);
219                }
220    
221                page.makeFree(getWriteTransactionId());
222                // ensure free page is visible while write is pending
223                pageFile.addToCache(page.copy());
224    
225                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
226                page.write(out);
227                write(page, out.getData());
228    
229                freeList.add(page.getPageId());
230                page = next;
231            }
232        }
233    
234        /**
235         *
236         * @param page
237         *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
238         * @param marshaller
239         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
240         * @param overflow
241         *        If true, then if the page data marshalls to a bigger size than can fit in one page, then additional
242         *        overflow pages are automatically allocated and chained to this page to store all the data.  If false,
243         *        and the overflow condition would occur, then the PageOverflowIOException is thrown.
244         * @throws IOException
245         *         If an disk error occurred.
246         * @throws PageOverflowIOException
247         *         If the page data marshalls to size larger than maximum page size and overflow was false.
248         * @throws IllegalStateException
249         *         if the PageFile is not loaded
250         */
251        public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
252            DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
253            if (marshaller != null) {
254                marshaller.writePayload(page.get(), out);
255            }
256            out.close();
257        }
258    
259        /**
260         * @throws IOException
261         */
262        public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
263            pageFile.assertLoaded();
264    
265            // Copy to protect against the end user changing
266            // the page instance while we are doing a write.
267            final Page copy = page.copy();
268            pageFile.addToCache(copy);
269    
270            //
271            // To support writing VERY large data, we override the output stream so
272            // that we
273            // we do the page writes incrementally while the data is being
274            // marshalled.
275            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
276                Page current = copy;
277    
278                @SuppressWarnings("unchecked")
279                @Override
280                protected void onWrite() throws IOException {
281    
282                    // Are we at an overflow condition?
283                    final int pageSize = pageFile.getPageSize();
284                    if (pos >= pageSize) {
285                        // If overflow is allowed
286                        if (overflow) {
287    
288                            do {
289                                Page next;
290                                if (current.getType() == Page.PAGE_PART_TYPE) {
291                                    next = load(current.getNext(), null);
292                                } else {
293                                    next = allocate();
294                                }
295    
296                                next.txId = current.txId;
297    
298                                // Write the page header
299                                int oldPos = pos;
300                                pos = 0;
301    
302                                current.makePagePart(next.getPageId(), getWriteTransactionId());
303                                current.write(this);
304    
305                                // Do the page write..
306                                byte[] data = new byte[pageSize];
307                                System.arraycopy(buf, 0, data, 0, pageSize);
308                                Transaction.this.write(current, data);
309    
310                                // Reset for the next page chunk
311                                pos = 0;
312                                // The page header marshalled after the data is written.
313                                skip(Page.PAGE_HEADER_SIZE);
314                                // Move the overflow data after the header.
315                                System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
316                                pos += oldPos - pageSize;
317                                current = next;
318    
319                            } while (pos > pageSize);
320                        } else {
321                            throw new PageOverflowIOException("Page overflow.");
322                        }
323                    }
324    
325                }
326    
327                @Override
328                public void close() throws IOException {
329                    super.close();
330    
331                    // We need to free up the rest of the page chain..
332                    if (current.getType() == Page.PAGE_PART_TYPE) {
333                        free(current.getNext());
334                    }
335    
336                    current.makePageEnd(pos, getWriteTransactionId());
337    
338                    // Write the header..
339                    pos = 0;
340                    current.write(this);
341    
342                    Transaction.this.write(current, buf);
343                }
344            };
345    
346            // The page header marshaled after the data is written.
347            out.skip(Page.PAGE_HEADER_SIZE);
348            return out;
349        }
350    
351        /**
352         * Loads a page from disk.
353         *
354         * @param pageId
355         *        the id of the page to load
356         * @param marshaller
357         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
358         * @return The page with the given id
359         * @throws IOException
360         *         If an disk error occurred.
361         * @throws IllegalStateException
362         *         if the PageFile is not loaded
363         */
364        public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
365            pageFile.assertLoaded();
366            Page<T> page = new Page<T>(pageId);
367            load(page, marshaller);
368            return page;
369        }
370    
371        /**
372         * Loads a page from disk.
373         *
374         * @param page - The pageId field must be properly set
375         * @param marshaller
376         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
377         * @throws IOException
378         *         If an disk error occurred.
379         * @throws InvalidPageIOException
380         *         If the page is is not valid.
381         * @throws IllegalStateException
382         *         if the PageFile is not loaded
383         */
384        @SuppressWarnings("unchecked")
385        public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
386            pageFile.assertLoaded();
387    
388            // Can't load invalid offsets...
389            long pageId = page.getPageId();
390            if (pageId < 0) {
391                throw new InvalidPageIOException("Page id is not valid", pageId);
392            }
393    
394            // It might be a page this transaction has modified...
395            PageWrite update = writes.get(pageId);
396            if (update != null) {
397                page.copy(update.getPage());
398                return;
399            }
400    
401            // We may be able to get it from the cache...
402            Page<T> t = pageFile.getFromCache(pageId);
403            if (t != null) {
404                page.copy(t);
405                return;
406            }
407    
408            if (marshaller != null) {
409                // Full page read..
410                InputStream is = openInputStream(page);
411                DataInputStream dataIn = new DataInputStream(is);
412                page.set(marshaller.readPayload(dataIn));
413                is.close();
414            } else {
415                // Page header read.
416                DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
417                pageFile.readPage(pageId, in.getRawData());
418                page.read(in);
419                page.set(null);
420            }
421    
422            // Cache it.
423            if (marshaller != null) {
424                pageFile.addToCache(page);
425            }
426        }
427    
428        /**
429         * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
430         *      org.apache.kahadb.util.Marshaller)
431         */
432        public InputStream openInputStream(final Page p) throws IOException {
433    
434            return new InputStream() {
435    
436                private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
437                private Page page = readPage(p);
438                private int pageCount = 1;
439    
440                private Page markPage;
441                private ByteSequence markChunk;
442    
443                private Page readPage(Page page) throws IOException {
444                    // Read the page data
445    
446                    pageFile.readPage(page.getPageId(), chunk.getData());
447    
448                    chunk.setOffset(0);
449                    chunk.setLength(pageFile.getPageSize());
450    
451                    DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
452                    page.read(in);
453    
454                    chunk.setOffset(Page.PAGE_HEADER_SIZE);
455                    if (page.getType() == Page.PAGE_END_TYPE) {
456                        chunk.setLength((int)(page.getNext()));
457                    }
458    
459                    if (page.getType() == Page.PAGE_FREE_TYPE) {
460                        throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
461                    }
462    
463                    return page;
464                }
465    
466                public int read() throws IOException {
467                    if (!atEOF()) {
468                        return chunk.data[chunk.offset++] & 0xff;
469                    } else {
470                        return -1;
471                    }
472                }
473    
474                private boolean atEOF() throws IOException {
475                    if (chunk.offset < chunk.length) {
476                        return false;
477                    }
478                    if (page.getType() == Page.PAGE_END_TYPE) {
479                        return true;
480                    }
481                    fill();
482                    return chunk.offset >= chunk.length;
483                }
484    
485                private void fill() throws IOException {
486                    page = readPage(new Page(page.getNext()));
487                    pageCount++;
488                }
489    
490                public int read(byte[] b) throws IOException {
491                    return read(b, 0, b.length);
492                }
493    
494                public int read(byte b[], int off, int len) throws IOException {
495                    if (!atEOF()) {
496                        int rc = 0;
497                        while (!atEOF() && rc < len) {
498                            len = Math.min(len, chunk.length - chunk.offset);
499                            if (len > 0) {
500                                System.arraycopy(chunk.data, chunk.offset, b, off, len);
501                                chunk.offset += len;
502                            }
503                            rc += len;
504                        }
505                        return rc;
506                    } else {
507                        return -1;
508                    }
509                }
510    
511                public long skip(long len) throws IOException {
512                    if (atEOF()) {
513                        int rc = 0;
514                        while (!atEOF() && rc < len) {
515                            len = Math.min(len, chunk.length - chunk.offset);
516                            if (len > 0) {
517                                chunk.offset += len;
518                            }
519                            rc += len;
520                        }
521                        return rc;
522                    } else {
523                        return -1;
524                    }
525                }
526    
527                public int available() {
528                    return chunk.length - chunk.offset;
529                }
530    
531                public boolean markSupported() {
532                    return true;
533                }
534    
535                public void mark(int markpos) {
536                    markPage = page;
537                    byte data[] = new byte[pageFile.getPageSize()];
538                    System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
539                    markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
540                }
541    
542                public void reset() {
543                    page = markPage;
544                    chunk = markChunk;
545                }
546    
547            };
548        }
549    
550        /**
551         * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are
552         * not included in this iteration.
553         *
554         * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
555         *
556         * @throws IllegalStateException
557         *         if the PageFile is not loaded
558         */
559        public Iterator<Page> iterator() {
560            return (Iterator<Page>)iterator(false);
561        }
562    
563        /**
564         * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
565         * iterated.
566         *
567         * @param includeFreePages - if true, free pages are included in the iteration
568         * @throws IllegalStateException
569         *         if the PageFile is not loaded
570         */
571        public Iterator<Page> iterator(final boolean includeFreePages) {
572    
573            pageFile.assertLoaded();
574    
575            return new Iterator<Page>() {
576    
577                long nextId;
578                Page nextPage;
579                Page lastPage;
580    
581                private void findNextPage() {
582                    if (!pageFile.isLoaded()) {
583                        throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
584                    }
585    
586                    if (nextPage != null) {
587                        return;
588                    }
589    
590                    try {
591                        while (nextId < pageFile.getPageCount()) {
592    
593                            Page page = load(nextId, null);
594    
595                            if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
596                                nextPage = page;
597                                return;
598                            } else {
599                                nextId++;
600                            }
601                        }
602                    } catch (IOException e) {
603                    }
604                }
605    
606                public boolean hasNext() {
607                    findNextPage();
608                    return nextPage != null;
609                }
610    
611                public Page next() {
612                    findNextPage();
613                    if (nextPage != null) {
614                        lastPage = nextPage;
615                        nextPage = null;
616                        nextId++;
617                        return lastPage;
618                    } else {
619                        throw new NoSuchElementException();
620                    }
621                }
622    
623                @SuppressWarnings("unchecked")
624                public void remove() {
625                    if (lastPage == null) {
626                        throw new IllegalStateException();
627                    }
628                    try {
629                        free(lastPage);
630                        lastPage = null;
631                    } catch (IOException e) {
632                        throw new RuntimeException(e);
633                    }
634                }
635            };
636        }
637    
638        ///////////////////////////////////////////////////////////////////
639        // Commit / Rollback related methods..
640        ///////////////////////////////////////////////////////////////////
641    
642        /**
643         * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
644         * with the transaction are written to disk or none will.
645         */
646        public void commit() throws IOException {
647            if( writeTransactionId!=-1 ) {
648                if (tmpFile != null) {
649                    tmpFile.close();
650                    pageFile.removeTmpFile(getTempFile());
651                    tmpFile = null;
652                    txFile = null;
653                }
654                // Actually do the page writes...
655                pageFile.write(writes.entrySet());
656                // Release the pages that were freed up in the transaction..
657                freePages(freeList);
658    
659                freeList.clear();
660                allocateList.clear();
661                writes.clear();
662                writeTransactionId = -1;
663            }
664            size = 0;
665        }
666    
667        /**
668         * Rolls back the transaction.
669         */
670        public void rollback() throws IOException {
671            if( writeTransactionId!=-1 ) {
672                if (tmpFile != null) {
673                    tmpFile.close();
674                    pageFile.removeTmpFile(getTempFile());
675                    tmpFile = null;
676                    txFile = null;
677                }
678                // Release the pages that were allocated in the transaction...
679                freePages(allocateList);
680    
681                freeList.clear();
682                allocateList.clear();
683                writes.clear();
684                writeTransactionId = -1;
685            }
686            size = 0;
687        }
688    
689        private long getWriteTransactionId() {
690            if( writeTransactionId==-1 ) {
691                writeTransactionId = pageFile.getNextWriteTransactionId();
692            }
693            return writeTransactionId;
694        }
695    
696    
697        protected File getTempFile() {
698            if (txFile == null) {
699                txFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName("tx-"+ Long.toString(getWriteTransactionId()) + "-" + Long.toString(System.currentTimeMillis()) + ".tmp"));
700            }
701           return txFile;
702        }
703    
704        /**
705         * Queues up a page write that should get done when commit() gets called.
706         */
707        private void write(final Page page, byte[] data) throws IOException {
708            Long key = page.getPageId();
709    
710            // how much pages we have for this transaction
711            size = writes.size() * pageFile.getPageSize();
712    
713            PageWrite write;
714    
715            if (size > maxTransactionSize) {
716                if (tmpFile == null) {
717                    tmpFile = new RandomAccessFile(getTempFile(), "rw");
718                }
719                long location = nextLocation;
720                tmpFile.seek(nextLocation);
721                tmpFile.write(data);
722                nextLocation = location + data.length;
723                write = new PageWrite(page, location, data.length, getTempFile());
724            } else {
725                write = new PageWrite(page, data);
726            }
727            writes.put(key, write);
728        }
729    
730        /**
731         * @param list
732         * @throws RuntimeException
733         */
734        private void freePages(SequenceSet list) throws RuntimeException {
735            Sequence seq = list.getHead();
736            while( seq!=null ) {
737                seq.each(new Sequence.Closure<RuntimeException>(){
738                    public void execute(long value) {
739                        pageFile.freePage(value);
740                    }
741                });
742                seq = seq.getNext();
743            }
744        }
745    
746        /**
747         * @return true if there are no uncommitted page file updates associated with this transaction.
748         */
749        public boolean isReadOnly() {
750            return writeTransactionId==-1;
751        }
752    
753        ///////////////////////////////////////////////////////////////////
754        // Transaction closure helpers...
755        ///////////////////////////////////////////////////////////////////
756    
757        /**
758         * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
759         * If the closure throws an Exception, then the transaction is rolled back.
760         *
761         * @param <T>
762         * @param closure - the work to get exectued.
763         * @throws T if the closure throws it
764         * @throws IOException If the commit fails.
765         */
766        public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
767            boolean success = false;
768            try {
769                closure.execute(this);
770                success = true;
771            } finally {
772                if (success) {
773                    commit();
774                } else {
775                    rollback();
776                }
777            }
778        }
779    
780        /**
781         * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
782         * If the closure throws an Exception, then the transaction is rolled back.
783         *
784         * @param <T>
785         * @param closure - the work to get exectued.
786         * @throws T if the closure throws it
787         * @throws IOException If the commit fails.
788         */
789        public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
790            boolean success = false;
791            try {
792                R rc = closure.execute(this);
793                success = true;
794                return rc;
795            } finally {
796                if (success) {
797                    commit();
798                } else {
799                    rollback();
800                }
801            }
802        }
803    }