001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.page;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.io.RandomAccessFile;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.LinkedHashMap;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Properties;
038import java.util.TreeMap;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.zip.Adler32;
043import java.util.zip.Checksum;
044
045import org.apache.activemq.store.kahadb.disk.util.Sequence;
046import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
047import org.apache.activemq.util.DataByteArrayOutputStream;
048import org.apache.activemq.util.IOExceptionSupport;
049import org.apache.activemq.util.IOHelper;
050import org.apache.activemq.util.IntrospectionSupport;
051import org.apache.activemq.util.LFUCache;
052import org.apache.activemq.util.LRUCache;
053import org.apache.activemq.util.RecoverableRandomAccessFile;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
059 * be externally synchronized.
060 * <p/>
061 * The file has 3 parts:
062 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
063 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
064 * Page Space: The pages in the page file.
065 */
066public class PageFile {
067
068    private static final String PAGEFILE_SUFFIX = ".data";
069    private static final String RECOVERY_FILE_SUFFIX = ".redo";
070    private static final String FREE_FILE_SUFFIX = ".free";
071
072    // 4k Default page size.
073    public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
074    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
075    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
076
077    private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
078    private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
079
080    // Recovery header is (long offset)
081    private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
082
083    // A PageFile will use a couple of files in this directory
084    private final File directory;
085    // And the file names in that directory will be based on this name.
086    private final String name;
087
088    // File handle used for reading pages..
089    private RecoverableRandomAccessFile readFile;
090    // File handle used for writing pages..
091    private RecoverableRandomAccessFile writeFile;
092    // File handle used for writing pages..
093    private RecoverableRandomAccessFile recoveryFile;
094
095    // The size of pages
096    private int pageSize = DEFAULT_PAGE_SIZE;
097
098    // The minimum number of space allocated to the recovery file in number of pages.
099    private int recoveryFileMinPageCount = 1000;
100    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
101    // to this max size as soon as  possible.
102    private int recoveryFileMaxPageCount = 10000;
103    // The number of pages in the current recovery buffer
104    private int recoveryPageCount;
105
106    private final AtomicBoolean loaded = new AtomicBoolean();
107    // The number of pages we are aiming to write every time we
108    // write to disk.
109    int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
110
111    // We keep a cache of pages recently used?
112    private Map<Long, Page> pageCache;
113    // The cache of recently used pages.
114    private boolean enablePageCaching = true;
115    // How many pages will we keep in the cache?
116    private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
117
118    // Should first log the page write to the recovery buffer? Avoids partial
119    // page write failures..
120    private boolean enableRecoveryFile = true;
121    // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
122    private boolean enableDiskSyncs = true;
123    // Will writes be done in an async thread?
124    private boolean enabledWriteThread = false;
125
126    // These are used if enableAsyncWrites==true
127    private final AtomicBoolean stopWriter = new AtomicBoolean();
128    private Thread writerThread;
129    private CountDownLatch checkpointLatch;
130
131    // Keeps track of writes that are being written to disk.
132    private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
133
134    // Keeps track of free pages.
135    private final AtomicLong nextFreePageId = new AtomicLong();
136    private SequenceSet freeList = new SequenceSet();
137
138    private final AtomicLong nextTxid = new AtomicLong();
139
140    // Persistent settings stored in the page file.
141    private MetaData metaData;
142
143    private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
144
145    private boolean useLFRUEviction = false;
146    private float LFUEvictionFactor = 0.2f;
147
148    /**
149     * Use to keep track of updated pages which have not yet been committed.
150     */
151    static class PageWrite {
152        Page page;
153        byte[] current;
154        byte[] diskBound;
155        long currentLocation = -1;
156        long diskBoundLocation = -1;
157        File tmpFile;
158        int length;
159
160        public PageWrite(Page page, byte[] data) {
161            this.page = page;
162            current = data;
163        }
164
165        public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
166            this.page = page;
167            this.currentLocation = currentLocation;
168            this.tmpFile = tmpFile;
169            this.length = length;
170        }
171
172        public void setCurrent(Page page, byte[] data) {
173            this.page = page;
174            current = data;
175            currentLocation = -1;
176            diskBoundLocation = -1;
177        }
178
179        public void setCurrentLocation(Page page, long location, int length) {
180            this.page = page;
181            this.currentLocation = location;
182            this.length = length;
183            this.current = null;
184        }
185
186        @Override
187        public String toString() {
188            return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
189        }
190
191        @SuppressWarnings("unchecked")
192        public Page getPage() {
193            return page;
194        }
195
196        public byte[] getDiskBound() throws IOException {
197            if (diskBound == null && diskBoundLocation != -1) {
198                diskBound = new byte[length];
199                try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
200                    file.seek(diskBoundLocation);
201                    file.read(diskBound);
202                }
203                diskBoundLocation = -1;
204            }
205            return diskBound;
206        }
207
208        void begin() {
209            if (currentLocation != -1) {
210                diskBoundLocation = currentLocation;
211            } else {
212                diskBound = current;
213            }
214            current = null;
215            currentLocation = -1;
216        }
217
218        /**
219         * @return true if there is no pending writes to do.
220         */
221        boolean done() {
222            diskBoundLocation = -1;
223            diskBound = null;
224            return current == null || currentLocation == -1;
225        }
226
227        boolean isDone() {
228            return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
229        }
230    }
231
232    /**
233     * The MetaData object hold the persistent data associated with a PageFile object.
234     */
235    public static class MetaData {
236
237        String fileType;
238        String fileTypeVersion;
239
240        long metaDataTxId = -1;
241        int pageSize;
242        boolean cleanShutdown;
243        long lastTxId;
244        long freePages;
245
246        public String getFileType() {
247            return fileType;
248        }
249
250        public void setFileType(String fileType) {
251            this.fileType = fileType;
252        }
253
254        public String getFileTypeVersion() {
255            return fileTypeVersion;
256        }
257
258        public void setFileTypeVersion(String version) {
259            this.fileTypeVersion = version;
260        }
261
262        public long getMetaDataTxId() {
263            return metaDataTxId;
264        }
265
266        public void setMetaDataTxId(long metaDataTxId) {
267            this.metaDataTxId = metaDataTxId;
268        }
269
270        public int getPageSize() {
271            return pageSize;
272        }
273
274        public void setPageSize(int pageSize) {
275            this.pageSize = pageSize;
276        }
277
278        public boolean isCleanShutdown() {
279            return cleanShutdown;
280        }
281
282        public void setCleanShutdown(boolean cleanShutdown) {
283            this.cleanShutdown = cleanShutdown;
284        }
285
286        public long getLastTxId() {
287            return lastTxId;
288        }
289
290        public void setLastTxId(long lastTxId) {
291            this.lastTxId = lastTxId;
292        }
293
294        public long getFreePages() {
295            return freePages;
296        }
297
298        public void setFreePages(long value) {
299            this.freePages = value;
300        }
301    }
302
303    public Transaction tx() {
304        assertLoaded();
305        return new Transaction(this);
306    }
307
308    /**
309     * Creates a PageFile in the specified directory who's data files are named by name.
310     */
311    public PageFile(File directory, String name) {
312        this.directory = directory;
313        this.name = name;
314    }
315
316    /**
317     * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
318     *
319     * @throws IOException           if the files cannot be deleted.
320     * @throws IllegalStateException if this PageFile is loaded
321     */
322    public void delete() throws IOException {
323        if (loaded.get()) {
324            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
325        }
326        delete(getMainPageFile());
327        delete(getFreeFile());
328        delete(getRecoveryFile());
329    }
330
331    public void archive() throws IOException {
332        if (loaded.get()) {
333            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
334        }
335        long timestamp = System.currentTimeMillis();
336        archive(getMainPageFile(), String.valueOf(timestamp));
337        archive(getFreeFile(), String.valueOf(timestamp));
338        archive(getRecoveryFile(), String.valueOf(timestamp));
339    }
340
341    /**
342     * @param file
343     * @throws IOException
344     */
345    private void delete(File file) throws IOException {
346        if (file.exists() && !file.delete()) {
347            throw new IOException("Could not delete: " + file.getPath());
348        }
349    }
350
351    private void archive(File file, String suffix) throws IOException {
352        if (file.exists()) {
353            File archive = new File(file.getPath() + "-" + suffix);
354            if (!file.renameTo(archive)) {
355                throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
356            }
357        }
358    }
359
360    /**
361     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
362     * first time the page file is loaded, then this creates the page file in the file system.
363     *
364     * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
365     *                               there was a disk error.
366     * @throws IllegalStateException If the page file was already loaded.
367     */
368    public void load() throws IOException, IllegalStateException {
369        if (loaded.compareAndSet(false, true)) {
370
371            if (enablePageCaching) {
372                if (isUseLFRUEviction()) {
373                    pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
374                } else {
375                    pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
376                }
377            }
378
379            File file = getMainPageFile();
380            IOHelper.mkdirs(file.getParentFile());
381            writeFile = new RecoverableRandomAccessFile(file, "rw", false);
382            readFile = new RecoverableRandomAccessFile(file, "r");
383
384            if (readFile.length() > 0) {
385                // Load the page size setting cause that can't change once the file is created.
386                loadMetaData();
387                pageSize = metaData.getPageSize();
388            } else {
389                // Store the page size setting cause that can't change once the file is created.
390                metaData = new MetaData();
391                metaData.setFileType(PageFile.class.getName());
392                metaData.setFileTypeVersion("1");
393                metaData.setPageSize(getPageSize());
394                metaData.setCleanShutdown(true);
395                metaData.setFreePages(-1);
396                metaData.setLastTxId(0);
397                storeMetaData();
398            }
399
400            if (enableRecoveryFile) {
401                recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw");
402            }
403
404            boolean needsFreePageRecovery = false;
405
406            if (metaData.isCleanShutdown()) {
407                nextTxid.set(metaData.getLastTxId() + 1);
408                if (metaData.getFreePages() > 0) {
409                    loadFreeList();
410                }
411            } else {
412                LOG.debug(toString() + ", Recovering page file...");
413                nextTxid.set(redoRecoveryUpdates());
414                needsFreePageRecovery = true;
415            }
416
417            if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
418                writeFile.setLength(PAGE_FILE_HEADER_SIZE);
419            }
420            nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
421
422            if (needsFreePageRecovery) {
423                // Scan all to find the free pages after nextFreePageId is set
424                freeList = new SequenceSet();
425                for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
426                    Page page = i.next();
427                    if (page.getType() == Page.PAGE_FREE_TYPE) {
428                        freeList.add(page.getPageId());
429                    }
430                }
431            }
432
433            metaData.setCleanShutdown(false);
434            storeMetaData();
435            getFreeFile().delete();
436            startWriter();
437        } else {
438            throw new IllegalStateException("Cannot load the page file when it is already loaded.");
439        }
440    }
441
442
443    /**
444     * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
445     * once unloaded, you can no longer use the page file to read or write Pages.
446     *
447     * @throws IOException           if there was a disk error occurred while closing the down the page file.
448     * @throws IllegalStateException if the PageFile is not loaded
449     */
450    public void unload() throws IOException {
451        if (loaded.compareAndSet(true, false)) {
452            flush();
453            try {
454                stopWriter();
455            } catch (InterruptedException e) {
456                throw new InterruptedIOException();
457            }
458
459            if (freeList.isEmpty()) {
460                metaData.setFreePages(0);
461            } else {
462                storeFreeList();
463                metaData.setFreePages(freeList.size());
464            }
465
466            metaData.setLastTxId(nextTxid.get() - 1);
467            metaData.setCleanShutdown(true);
468            storeMetaData();
469
470            if (readFile != null) {
471                readFile.close();
472                readFile = null;
473                writeFile.close();
474                writeFile = null;
475                if (enableRecoveryFile) {
476                    recoveryFile.close();
477                    recoveryFile = null;
478                }
479                freeList.clear();
480                if (pageCache != null) {
481                    pageCache = null;
482                }
483                synchronized (writes) {
484                    writes.clear();
485                }
486            }
487        } else {
488            throw new IllegalStateException("Cannot unload the page file when it is not loaded");
489        }
490    }
491
492    public boolean isLoaded() {
493        return loaded.get();
494    }
495
496    public void allowIOResumption() {
497        loaded.set(true);
498    }
499
500    /**
501     * Flush and sync all write buffers to disk.
502     *
503     * @throws IOException If an disk error occurred.
504     */
505    public void flush() throws IOException {
506
507        if (enabledWriteThread && stopWriter.get()) {
508            throw new IOException("Page file already stopped: checkpointing is not allowed");
509        }
510
511        // Setup a latch that gets notified when all buffered writes hits the disk.
512        CountDownLatch checkpointLatch;
513        synchronized (writes) {
514            if (writes.isEmpty()) {
515                return;
516            }
517            if (enabledWriteThread) {
518                if (this.checkpointLatch == null) {
519                    this.checkpointLatch = new CountDownLatch(1);
520                }
521                checkpointLatch = this.checkpointLatch;
522                writes.notify();
523            } else {
524                writeBatch();
525                return;
526            }
527        }
528        try {
529            checkpointLatch.await();
530        } catch (InterruptedException e) {
531            InterruptedIOException ioe = new InterruptedIOException();
532            ioe.initCause(e);
533            throw ioe;
534        }
535    }
536
537
538    @Override
539    public String toString() {
540        return "Page File: " + getMainPageFile();
541    }
542
543    ///////////////////////////////////////////////////////////////////
544    // Private Implementation Methods
545    ///////////////////////////////////////////////////////////////////
546    private File getMainPageFile() {
547        return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
548    }
549
550    public File getFreeFile() {
551        return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
552    }
553
554    public File getRecoveryFile() {
555        return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
556    }
557
558    public long toOffset(long pageId) {
559        return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
560    }
561
562    private void loadMetaData() throws IOException {
563
564        ByteArrayInputStream is;
565        MetaData v1 = new MetaData();
566        MetaData v2 = new MetaData();
567        try {
568            Properties p = new Properties();
569            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
570            readFile.seek(0);
571            readFile.readFully(d);
572            is = new ByteArrayInputStream(d);
573            p.load(is);
574            IntrospectionSupport.setProperties(v1, p);
575        } catch (IOException e) {
576            v1 = null;
577        }
578
579        try {
580            Properties p = new Properties();
581            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
582            readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
583            readFile.readFully(d);
584            is = new ByteArrayInputStream(d);
585            p.load(is);
586            IntrospectionSupport.setProperties(v2, p);
587        } catch (IOException e) {
588            v2 = null;
589        }
590
591        if (v1 == null && v2 == null) {
592            throw new IOException("Could not load page file meta data");
593        }
594
595        if (v1 == null || v1.metaDataTxId < 0) {
596            metaData = v2;
597        } else if (v2 == null || v1.metaDataTxId < 0) {
598            metaData = v1;
599        } else if (v1.metaDataTxId == v2.metaDataTxId) {
600            metaData = v1; // use the first since the 2nd could be a partial..
601        } else {
602            metaData = v2; // use the second cause the first is probably a partial.
603        }
604    }
605
606    private void storeMetaData() throws IOException {
607        // Convert the metadata into a property format
608        metaData.metaDataTxId++;
609        Properties p = new Properties();
610        IntrospectionSupport.getProperties(metaData, p, null);
611
612        ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
613        p.store(os, "");
614        if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
615            throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
616        }
617        // Fill the rest with space...
618        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
619        Arrays.fill(filler, (byte) ' ');
620        os.write(filler);
621        os.flush();
622
623        byte[] d = os.toByteArray();
624
625        // So we don't loose it.. write it 2 times...
626        writeFile.seek(0);
627        writeFile.write(d);
628        writeFile.sync();
629        writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
630        writeFile.write(d);
631        writeFile.sync();
632    }
633
634    private void storeFreeList() throws IOException {
635        FileOutputStream os = new FileOutputStream(getFreeFile());
636        DataOutputStream dos = new DataOutputStream(os);
637        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
638        dos.close();
639    }
640
641    private void loadFreeList() throws IOException {
642        freeList.clear();
643        FileInputStream is = new FileInputStream(getFreeFile());
644        DataInputStream dis = new DataInputStream(is);
645        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
646        dis.close();
647    }
648
649    ///////////////////////////////////////////////////////////////////
650    // Property Accessors
651    ///////////////////////////////////////////////////////////////////
652
653    /**
654     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
655     *
656     * @return is the recovery buffer enabled.
657     */
658    public boolean isEnableRecoveryFile() {
659        return enableRecoveryFile;
660    }
661
662    /**
663     * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
664     * may potentially cause partial page writes which can lead to page file corruption.
665     */
666    public void setEnableRecoveryFile(boolean doubleBuffer) {
667        assertNotLoaded();
668        this.enableRecoveryFile = doubleBuffer;
669    }
670
671    /**
672     * @return Are page writes synced to disk?
673     */
674    public boolean isEnableDiskSyncs() {
675        return enableDiskSyncs;
676    }
677
678    /**
679     * Allows you enable syncing writes to disk.
680     */
681    public void setEnableDiskSyncs(boolean syncWrites) {
682        assertNotLoaded();
683        this.enableDiskSyncs = syncWrites;
684    }
685
686    /**
687     * @return the page size
688     */
689    public int getPageSize() {
690        return this.pageSize;
691    }
692
693    /**
694     * @return the amount of content data that a page can hold.
695     */
696    public int getPageContentSize() {
697        return this.pageSize - Page.PAGE_HEADER_SIZE;
698    }
699
700    /**
701     * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
702     * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
703     * can no longer be changed.
704     *
705     * @param pageSize the pageSize to set
706     * @throws IllegalStateException once the page file is loaded.
707     */
708    public void setPageSize(int pageSize) throws IllegalStateException {
709        assertNotLoaded();
710        this.pageSize = pageSize;
711    }
712
713    /**
714     * @return true if read page caching is enabled
715     */
716    public boolean isEnablePageCaching() {
717        return this.enablePageCaching;
718    }
719
720    /**
721     * @param enablePageCaching allows you to enable read page caching
722     */
723    public void setEnablePageCaching(boolean enablePageCaching) {
724        assertNotLoaded();
725        this.enablePageCaching = enablePageCaching;
726    }
727
728    /**
729     * @return the maximum number of pages that will get stored in the read page cache.
730     */
731    public int getPageCacheSize() {
732        return this.pageCacheSize;
733    }
734
735    /**
736     * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
737     */
738    public void setPageCacheSize(int pageCacheSize) {
739        assertNotLoaded();
740        this.pageCacheSize = pageCacheSize;
741    }
742
743    public boolean isEnabledWriteThread() {
744        return enabledWriteThread;
745    }
746
747    public void setEnableWriteThread(boolean enableAsyncWrites) {
748        assertNotLoaded();
749        this.enabledWriteThread = enableAsyncWrites;
750    }
751
752    public long getDiskSize() throws IOException {
753        return toOffset(nextFreePageId.get());
754    }
755
756    /**
757     * @return the number of pages allocated in the PageFile
758     */
759    public long getPageCount() {
760        return nextFreePageId.get();
761    }
762
763    public int getRecoveryFileMinPageCount() {
764        return recoveryFileMinPageCount;
765    }
766
767    public long getFreePageCount() {
768        assertLoaded();
769        return freeList.rangeSize();
770    }
771
772    public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
773        assertNotLoaded();
774        this.recoveryFileMinPageCount = recoveryFileMinPageCount;
775    }
776
777    public int getRecoveryFileMaxPageCount() {
778        return recoveryFileMaxPageCount;
779    }
780
781    public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
782        assertNotLoaded();
783        this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
784    }
785
786    public int getWriteBatchSize() {
787        return writeBatchSize;
788    }
789
790    public void setWriteBatchSize(int writeBatchSize) {
791        this.writeBatchSize = writeBatchSize;
792    }
793
794    public float getLFUEvictionFactor() {
795        return LFUEvictionFactor;
796    }
797
798    public void setLFUEvictionFactor(float LFUEvictionFactor) {
799        this.LFUEvictionFactor = LFUEvictionFactor;
800    }
801
802    public boolean isUseLFRUEviction() {
803        return useLFRUEviction;
804    }
805
806    public void setUseLFRUEviction(boolean useLFRUEviction) {
807        this.useLFRUEviction = useLFRUEviction;
808    }
809
810    ///////////////////////////////////////////////////////////////////
811    // Package Protected Methods exposed to Transaction
812    ///////////////////////////////////////////////////////////////////
813
814    /**
815     * @throws IllegalStateException if the page file is not loaded.
816     */
817    void assertLoaded() throws IllegalStateException {
818        if (!loaded.get()) {
819            throw new IllegalStateException("PageFile is not loaded");
820        }
821    }
822
823    void assertNotLoaded() throws IllegalStateException {
824        if (loaded.get()) {
825            throw new IllegalStateException("PageFile is loaded");
826        }
827    }
828
829    /**
830     * Allocates a block of free pages that you can write data to.
831     *
832     * @param count the number of sequential pages to allocate
833     * @return the first page of the sequential set.
834     * @throws IOException           If an disk error occurred.
835     * @throws IllegalStateException if the PageFile is not loaded
836     */
837    <T> Page<T> allocate(int count) throws IOException {
838        assertLoaded();
839        if (count <= 0) {
840            throw new IllegalArgumentException("The allocation count must be larger than zero");
841        }
842
843        Sequence seq = freeList.removeFirstSequence(count);
844
845        // We may need to create new free pages...
846        if (seq == null) {
847
848            Page<T> first = null;
849            int c = count;
850
851            // Perform the id's only once....
852            long pageId = nextFreePageId.getAndAdd(count);
853            long writeTxnId = nextTxid.getAndAdd(count);
854
855            while (c-- > 0) {
856                Page<T> page = new Page<T>(pageId++);
857                page.makeFree(writeTxnId++);
858
859                if (first == null) {
860                    first = page;
861                }
862
863                addToCache(page);
864                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
865                page.write(out);
866                write(page, out.getData());
867
868                // LOG.debug("allocate writing: "+page.getPageId());
869            }
870
871            return first;
872        }
873
874        Page<T> page = new Page<T>(seq.getFirst());
875        page.makeFree(0);
876        // LOG.debug("allocated: "+page.getPageId());
877        return page;
878    }
879
880    long getNextWriteTransactionId() {
881        return nextTxid.incrementAndGet();
882    }
883
884    synchronized void readPage(long pageId, byte[] data) throws IOException {
885        readFile.seek(toOffset(pageId));
886        readFile.readFully(data);
887    }
888
889    public void freePage(long pageId) {
890        freeList.add(pageId);
891        removeFromCache(pageId);
892    }
893
894    @SuppressWarnings("unchecked")
895    private <T> void write(Page<T> page, byte[] data) throws IOException {
896        final PageWrite write = new PageWrite(page, data);
897        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
898            @Override
899            public Long getKey() {
900                return write.getPage().getPageId();
901            }
902
903            @Override
904            public PageWrite getValue() {
905                return write;
906            }
907
908            @Override
909            public PageWrite setValue(PageWrite value) {
910                return null;
911            }
912        };
913        Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
914        write(Arrays.asList(entries));
915    }
916
917    void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
918        synchronized (writes) {
919            if (enabledWriteThread) {
920                while (writes.size() >= writeBatchSize && !stopWriter.get()) {
921                    try {
922                        writes.wait();
923                    } catch (InterruptedException e) {
924                        Thread.currentThread().interrupt();
925                        throw new InterruptedIOException();
926                    }
927                }
928            }
929
930            boolean longTx = false;
931
932            for (Map.Entry<Long, PageWrite> entry : updates) {
933                Long key = entry.getKey();
934                PageWrite value = entry.getValue();
935                PageWrite write = writes.get(key);
936                if (write == null) {
937                    writes.put(key, value);
938                } else {
939                    if (value.currentLocation != -1) {
940                        write.setCurrentLocation(value.page, value.currentLocation, value.length);
941                        write.tmpFile = value.tmpFile;
942                        longTx = true;
943                    } else {
944                        write.setCurrent(value.page, value.current);
945                    }
946                }
947            }
948
949            // Once we start approaching capacity, notify the writer to start writing
950            // sync immediately for long txs
951            if (longTx || canStartWriteBatch()) {
952
953                if (enabledWriteThread) {
954                    writes.notify();
955                } else {
956                    writeBatch();
957                }
958            }
959        }
960    }
961
962    private boolean canStartWriteBatch() {
963        int capacityUsed = ((writes.size() * 100) / writeBatchSize);
964        if (enabledWriteThread) {
965            // The constant 10 here controls how soon write batches start going to disk..
966            // would be nice to figure out how to auto tune that value.  Make to small and
967            // we reduce through put because we are locking the write mutex too often doing writes
968            return capacityUsed >= 10 || checkpointLatch != null;
969        } else {
970            return capacityUsed >= 80 || checkpointLatch != null;
971        }
972    }
973
974    ///////////////////////////////////////////////////////////////////
975    // Cache Related operations
976    ///////////////////////////////////////////////////////////////////
977    @SuppressWarnings("unchecked")
978    <T> Page<T> getFromCache(long pageId) {
979        synchronized (writes) {
980            PageWrite pageWrite = writes.get(pageId);
981            if (pageWrite != null) {
982                return pageWrite.page;
983            }
984        }
985
986        Page<T> result = null;
987        if (enablePageCaching) {
988            result = pageCache.get(pageId);
989        }
990        return result;
991    }
992
993    void addToCache(Page page) {
994        if (enablePageCaching) {
995            pageCache.put(page.getPageId(), page);
996        }
997    }
998
999    void removeFromCache(long pageId) {
1000        if (enablePageCaching) {
1001            pageCache.remove(pageId);
1002        }
1003    }
1004
1005    ///////////////////////////////////////////////////////////////////
1006    // Internal Double write implementation follows...
1007    ///////////////////////////////////////////////////////////////////
1008
1009    private void pollWrites() {
1010        try {
1011            while (!stopWriter.get()) {
1012                // Wait for a notification...
1013                synchronized (writes) {
1014                    writes.notifyAll();
1015
1016                    // If there is not enough to write, wait for a notification...
1017                    while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
1018                        writes.wait(100);
1019                    }
1020
1021                    if (writes.isEmpty()) {
1022                        releaseCheckpointWaiter();
1023                    }
1024                }
1025                writeBatch();
1026            }
1027        } catch (Throwable e) {
1028            LOG.info("An exception was raised while performing poll writes", e);
1029        } finally {
1030            releaseCheckpointWaiter();
1031        }
1032    }
1033
1034    private void writeBatch() throws IOException {
1035
1036        CountDownLatch checkpointLatch;
1037        ArrayList<PageWrite> batch;
1038        synchronized (writes) {
1039            // If there is not enough to write, wait for a notification...
1040
1041            batch = new ArrayList<PageWrite>(writes.size());
1042            // build a write batch from the current write cache.
1043            for (PageWrite write : writes.values()) {
1044                batch.add(write);
1045                // Move the current write to the diskBound write, this lets folks update the
1046                // page again without blocking for this write.
1047                write.begin();
1048                if (write.diskBound == null && write.diskBoundLocation == -1) {
1049                    batch.remove(write);
1050                }
1051            }
1052
1053            // Grab on to the existing checkpoint latch cause once we do this write we can
1054            // release the folks that were waiting for those writes to hit disk.
1055            checkpointLatch = this.checkpointLatch;
1056            this.checkpointLatch = null;
1057        }
1058
1059        try {
1060
1061            // First land the writes in the recovery file
1062            if (enableRecoveryFile) {
1063                Checksum checksum = new Adler32();
1064
1065                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1066
1067                for (PageWrite w : batch) {
1068                    try {
1069                        checksum.update(w.getDiskBound(), 0, pageSize);
1070                    } catch (Throwable t) {
1071                        throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1072                    }
1073                    recoveryFile.writeLong(w.page.getPageId());
1074                    recoveryFile.write(w.getDiskBound(), 0, pageSize);
1075                }
1076
1077                // Can we shrink the recovery buffer??
1078                if (recoveryPageCount > recoveryFileMaxPageCount) {
1079                    int t = Math.max(recoveryFileMinPageCount, batch.size());
1080                    recoveryFile.setLength(recoveryFileSizeForPages(t));
1081                }
1082
1083                // Record the page writes in the recovery buffer.
1084                recoveryFile.seek(0);
1085                // Store the next tx id...
1086                recoveryFile.writeLong(nextTxid.get());
1087                // Store the checksum for thw write batch so that on recovery we
1088                // know if we have a consistent
1089                // write batch on disk.
1090                recoveryFile.writeLong(checksum.getValue());
1091                // Write the # of pages that will follow
1092                recoveryFile.writeInt(batch.size());
1093
1094                if (enableDiskSyncs) {
1095                    recoveryFile.sync();
1096                }
1097            }
1098
1099            for (PageWrite w : batch) {
1100                writeFile.seek(toOffset(w.page.getPageId()));
1101                writeFile.write(w.getDiskBound(), 0, pageSize);
1102                w.done();
1103            }
1104
1105            if (enableDiskSyncs) {
1106                writeFile.sync();
1107            }
1108
1109        } catch (IOException ioError) {
1110            LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError);
1111            // any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates
1112            // to ensure disk image is self consistent
1113            loaded.set(false);
1114            throw  ioError;
1115        } finally {
1116            synchronized (writes) {
1117                for (PageWrite w : batch) {
1118                    // If there are no more pending writes, then remove it from
1119                    // the write cache.
1120                    if (w.isDone()) {
1121                        writes.remove(w.page.getPageId());
1122                        if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1123                            if (!w.tmpFile.delete()) {
1124                                throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1125                            }
1126                            tmpFilesForRemoval.remove(w.tmpFile);
1127                        }
1128                    }
1129                }
1130            }
1131
1132            if (checkpointLatch != null) {
1133                checkpointLatch.countDown();
1134            }
1135        }
1136    }
1137
1138    public void removeTmpFile(File file) {
1139        tmpFilesForRemoval.add(file);
1140    }
1141
1142    private long recoveryFileSizeForPages(int pageCount) {
1143        return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1144    }
1145
1146    private void releaseCheckpointWaiter() {
1147        if (checkpointLatch != null) {
1148            checkpointLatch.countDown();
1149            checkpointLatch = null;
1150        }
1151    }
1152
1153    /**
1154     * Inspects the recovery buffer and re-applies any
1155     * partially applied page writes.
1156     *
1157     * @return the next transaction id that can be used.
1158     */
1159    private long redoRecoveryUpdates() throws IOException {
1160        if (!enableRecoveryFile) {
1161            return 0;
1162        }
1163        recoveryPageCount = 0;
1164
1165        // Are we initializing the recovery file?
1166        if (recoveryFile.length() == 0) {
1167            // Write an empty header..
1168            recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1169            // Preallocate the minium size for better performance.
1170            recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1171            return 0;
1172        }
1173
1174        // How many recovery pages do we have in the recovery buffer?
1175        recoveryFile.seek(0);
1176        long nextTxId = recoveryFile.readLong();
1177        long expectedChecksum = recoveryFile.readLong();
1178        int pageCounter = recoveryFile.readInt();
1179
1180        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1181        Checksum checksum = new Adler32();
1182        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1183        try {
1184            for (int i = 0; i < pageCounter; i++) {
1185                long offset = recoveryFile.readLong();
1186                byte[] data = new byte[pageSize];
1187                if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1188                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1189                    return nextTxId;
1190                }
1191                checksum.update(data, 0, pageSize);
1192                batch.put(offset, data);
1193            }
1194        } catch (Exception e) {
1195            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1196            // as the pages should still be consistent.
1197            LOG.debug("Redo buffer was not fully intact: ", e);
1198            return nextTxId;
1199        }
1200
1201        recoveryPageCount = pageCounter;
1202
1203        // If the checksum is not valid then the recovery buffer was partially written to disk.
1204        if (checksum.getValue() != expectedChecksum) {
1205            return nextTxId;
1206        }
1207
1208        // Re-apply all the writes in the recovery buffer.
1209        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1210            writeFile.seek(toOffset(e.getKey()));
1211            writeFile.write(e.getValue());
1212        }
1213
1214        // And sync it to disk
1215        writeFile.sync();
1216        return nextTxId;
1217    }
1218
1219    private void startWriter() {
1220        synchronized (writes) {
1221            if (enabledWriteThread) {
1222                stopWriter.set(false);
1223                writerThread = new Thread("KahaDB Page Writer") {
1224                    @Override
1225                    public void run() {
1226                        pollWrites();
1227                    }
1228                };
1229                writerThread.setPriority(Thread.MAX_PRIORITY);
1230                writerThread.setDaemon(true);
1231                writerThread.start();
1232            }
1233        }
1234    }
1235
1236    private void stopWriter() throws InterruptedException {
1237        if (enabledWriteThread) {
1238            stopWriter.set(true);
1239            writerThread.join();
1240        }
1241    }
1242
1243    public File getFile() {
1244        return getMainPageFile();
1245    }
1246
1247    public File getDirectory() {
1248        return directory;
1249    }
1250}