001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.EOFException;
020import java.io.File;
021import java.io.FileNotFoundException;
022import java.io.FilenameFilter;
023import java.io.IOException;
024import java.io.RandomAccessFile;
025import java.io.UnsupportedEncodingException;
026import java.nio.ByteBuffer;
027import java.nio.channels.ClosedByInterruptException;
028import java.nio.channels.FileChannel;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.LinkedHashMap;
033import java.util.LinkedList;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.Executors;
039import java.util.concurrent.Future;
040import java.util.concurrent.ScheduledExecutorService;
041import java.util.concurrent.ScheduledFuture;
042import java.util.concurrent.ThreadFactory;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.atomic.AtomicReference;
046import java.util.zip.Adler32;
047import java.util.zip.Checksum;
048
049import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
051import org.apache.activemq.store.kahadb.disk.util.Sequence;
052import org.apache.activemq.util.ByteSequence;
053import org.apache.activemq.util.DataByteArrayInputStream;
054import org.apache.activemq.util.DataByteArrayOutputStream;
055import org.apache.activemq.util.IOHelper;
056import org.apache.activemq.util.RecoverableRandomAccessFile;
057import org.apache.activemq.util.ThreadPoolUtils;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Manages DataFiles
063 */
064public class Journal {
065    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
066    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
067
068    private static final int PREALLOC_CHUNK_SIZE = 1024*1024;
069
070    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
071    public static final int RECORD_HEAD_SPACE = 4 + 1;
072
073    public static final byte USER_RECORD_TYPE = 1;
074    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
075    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
076    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
077    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
078    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
079    public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader();
080    public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt();
081    public static final byte EOF_EOT = '4';
082    public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
083
084    private ScheduledExecutorService scheduler;
085
086    // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
087    public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
088        DataFile dataFile = getDataFile(recoveryPosition);
089        // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
090        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
091        try {
092            RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
093            randomAccessFile.seek(recoveryPosition.getOffset() + 1);
094            byte[] data = new byte[getWriteBatchSize()];
095            ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
096            int nextOffset = 0;
097            if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
098                nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
099            } else {
100                nextOffset = Math.toIntExact(randomAccessFile.length());
101            }
102            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1);
103            LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
104
105            // skip corruption on getNextLocation
106            recoveryPosition.setOffset(nextOffset);
107            recoveryPosition.setSize(-1);
108
109            dataFile.corruptedBlocks.add(sequence);
110        } catch (IOException e) {
111        } finally {
112            accessorPool.closeDataFileAccessor(reader);
113        }
114    }
115
116    public DataFileAccessorPool getAccessorPool() {
117        return accessorPool;
118    }
119
120    public void allowIOResumption() {
121        if (appender instanceof DataFileAppender) {
122            DataFileAppender dataFileAppender = (DataFileAppender)appender;
123            dataFileAppender.shutdown = false;
124        }
125    }
126
127    public enum PreallocationStrategy {
128        SPARSE_FILE,
129        OS_KERNEL_COPY,
130        ZEROS,
131        CHUNKED_ZEROS;
132    }
133
134    public enum PreallocationScope {
135        ENTIRE_JOURNAL,
136        ENTIRE_JOURNAL_ASYNC,
137        NONE;
138    }
139
140    public enum JournalDiskSyncStrategy {
141        ALWAYS,
142        PERIODIC,
143        NEVER;
144    }
145
146    private static byte[] createBatchControlRecordHeader() {
147        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
148            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
149            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
150            os.write(BATCH_CONTROL_RECORD_MAGIC);
151            ByteSequence sequence = os.toByteSequence();
152            sequence.compact();
153            return sequence.getData();
154        } catch (IOException e) {
155            throw new RuntimeException("Could not create batch control record header.", e);
156        }
157    }
158
159    private static byte[] createEmptyBatchControlRecordHeader() {
160        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
161            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
162            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
163            os.write(BATCH_CONTROL_RECORD_MAGIC);
164            os.writeInt(0);
165            os.writeLong(0l);
166            ByteSequence sequence = os.toByteSequence();
167            sequence.compact();
168            return sequence.getData();
169        } catch (IOException e) {
170            throw new RuntimeException("Could not create empty batch control record header.", e);
171        }
172    }
173
174    private static byte[] createEofBatchAndLocationRecord() {
175        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
176            os.writeInt(EOF_INT);
177            os.writeByte(EOF_EOT);
178            ByteSequence sequence = os.toByteSequence();
179            sequence.compact();
180            return sequence.getData();
181        } catch (IOException e) {
182            throw new RuntimeException("Could not create eof header.", e);
183        }
184    }
185
186    public static final String DEFAULT_DIRECTORY = ".";
187    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
188    public static final String DEFAULT_FILE_PREFIX = "db-";
189    public static final String DEFAULT_FILE_SUFFIX = ".log";
190    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
191    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
192    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
193
194    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
195
196    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
197
198    protected File directory = new File(DEFAULT_DIRECTORY);
199    protected File directoryArchive;
200    private boolean directoryArchiveOverridden = false;
201
202    protected String filePrefix = DEFAULT_FILE_PREFIX;
203    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
204    protected boolean started;
205
206    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
207    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
208
209    protected FileAppender appender;
210    protected DataFileAccessorPool accessorPool;
211
212    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
213    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
214    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
215
216    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
217    protected ScheduledFuture cleanupTask;
218    protected AtomicLong totalLength = new AtomicLong();
219    protected boolean archiveDataLogs;
220    private ReplicationTarget replicationTarget;
221    protected boolean checksum;
222    protected boolean checkForCorruptionOnStartup;
223    protected boolean enableAsyncDiskSync = true;
224    private int nextDataFileId = 1;
225    private Object dataFileIdLock = new Object();
226    private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
227    private volatile DataFile nextDataFile;
228
229    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
230    protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
231    private File osKernelCopyTemplateFile = null;
232    private ByteBuffer preAllocateDirectBuffer = null;
233
234    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
235
236    public interface DataFileRemovedListener {
237        void fileRemoved(DataFile datafile);
238    }
239
240    private DataFileRemovedListener dataFileRemovedListener;
241
242    public synchronized void start() throws IOException {
243        if (started) {
244            return;
245        }
246
247        long start = System.currentTimeMillis();
248        accessorPool = new DataFileAccessorPool(this);
249        started = true;
250
251        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
252
253        File[] files = directory.listFiles(new FilenameFilter() {
254            @Override
255            public boolean accept(File dir, String n) {
256                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
257            }
258        });
259
260        if (files != null) {
261            for (File file : files) {
262                try {
263                    String n = file.getName();
264                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
265                    int num = Integer.parseInt(numStr);
266                    DataFile dataFile = new DataFile(file, num);
267                    fileMap.put(dataFile.getDataFileId(), dataFile);
268                    totalLength.addAndGet(dataFile.getLength());
269                } catch (NumberFormatException e) {
270                    // Ignore file that do not match the pattern.
271                }
272            }
273
274            // Sort the list so that we can link the DataFiles together in the
275            // right order.
276            LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
277            Collections.sort(l);
278            for (DataFile df : l) {
279                if (df.getLength() == 0) {
280                    // possibly the result of a previous failed write
281                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
282                    continue;
283                } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
284                    continue;
285                }
286                dataFiles.addLast(df);
287                fileByFileMap.put(df.getFile(), df);
288
289                if( isCheckForCorruptionOnStartup() ) {
290                    lastAppendLocation.set(recoveryCheck(df));
291                }
292            }
293        }
294
295        if (preallocationScope != PreallocationScope.NONE) {
296            switch (preallocationStrategy) {
297                case SPARSE_FILE:
298                    break;
299                case OS_KERNEL_COPY: {
300                    osKernelCopyTemplateFile = createJournalTemplateFile();
301                }
302                break;
303                case CHUNKED_ZEROS: {
304                    preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE);
305                }
306                break;
307                case ZEROS: {
308                    preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength());
309                }
310                break;
311            }
312        }
313        scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
314            @Override
315            public Thread newThread(Runnable r) {
316                Thread schedulerThread = new Thread(r);
317                schedulerThread.setName("ActiveMQ Journal Scheduled executor");
318                schedulerThread.setDaemon(true);
319                return schedulerThread;
320            }
321        });
322
323        // init current write file
324        if (dataFiles.isEmpty()) {
325            nextDataFileId = 1;
326            rotateWriteFile();
327        } else {
328            currentDataFile.set(dataFiles.getTail());
329            nextDataFileId = currentDataFile.get().dataFileId + 1;
330        }
331
332        if( lastAppendLocation.get()==null ) {
333            DataFile df = dataFiles.getTail();
334            lastAppendLocation.set(recoveryCheck(df));
335        }
336
337        // ensure we don't report unused space of last journal file in size metric
338        int lastFileLength = dataFiles.getTail().getLength();
339        if (totalLength.get() > lastFileLength && lastAppendLocation.get().getOffset() > 0) {
340            totalLength.addAndGet(lastAppendLocation.get().getOffset() - lastFileLength);
341        }
342
343        cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
344            @Override
345            public void run() {
346                cleanup();
347            }
348        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
349
350        long end = System.currentTimeMillis();
351        LOG.trace("Startup took: "+(end-start)+" ms");
352    }
353
354    private ByteBuffer allocateDirectBuffer(int size) {
355        ByteBuffer buffer = ByteBuffer.allocateDirect(size);
356        buffer.put(EOF_RECORD);
357        return buffer;
358    }
359
360    public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
361
362        if (PreallocationScope.NONE != preallocationScope) {
363
364            try {
365                if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
366                    doPreallocationKernelCopy(file);
367                } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
368                    doPreallocationZeros(file);
369                } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
370                    doPreallocationChunkedZeros(file);
371                } else {
372                    doPreallocationSparseFile(file);
373                }
374            } catch (Throwable continueWithNoPrealloc) {
375                // error on preallocation is non fatal, and we don't want to leak the journal handle
376                LOG.error("cound not preallocate journal data file", continueWithNoPrealloc);
377            }
378        }
379    }
380
381    private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
382        final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD);
383        try {
384            FileChannel channel = file.getChannel();
385            channel.position(0);
386            channel.write(journalEof);
387            channel.position(maxFileLength - 5);
388            journalEof.rewind();
389            channel.write(journalEof);
390            channel.force(false);
391            channel.position(0);
392        } catch (ClosedByInterruptException ignored) {
393            LOG.trace("Could not preallocate journal file with sparse file", ignored);
394        } catch (IOException e) {
395            LOG.error("Could not preallocate journal file with sparse file", e);
396        }
397    }
398
399    private void doPreallocationZeros(RecoverableRandomAccessFile file) {
400        preAllocateDirectBuffer.rewind();
401        try {
402            FileChannel channel = file.getChannel();
403            channel.write(preAllocateDirectBuffer);
404            channel.force(false);
405            channel.position(0);
406        } catch (ClosedByInterruptException ignored) {
407            LOG.trace("Could not preallocate journal file with zeros", ignored);
408        } catch (IOException e) {
409            LOG.error("Could not preallocate journal file with zeros", e);
410        }
411    }
412
413    private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
414        try (RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");){
415            templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
416        } catch (ClosedByInterruptException ignored) {
417            LOG.trace("Could not preallocate journal file with kernel copy", ignored);
418        } catch (FileNotFoundException e) {
419            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
420        } catch (IOException e) {
421            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
422        }
423    }
424
425    private File createJournalTemplateFile() {
426        String fileName = "db-log.template";
427        File rc = new File(directory, fileName);
428        try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) {
429            templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD));
430            templateRaf.setLength(maxFileLength);
431            templateRaf.getChannel().force(true);
432        } catch (FileNotFoundException e) {
433            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
434        } catch (IOException e) {
435            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
436        }
437        return rc;
438    }
439
440    private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
441        preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity());
442        preAllocateDirectBuffer.rewind();
443        try {
444            FileChannel channel = file.getChannel();
445
446            int remLen = maxFileLength;
447            while (remLen > 0) {
448                if (remLen < preAllocateDirectBuffer.remaining()) {
449                    preAllocateDirectBuffer.limit(remLen);
450                }
451                int writeLen = channel.write(preAllocateDirectBuffer);
452                remLen -= writeLen;
453                preAllocateDirectBuffer.rewind();
454            }
455
456            channel.force(false);
457            channel.position(0);
458        } catch (ClosedByInterruptException ignored) {
459            LOG.trace("Could not preallocate journal file with zeros", ignored);
460        } catch (IOException e) {
461            LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
462        }
463    }
464
465    private static byte[] bytes(String string) {
466        try {
467            return string.getBytes("UTF-8");
468        } catch (UnsupportedEncodingException e) {
469            throw new RuntimeException(e);
470        }
471    }
472
473    public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
474        if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
475            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
476            try {
477                byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length];
478                reader.readFully(0, firstFewBytes);
479                ByteSequence bs = new ByteSequence(firstFewBytes);
480                return bs.startsWith(EOF_RECORD);
481            } catch (Exception ignored) {
482            } finally {
483                accessorPool.closeDataFileAccessor(reader);
484            }
485        }
486        return false;
487    }
488
489    protected Location recoveryCheck(DataFile dataFile) throws IOException {
490        Location location = new Location();
491        location.setDataFileId(dataFile.getDataFileId());
492        location.setOffset(0);
493
494        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
495        try {
496            RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
497            randomAccessFile.seek(0);
498            final long totalFileLength = randomAccessFile.length();
499            byte[] data = new byte[getWriteBatchSize()];
500            ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
501
502            while (true) {
503                int size = checkBatchRecord(bs, randomAccessFile);
504                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) {
505                    if (size == 0) {
506                        // eof batch record
507                        break;
508                    }
509                    location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
510                } else {
511
512                    // Perhaps it's just some corruption... scan through the
513                    // file to find the next valid batch record. We
514                    // may have subsequent valid batch records.
515                    if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
516                        int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
517                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
518                        LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
519                        dataFile.corruptedBlocks.add(sequence);
520                        location.setOffset(nextOffset);
521                    } else {
522                        break;
523                    }
524                }
525            }
526
527        } catch (IOException e) {
528        } finally {
529            accessorPool.closeDataFileAccessor(reader);
530        }
531
532        int existingLen = dataFile.getLength();
533        dataFile.setLength(location.getOffset());
534        if (existingLen > dataFile.getLength()) {
535            totalLength.addAndGet(dataFile.getLength() - existingLen);
536        }
537
538        if (!dataFile.corruptedBlocks.isEmpty()) {
539            // Is the end of the data file corrupted?
540            if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
541                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
542            }
543        }
544
545        return location;
546    }
547
548    private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
549        final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
550        int pos = 0;
551        while (true) {
552            pos = bs.indexOf(header, 0);
553            if (pos >= 0) {
554                bs.setOffset(bs.offset + pos);
555                return pos;
556            } else {
557                // need to load the next data chunck in..
558                if (bs.length != bs.data.length) {
559                    // If we had a short read then we were at EOF
560                    return -1;
561                }
562                bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length);
563                bs.reset();
564                bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length));
565            }
566        }
567    }
568
569    private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
570        ensureAvailable(bs, reader, EOF_RECORD.length);
571        if (bs.startsWith(EOF_RECORD)) {
572            return 0; // eof
573        }
574        ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE);
575        try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
576
577            // Assert that it's a batch record.
578            for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
579                if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
580                    return -1;
581                }
582            }
583
584            int size = controlIs.readInt();
585            if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
586                return -2;
587            }
588
589            long expectedChecksum = controlIs.readLong();
590            Checksum checksum = null;
591            if (isChecksum() && expectedChecksum > 0) {
592                checksum = new Adler32();
593            }
594
595            // revert to bs to consume data
596            bs.setOffset(controlIs.position());
597            int toRead = size;
598            while (toRead > 0) {
599                if (bs.remaining() >= toRead) {
600                    if (checksum != null) {
601                        checksum.update(bs.getData(), bs.getOffset(), toRead);
602                    }
603                    bs.setOffset(bs.offset + toRead);
604                    toRead = 0;
605                } else {
606                    if (bs.length != bs.data.length) {
607                        // buffer exhausted
608                        return  -3;
609                    }
610
611                    toRead -= bs.remaining();
612                    if (checksum != null) {
613                        checksum.update(bs.getData(), bs.getOffset(), bs.remaining());
614                    }
615                    bs.setLength(reader.read(bs.data));
616                    bs.setOffset(0);
617                }
618            }
619            if (checksum != null && expectedChecksum != checksum.getValue()) {
620                return -4;
621            }
622
623            return size;
624        }
625    }
626
627    private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required) throws IOException {
628        if (bs.remaining() < required) {
629            bs.reset();
630            int read = reader.read(bs.data, bs.length, bs.data.length - bs.length);
631            if (read < 0) {
632                if (bs.remaining() == 0) {
633                    throw new EOFException("request for " + required + " bytes reached EOF");
634                }
635            }
636            bs.setLength(bs.length + read);
637        }
638    }
639
640    void addToTotalLength(int size) {
641        totalLength.addAndGet(size);
642    }
643
644    public long length() {
645        return totalLength.get();
646    }
647
648    private void rotateWriteFile() throws IOException {
649       synchronized (dataFileIdLock) {
650            DataFile dataFile = nextDataFile;
651            if (dataFile == null) {
652                dataFile = newDataFile();
653            }
654            synchronized (currentDataFile) {
655                fileMap.put(dataFile.getDataFileId(), dataFile);
656                fileByFileMap.put(dataFile.getFile(), dataFile);
657                dataFiles.addLast(dataFile);
658                currentDataFile.set(dataFile);
659            }
660            nextDataFile = null;
661        }
662        if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) {
663            preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask);
664        }
665    }
666
667    private Runnable preAllocateNextDataFileTask = new Runnable() {
668        @Override
669        public void run() {
670            if (nextDataFile == null) {
671                synchronized (dataFileIdLock){
672                    try {
673                        nextDataFile = newDataFile();
674                    } catch (IOException e) {
675                        LOG.warn("Failed to proactively allocate data file", e);
676                    }
677                }
678            }
679        }
680    };
681
682    private volatile Future preAllocateNextDataFileFuture;
683
684    private DataFile newDataFile() throws IOException {
685        int nextNum = nextDataFileId++;
686        File file = getFile(nextNum);
687        DataFile nextWriteFile = new DataFile(file, nextNum);
688        preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
689        return nextWriteFile;
690    }
691
692
693    public DataFile reserveDataFile() {
694        synchronized (dataFileIdLock) {
695            int nextNum = nextDataFileId++;
696            File file = getFile(nextNum);
697            DataFile reservedDataFile = new DataFile(file, nextNum);
698            synchronized (currentDataFile) {
699                fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
700                fileByFileMap.put(file, reservedDataFile);
701                if (dataFiles.isEmpty()) {
702                    dataFiles.addLast(reservedDataFile);
703                } else {
704                    dataFiles.getTail().linkBefore(reservedDataFile);
705                }
706            }
707            return reservedDataFile;
708        }
709    }
710
711    public File getFile(int nextNum) {
712        String fileName = filePrefix + nextNum + fileSuffix;
713        File file = new File(directory, fileName);
714        return file;
715    }
716
717    DataFile getDataFile(Location item) throws IOException {
718        Integer key = Integer.valueOf(item.getDataFileId());
719        DataFile dataFile = null;
720        synchronized (currentDataFile) {
721            dataFile = fileMap.get(key);
722        }
723        if (dataFile == null) {
724            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
725            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
726        }
727        return dataFile;
728    }
729
730    public void close() throws IOException {
731        synchronized (this) {
732            if (!started) {
733                return;
734            }
735            cleanupTask.cancel(true);
736            if (preAllocateNextDataFileFuture != null) {
737                preAllocateNextDataFileFuture.cancel(true);
738            }
739            ThreadPoolUtils.shutdownGraceful(scheduler, 4000);
740            accessorPool.close();
741        }
742        // the appender can be calling back to to the journal blocking a close AMQ-5620
743        appender.close();
744        synchronized (currentDataFile) {
745            fileMap.clear();
746            fileByFileMap.clear();
747            dataFiles.clear();
748            lastAppendLocation.set(null);
749            started = false;
750        }
751    }
752
753    public synchronized void cleanup() {
754        if (accessorPool != null) {
755            accessorPool.disposeUnused();
756        }
757    }
758
759    public synchronized boolean delete() throws IOException {
760
761        // Close all open file handles...
762        appender.close();
763        accessorPool.close();
764
765        boolean result = true;
766        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
767            DataFile dataFile = i.next();
768            result &= dataFile.delete();
769        }
770
771        if (preAllocateNextDataFileFuture != null) {
772            preAllocateNextDataFileFuture.cancel(true);
773        }
774        synchronized (dataFileIdLock) {
775            if (nextDataFile != null) {
776                nextDataFile.delete();
777                nextDataFile = null;
778            }
779        }
780
781        totalLength.set(0);
782        synchronized (currentDataFile) {
783            fileMap.clear();
784            fileByFileMap.clear();
785            lastAppendLocation.set(null);
786            dataFiles = new LinkedNodeList<DataFile>();
787        }
788        // reopen open file handles...
789        accessorPool = new DataFileAccessorPool(this);
790        appender = new DataFileAppender(this);
791        return result;
792    }
793
794    public void removeDataFiles(Set<Integer> files) throws IOException {
795        for (Integer key : files) {
796            // Can't remove the data file (or subsequent files) that is currently being written to.
797            if (key >= lastAppendLocation.get().getDataFileId()) {
798                continue;
799            }
800            DataFile dataFile = null;
801            synchronized (currentDataFile) {
802                dataFile = fileMap.remove(key);
803                if (dataFile != null) {
804                    fileByFileMap.remove(dataFile.getFile());
805                    dataFile.unlink();
806                }
807            }
808            if (dataFile != null) {
809                forceRemoveDataFile(dataFile);
810            }
811        }
812    }
813
814    private void forceRemoveDataFile(DataFile dataFile) throws IOException {
815        accessorPool.disposeDataFileAccessors(dataFile);
816        totalLength.addAndGet(-dataFile.getLength());
817        if (archiveDataLogs) {
818            File directoryArchive = getDirectoryArchive();
819            if (directoryArchive.exists()) {
820                LOG.debug("Archive directory exists: {}", directoryArchive);
821            } else {
822                if (directoryArchive.isAbsolute())
823                if (LOG.isDebugEnabled()) {
824                    LOG.debug("Archive directory [{}] does not exist - creating it now",
825                            directoryArchive.getAbsolutePath());
826                }
827                IOHelper.mkdirs(directoryArchive);
828            }
829            LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
830            dataFile.move(directoryArchive);
831            LOG.debug("Successfully moved data file");
832        } else {
833            LOG.debug("Deleting data file: {}", dataFile);
834            if (dataFile.delete()) {
835                LOG.debug("Discarded data file: {}", dataFile);
836            } else {
837                LOG.warn("Failed to discard data file : {}", dataFile.getFile());
838            }
839        }
840        if (dataFileRemovedListener != null) {
841            dataFileRemovedListener.fileRemoved(dataFile);
842        }
843    }
844
845    /**
846     * @return the maxFileLength
847     */
848    public int getMaxFileLength() {
849        return maxFileLength;
850    }
851
852    /**
853     * @param maxFileLength the maxFileLength to set
854     */
855    public void setMaxFileLength(int maxFileLength) {
856        this.maxFileLength = maxFileLength;
857    }
858
859    @Override
860    public String toString() {
861        return directory.toString();
862    }
863
864    public Location getNextLocation(Location location) throws IOException, IllegalStateException {
865        return getNextLocation(location, null);
866    }
867
868    public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException {
869        Location cur = null;
870        while (true) {
871            if (cur == null) {
872                if (location == null) {
873                    DataFile head = null;
874                    synchronized (currentDataFile) {
875                        head = dataFiles.getHead();
876                    }
877                    if (head == null) {
878                        return null;
879                    }
880                    cur = new Location();
881                    cur.setDataFileId(head.getDataFileId());
882                    cur.setOffset(0);
883                } else {
884                    // Set to the next offset..
885                    if (location.getSize() == -1) {
886                        cur = new Location(location);
887                    } else {
888                        cur = new Location(location);
889                        cur.setOffset(location.getOffset() + location.getSize());
890                    }
891                }
892            } else {
893                cur.setOffset(cur.getOffset() + cur.getSize());
894            }
895
896            DataFile dataFile = getDataFile(cur);
897
898            // Did it go into the next file??
899            if (dataFile.getLength() <= cur.getOffset()) {
900                synchronized (currentDataFile) {
901                    dataFile = dataFile.getNext();
902                }
903                if (dataFile == null) {
904                    return null;
905                } else {
906                    cur.setDataFileId(dataFile.getDataFileId().intValue());
907                    cur.setOffset(0);
908                    if (limit != null && cur.compareTo(limit) >= 0) {
909                        LOG.trace("reached limit: {} at: {}", limit, cur);
910                        return null;
911                    }
912                }
913            }
914
915            // Load in location size and type.
916            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
917            try {
918                reader.readLocationDetails(cur);
919            } catch (EOFException eof) {
920                LOG.trace("EOF on next: " + location + ", cur: " + cur);
921                throw eof;
922            } finally {
923                accessorPool.closeDataFileAccessor(reader);
924            }
925
926            Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset());
927            if (corruptedRange != null) {
928                // skip corruption
929                cur.setSize((int) corruptedRange.range());
930            } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT ||
931                    (cur.getType() == 0 && cur.getSize() == 0)) {
932                // eof - jump to next datafile
933                // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for
934                // replay of existing journals
935                // possibly journal is larger than maxFileLength after config change
936                cur.setSize(EOF_RECORD.length);
937                cur.setOffset(Math.max(maxFileLength, dataFile.getLength()));
938            } else if (cur.getType() == USER_RECORD_TYPE) {
939                // Only return user records.
940                return cur;
941            }
942        }
943    }
944
945    public ByteSequence read(Location location) throws IOException, IllegalStateException {
946        DataFile dataFile = getDataFile(location);
947        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
948        ByteSequence rc = null;
949        try {
950            rc = reader.readRecord(location);
951        } finally {
952            accessorPool.closeDataFileAccessor(reader);
953        }
954        return rc;
955    }
956
957    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
958        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
959        return loc;
960    }
961
962    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
963        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
964        return loc;
965    }
966
967    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
968        DataFile dataFile = getDataFile(location);
969        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
970        try {
971            updater.updateRecord(location, data, sync);
972        } finally {
973            accessorPool.closeDataFileAccessor(updater);
974        }
975    }
976
977    public PreallocationStrategy getPreallocationStrategy() {
978        return preallocationStrategy;
979    }
980
981    public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
982        this.preallocationStrategy = preallocationStrategy;
983    }
984
985    public PreallocationScope getPreallocationScope() {
986        return preallocationScope;
987    }
988
989    public void setPreallocationScope(PreallocationScope preallocationScope) {
990        this.preallocationScope = preallocationScope;
991    }
992
993    public File getDirectory() {
994        return directory;
995    }
996
997    public void setDirectory(File directory) {
998        this.directory = directory;
999    }
1000
1001    public String getFilePrefix() {
1002        return filePrefix;
1003    }
1004
1005    public void setFilePrefix(String filePrefix) {
1006        this.filePrefix = filePrefix;
1007    }
1008
1009    public Map<WriteKey, WriteCommand> getInflightWrites() {
1010        return inflightWrites;
1011    }
1012
1013    public Location getLastAppendLocation() {
1014        return lastAppendLocation.get();
1015    }
1016
1017    public void setLastAppendLocation(Location lastSyncedLocation) {
1018        this.lastAppendLocation.set(lastSyncedLocation);
1019    }
1020
1021    public File getDirectoryArchive() {
1022        if (!directoryArchiveOverridden && (directoryArchive == null)) {
1023            // create the directoryArchive relative to the journal location
1024            directoryArchive = new File(directory.getAbsolutePath() +
1025                    File.separator + DEFAULT_ARCHIVE_DIRECTORY);
1026        }
1027        return directoryArchive;
1028    }
1029
1030    public void setDirectoryArchive(File directoryArchive) {
1031        directoryArchiveOverridden = true;
1032        this.directoryArchive = directoryArchive;
1033    }
1034
1035    public boolean isArchiveDataLogs() {
1036        return archiveDataLogs;
1037    }
1038
1039    public void setArchiveDataLogs(boolean archiveDataLogs) {
1040        this.archiveDataLogs = archiveDataLogs;
1041    }
1042
1043    public DataFile getDataFileById(int dataFileId) {
1044        synchronized (currentDataFile) {
1045            return fileMap.get(Integer.valueOf(dataFileId));
1046        }
1047    }
1048
1049    public DataFile getCurrentDataFile(int capacity) throws IOException {
1050        //First just acquire the currentDataFile lock and return if no rotation needed
1051        synchronized (currentDataFile) {
1052            if (currentDataFile.get().getLength() + capacity < maxFileLength) {
1053                return currentDataFile.get();
1054            }
1055        }
1056
1057        //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks
1058        //then re-check if rotation is needed
1059        synchronized (dataFileIdLock) {
1060            synchronized (currentDataFile) {
1061                if (currentDataFile.get().getLength() + capacity >= maxFileLength) {
1062                    rotateWriteFile();
1063                }
1064                return currentDataFile.get();
1065            }
1066        }
1067    }
1068
1069    public Integer getCurrentDataFileId() {
1070        synchronized (currentDataFile) {
1071            return currentDataFile.get().getDataFileId();
1072        }
1073    }
1074
1075    /**
1076     * Get a set of files - only valid after start()
1077     *
1078     * @return files currently being used
1079     */
1080    public Set<File> getFiles() {
1081        synchronized (currentDataFile) {
1082            return fileByFileMap.keySet();
1083        }
1084    }
1085
1086    public Map<Integer, DataFile> getFileMap() {
1087        synchronized (currentDataFile) {
1088            return new TreeMap<Integer, DataFile>(fileMap);
1089        }
1090    }
1091
1092    public long getDiskSize() {
1093        return totalLength.get();
1094    }
1095
1096    public void setReplicationTarget(ReplicationTarget replicationTarget) {
1097        this.replicationTarget = replicationTarget;
1098    }
1099
1100    public ReplicationTarget getReplicationTarget() {
1101        return replicationTarget;
1102    }
1103
1104    public String getFileSuffix() {
1105        return fileSuffix;
1106    }
1107
1108    public void setFileSuffix(String fileSuffix) {
1109        this.fileSuffix = fileSuffix;
1110    }
1111
1112    public boolean isChecksum() {
1113        return checksum;
1114    }
1115
1116    public void setChecksum(boolean checksumWrites) {
1117        this.checksum = checksumWrites;
1118    }
1119
1120    public boolean isCheckForCorruptionOnStartup() {
1121        return checkForCorruptionOnStartup;
1122    }
1123
1124    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
1125        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
1126    }
1127
1128    public void setWriteBatchSize(int writeBatchSize) {
1129        this.writeBatchSize = writeBatchSize;
1130    }
1131
1132    public int getWriteBatchSize() {
1133        return writeBatchSize;
1134    }
1135
1136    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
1137       this.totalLength = storeSizeAccumulator;
1138    }
1139
1140    public void setEnableAsyncDiskSync(boolean val) {
1141        this.enableAsyncDiskSync = val;
1142    }
1143
1144    public boolean isEnableAsyncDiskSync() {
1145        return enableAsyncDiskSync;
1146    }
1147
1148    public JournalDiskSyncStrategy getJournalDiskSyncStrategy() {
1149        return journalDiskSyncStrategy;
1150    }
1151
1152    public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) {
1153        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
1154    }
1155
1156    public boolean isJournalDiskSyncPeriodic() {
1157        return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy);
1158    }
1159
1160    public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
1161        this.dataFileRemovedListener = dataFileRemovedListener;
1162    }
1163
1164    public static class WriteCommand extends LinkedNode<WriteCommand> {
1165        public final Location location;
1166        public final ByteSequence data;
1167        final boolean sync;
1168        public final Runnable onComplete;
1169
1170        public WriteCommand(Location location, ByteSequence data, boolean sync) {
1171            this.location = location;
1172            this.data = data;
1173            this.sync = sync;
1174            this.onComplete = null;
1175        }
1176
1177        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
1178            this.location = location;
1179            this.data = data;
1180            this.onComplete = onComplete;
1181            this.sync = false;
1182        }
1183    }
1184
1185    public static class WriteKey {
1186        private final int file;
1187        private final long offset;
1188        private final int hash;
1189
1190        public WriteKey(Location item) {
1191            file = item.getDataFileId();
1192            offset = item.getOffset();
1193            // TODO: see if we can build a better hash
1194            hash = (int)(file ^ offset);
1195        }
1196
1197        @Override
1198        public int hashCode() {
1199            return hash;
1200        }
1201
1202        @Override
1203        public boolean equals(Object obj) {
1204            if (obj instanceof WriteKey) {
1205                WriteKey di = (WriteKey)obj;
1206                return di.file == file && di.offset == offset;
1207            }
1208            return false;
1209        }
1210    }
1211}