001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeMap;
050import java.util.TreeSet;
051import java.util.concurrent.ConcurrentHashMap;
052import java.util.concurrent.ConcurrentMap;
053import java.util.concurrent.Executors;
054import java.util.concurrent.ScheduledExecutorService;
055import java.util.concurrent.ThreadFactory;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicBoolean;
058import java.util.concurrent.atomic.AtomicLong;
059import java.util.concurrent.atomic.AtomicReference;
060import java.util.concurrent.locks.ReentrantReadWriteLock;
061
062import org.apache.activemq.ActiveMQMessageAuditNoSync;
063import org.apache.activemq.broker.BrokerService;
064import org.apache.activemq.broker.BrokerServiceAware;
065import org.apache.activemq.broker.region.Destination;
066import org.apache.activemq.broker.region.Queue;
067import org.apache.activemq.broker.region.Topic;
068import org.apache.activemq.command.MessageAck;
069import org.apache.activemq.command.TransactionId;
070import org.apache.activemq.openwire.OpenWireFormat;
071import org.apache.activemq.protobuf.Buffer;
072import org.apache.activemq.store.MessageStore;
073import org.apache.activemq.store.MessageStoreStatistics;
074import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
075import org.apache.activemq.store.TopicMessageStore;
076import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
077import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
078import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
079import org.apache.activemq.store.kahadb.data.KahaDestination;
080import org.apache.activemq.store.kahadb.data.KahaEntryType;
081import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
082import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
083import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
084import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
085import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
086import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
087import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
088import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
089import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
090import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
091import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
092import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
093import org.apache.activemq.store.kahadb.disk.index.ListIndex;
094import org.apache.activemq.store.kahadb.disk.journal.DataFile;
095import org.apache.activemq.store.kahadb.disk.journal.Journal;
096import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
097import org.apache.activemq.store.kahadb.disk.journal.Location;
098import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
099import org.apache.activemq.store.kahadb.disk.page.Page;
100import org.apache.activemq.store.kahadb.disk.page.PageFile;
101import org.apache.activemq.store.kahadb.disk.page.Transaction;
102import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
103import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
104import org.apache.activemq.store.kahadb.disk.util.Marshaller;
105import org.apache.activemq.store.kahadb.disk.util.Sequence;
106import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
107import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
108import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
109import org.apache.activemq.util.ByteSequence;
110import org.apache.activemq.util.DataByteArrayInputStream;
111import org.apache.activemq.util.DataByteArrayOutputStream;
112import org.apache.activemq.util.IOExceptionSupport;
113import org.apache.activemq.util.IOHelper;
114import org.apache.activemq.util.ServiceStopper;
115import org.apache.activemq.util.ServiceSupport;
116import org.apache.activemq.util.ThreadPoolUtils;
117import org.slf4j.Logger;
118import org.slf4j.LoggerFactory;
119import org.slf4j.MDC;
120
121public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
122
123    protected BrokerService brokerService;
124
125    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
126    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
127    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
128    protected static final Buffer UNMATCHED;
129    static {
130        UNMATCHED = new Buffer(new byte[]{});
131    }
132    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
133
134    static final int CLOSED_STATE = 1;
135    static final int OPEN_STATE = 2;
136    static final long NOT_ACKED = -1;
137
138    static final int VERSION = 6;
139
140    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
141
142    protected class Metadata {
143        protected Page<Metadata> page;
144        protected int state;
145        protected BTreeIndex<String, StoredDestination> destinations;
146        protected Location lastUpdate;
147        protected Location firstInProgressTransactionLocation;
148        protected Location producerSequenceIdTrackerLocation = null;
149        protected Location ackMessageFileMapLocation = null;
150        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
151        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<>();
152        protected int version = VERSION;
153        protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
154
155        public void read(DataInput is) throws IOException {
156            state = is.readInt();
157            destinations = new BTreeIndex<>(pageFile, is.readLong());
158            if (is.readBoolean()) {
159                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
160            } else {
161                lastUpdate = null;
162            }
163            if (is.readBoolean()) {
164                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
165            } else {
166                firstInProgressTransactionLocation = null;
167            }
168            try {
169                if (is.readBoolean()) {
170                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
171                } else {
172                    producerSequenceIdTrackerLocation = null;
173                }
174            } catch (EOFException expectedOnUpgrade) {
175            }
176            try {
177                version = is.readInt();
178            } catch (EOFException expectedOnUpgrade) {
179                version = 1;
180            }
181            if (version >= 5 && is.readBoolean()) {
182                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
183            } else {
184                ackMessageFileMapLocation = null;
185            }
186            try {
187                openwireVersion = is.readInt();
188            } catch (EOFException expectedOnUpgrade) {
189                openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
190            }
191            LOG.info("KahaDB is version " + version);
192        }
193
194        public void write(DataOutput os) throws IOException {
195            os.writeInt(state);
196            os.writeLong(destinations.getPageId());
197
198            if (lastUpdate != null) {
199                os.writeBoolean(true);
200                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
201            } else {
202                os.writeBoolean(false);
203            }
204
205            if (firstInProgressTransactionLocation != null) {
206                os.writeBoolean(true);
207                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
208            } else {
209                os.writeBoolean(false);
210            }
211
212            if (producerSequenceIdTrackerLocation != null) {
213                os.writeBoolean(true);
214                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
215            } else {
216                os.writeBoolean(false);
217            }
218            os.writeInt(VERSION);
219            if (ackMessageFileMapLocation != null) {
220                os.writeBoolean(true);
221                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
222            } else {
223                os.writeBoolean(false);
224            }
225            os.writeInt(this.openwireVersion);
226        }
227    }
228
229    class MetadataMarshaller extends VariableMarshaller<Metadata> {
230        @Override
231        public Metadata readPayload(DataInput dataIn) throws IOException {
232            Metadata rc = createMetadata();
233            rc.read(dataIn);
234            return rc;
235        }
236
237        @Override
238        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
239            object.write(dataOut);
240        }
241    }
242
243    protected PageFile pageFile;
244    protected Journal journal;
245    protected Metadata metadata = new Metadata();
246
247    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
248
249    protected boolean failIfDatabaseIsLocked;
250
251    protected boolean deleteAllMessages;
252    protected File directory = DEFAULT_DIRECTORY;
253    protected File indexDirectory = null;
254    protected ScheduledExecutorService scheduler;
255    private final Object schedulerLock = new Object();
256
257    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
258    protected boolean archiveDataLogs;
259    protected File directoryArchive;
260    protected AtomicLong journalSize = new AtomicLong(0);
261    long journalDiskSyncInterval = 1000;
262    long checkpointInterval = 5*1000;
263    long cleanupInterval = 30*1000;
264    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
265    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
266    boolean enableIndexWriteAsync = false;
267    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
268    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
269    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
270
271    protected AtomicBoolean opened = new AtomicBoolean();
272    private boolean ignoreMissingJournalfiles = false;
273    private int indexCacheSize = 10000;
274    private boolean checkForCorruptJournalFiles = false;
275    private boolean checksumJournalFiles = true;
276    protected boolean forceRecoverIndex = false;
277    private boolean archiveCorruptedIndex = false;
278    private boolean useIndexLFRUEviction = false;
279    private float indexLFUEvictionFactor = 0.2f;
280    private boolean enableIndexDiskSyncs = true;
281    private boolean enableIndexRecoveryFile = true;
282    private boolean enableIndexPageCaching = true;
283    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
284
285    private boolean enableAckCompaction = true;
286    private int compactAcksAfterNoGC = 10;
287    private boolean compactAcksIgnoresStoreGrowth = false;
288    private int checkPointCyclesWithNoGC;
289    private int journalLogOnLastCompactionCheck;
290    private boolean enableSubscriptionStatistics = false;
291
292    //only set when using JournalDiskSyncStrategy.PERIODIC
293    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
294
295    @Override
296    public void doStart() throws Exception {
297        load();
298    }
299
300    @Override
301    public void doStop(ServiceStopper stopper) throws Exception {
302        unload();
303    }
304
305    public void allowIOResumption() {
306        if (pageFile != null) {
307            pageFile.allowIOResumption();
308        }
309        if (journal != null) {
310            journal.allowIOResumption();
311        }
312    }
313
314    private void loadPageFile() throws IOException {
315        this.indexLock.writeLock().lock();
316        try {
317            final PageFile pageFile = getPageFile();
318            pageFile.load();
319            pageFile.tx().execute(new Transaction.Closure<IOException>() {
320                @Override
321                public void execute(Transaction tx) throws IOException {
322                    if (pageFile.getPageCount() == 0) {
323                        // First time this is created.. Initialize the metadata
324                        Page<Metadata> page = tx.allocate();
325                        assert page.getPageId() == 0;
326                        page.set(metadata);
327                        metadata.page = page;
328                        metadata.state = CLOSED_STATE;
329                        metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId());
330
331                        tx.store(metadata.page, metadataMarshaller, true);
332                    } else {
333                        Page<Metadata> page = tx.load(0, metadataMarshaller);
334                        metadata = page.get();
335                        metadata.page = page;
336                    }
337                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
338                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
339                    metadata.destinations.load(tx);
340                }
341            });
342            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
343            // Perhaps we should just keep an index of file
344            storedDestinations.clear();
345            pageFile.tx().execute(new Transaction.Closure<IOException>() {
346                @Override
347                public void execute(Transaction tx) throws IOException {
348                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
349                        Entry<String, StoredDestination> entry = iterator.next();
350                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
351                        storedDestinations.put(entry.getKey(), sd);
352
353                        if (checkForCorruptJournalFiles) {
354                            // sanity check the index also
355                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
356                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
357                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
358                                }
359                            }
360                        }
361                    }
362                }
363            });
364            pageFile.flush();
365        } finally {
366            this.indexLock.writeLock().unlock();
367        }
368    }
369
370    private void startCheckpoint() {
371        if (checkpointInterval == 0 && cleanupInterval == 0) {
372            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
373            return;
374        }
375        synchronized (schedulerLock) {
376            if (scheduler == null || scheduler.isShutdown()) {
377                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
378
379                    @Override
380                    public Thread newThread(Runnable r) {
381                        Thread schedulerThread = new Thread(r);
382
383                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
384                        schedulerThread.setDaemon(true);
385
386                        return schedulerThread;
387                    }
388                });
389
390                // Short intervals for check-point and cleanups
391                long delay;
392                if (journal.isJournalDiskSyncPeriodic()) {
393                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
394                } else {
395                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
396                }
397
398                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
399            }
400        }
401    }
402
403    private final class CheckpointRunner implements Runnable {
404
405        private long lastCheckpoint = System.currentTimeMillis();
406        private long lastCleanup = System.currentTimeMillis();
407        private long lastSync = System.currentTimeMillis();
408        private Location lastAsyncUpdate = null;
409
410        @Override
411        public void run() {
412            try {
413                // Decide on cleanup vs full checkpoint here.
414                if (opened.get()) {
415                    long now = System.currentTimeMillis();
416                    if (journal.isJournalDiskSyncPeriodic() &&
417                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
418                        Location currentUpdate = lastAsyncJournalUpdate.get();
419                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
420                            lastAsyncUpdate = currentUpdate;
421                            if (LOG.isTraceEnabled()) {
422                                LOG.trace("Writing trace command to trigger journal sync");
423                            }
424                            store(new KahaTraceCommand(), true, null, null);
425                        }
426                        lastSync = now;
427                    }
428                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
429                        checkpointCleanup(true);
430                        lastCleanup = now;
431                        lastCheckpoint = now;
432                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
433                        checkpointCleanup(false);
434                        lastCheckpoint = now;
435                    }
436                }
437            } catch (IOException ioe) {
438                LOG.error("Checkpoint failed", ioe);
439                brokerService.handleIOException(ioe);
440            } catch (Throwable e) {
441                LOG.error("Checkpoint failed", e);
442                brokerService.handleIOException(IOExceptionSupport.create(e));
443            }
444        }
445    }
446
447    public void open() throws IOException {
448        if( opened.compareAndSet(false, true) ) {
449            getJournal().start();
450            try {
451                loadPageFile();
452            } catch (Throwable t) {
453                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
454                if (LOG.isDebugEnabled()) {
455                    LOG.debug("Index load failure", t);
456                }
457                // try to recover index
458                try {
459                    pageFile.unload();
460                } catch (Exception ignore) {}
461                if (archiveCorruptedIndex) {
462                    pageFile.archive();
463                } else {
464                    pageFile.delete();
465                }
466                metadata = createMetadata();
467                //The metadata was recreated after a detect corruption so we need to
468                //reconfigure anything that was configured on the old metadata on startup
469                configureMetadata();
470                pageFile = null;
471                loadPageFile();
472            }
473            recover();
474            startCheckpoint();
475        }
476    }
477
478    public void load() throws IOException {
479        this.indexLock.writeLock().lock();
480        try {
481            IOHelper.mkdirs(directory);
482            if (deleteAllMessages) {
483                getJournal().setCheckForCorruptionOnStartup(false);
484                getJournal().start();
485                getJournal().delete();
486                getJournal().close();
487                journal = null;
488                getPageFile().delete();
489                LOG.info("Persistence store purged.");
490                deleteAllMessages = false;
491            }
492
493            open();
494            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
495        } finally {
496            this.indexLock.writeLock().unlock();
497        }
498    }
499
500    public void close() throws IOException, InterruptedException {
501        if (opened.compareAndSet(true, false)) {
502            checkpointLock.writeLock().lock();
503            try {
504                if (metadata.page != null) {
505                    checkpointUpdate(true);
506                }
507                pageFile.unload();
508                metadata = createMetadata();
509            } finally {
510                checkpointLock.writeLock().unlock();
511            }
512            journal.close();
513            synchronized(schedulerLock) {
514                if (scheduler != null) {
515                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
516                    scheduler = null;
517                }
518            }
519            // clear the cache and journalSize on shutdown of the store
520            storeCache.clear();
521            journalSize.set(0);
522        }
523    }
524
525    public void unload() throws IOException, InterruptedException {
526        this.indexLock.writeLock().lock();
527        try {
528            if( pageFile != null && pageFile.isLoaded() ) {
529                metadata.state = CLOSED_STATE;
530                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
531
532                if (metadata.page != null) {
533                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
534                        @Override
535                        public void execute(Transaction tx) throws IOException {
536                            tx.store(metadata.page, metadataMarshaller, true);
537                        }
538                    });
539                }
540            }
541        } finally {
542            this.indexLock.writeLock().unlock();
543        }
544        close();
545    }
546
547    // public for testing
548    @SuppressWarnings("rawtypes")
549    public Location[] getInProgressTxLocationRange() {
550        Location[] range = new Location[]{null, null};
551        synchronized (inflightTransactions) {
552            if (!inflightTransactions.isEmpty()) {
553                for (List<Operation> ops : inflightTransactions.values()) {
554                    if (!ops.isEmpty()) {
555                        trackMaxAndMin(range, ops);
556                    }
557                }
558            }
559            if (!preparedTransactions.isEmpty()) {
560                for (List<Operation> ops : preparedTransactions.values()) {
561                    if (!ops.isEmpty()) {
562                        trackMaxAndMin(range, ops);
563                    }
564                }
565            }
566        }
567        return range;
568    }
569
570    @SuppressWarnings("rawtypes")
571    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
572        Location t = ops.get(0).getLocation();
573        if (range[0] == null || t.compareTo(range[0]) <= 0) {
574            range[0] = t;
575        }
576        t = ops.get(ops.size() -1).getLocation();
577        if (range[1] == null || t.compareTo(range[1]) >= 0) {
578            range[1] = t;
579        }
580    }
581
582    class TranInfo {
583        TransactionId id;
584        Location location;
585
586        class opCount {
587            int add;
588            int remove;
589        }
590        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<>();
591
592        @SuppressWarnings("rawtypes")
593        public void track(Operation operation) {
594            if (location == null ) {
595                location = operation.getLocation();
596            }
597            KahaDestination destination;
598            boolean isAdd = false;
599            if (operation instanceof AddOperation) {
600                AddOperation add = (AddOperation) operation;
601                destination = add.getCommand().getDestination();
602                isAdd = true;
603            } else {
604                RemoveOperation removeOpperation = (RemoveOperation) operation;
605                destination = removeOpperation.getCommand().getDestination();
606            }
607            opCount opCount = destinationOpCount.get(destination);
608            if (opCount == null) {
609                opCount = new opCount();
610                destinationOpCount.put(destination, opCount);
611            }
612            if (isAdd) {
613                opCount.add++;
614            } else {
615                opCount.remove++;
616            }
617        }
618
619        @Override
620        public String toString() {
621           StringBuffer buffer = new StringBuffer();
622           buffer.append(location).append(";").append(id).append(";\n");
623           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
624               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
625           }
626           return buffer.toString();
627        }
628    }
629
630    @SuppressWarnings("rawtypes")
631    public String getTransactions() {
632
633        ArrayList<TranInfo> infos = new ArrayList<>();
634        synchronized (inflightTransactions) {
635            if (!inflightTransactions.isEmpty()) {
636                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
637                    TranInfo info = new TranInfo();
638                    info.id = entry.getKey();
639                    for (Operation operation : entry.getValue()) {
640                        info.track(operation);
641                    }
642                    infos.add(info);
643                }
644            }
645        }
646        synchronized (preparedTransactions) {
647            if (!preparedTransactions.isEmpty()) {
648                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
649                    TranInfo info = new TranInfo();
650                    info.id = entry.getKey();
651                    for (Operation operation : entry.getValue()) {
652                        info.track(operation);
653                    }
654                    infos.add(info);
655                }
656            }
657        }
658        return infos.toString();
659    }
660
661    /**
662     * Move all the messages that were in the journal into long term storage. We
663     * just replay and do a checkpoint.
664     *
665     * @throws IOException
666     * @throws IOException
667     * @throws IllegalStateException
668     */
669    private void recover() throws IllegalStateException, IOException {
670        this.indexLock.writeLock().lock();
671        try {
672
673            long start = System.currentTimeMillis();
674            boolean requiresJournalReplay = recoverProducerAudit();
675            requiresJournalReplay |= recoverAckMessageFileMap();
676            Location lastIndoubtPosition = getRecoveryPosition();
677            Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
678            if (recoveryPosition != null) {
679                int redoCounter = 0;
680                int dataFileRotationTracker = recoveryPosition.getDataFileId();
681                LOG.info("Recovering from the journal @" + recoveryPosition);
682                while (recoveryPosition != null) {
683                    try {
684                        JournalCommand<?> message = load(recoveryPosition);
685                        metadata.lastUpdate = recoveryPosition;
686                        process(message, recoveryPosition, lastIndoubtPosition);
687                        redoCounter++;
688                    } catch (IOException failedRecovery) {
689                        if (isIgnoreMissingJournalfiles()) {
690                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
691                            // track this dud location
692                            journal.corruptRecoveryLocation(recoveryPosition);
693                        } else {
694                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
695                        }
696                    }
697                    recoveryPosition = journal.getNextLocation(recoveryPosition);
698                    // hold on to the minimum number of open files during recovery
699                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
700                        dataFileRotationTracker = recoveryPosition.getDataFileId();
701                        journal.cleanup();
702                    }
703                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
704                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
705                    }
706                }
707                if (LOG.isInfoEnabled()) {
708                    long end = System.currentTimeMillis();
709                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
710                }
711            }
712
713            // We may have to undo some index updates.
714            pageFile.tx().execute(new Transaction.Closure<IOException>() {
715                @Override
716                public void execute(Transaction tx) throws IOException {
717                    recoverIndex(tx);
718                }
719            });
720
721            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
722            Set<TransactionId> toRollback = new HashSet<>();
723            Set<TransactionId> toDiscard = new HashSet<>();
724            synchronized (inflightTransactions) {
725                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
726                    TransactionId id = it.next();
727                    if (id.isLocalTransaction()) {
728                        toRollback.add(id);
729                    } else {
730                        toDiscard.add(id);
731                    }
732                }
733                for (TransactionId tx: toRollback) {
734                    if (LOG.isDebugEnabled()) {
735                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
736                    }
737                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
738                }
739                for (TransactionId tx: toDiscard) {
740                    if (LOG.isDebugEnabled()) {
741                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
742                    }
743                    inflightTransactions.remove(tx);
744                }
745            }
746
747            synchronized (preparedTransactions) {
748                for (TransactionId txId : preparedTransactions.keySet()) {
749                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
750                }
751            }
752
753        } finally {
754            this.indexLock.writeLock().unlock();
755        }
756    }
757
758    @SuppressWarnings("unused")
759    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
760        return TransactionIdConversion.convertToLocal(tx);
761    }
762
763    private Location minimum(Location x,
764                             Location y) {
765        Location min = null;
766        if (x != null) {
767            min = x;
768            if (y != null) {
769                int compare = y.compareTo(x);
770                if (compare < 0) {
771                    min = y;
772                }
773            }
774        } else {
775            min = y;
776        }
777        return min;
778    }
779
780    private boolean recoverProducerAudit() throws IOException {
781        boolean requiresReplay = true;
782        if (metadata.producerSequenceIdTrackerLocation != null) {
783            try {
784                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
785                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
786                int maxNumProducers = getMaxFailoverProducersToTrack();
787                int maxAuditDepth = getFailoverProducersAuditDepth();
788                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
789                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
790                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
791                requiresReplay = false;
792            } catch (Exception e) {
793                LOG.warn("Cannot recover message audit", e);
794            }
795        }
796        // got no audit stored so got to recreate via replay from start of the journal
797        return requiresReplay;
798    }
799
800    @SuppressWarnings("unchecked")
801    private boolean recoverAckMessageFileMap() throws IOException {
802        boolean requiresReplay = true;
803        if (metadata.ackMessageFileMapLocation != null) {
804            try {
805                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
806                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
807                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
808                requiresReplay = false;
809            } catch (Exception e) {
810                LOG.warn("Cannot recover ackMessageFileMap", e);
811            }
812        }
813        // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
814        return requiresReplay;
815    }
816
817    protected void recoverIndex(Transaction tx) throws IOException {
818        long start = System.currentTimeMillis();
819        // It is possible index updates got applied before the journal updates..
820        // in that case we need to removed references to messages that are not in the journal
821        final Location lastAppendLocation = journal.getLastAppendLocation();
822        long undoCounter=0;
823
824        // Go through all the destinations to see if they have messages past the lastAppendLocation
825        for (String key : storedDestinations.keySet()) {
826            StoredDestination sd = storedDestinations.get(key);
827
828            final ArrayList<Long> matches = new ArrayList<>();
829            // Find all the Locations that are >= than the last Append Location.
830            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
831                @Override
832                protected void matched(Location key, Long value) {
833                    matches.add(value);
834                }
835            });
836
837            for (Long sequenceId : matches) {
838                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
839                if (keys != null) {
840                    sd.locationIndex.remove(tx, keys.location);
841                    sd.messageIdIndex.remove(tx, keys.messageId);
842                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
843                    undoCounter++;
844                    decrementAndSubSizeToStoreStat(key, keys.location.getSize());
845                    // TODO: do we need to modify the ack positions for the pub sub case?
846                }
847            }
848        }
849
850        if (undoCounter > 0) {
851            // The rolledback operations are basically in flight journal writes.  To avoid getting
852            // these the end user should do sync writes to the journal.
853            if (LOG.isInfoEnabled()) {
854                long end = System.currentTimeMillis();
855                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
856            }
857        }
858
859        undoCounter = 0;
860        start = System.currentTimeMillis();
861
862        // Lets be extra paranoid here and verify that all the datafiles being referenced
863        // by the indexes still exists.
864
865        final SequenceSet ss = new SequenceSet();
866        for (StoredDestination sd : storedDestinations.values()) {
867            // Use a visitor to cut down the number of pages that we load
868            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
869                int last=-1;
870
871                @Override
872                public boolean isInterestedInKeysBetween(Location first, Location second) {
873                    if( first==null ) {
874                        return !ss.contains(0, second.getDataFileId());
875                    } else if( second==null ) {
876                        return true;
877                    } else {
878                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
879                    }
880                }
881
882                @Override
883                public void visit(List<Location> keys, List<Long> values) {
884                    for (Location l : keys) {
885                        int fileId = l.getDataFileId();
886                        if( last != fileId ) {
887                            ss.add(fileId);
888                            last = fileId;
889                        }
890                    }
891                }
892
893            });
894        }
895        HashSet<Integer> missingJournalFiles = new HashSet<>();
896        while (!ss.isEmpty()) {
897            missingJournalFiles.add((int) ss.removeFirst());
898        }
899
900        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
901            missingJournalFiles.add(entry.getKey());
902            for (Integer i : entry.getValue()) {
903                missingJournalFiles.add(i);
904            }
905        }
906
907        missingJournalFiles.removeAll(journal.getFileMap().keySet());
908
909        if (!missingJournalFiles.isEmpty()) {
910            LOG.warn("Some journal files are missing: " + missingJournalFiles);
911        }
912
913        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<>();
914        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<>();
915        for (Integer missing : missingJournalFiles) {
916            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
917        }
918
919        if (checkForCorruptJournalFiles) {
920            Collection<DataFile> dataFiles = journal.getFileMap().values();
921            for (DataFile dataFile : dataFiles) {
922                int id = dataFile.getDataFileId();
923                // eof to next file id
924                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
925                Sequence seq = dataFile.getCorruptedBlocks().getHead();
926                while (seq != null) {
927                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
928                        new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
929                    missingPredicates.add(visitor);
930                    knownCorruption.add(visitor);
931                    seq = seq.getNext();
932                }
933            }
934        }
935
936        if (!missingPredicates.isEmpty()) {
937            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
938                final StoredDestination sd = sdEntry.getValue();
939                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<>();
940                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
941                    @Override
942                    protected void matched(Location key, Long value) {
943                        matches.put(value, key);
944                    }
945                });
946
947                // If some message references are affected by the missing data files...
948                if (!matches.isEmpty()) {
949
950                    // We either 'gracefully' recover dropping the missing messages or
951                    // we error out.
952                    if( ignoreMissingJournalfiles ) {
953                        // Update the index to remove the references to the missing data
954                        for (Long sequenceId : matches.keySet()) {
955                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
956                            sd.locationIndex.remove(tx, keys.location);
957                            sd.messageIdIndex.remove(tx, keys.messageId);
958                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
959                            undoCounter++;
960                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
961                            // TODO: do we need to modify the ack positions for the pub sub case?
962                        }
963                    } else {
964                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
965                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
966                    }
967                }
968            }
969        }
970
971        if (!ignoreMissingJournalfiles) {
972            if (!knownCorruption.isEmpty()) {
973                LOG.error("Detected corrupt journal files. " + knownCorruption);
974                throw new IOException("Detected corrupt journal files. " + knownCorruption);
975            }
976
977            if (!missingJournalFiles.isEmpty()) {
978                LOG.error("Detected missing journal files. " + missingJournalFiles);
979                throw new IOException("Detected missing journal files. " + missingJournalFiles);
980            }
981        }
982
983        if (undoCounter > 0) {
984            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
985            // should do sync writes to the journal.
986            if (LOG.isInfoEnabled()) {
987                long end = System.currentTimeMillis();
988                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
989            }
990        }
991    }
992
993    private Location nextRecoveryPosition;
994    private Location lastRecoveryPosition;
995
996    public void incrementalRecover() throws IOException {
997        this.indexLock.writeLock().lock();
998        try {
999            if( nextRecoveryPosition == null ) {
1000                if( lastRecoveryPosition==null ) {
1001                    nextRecoveryPosition = getRecoveryPosition();
1002                } else {
1003                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1004                }
1005            }
1006            while (nextRecoveryPosition != null) {
1007                lastRecoveryPosition = nextRecoveryPosition;
1008                metadata.lastUpdate = lastRecoveryPosition;
1009                JournalCommand<?> message = load(lastRecoveryPosition);
1010                process(message, lastRecoveryPosition, (IndexAware) null);
1011                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1012            }
1013        } finally {
1014            this.indexLock.writeLock().unlock();
1015        }
1016    }
1017
1018    public Location getLastUpdatePosition() throws IOException {
1019        return metadata.lastUpdate;
1020    }
1021
1022    private Location getRecoveryPosition() throws IOException {
1023
1024        if (!this.forceRecoverIndex) {
1025
1026            // If we need to recover the transactions..
1027            if (metadata.firstInProgressTransactionLocation != null) {
1028                return metadata.firstInProgressTransactionLocation;
1029            }
1030
1031            // Perhaps there were no transactions...
1032            if( metadata.lastUpdate!=null) {
1033                // Start replay at the record after the last one recorded in the index file.
1034                return getNextInitializedLocation(metadata.lastUpdate);
1035            }
1036        }
1037        // This loads the first position.
1038        return journal.getNextLocation(null);
1039    }
1040
1041    private Location getNextInitializedLocation(Location location) throws IOException {
1042        Location mayNotBeInitialized = journal.getNextLocation(location);
1043        if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
1044            // need to init size and type to skip
1045            return journal.getNextLocation(mayNotBeInitialized);
1046        } else {
1047            return mayNotBeInitialized;
1048        }
1049    }
1050
1051    protected void checkpointCleanup(final boolean cleanup) throws IOException {
1052        long start;
1053        this.indexLock.writeLock().lock();
1054        try {
1055            start = System.currentTimeMillis();
1056            if( !opened.get() ) {
1057                return;
1058            }
1059        } finally {
1060            this.indexLock.writeLock().unlock();
1061        }
1062        checkpointUpdate(cleanup);
1063        long end = System.currentTimeMillis();
1064        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1065            if (LOG.isInfoEnabled()) {
1066                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1067            }
1068        }
1069    }
1070
1071    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1072        int size = data.serializedSizeFramed();
1073        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1074        os.writeByte(data.type().getNumber());
1075        data.writeFramed(os);
1076        return os.toByteSequence();
1077    }
1078
1079    // /////////////////////////////////////////////////////////////////
1080    // Methods call by the broker to update and query the store.
1081    // /////////////////////////////////////////////////////////////////
1082    public Location store(JournalCommand<?> data) throws IOException {
1083        return store(data, false, null,null);
1084    }
1085
1086    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1087        return store(data, false, null, null, onJournalStoreComplete);
1088    }
1089
1090    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1091        return store(data, sync, before, after, null);
1092    }
1093
1094    /**
1095     * All updated are are funneled through this method. The updates are converted
1096     * to a JournalMessage which is logged to the journal and then the data from
1097     * the JournalMessage is used to update the index just like it would be done
1098     * during a recovery process.
1099     */
1100    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1101        try {
1102            ByteSequence sequence = toByteSequence(data);
1103            Location location;
1104
1105            checkpointLock.readLock().lock();
1106            try {
1107
1108                long start = System.currentTimeMillis();
1109                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1110                long start2 = System.currentTimeMillis();
1111                //Track the last async update so we know if we need to sync at the next checkpoint
1112                if (!sync && journal.isJournalDiskSyncPeriodic()) {
1113                    lastAsyncJournalUpdate.set(location);
1114                }
1115                process(data, location, before);
1116
1117                long end = System.currentTimeMillis();
1118                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1119                    if (LOG.isInfoEnabled()) {
1120                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1121                    }
1122                }
1123            } finally {
1124                checkpointLock.readLock().unlock();
1125            }
1126
1127            if (after != null) {
1128                after.run();
1129            }
1130
1131            if (scheduler == null && opened.get()) {
1132                startCheckpoint();
1133            }
1134            return location;
1135        } catch (IOException ioe) {
1136            LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
1137            brokerService.handleIOException(ioe);
1138            throw ioe;
1139        }
1140    }
1141
1142    /**
1143     * Loads a previously stored JournalMessage
1144     *
1145     * @param location
1146     * @return
1147     * @throws IOException
1148     */
1149    public JournalCommand<?> load(Location location) throws IOException {
1150        long start = System.currentTimeMillis();
1151        ByteSequence data = journal.read(location);
1152        long end = System.currentTimeMillis();
1153        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1154            if (LOG.isInfoEnabled()) {
1155                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1156            }
1157        }
1158        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1159        byte readByte = is.readByte();
1160        KahaEntryType type = KahaEntryType.valueOf(readByte);
1161        if( type == null ) {
1162            try {
1163                is.close();
1164            } catch (IOException e) {}
1165            throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location);
1166        }
1167        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1168        message.mergeFramed(is);
1169        return message;
1170    }
1171
1172    /**
1173     * do minimal recovery till we reach the last inDoubtLocation
1174     * @param data
1175     * @param location
1176     * @param inDoubtlocation
1177     * @throws IOException
1178     */
1179    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1180        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1181            process(data, location, (IndexAware) null);
1182        } else {
1183            // just recover producer audit
1184            data.visit(new Visitor() {
1185                @Override
1186                public void visit(KahaAddMessageCommand command) throws IOException {
1187                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1188                }
1189            });
1190        }
1191    }
1192
1193    // /////////////////////////////////////////////////////////////////
1194    // Journaled record processing methods. Once the record is journaled,
1195    // these methods handle applying the index updates. These may be called
1196    // from the recovery method too so they need to be idempotent
1197    // /////////////////////////////////////////////////////////////////
1198
1199    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1200        data.visit(new Visitor() {
1201            @Override
1202            public void visit(KahaAddMessageCommand command) throws IOException {
1203                process(command, location, onSequenceAssignedCallback);
1204            }
1205
1206            @Override
1207            public void visit(KahaRemoveMessageCommand command) throws IOException {
1208                process(command, location);
1209            }
1210
1211            @Override
1212            public void visit(KahaPrepareCommand command) throws IOException {
1213                process(command, location);
1214            }
1215
1216            @Override
1217            public void visit(KahaCommitCommand command) throws IOException {
1218                process(command, location, onSequenceAssignedCallback);
1219            }
1220
1221            @Override
1222            public void visit(KahaRollbackCommand command) throws IOException {
1223                process(command, location);
1224            }
1225
1226            @Override
1227            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1228                process(command, location);
1229            }
1230
1231            @Override
1232            public void visit(KahaSubscriptionCommand command) throws IOException {
1233                process(command, location);
1234            }
1235
1236            @Override
1237            public void visit(KahaProducerAuditCommand command) throws IOException {
1238                processLocation(location);
1239            }
1240
1241            @Override
1242            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1243                processLocation(location);
1244            }
1245
1246            @Override
1247            public void visit(KahaTraceCommand command) {
1248                processLocation(location);
1249            }
1250
1251            @Override
1252            public void visit(KahaUpdateMessageCommand command) throws IOException {
1253                process(command, location);
1254            }
1255
1256            @Override
1257            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1258                process(command, location);
1259            }
1260        });
1261    }
1262
1263    @SuppressWarnings("rawtypes")
1264    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1265        if (command.hasTransactionInfo()) {
1266            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1267            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1268        } else {
1269            this.indexLock.writeLock().lock();
1270            try {
1271                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1272                    @Override
1273                    public void execute(Transaction tx) throws IOException {
1274                        long assignedIndex = updateIndex(tx, command, location);
1275                        if (runWithIndexLock != null) {
1276                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1277                        }
1278                    }
1279                });
1280
1281            } finally {
1282                this.indexLock.writeLock().unlock();
1283            }
1284        }
1285    }
1286
1287    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1288        this.indexLock.writeLock().lock();
1289        try {
1290            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1291                @Override
1292                public void execute(Transaction tx) throws IOException {
1293                    updateIndex(tx, command, location);
1294                }
1295            });
1296        } finally {
1297            this.indexLock.writeLock().unlock();
1298        }
1299    }
1300
1301    @SuppressWarnings("rawtypes")
1302    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1303        if (command.hasTransactionInfo()) {
1304           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1305           inflightTx.add(new RemoveOperation(command, location));
1306        } else {
1307            this.indexLock.writeLock().lock();
1308            try {
1309                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1310                    @Override
1311                    public void execute(Transaction tx) throws IOException {
1312                        updateIndex(tx, command, location);
1313                    }
1314                });
1315            } finally {
1316                this.indexLock.writeLock().unlock();
1317            }
1318        }
1319    }
1320
1321    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1322        this.indexLock.writeLock().lock();
1323        try {
1324            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1325                @Override
1326                public void execute(Transaction tx) throws IOException {
1327                    updateIndex(tx, command, location);
1328                }
1329            });
1330        } finally {
1331            this.indexLock.writeLock().unlock();
1332        }
1333    }
1334
1335    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1336        this.indexLock.writeLock().lock();
1337        try {
1338            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1339                @Override
1340                public void execute(Transaction tx) throws IOException {
1341                    updateIndex(tx, command, location);
1342                }
1343            });
1344        } finally {
1345            this.indexLock.writeLock().unlock();
1346        }
1347    }
1348
1349    protected void processLocation(final Location location) {
1350        this.indexLock.writeLock().lock();
1351        try {
1352            metadata.lastUpdate = location;
1353        } finally {
1354            this.indexLock.writeLock().unlock();
1355        }
1356    }
1357
1358    @SuppressWarnings("rawtypes")
1359    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1360        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1361        List<Operation> inflightTx;
1362        synchronized (inflightTransactions) {
1363            inflightTx = inflightTransactions.remove(key);
1364            if (inflightTx == null) {
1365                inflightTx = preparedTransactions.remove(key);
1366            }
1367        }
1368        if (inflightTx == null) {
1369            // only non persistent messages in this tx
1370            if (before != null) {
1371                before.sequenceAssignedWithIndexLocked(-1);
1372            }
1373            return;
1374        }
1375
1376        final List<Operation> messagingTx = inflightTx;
1377        indexLock.writeLock().lock();
1378        try {
1379            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1380                @Override
1381                public void execute(Transaction tx) throws IOException {
1382                    for (Operation op : messagingTx) {
1383                        op.execute(tx);
1384                    }
1385                }
1386            });
1387            metadata.lastUpdate = location;
1388        } finally {
1389            indexLock.writeLock().unlock();
1390        }
1391    }
1392
1393    @SuppressWarnings("rawtypes")
1394    protected void process(KahaPrepareCommand command, Location location) {
1395        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1396        synchronized (inflightTransactions) {
1397            List<Operation> tx = inflightTransactions.remove(key);
1398            if (tx != null) {
1399                preparedTransactions.put(key, tx);
1400            }
1401        }
1402    }
1403
1404    @SuppressWarnings("rawtypes")
1405    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1406        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1407        List<Operation> updates = null;
1408        synchronized (inflightTransactions) {
1409            updates = inflightTransactions.remove(key);
1410            if (updates == null) {
1411                updates = preparedTransactions.remove(key);
1412            }
1413        }
1414    }
1415
1416    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1417        final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
1418
1419        // Mark the current journal file as a compacted file so that gc checks can skip
1420        // over logs that are smaller compaction type logs.
1421        DataFile current = journal.getDataFileById(location.getDataFileId());
1422        current.setTypeCode(command.getRewriteType());
1423
1424        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1425            // Move offset so that next location read jumps to next file.
1426            location.setOffset(journalMaxFileLength);
1427        }
1428    }
1429
1430    // /////////////////////////////////////////////////////////////////
1431    // These methods do the actual index updates.
1432    // /////////////////////////////////////////////////////////////////
1433
1434    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1435    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<>();
1436
1437    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1438        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1439
1440        // Skip adding the message to the index if this is a topic and there are
1441        // no subscriptions.
1442        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1443            return -1;
1444        }
1445
1446        // Add the message.
1447        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1448        long id = sd.orderIndex.getNextMessageId();
1449        Long previous = sd.locationIndex.put(tx, location, id);
1450        if (previous == null) {
1451            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1452            if (previous == null) {
1453                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1454                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1455                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1456                    addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
1457                }
1458                metadata.lastUpdate = location;
1459            } else {
1460
1461                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1462                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1463                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1464                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1465                }
1466                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1467                sd.locationIndex.remove(tx, location);
1468                id = -1;
1469            }
1470        } else {
1471            // restore the previous value.. Looks like this was a redo of a previously
1472            // added message. We don't want to assign it a new id as the other indexes would
1473            // be wrong..
1474            sd.locationIndex.put(tx, location, previous);
1475            // ensure sequence is not broken
1476            sd.orderIndex.revertNextMessageId();
1477            metadata.lastUpdate = location;
1478        }
1479        // record this id in any event, initial send or recovery
1480        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1481
1482       return id;
1483    }
1484
1485    void trackPendingAdd(KahaDestination destination, Long seq) {
1486        StoredDestination sd = storedDestinations.get(key(destination));
1487        if (sd != null) {
1488            sd.trackPendingAdd(seq);
1489        }
1490    }
1491
1492    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1493        StoredDestination sd = storedDestinations.get(key(destination));
1494        if (sd != null) {
1495            sd.trackPendingAddComplete(seq);
1496        }
1497    }
1498
1499    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1500        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1501        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1502
1503        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1504        if (id != null) {
1505            MessageKeys previousKeys = sd.orderIndex.put(
1506                    tx,
1507                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1508                    id,
1509                    new MessageKeys(command.getMessageId(), location)
1510            );
1511            sd.locationIndex.put(tx, location, id);
1512            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
1513
1514            if (previousKeys != null) {
1515                //Remove the existing from the size
1516                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
1517
1518                //update all the subscription metrics
1519                if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) {
1520                    Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx);
1521                    while (iter.hasNext()) {
1522                        Entry<String, SequenceSet> e = iter.next();
1523                        if (e.getValue().contains(id)) {
1524                            incrementAndAddSizeToStoreStat(key(command.getDestination()), e.getKey(), location.getSize());
1525                            decrementAndSubSizeToStoreStat(key(command.getDestination()), e.getKey(), previousKeys.location.getSize());
1526                        }
1527                    }
1528                }
1529
1530                // on first update previous is original location, on recovery/replay it may be the updated location
1531                if(!previousKeys.location.equals(location)) {
1532                    sd.locationIndex.remove(tx, previousKeys.location);
1533                }
1534            }
1535            metadata.lastUpdate = location;
1536        } else {
1537            //Add the message if it can't be found
1538            this.updateIndex(tx, command, location);
1539        }
1540    }
1541
1542    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1543        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1544        if (!command.hasSubscriptionKey()) {
1545
1546            // In the queue case we just remove the message from the index..
1547            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1548            if (sequenceId != null) {
1549                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1550                if (keys != null) {
1551                    sd.locationIndex.remove(tx, keys.location);
1552                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
1553                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1554                    metadata.lastUpdate = ackLocation;
1555                }  else if (LOG.isDebugEnabled()) {
1556                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1557                }
1558            } else if (LOG.isDebugEnabled()) {
1559                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1560            }
1561        } else {
1562            // In the topic case we need remove the message once it's been acked
1563            // by all the subs
1564            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1565
1566            // Make sure it's a valid message id...
1567            if (sequence != null) {
1568                String subscriptionKey = command.getSubscriptionKey();
1569                if (command.getAck() != UNMATCHED) {
1570                    sd.orderIndex.get(tx, sequence);
1571                    byte priority = sd.orderIndex.lastGetPriority();
1572                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1573                }
1574
1575                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1576                if (keys != null) {
1577                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1578                }
1579                // The following method handles deleting un-referenced messages.
1580                removeAckLocation(command, tx, sd, subscriptionKey, sequence);
1581                metadata.lastUpdate = ackLocation;
1582            } else if (LOG.isDebugEnabled()) {
1583                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1584            }
1585
1586        }
1587    }
1588
1589    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1590        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1591        if (referenceFileIds == null) {
1592            referenceFileIds = new HashSet<>();
1593            referenceFileIds.add(messageLocation.getDataFileId());
1594            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1595        } else {
1596            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1597            if (!referenceFileIds.contains(id)) {
1598                referenceFileIds.add(id);
1599            }
1600        }
1601    }
1602
1603    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1604        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1605        sd.orderIndex.remove(tx);
1606
1607        sd.locationIndex.clear(tx);
1608        sd.locationIndex.unload(tx);
1609        tx.free(sd.locationIndex.getPageId());
1610
1611        sd.messageIdIndex.clear(tx);
1612        sd.messageIdIndex.unload(tx);
1613        tx.free(sd.messageIdIndex.getPageId());
1614
1615        if (sd.subscriptions != null) {
1616            sd.subscriptions.clear(tx);
1617            sd.subscriptions.unload(tx);
1618            tx.free(sd.subscriptions.getPageId());
1619
1620            sd.subscriptionAcks.clear(tx);
1621            sd.subscriptionAcks.unload(tx);
1622            tx.free(sd.subscriptionAcks.getPageId());
1623
1624            sd.ackPositions.clear(tx);
1625            sd.ackPositions.unload(tx);
1626            tx.free(sd.ackPositions.getHeadPageId());
1627
1628            sd.subLocations.clear(tx);
1629            sd.subLocations.unload(tx);
1630            tx.free(sd.subLocations.getHeadPageId());
1631        }
1632
1633        String key = key(command.getDestination());
1634        storedDestinations.remove(key);
1635        metadata.destinations.remove(tx, key);
1636        clearStoreStats(command.getDestination());
1637        storeCache.remove(key(command.getDestination()));
1638    }
1639
1640    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1641        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1642        final String subscriptionKey = command.getSubscriptionKey();
1643
1644        // If set then we are creating it.. otherwise we are destroying the sub
1645        if (command.hasSubscriptionInfo()) {
1646            Location existing = sd.subLocations.get(tx, subscriptionKey);
1647            if (existing != null && existing.compareTo(location) == 0) {
1648                // replay on recovery, ignore
1649                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1650                return;
1651            }
1652
1653            sd.subscriptions.put(tx, subscriptionKey, command);
1654            sd.subLocations.put(tx, subscriptionKey, location);
1655            long ackLocation=NOT_ACKED;
1656            if (!command.getRetroactive()) {
1657                ackLocation = sd.orderIndex.nextMessageId-1;
1658            } else {
1659                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1660            }
1661            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1662            sd.subscriptionCache.add(subscriptionKey);
1663        } else {
1664            // delete the sub...
1665            sd.subscriptions.remove(tx, subscriptionKey);
1666            sd.subLocations.remove(tx, subscriptionKey);
1667            sd.subscriptionAcks.remove(tx, subscriptionKey);
1668            sd.subscriptionCache.remove(subscriptionKey);
1669            removeAckLocationsForSub(command, tx, sd, subscriptionKey);
1670            MessageStoreSubscriptionStatistics subStats = getSubStats(key(command.getDestination()));
1671            if (subStats != null) {
1672                subStats.removeSubscription(subscriptionKey);
1673            }
1674
1675            if (sd.subscriptions.isEmpty(tx)) {
1676                // remove the stored destination
1677                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1678                removeDestinationCommand.setDestination(command.getDestination());
1679                updateIndex(tx, removeDestinationCommand, null);
1680                clearStoreStats(command.getDestination());
1681            }
1682        }
1683    }
1684
1685    private void checkpointUpdate(final boolean cleanup) throws IOException {
1686        checkpointLock.writeLock().lock();
1687        try {
1688            this.indexLock.writeLock().lock();
1689            try {
1690                Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() {
1691                    @Override
1692                    public Set<Integer> execute(Transaction tx) throws IOException {
1693                        return checkpointUpdate(tx, cleanup);
1694                    }
1695                });
1696                pageFile.flush();
1697                // after the index update such that partial removal does not leave dangling references in the index.
1698                journal.removeDataFiles(filesToGc);
1699            } finally {
1700                this.indexLock.writeLock().unlock();
1701            }
1702
1703        } finally {
1704            checkpointLock.writeLock().unlock();
1705        }
1706    }
1707
1708    /**
1709     * @param tx
1710     * @throws IOException
1711     */
1712    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1713        MDC.put("activemq.persistenceDir", getDirectory().getName());
1714        LOG.debug("Checkpoint started.");
1715
1716        // reflect last update exclusive of current checkpoint
1717        Location lastUpdate = metadata.lastUpdate;
1718
1719        metadata.state = OPEN_STATE;
1720        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1721        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1722        Location[] inProgressTxRange = getInProgressTxLocationRange();
1723        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1724        tx.store(metadata.page, metadataMarshaller, true);
1725
1726        final TreeSet<Integer> gcCandidateSet = new TreeSet<>();
1727        if (cleanup) {
1728
1729            final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
1730            gcCandidateSet.addAll(completeFileSet);
1731
1732            if (LOG.isTraceEnabled()) {
1733                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1734            }
1735
1736            if (lastUpdate != null) {
1737                // we won't delete past the last update, ackCompaction journal can be a candidate in error
1738                gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId())));
1739            }
1740
1741            // Don't GC files under replication
1742            if( journalFilesBeingReplicated!=null ) {
1743                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1744            }
1745
1746            if (metadata.producerSequenceIdTrackerLocation != null) {
1747                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1748                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1749                    // rewrite so we don't prevent gc
1750                    metadata.producerSequenceIdTracker.setModified(true);
1751                    if (LOG.isTraceEnabled()) {
1752                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1753                    }
1754                }
1755                gcCandidateSet.remove(dataFileId);
1756                if (LOG.isTraceEnabled()) {
1757                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet);
1758                }
1759            }
1760
1761            if (metadata.ackMessageFileMapLocation != null) {
1762                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1763                gcCandidateSet.remove(dataFileId);
1764                if (LOG.isTraceEnabled()) {
1765                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet);
1766                }
1767            }
1768
1769            // Don't GC files referenced by in-progress tx
1770            if (inProgressTxRange[0] != null) {
1771                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1772                    gcCandidateSet.remove(pendingTx);
1773                }
1774            }
1775            if (LOG.isTraceEnabled()) {
1776                LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1777            }
1778
1779            // Go through all the destinations to see if any of them can remove GC candidates.
1780            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1781                if( gcCandidateSet.isEmpty() ) {
1782                    break;
1783                }
1784
1785                // Use a visitor to cut down the number of pages that we load
1786                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1787                    int last=-1;
1788                    @Override
1789                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1790                        if( first==null ) {
1791                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1792                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1793                                subset.remove(second.getDataFileId());
1794                            }
1795                            return !subset.isEmpty();
1796                        } else if( second==null ) {
1797                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1798                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1799                                subset.remove(first.getDataFileId());
1800                            }
1801                            return !subset.isEmpty();
1802                        } else {
1803                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1804                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1805                                subset.remove(first.getDataFileId());
1806                            }
1807                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1808                                subset.remove(second.getDataFileId());
1809                            }
1810                            return !subset.isEmpty();
1811                        }
1812                    }
1813
1814                    @Override
1815                    public void visit(List<Location> keys, List<Long> values) {
1816                        for (Location l : keys) {
1817                            int fileId = l.getDataFileId();
1818                            if( last != fileId ) {
1819                                gcCandidateSet.remove(fileId);
1820                                last = fileId;
1821                            }
1822                        }
1823                    }
1824                });
1825
1826                // Durable Subscription
1827                if (entry.getValue().subLocations != null) {
1828                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1829                    while (iter.hasNext()) {
1830                        Entry<String, Location> subscription = iter.next();
1831                        int dataFileId = subscription.getValue().getDataFileId();
1832
1833                        // Move subscription along if it has no outstanding messages that need ack'd
1834                        // and its in the last log file in the journal.
1835                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1836                            final StoredDestination destination = entry.getValue();
1837                            final String subscriptionKey = subscription.getKey();
1838                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1839
1840                            // When pending is size one that is the next message Id meaning there
1841                            // are no pending messages currently.
1842                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1843                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1844
1845                                if (LOG.isTraceEnabled()) {
1846                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1847                                }
1848
1849                                final KahaSubscriptionCommand kahaSub =
1850                                    destination.subscriptions.get(tx, subscriptionKey);
1851                                destination.subLocations.put(
1852                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1853
1854                                // Skips the remove from candidates if we rewrote the subscription
1855                                // in order to prevent duplicate subscription commands on recover.
1856                                // If another subscription is on the same file and isn't rewritten
1857                                // than it will remove the file from the set.
1858                                continue;
1859                            }
1860                        }
1861
1862                        gcCandidateSet.remove(dataFileId);
1863                    }
1864                }
1865
1866                if (LOG.isTraceEnabled()) {
1867                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1868                }
1869            }
1870
1871            // check we are not deleting file with ack for in-use journal files
1872            if (LOG.isTraceEnabled()) {
1873                LOG.trace("gc candidates: " + gcCandidateSet);
1874                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1875            }
1876
1877            boolean ackMessageFileMapMod = false;
1878            Iterator<Integer> candidates = gcCandidateSet.iterator();
1879            while (candidates.hasNext()) {
1880                Integer candidate = candidates.next();
1881                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1882                if (referencedFileIds != null) {
1883                    for (Integer referencedFileId : referencedFileIds) {
1884                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1885                            // active file that is not targeted for deletion is referenced so don't delete
1886                            candidates.remove();
1887                            break;
1888                        }
1889                    }
1890                    if (gcCandidateSet.contains(candidate)) {
1891                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1892                    } else {
1893                        if (LOG.isTraceEnabled()) {
1894                            LOG.trace("not removing data file: " + candidate
1895                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1896                        }
1897                    }
1898                }
1899            }
1900
1901            if (!gcCandidateSet.isEmpty()) {
1902                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1903                for (Integer candidate : gcCandidateSet) {
1904                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1905                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1906                    }
1907                }
1908                if (ackMessageFileMapMod) {
1909                    checkpointUpdate(tx, false);
1910                }
1911            } else if (isEnableAckCompaction()) {
1912                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1913                    // First check length of journal to make sure it makes sense to even try.
1914                    //
1915                    // If there is only one journal file with Acks in it we don't need to move
1916                    // it since it won't be chained to any later logs.
1917                    //
1918                    // If the logs haven't grown since the last time then we need to compact
1919                    // otherwise there seems to still be room for growth and we don't need to incur
1920                    // the overhead.  Depending on configuration this check can be avoided and
1921                    // Ack compaction will run any time the store has not GC'd a journal file in
1922                    // the configured amount of cycles.
1923                    if (metadata.ackMessageFileMap.size() > 1 &&
1924                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1925
1926                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1927                        try {
1928                            scheduler.execute(new AckCompactionRunner());
1929                        } catch (Exception ex) {
1930                            LOG.warn("Error on queueing the Ack Compactor", ex);
1931                        }
1932                    } else {
1933                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1934                    }
1935
1936                    checkPointCyclesWithNoGC = 0;
1937                } else {
1938                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1939                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1940                }
1941
1942                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1943            }
1944        }
1945        MDC.remove("activemq.persistenceDir");
1946
1947        LOG.debug("Checkpoint done.");
1948        return gcCandidateSet;
1949    }
1950
1951    private final class AckCompactionRunner implements Runnable {
1952
1953        @Override
1954        public void run() {
1955
1956            int journalToAdvance = -1;
1957            Set<Integer> journalLogsReferenced = new HashSet<>();
1958
1959            //flag to know whether the ack forwarding completed without an exception
1960            boolean forwarded = false;
1961
1962            try {
1963                //acquire the checkpoint lock to prevent other threads from
1964                //running a checkpoint while this is running
1965                //
1966                //Normally this task runs on the same executor as the checkpoint task
1967                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
1968                //
1969                //However, there are two cases where this isn't always true.
1970                //First, the checkpoint() method is public and can be called through the
1971                //PersistenceAdapter interface by someone at the same time this is running.
1972                //Second, a checkpoint is called during shutdown without using the executor.
1973                //
1974                //In the future it might be better to just remove the checkpointLock entirely
1975                //and only use the executor but this would need to be examined for any unintended
1976                //consequences
1977                checkpointLock.readLock().lock();
1978
1979                try {
1980
1981                    // Lock index to capture the ackMessageFileMap data
1982                    indexLock.writeLock().lock();
1983
1984                    // Map keys might not be sorted, find the earliest log file to forward acks
1985                    // from and move only those, future cycles can chip away at more as needed.
1986                    // We won't move files that are themselves rewritten on a previous compaction.
1987                    List<Integer> journalFileIds = new ArrayList<>(metadata.ackMessageFileMap.keySet());
1988                    Collections.sort(journalFileIds);
1989                    for (Integer journalFileId : journalFileIds) {
1990                        DataFile current = journal.getDataFileById(journalFileId);
1991                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
1992                            journalToAdvance = journalFileId;
1993                            break;
1994                        }
1995                    }
1996
1997                    // Check if we found one, or if we only found the current file being written to.
1998                    if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
1999                        return;
2000                    }
2001
2002                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
2003
2004                } finally {
2005                    indexLock.writeLock().unlock();
2006                }
2007
2008                try {
2009                    // Background rewrite of the old acks
2010                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
2011                    forwarded = true;
2012                } catch (IOException ioe) {
2013                    LOG.error("Forwarding of acks failed", ioe);
2014                    brokerService.handleIOException(ioe);
2015                } catch (Throwable e) {
2016                    LOG.error("Forwarding of acks failed", e);
2017                    brokerService.handleIOException(IOExceptionSupport.create(e));
2018                }
2019            } finally {
2020                checkpointLock.readLock().unlock();
2021            }
2022
2023            try {
2024                if (forwarded) {
2025                    // Checkpoint with changes from the ackMessageFileMap
2026                    checkpointUpdate(false);
2027                }
2028            } catch (IOException ioe) {
2029                LOG.error("Checkpoint failed", ioe);
2030                brokerService.handleIOException(ioe);
2031            } catch (Throwable e) {
2032                LOG.error("Checkpoint failed", e);
2033                brokerService.handleIOException(IOExceptionSupport.create(e));
2034            }
2035        }
2036    }
2037
2038    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
2039        LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
2040
2041        DataFile forwardsFile = journal.reserveDataFile();
2042        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
2043        LOG.trace("Reserved file for forwarded acks: {}", forwardsFile);
2044
2045        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<>();
2046
2047        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
2048            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
2049            compactionMarker.setSourceDataFileId(journalToRead);
2050            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
2051
2052            ByteSequence payload = toByteSequence(compactionMarker);
2053            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2054            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
2055
2056            final Location limit = new Location(journalToRead + 1, 0);
2057            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit);
2058            while (nextLocation != null) {
2059                JournalCommand<?> command = null;
2060                try {
2061                    command = load(nextLocation);
2062                } catch (IOException ex) {
2063                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
2064                }
2065
2066                if (command != null && command instanceof KahaRemoveMessageCommand) {
2067                    payload = toByteSequence(command);
2068                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2069                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
2070                }
2071
2072                nextLocation = getNextLocationForAckForward(nextLocation, limit);
2073            }
2074        }
2075
2076        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
2077
2078        // Lock index while we update the ackMessageFileMap.
2079        indexLock.writeLock().lock();
2080
2081        // Update the ack map with the new locations of the acks
2082        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
2083            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
2084            if (referenceFileIds == null) {
2085                referenceFileIds = new HashSet<>();
2086                referenceFileIds.addAll(entry.getValue());
2087                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
2088            } else {
2089                referenceFileIds.addAll(entry.getValue());
2090            }
2091        }
2092
2093        // remove the old location data from the ack map so that the old journal log file can
2094        // be removed on next GC.
2095        metadata.ackMessageFileMap.remove(journalToRead);
2096
2097        indexLock.writeLock().unlock();
2098
2099        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
2100    }
2101
2102    private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
2103        //getNextLocation() can throw an IOException, we should handle it and set
2104        //nextLocation to null and abort gracefully
2105        //Should not happen in the normal case
2106        Location location = null;
2107        try {
2108            location = journal.getNextLocation(nextLocation, limit);
2109        } catch (IOException e) {
2110            LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e);
2111            if (LOG.isDebugEnabled()) {
2112                LOG.debug("Failed to load next journal location after: {}", nextLocation, e);
2113            }
2114        }
2115        return location;
2116    }
2117
2118    final Runnable nullCompletionCallback = new Runnable() {
2119        @Override
2120        public void run() {
2121        }
2122    };
2123
2124    private Location checkpointProducerAudit() throws IOException {
2125        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
2126            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2127            ObjectOutputStream oout = new ObjectOutputStream(baos);
2128            oout.writeObject(metadata.producerSequenceIdTracker);
2129            oout.flush();
2130            oout.close();
2131            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2132            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2133            try {
2134                location.getLatch().await();
2135                if (location.getException().get() != null) {
2136                    throw location.getException().get();
2137                }
2138            } catch (InterruptedException e) {
2139                throw new InterruptedIOException(e.toString());
2140            }
2141            return location;
2142        }
2143        return metadata.producerSequenceIdTrackerLocation;
2144    }
2145
2146    private Location checkpointAckMessageFileMap() throws IOException {
2147        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2148        ObjectOutputStream oout = new ObjectOutputStream(baos);
2149        oout.writeObject(metadata.ackMessageFileMap);
2150        oout.flush();
2151        oout.close();
2152        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2153        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2154        try {
2155            location.getLatch().await();
2156        } catch (InterruptedException e) {
2157            throw new InterruptedIOException(e.toString());
2158        }
2159        return location;
2160    }
2161
2162    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2163
2164        ByteSequence sequence = toByteSequence(subscription);
2165        Location location = journal.write(sequence, nullCompletionCallback) ;
2166
2167        try {
2168            location.getLatch().await();
2169        } catch (InterruptedException e) {
2170            throw new InterruptedIOException(e.toString());
2171        }
2172        return location;
2173    }
2174
2175    public HashSet<Integer> getJournalFilesBeingReplicated() {
2176        return journalFilesBeingReplicated;
2177    }
2178
2179    // /////////////////////////////////////////////////////////////////
2180    // StoredDestination related implementation methods.
2181    // /////////////////////////////////////////////////////////////////
2182
2183    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<>();
2184
2185    static class MessageKeys {
2186        final String messageId;
2187        final Location location;
2188
2189        public MessageKeys(String messageId, Location location) {
2190            this.messageId=messageId;
2191            this.location=location;
2192        }
2193
2194        @Override
2195        public String toString() {
2196            return "["+messageId+","+location+"]";
2197        }
2198    }
2199
2200    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2201        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
2202
2203        @Override
2204        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2205            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
2206        }
2207
2208        @Override
2209        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2210            dataOut.writeUTF(object.messageId);
2211            locationSizeMarshaller.writePayload(object.location, dataOut);
2212        }
2213    }
2214
2215    class LastAck {
2216        long lastAckedSequence;
2217        byte priority;
2218
2219        public LastAck(LastAck source) {
2220            this.lastAckedSequence = source.lastAckedSequence;
2221            this.priority = source.priority;
2222        }
2223
2224        public LastAck() {
2225            this.priority = MessageOrderIndex.HI;
2226        }
2227
2228        public LastAck(long ackLocation) {
2229            this.lastAckedSequence = ackLocation;
2230            this.priority = MessageOrderIndex.LO;
2231        }
2232
2233        public LastAck(long ackLocation, byte priority) {
2234            this.lastAckedSequence = ackLocation;
2235            this.priority = priority;
2236        }
2237
2238        @Override
2239        public String toString() {
2240            return "[" + lastAckedSequence + ":" + priority + "]";
2241        }
2242    }
2243
2244    protected class LastAckMarshaller implements Marshaller<LastAck> {
2245
2246        @Override
2247        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2248            dataOut.writeLong(object.lastAckedSequence);
2249            dataOut.writeByte(object.priority);
2250        }
2251
2252        @Override
2253        public LastAck readPayload(DataInput dataIn) throws IOException {
2254            LastAck lastAcked = new LastAck();
2255            lastAcked.lastAckedSequence = dataIn.readLong();
2256            if (metadata.version >= 3) {
2257                lastAcked.priority = dataIn.readByte();
2258            }
2259            return lastAcked;
2260        }
2261
2262        @Override
2263        public int getFixedSize() {
2264            return 9;
2265        }
2266
2267        @Override
2268        public LastAck deepCopy(LastAck source) {
2269            return new LastAck(source);
2270        }
2271
2272        @Override
2273        public boolean isDeepCopySupported() {
2274            return true;
2275        }
2276    }
2277
2278    class StoredDestination {
2279
2280        MessageOrderIndex orderIndex = new MessageOrderIndex();
2281        BTreeIndex<Location, Long> locationIndex;
2282        BTreeIndex<String, Long> messageIdIndex;
2283
2284        // These bits are only set for Topics
2285        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2286        BTreeIndex<String, LastAck> subscriptionAcks;
2287        HashMap<String, MessageOrderCursor> subscriptionCursors;
2288        ListIndex<String, SequenceSet> ackPositions;
2289        ListIndex<String, Location> subLocations;
2290
2291        // Transient data used to track which Messages are no longer needed.
2292        final TreeMap<Long, Long> messageReferences = new TreeMap<>();
2293        final HashSet<String> subscriptionCache = new LinkedHashSet<>();
2294
2295        public void trackPendingAdd(Long seq) {
2296            orderIndex.trackPendingAdd(seq);
2297        }
2298
2299        public void trackPendingAddComplete(Long seq) {
2300            orderIndex.trackPendingAddComplete(seq);
2301        }
2302
2303        @Override
2304        public String toString() {
2305            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2306        }
2307    }
2308
2309    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2310
2311        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
2312
2313        @Override
2314        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2315            final StoredDestination value = new StoredDestination();
2316            value.orderIndex.defaultPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2317            value.locationIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2318            value.messageIdIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2319
2320            if (dataIn.readBoolean()) {
2321                value.subscriptions = new BTreeIndex<>(pageFile, dataIn.readLong());
2322                value.subscriptionAcks = new BTreeIndex<>(pageFile, dataIn.readLong());
2323                if (metadata.version >= 4) {
2324                    value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong());
2325                } else {
2326                    // upgrade
2327                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2328                        @Override
2329                        public void execute(Transaction tx) throws IOException {
2330                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<>();
2331
2332                            if (metadata.version >= 3) {
2333                                // migrate
2334                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2335                                        new BTreeIndex<>(pageFile, dataIn.readLong());
2336                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2337                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2338                                oldAckPositions.load(tx);
2339
2340
2341                                // Do the initial build of the data in memory before writing into the store
2342                                // based Ack Positions List to avoid a lot of disk thrashing.
2343                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2344                                while (iterator.hasNext()) {
2345                                    Entry<Long, HashSet<String>> entry = iterator.next();
2346
2347                                    for(String subKey : entry.getValue()) {
2348                                        SequenceSet pendingAcks = temp.get(subKey);
2349                                        if (pendingAcks == null) {
2350                                            pendingAcks = new SequenceSet();
2351                                            temp.put(subKey, pendingAcks);
2352                                        }
2353
2354                                        pendingAcks.add(entry.getKey());
2355                                    }
2356                                }
2357                            }
2358                            // Now move the pending messages to ack data into the store backed
2359                            // structure.
2360                            value.ackPositions = new ListIndex<>(pageFile, tx.allocate());
2361                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2362                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2363                            value.ackPositions.load(tx);
2364                            for(String subscriptionKey : temp.keySet()) {
2365                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2366                            }
2367
2368                        }
2369                    });
2370                }
2371
2372                if (metadata.version >= 5) {
2373                    value.subLocations = new ListIndex<>(pageFile, dataIn.readLong());
2374                } else {
2375                    // upgrade
2376                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2377                        @Override
2378                        public void execute(Transaction tx) throws IOException {
2379                            value.subLocations = new ListIndex<>(pageFile, tx.allocate());
2380                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2381                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2382                            value.subLocations.load(tx);
2383                        }
2384                    });
2385                }
2386            }
2387            if (metadata.version >= 2) {
2388                value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2389                value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
2390            } else {
2391                // upgrade
2392                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2393                    @Override
2394                    public void execute(Transaction tx) throws IOException {
2395                        value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
2396                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2397                        value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2398                        value.orderIndex.lowPriorityIndex.load(tx);
2399
2400                        value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
2401                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2402                        value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
2403                        value.orderIndex.highPriorityIndex.load(tx);
2404                    }
2405                });
2406            }
2407
2408            return value;
2409        }
2410
2411        @Override
2412        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2413            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2414            dataOut.writeLong(value.locationIndex.getPageId());
2415            dataOut.writeLong(value.messageIdIndex.getPageId());
2416            if (value.subscriptions != null) {
2417                dataOut.writeBoolean(true);
2418                dataOut.writeLong(value.subscriptions.getPageId());
2419                dataOut.writeLong(value.subscriptionAcks.getPageId());
2420                dataOut.writeLong(value.ackPositions.getHeadPageId());
2421                dataOut.writeLong(value.subLocations.getHeadPageId());
2422            } else {
2423                dataOut.writeBoolean(false);
2424            }
2425            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2426            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2427        }
2428    }
2429
2430    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2431        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2432
2433        @Override
2434        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2435            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2436            rc.mergeFramed((InputStream)dataIn);
2437            return rc;
2438        }
2439
2440        @Override
2441        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2442            object.writeFramed((OutputStream)dataOut);
2443        }
2444    }
2445
2446    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2447        String key = key(destination);
2448        StoredDestination rc = storedDestinations.get(key);
2449        if (rc == null) {
2450            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2451            rc = loadStoredDestination(tx, key, topic);
2452            // Cache it. We may want to remove/unload destinations from the
2453            // cache that are not used for a while
2454            // to reduce memory usage.
2455            storedDestinations.put(key, rc);
2456        }
2457        return rc;
2458    }
2459
2460    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2461        String key = key(destination);
2462        StoredDestination rc = storedDestinations.get(key);
2463        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2464            rc = getStoredDestination(destination, tx);
2465        }
2466        return rc;
2467    }
2468
2469    /**
2470     * @param tx
2471     * @param key
2472     * @param topic
2473     * @return
2474     * @throws IOException
2475     */
2476    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2477        // Try to load the existing indexes..
2478        StoredDestination rc = metadata.destinations.get(tx, key);
2479        if (rc == null) {
2480            // Brand new destination.. allocate indexes for it.
2481            rc = new StoredDestination();
2482            rc.orderIndex.allocate(tx);
2483            rc.locationIndex = new BTreeIndex<>(pageFile, tx.allocate());
2484            rc.messageIdIndex = new BTreeIndex<>(pageFile, tx.allocate());
2485
2486            if (topic) {
2487                rc.subscriptions = new BTreeIndex<>(pageFile, tx.allocate());
2488                rc.subscriptionAcks = new BTreeIndex<>(pageFile, tx.allocate());
2489                rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
2490                rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
2491            }
2492            metadata.destinations.put(tx, key, rc);
2493        }
2494
2495        // Configure the marshalers and load.
2496        rc.orderIndex.load(tx);
2497
2498        // Figure out the next key using the last entry in the destination.
2499        rc.orderIndex.configureLast(tx);
2500
2501        rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
2502        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2503        rc.locationIndex.load(tx);
2504
2505        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2506        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2507        rc.messageIdIndex.load(tx);
2508
2509        //go through an upgrade old index if older than version 6
2510        if (metadata.version < 6) {
2511            for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
2512                Entry<Location, Long> entry = iterator.next();
2513                // modify so it is upgraded
2514                rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
2515            }
2516            //upgrade the order index
2517            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) {
2518                Entry<Long, MessageKeys> entry = iterator.next();
2519                //call get so that the last priority is updated
2520                rc.orderIndex.get(tx, entry.getKey());
2521                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
2522            }
2523        }
2524
2525        // If it was a topic...
2526        if (topic) {
2527
2528            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2529            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2530            rc.subscriptions.load(tx);
2531
2532            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2533            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2534            rc.subscriptionAcks.load(tx);
2535
2536            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2537            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2538            rc.ackPositions.load(tx);
2539
2540            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2541            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2542            rc.subLocations.load(tx);
2543
2544            rc.subscriptionCursors = new HashMap<>();
2545
2546            if (metadata.version < 3) {
2547
2548                // on upgrade need to fill ackLocation with available messages past last ack
2549                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2550                    Entry<String, LastAck> entry = iterator.next();
2551                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2552                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2553                        Long sequence = orderIterator.next().getKey();
2554                        addAckLocation(tx, rc, sequence, entry.getKey());
2555                    }
2556                    // modify so it is upgraded
2557                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2558                }
2559            }
2560
2561            // Configure the message references index
2562            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2563            while (subscriptions.hasNext()) {
2564                Entry<String, SequenceSet> subscription = subscriptions.next();
2565                SequenceSet pendingAcks = subscription.getValue();
2566                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2567                    Long lastPendingAck = pendingAcks.getTail().getLast();
2568                    for (Long sequenceId : pendingAcks) {
2569                        Long current = rc.messageReferences.get(sequenceId);
2570                        if (current == null) {
2571                            current = new Long(0);
2572                        }
2573
2574                        // We always add a trailing empty entry for the next position to start from
2575                        // so we need to ensure we don't count that as a message reference on reload.
2576                        if (!sequenceId.equals(lastPendingAck)) {
2577                            current = current.longValue() + 1;
2578                        } else {
2579                            current = Long.valueOf(0L);
2580                        }
2581
2582                        rc.messageReferences.put(sequenceId, current);
2583                    }
2584                }
2585            }
2586
2587            // Configure the subscription cache
2588            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2589                Entry<String, LastAck> entry = iterator.next();
2590                rc.subscriptionCache.add(entry.getKey());
2591            }
2592
2593            if (rc.orderIndex.nextMessageId == 0) {
2594                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2595                if (!rc.subscriptionAcks.isEmpty(tx)) {
2596                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2597                        Entry<String, LastAck> entry = iterator.next();
2598                        rc.orderIndex.nextMessageId =
2599                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2600                    }
2601                }
2602            } else {
2603                // update based on ackPositions for unmatched, last entry is always the next
2604                if (!rc.messageReferences.isEmpty()) {
2605                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2606                    rc.orderIndex.nextMessageId =
2607                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2608                }
2609            }
2610        }
2611
2612        if (metadata.version < VERSION) {
2613            // store again after upgrade
2614            metadata.destinations.put(tx, key, rc);
2615        }
2616        return rc;
2617    }
2618
2619    /**
2620     * Clear the counter for the destination, if one exists.
2621     *
2622     * @param kahaDestination
2623     */
2624    protected void clearStoreStats(KahaDestination kahaDestination) {
2625        String key = key(kahaDestination);
2626        MessageStoreStatistics storeStats = getStoreStats(key);
2627        MessageStoreSubscriptionStatistics subStats = getSubStats(key);
2628        if (storeStats != null) {
2629            storeStats.reset();
2630        }
2631        if (subStats != null) {
2632            subStats.reset();
2633        }
2634    }
2635
2636    /**
2637     * Update MessageStoreStatistics
2638     *
2639     * @param kahaDestination
2640     * @param size
2641     */
2642    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
2643        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
2644    }
2645
2646    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
2647        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2648        if (storeStats != null) {
2649            storeStats.getMessageCount().increment();
2650            if (size > 0) {
2651                storeStats.getMessageSize().addSize(size);
2652            }
2653        }
2654    }
2655
2656    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
2657        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
2658    }
2659
2660    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
2661        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
2662        if (storeStats != null) {
2663            storeStats.getMessageCount().decrement();
2664            if (size > 0) {
2665                storeStats.getMessageSize().addSize(-size);
2666            }
2667        }
2668    }
2669
2670    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
2671        incrementAndAddSizeToStoreStat(key(kahaDestination), subKey, size);
2672    }
2673
2674    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size) {
2675        if (enableSubscriptionStatistics) {
2676            MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
2677            if (subStats != null && subKey != null) {
2678                subStats.getMessageCount(subKey).increment();
2679                if (size > 0) {
2680                    subStats.getMessageSize(subKey).addSize(size);
2681                }
2682            }
2683        }
2684    }
2685
2686
2687    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size) {
2688        if (enableSubscriptionStatistics) {
2689            MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
2690            if (subStats != null && subKey != null) {
2691                subStats.getMessageCount(subKey).decrement();
2692                if (size > 0) {
2693                    subStats.getMessageSize(subKey).addSize(-size);
2694                }
2695            }
2696        }
2697    }
2698
2699    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
2700        decrementAndSubSizeToStoreStat(key(kahaDestination), subKey, size);
2701    }
2702
2703    /**
2704     * This is a map to cache MessageStores for a specific
2705     * KahaDestination key
2706     */
2707    protected final ConcurrentMap<String, MessageStore> storeCache =
2708            new ConcurrentHashMap<>();
2709
2710    /**
2711     * Locate the storeMessageSize counter for this KahaDestination
2712     */
2713    protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
2714        MessageStoreStatistics storeStats = null;
2715        try {
2716            MessageStore messageStore = storeCache.get(kahaDestKey);
2717            if (messageStore != null) {
2718                storeStats = messageStore.getMessageStoreStatistics();
2719            }
2720        } catch (Exception e1) {
2721             LOG.error("Getting size counter of destination failed", e1);
2722        }
2723
2724        return storeStats;
2725    }
2726
2727    protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) {
2728        MessageStoreSubscriptionStatistics subStats = null;
2729        try {
2730            MessageStore messageStore = storeCache.get(kahaDestKey);
2731            if (messageStore instanceof TopicMessageStore) {
2732                subStats = ((TopicMessageStore)messageStore).getMessageStoreSubStatistics();
2733            }
2734        } catch (Exception e1) {
2735             LOG.error("Getting size counter of destination failed", e1);
2736        }
2737
2738        return subStats;
2739    }
2740
2741    /**
2742     * Determine whether this Destination matches the DestinationType
2743     *
2744     * @param destination
2745     * @param type
2746     * @return
2747     */
2748    protected boolean matchType(Destination destination,
2749            KahaDestination.DestinationType type) {
2750        if (destination instanceof Topic
2751                && type.equals(KahaDestination.DestinationType.TOPIC)) {
2752            return true;
2753        } else if (destination instanceof Queue
2754                && type.equals(KahaDestination.DestinationType.QUEUE)) {
2755            return true;
2756        }
2757        return false;
2758    }
2759
2760    class LocationSizeMarshaller implements Marshaller<Location> {
2761
2762        public LocationSizeMarshaller() {
2763
2764        }
2765
2766        @Override
2767        public Location readPayload(DataInput dataIn) throws IOException {
2768            Location rc = new Location();
2769            rc.setDataFileId(dataIn.readInt());
2770            rc.setOffset(dataIn.readInt());
2771            if (metadata.version >= 6) {
2772                rc.setSize(dataIn.readInt());
2773            }
2774            return rc;
2775        }
2776
2777        @Override
2778        public void writePayload(Location object, DataOutput dataOut)
2779                throws IOException {
2780            dataOut.writeInt(object.getDataFileId());
2781            dataOut.writeInt(object.getOffset());
2782            dataOut.writeInt(object.getSize());
2783        }
2784
2785        @Override
2786        public int getFixedSize() {
2787            return 12;
2788        }
2789
2790        @Override
2791        public Location deepCopy(Location source) {
2792            return new Location(source);
2793        }
2794
2795        @Override
2796        public boolean isDeepCopySupported() {
2797            return true;
2798        }
2799    }
2800
2801    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2802        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2803        if (sequences == null) {
2804            sequences = new SequenceSet();
2805            sequences.add(messageSequence);
2806            sd.ackPositions.add(tx, subscriptionKey, sequences);
2807        } else {
2808            sequences.add(messageSequence);
2809            sd.ackPositions.put(tx, subscriptionKey, sequences);
2810        }
2811
2812        Long count = sd.messageReferences.get(messageSequence);
2813        if (count == null) {
2814            count = Long.valueOf(0L);
2815        }
2816        count = count.longValue() + 1;
2817        sd.messageReferences.put(messageSequence, count);
2818    }
2819
2820    // new sub is interested in potentially all existing messages
2821    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2822        SequenceSet allOutstanding = new SequenceSet();
2823        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2824        while (iterator.hasNext()) {
2825            SequenceSet set = iterator.next().getValue();
2826            for (Long entry : set) {
2827                allOutstanding.add(entry);
2828            }
2829        }
2830        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2831
2832        for (Long ackPosition : allOutstanding) {
2833            Long count = sd.messageReferences.get(ackPosition);
2834
2835            // There might not be a reference if the ackLocation was the last
2836            // one which is a placeholder for the next incoming message and
2837            // no value was added to the message references table.
2838            if (count != null) {
2839                count = count.longValue() + 1;
2840                sd.messageReferences.put(ackPosition, count);
2841            }
2842        }
2843    }
2844
2845    // on a new message add, all existing subs are interested in this message
2846    private void addAckLocationForNewMessage(Transaction tx, KahaDestination kahaDest,
2847            StoredDestination sd, Long messageSequence) throws IOException {
2848        for(String subscriptionKey : sd.subscriptionCache) {
2849            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2850            if (sequences == null) {
2851                sequences = new SequenceSet();
2852                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2853                sd.ackPositions.add(tx, subscriptionKey, sequences);
2854            } else {
2855                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2856                sd.ackPositions.put(tx, subscriptionKey, sequences);
2857            }
2858
2859            MessageKeys key = sd.orderIndex.get(tx, messageSequence);
2860            incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey,
2861                    key.location.getSize());
2862
2863            Long count = sd.messageReferences.get(messageSequence);
2864            if (count == null) {
2865                count = Long.valueOf(0L);
2866            }
2867            count = count.longValue() + 1;
2868            sd.messageReferences.put(messageSequence, count);
2869            sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
2870        }
2871    }
2872
2873    private void removeAckLocationsForSub(KahaSubscriptionCommand command,
2874            Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2875        if (!sd.ackPositions.isEmpty(tx)) {
2876            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2877            if (sequences == null || sequences.isEmpty()) {
2878                return;
2879            }
2880
2881            ArrayList<Long> unreferenced = new ArrayList<>();
2882
2883            for(Long sequenceId : sequences) {
2884                Long references = sd.messageReferences.get(sequenceId);
2885                if (references != null) {
2886                    references = references.longValue() - 1;
2887
2888                    if (references.longValue() > 0) {
2889                        sd.messageReferences.put(sequenceId, references);
2890                    } else {
2891                        sd.messageReferences.remove(sequenceId);
2892                        unreferenced.add(sequenceId);
2893                    }
2894                }
2895            }
2896
2897            for(Long sequenceId : unreferenced) {
2898                // Find all the entries that need to get deleted.
2899                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
2900                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2901
2902                // Do the actual deletes.
2903                for (Entry<Long, MessageKeys> entry : deletes) {
2904                    sd.locationIndex.remove(tx, entry.getValue().location);
2905                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2906                    sd.orderIndex.remove(tx, entry.getKey());
2907                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2908                }
2909            }
2910        }
2911    }
2912
2913    /**
2914     * @param tx
2915     * @param sd
2916     * @param subscriptionKey
2917     * @param messageSequence
2918     * @throws IOException
2919     */
2920    private void removeAckLocation(KahaRemoveMessageCommand command,
2921            Transaction tx, StoredDestination sd, String subscriptionKey,
2922            Long messageSequence) throws IOException {
2923        // Remove the sub from the previous location set..
2924        if (messageSequence != null) {
2925            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2926            if (range != null && !range.isEmpty()) {
2927                range.remove(messageSequence);
2928                if (!range.isEmpty()) {
2929                    sd.ackPositions.put(tx, subscriptionKey, range);
2930                } else {
2931                    sd.ackPositions.remove(tx, subscriptionKey);
2932                }
2933
2934                MessageKeys key = sd.orderIndex.get(tx, messageSequence);
2935                decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey,
2936                        key.location.getSize());
2937
2938                // Check if the message is reference by any other subscription.
2939                Long count = sd.messageReferences.get(messageSequence);
2940                if (count != null) {
2941                    long references = count.longValue() - 1;
2942                    if (references > 0) {
2943                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2944                        return;
2945                    } else {
2946                        sd.messageReferences.remove(messageSequence);
2947                    }
2948                }
2949
2950                // Find all the entries that need to get deleted.
2951                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
2952                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2953
2954                // Do the actual deletes.
2955                for (Entry<Long, MessageKeys> entry : deletes) {
2956                    sd.locationIndex.remove(tx, entry.getValue().location);
2957                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2958                    sd.orderIndex.remove(tx, entry.getKey());
2959                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
2960                }
2961            }
2962        }
2963    }
2964
2965    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2966        return sd.subscriptionAcks.get(tx, subscriptionKey);
2967    }
2968
2969    protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2970        if (sd.ackPositions != null) {
2971            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2972            if (messageSequences != null) {
2973                long result = messageSequences.rangeSize();
2974                // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2975                return result > 0 ? result - 1 : 0;
2976            }
2977        }
2978
2979        return 0;
2980    }
2981
2982    protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2983        long locationSize = 0;
2984
2985        if (sd.ackPositions != null) {
2986            //grab the messages attached to this subscription
2987            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2988
2989            if (messageSequences != null) {
2990                Sequence head = messageSequences.getHead();
2991                if (head != null) {
2992                    //get an iterator over the order index starting at the first unacked message
2993                    //and go over each message to add up the size
2994                    Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
2995                            new MessageOrderCursor(head.getFirst()));
2996
2997                    while (iterator.hasNext()) {
2998                        Entry<Long, MessageKeys> entry = iterator.next();
2999                        locationSize += entry.getValue().location.getSize();
3000                    }
3001                }
3002            }
3003        }
3004
3005        return locationSize;
3006    }
3007
3008    protected String key(KahaDestination destination) {
3009        return destination.getType().getNumber() + ":" + destination.getName();
3010    }
3011
3012    // /////////////////////////////////////////////////////////////////
3013    // Transaction related implementation methods.
3014    // /////////////////////////////////////////////////////////////////
3015    @SuppressWarnings("rawtypes")
3016    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<>();
3017    @SuppressWarnings("rawtypes")
3018    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<>();
3019    protected final Set<String> ackedAndPrepared = new HashSet<>();
3020    protected final Set<String> rolledBackAcks = new HashSet<>();
3021
3022    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
3023    // till then they are skipped by the store.
3024    // 'at most once' XA guarantee
3025    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
3026        this.indexLock.writeLock().lock();
3027        try {
3028            for (MessageAck ack : acks) {
3029                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
3030            }
3031        } finally {
3032            this.indexLock.writeLock().unlock();
3033        }
3034    }
3035
3036    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
3037        if (acks != null) {
3038            this.indexLock.writeLock().lock();
3039            try {
3040                for (MessageAck ack : acks) {
3041                    final String id = ack.getLastMessageId().toProducerKey();
3042                    ackedAndPrepared.remove(id);
3043                    if (rollback) {
3044                        rolledBackAcks.add(id);
3045                    }
3046                }
3047            } finally {
3048                this.indexLock.writeLock().unlock();
3049            }
3050        }
3051    }
3052
3053    @SuppressWarnings("rawtypes")
3054    private List<Operation> getInflightTx(KahaTransactionInfo info) {
3055        TransactionId key = TransactionIdConversion.convert(info);
3056        List<Operation> tx;
3057        synchronized (inflightTransactions) {
3058            tx = inflightTransactions.get(key);
3059            if (tx == null) {
3060                tx = Collections.synchronizedList(new ArrayList<Operation>());
3061                inflightTransactions.put(key, tx);
3062            }
3063        }
3064        return tx;
3065    }
3066
3067    @SuppressWarnings("unused")
3068    private TransactionId key(KahaTransactionInfo transactionInfo) {
3069        return TransactionIdConversion.convert(transactionInfo);
3070    }
3071
3072    abstract class Operation <T extends JournalCommand<T>> {
3073        final T command;
3074        final Location location;
3075
3076        public Operation(T command, Location location) {
3077            this.command = command;
3078            this.location = location;
3079        }
3080
3081        public Location getLocation() {
3082            return location;
3083        }
3084
3085        public T getCommand() {
3086            return command;
3087        }
3088
3089        abstract public void execute(Transaction tx) throws IOException;
3090    }
3091
3092    class AddOperation extends Operation<KahaAddMessageCommand> {
3093        final IndexAware runWithIndexLock;
3094        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
3095            super(command, location);
3096            this.runWithIndexLock = runWithIndexLock;
3097        }
3098
3099        @Override
3100        public void execute(Transaction tx) throws IOException {
3101            long seq = updateIndex(tx, command, location);
3102            if (runWithIndexLock != null) {
3103                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
3104            }
3105        }
3106    }
3107
3108    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
3109
3110        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
3111            super(command, location);
3112        }
3113
3114        @Override
3115        public void execute(Transaction tx) throws IOException {
3116            updateIndex(tx, command, location);
3117        }
3118    }
3119
3120    // /////////////////////////////////////////////////////////////////
3121    // Initialization related implementation methods.
3122    // /////////////////////////////////////////////////////////////////
3123
3124    private PageFile createPageFile() throws IOException {
3125        if (indexDirectory == null) {
3126            indexDirectory = directory;
3127        }
3128        IOHelper.mkdirs(indexDirectory);
3129        PageFile index = new PageFile(indexDirectory, "db");
3130        index.setEnableWriteThread(isEnableIndexWriteAsync());
3131        index.setWriteBatchSize(getIndexWriteBatchSize());
3132        index.setPageCacheSize(indexCacheSize);
3133        index.setUseLFRUEviction(isUseIndexLFRUEviction());
3134        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
3135        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
3136        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
3137        index.setEnablePageCaching(isEnableIndexPageCaching());
3138        return index;
3139    }
3140
3141    protected Journal createJournal() throws IOException {
3142        Journal manager = new Journal();
3143        manager.setDirectory(directory);
3144        manager.setMaxFileLength(getJournalMaxFileLength());
3145        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
3146        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
3147        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
3148        manager.setArchiveDataLogs(isArchiveDataLogs());
3149        manager.setSizeAccumulator(journalSize);
3150        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
3151        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
3152        manager.setPreallocationStrategy(
3153                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
3154        manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
3155        if (getDirectoryArchive() != null) {
3156            IOHelper.mkdirs(getDirectoryArchive());
3157            manager.setDirectoryArchive(getDirectoryArchive());
3158        }
3159        return manager;
3160    }
3161
3162    private Metadata createMetadata() {
3163        Metadata md = new Metadata();
3164        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
3165        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
3166        return md;
3167    }
3168
3169    protected abstract void configureMetadata();
3170
3171    public int getJournalMaxWriteBatchSize() {
3172        return journalMaxWriteBatchSize;
3173    }
3174
3175    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
3176        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
3177    }
3178
3179    public File getDirectory() {
3180        return directory;
3181    }
3182
3183    public void setDirectory(File directory) {
3184        this.directory = directory;
3185    }
3186
3187    public boolean isDeleteAllMessages() {
3188        return deleteAllMessages;
3189    }
3190
3191    public void setDeleteAllMessages(boolean deleteAllMessages) {
3192        this.deleteAllMessages = deleteAllMessages;
3193    }
3194
3195    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
3196        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
3197    }
3198
3199    public int getIndexWriteBatchSize() {
3200        return setIndexWriteBatchSize;
3201    }
3202
3203    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
3204        this.enableIndexWriteAsync = enableIndexWriteAsync;
3205    }
3206
3207    boolean isEnableIndexWriteAsync() {
3208        return enableIndexWriteAsync;
3209    }
3210
3211    /**
3212     * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead
3213     * @return
3214     */
3215    @Deprecated
3216    public boolean isEnableJournalDiskSyncs() {
3217        return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS;
3218    }
3219
3220    /**
3221     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
3222     * @param syncWrites
3223     */
3224    @Deprecated
3225    public void setEnableJournalDiskSyncs(boolean syncWrites) {
3226        if (syncWrites) {
3227            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
3228        } else {
3229            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER;
3230        }
3231    }
3232
3233    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
3234        return journalDiskSyncStrategy;
3235    }
3236
3237    public String getJournalDiskSyncStrategy() {
3238        return journalDiskSyncStrategy.name();
3239    }
3240
3241    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
3242        this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase());
3243    }
3244
3245    public long getJournalDiskSyncInterval() {
3246        return journalDiskSyncInterval;
3247    }
3248
3249    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
3250        this.journalDiskSyncInterval = journalDiskSyncInterval;
3251    }
3252
3253    public long getCheckpointInterval() {
3254        return checkpointInterval;
3255    }
3256
3257    public void setCheckpointInterval(long checkpointInterval) {
3258        this.checkpointInterval = checkpointInterval;
3259    }
3260
3261    public long getCleanupInterval() {
3262        return cleanupInterval;
3263    }
3264
3265    public void setCleanupInterval(long cleanupInterval) {
3266        this.cleanupInterval = cleanupInterval;
3267    }
3268
3269    public void setJournalMaxFileLength(int journalMaxFileLength) {
3270        this.journalMaxFileLength = journalMaxFileLength;
3271    }
3272
3273    public int getJournalMaxFileLength() {
3274        return journalMaxFileLength;
3275    }
3276
3277    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3278        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3279    }
3280
3281    public int getMaxFailoverProducersToTrack() {
3282        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3283    }
3284
3285    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3286        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3287    }
3288
3289    public int getFailoverProducersAuditDepth() {
3290        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3291    }
3292
3293    public PageFile getPageFile() throws IOException {
3294        if (pageFile == null) {
3295            pageFile = createPageFile();
3296        }
3297        return pageFile;
3298    }
3299
3300    public Journal getJournal() throws IOException {
3301        if (journal == null) {
3302            journal = createJournal();
3303        }
3304        return journal;
3305    }
3306
3307    protected Metadata getMetadata() {
3308        return metadata;
3309    }
3310
3311    public boolean isFailIfDatabaseIsLocked() {
3312        return failIfDatabaseIsLocked;
3313    }
3314
3315    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3316        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3317    }
3318
3319    public boolean isIgnoreMissingJournalfiles() {
3320        return ignoreMissingJournalfiles;
3321    }
3322
3323    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3324        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3325    }
3326
3327    public int getIndexCacheSize() {
3328        return indexCacheSize;
3329    }
3330
3331    public void setIndexCacheSize(int indexCacheSize) {
3332        this.indexCacheSize = indexCacheSize;
3333    }
3334
3335    public boolean isCheckForCorruptJournalFiles() {
3336        return checkForCorruptJournalFiles;
3337    }
3338
3339    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3340        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3341    }
3342
3343    public boolean isChecksumJournalFiles() {
3344        return checksumJournalFiles;
3345    }
3346
3347    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3348        this.checksumJournalFiles = checksumJournalFiles;
3349    }
3350
3351    @Override
3352    public void setBrokerService(BrokerService brokerService) {
3353        this.brokerService = brokerService;
3354    }
3355
3356    /**
3357     * @return the archiveDataLogs
3358     */
3359    public boolean isArchiveDataLogs() {
3360        return this.archiveDataLogs;
3361    }
3362
3363    /**
3364     * @param archiveDataLogs the archiveDataLogs to set
3365     */
3366    public void setArchiveDataLogs(boolean archiveDataLogs) {
3367        this.archiveDataLogs = archiveDataLogs;
3368    }
3369
3370    /**
3371     * @return the directoryArchive
3372     */
3373    public File getDirectoryArchive() {
3374        return this.directoryArchive;
3375    }
3376
3377    /**
3378     * @param directoryArchive the directoryArchive to set
3379     */
3380    public void setDirectoryArchive(File directoryArchive) {
3381        this.directoryArchive = directoryArchive;
3382    }
3383
3384    public boolean isArchiveCorruptedIndex() {
3385        return archiveCorruptedIndex;
3386    }
3387
3388    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3389        this.archiveCorruptedIndex = archiveCorruptedIndex;
3390    }
3391
3392    public float getIndexLFUEvictionFactor() {
3393        return indexLFUEvictionFactor;
3394    }
3395
3396    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3397        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3398    }
3399
3400    public boolean isUseIndexLFRUEviction() {
3401        return useIndexLFRUEviction;
3402    }
3403
3404    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3405        this.useIndexLFRUEviction = useIndexLFRUEviction;
3406    }
3407
3408    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3409        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3410    }
3411
3412    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3413        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3414    }
3415
3416    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3417        this.enableIndexPageCaching = enableIndexPageCaching;
3418    }
3419
3420    public boolean isEnableIndexDiskSyncs() {
3421        return enableIndexDiskSyncs;
3422    }
3423
3424    public boolean isEnableIndexRecoveryFile() {
3425        return enableIndexRecoveryFile;
3426    }
3427
3428    public boolean isEnableIndexPageCaching() {
3429        return enableIndexPageCaching;
3430    }
3431
3432    // /////////////////////////////////////////////////////////////////
3433    // Internal conversion methods.
3434    // /////////////////////////////////////////////////////////////////
3435
3436    class MessageOrderCursor{
3437        long defaultCursorPosition;
3438        long lowPriorityCursorPosition;
3439        long highPriorityCursorPosition;
3440        MessageOrderCursor(){
3441        }
3442
3443        MessageOrderCursor(long position){
3444            this.defaultCursorPosition=position;
3445            this.lowPriorityCursorPosition=position;
3446            this.highPriorityCursorPosition=position;
3447        }
3448
3449        MessageOrderCursor(MessageOrderCursor other){
3450            this.defaultCursorPosition=other.defaultCursorPosition;
3451            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3452            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3453        }
3454
3455        MessageOrderCursor copy() {
3456            return new MessageOrderCursor(this);
3457        }
3458
3459        void reset() {
3460            this.defaultCursorPosition=0;
3461            this.highPriorityCursorPosition=0;
3462            this.lowPriorityCursorPosition=0;
3463        }
3464
3465        void increment() {
3466            if (defaultCursorPosition!=0) {
3467                defaultCursorPosition++;
3468            }
3469            if (highPriorityCursorPosition!=0) {
3470                highPriorityCursorPosition++;
3471            }
3472            if (lowPriorityCursorPosition!=0) {
3473                lowPriorityCursorPosition++;
3474            }
3475        }
3476
3477        @Override
3478        public String toString() {
3479           return "MessageOrderCursor:[def:" + defaultCursorPosition
3480                   + ", low:" + lowPriorityCursorPosition
3481                   + ", high:" +  highPriorityCursorPosition + "]";
3482        }
3483
3484        public void sync(MessageOrderCursor other) {
3485            this.defaultCursorPosition=other.defaultCursorPosition;
3486            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3487            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3488        }
3489    }
3490
3491    class MessageOrderIndex {
3492        static final byte HI = 9;
3493        static final byte LO = 0;
3494        static final byte DEF = 4;
3495
3496        long nextMessageId;
3497        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3498        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3499        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3500        final MessageOrderCursor cursor = new MessageOrderCursor();
3501        Long lastDefaultKey;
3502        Long lastHighKey;
3503        Long lastLowKey;
3504        byte lastGetPriority;
3505        final List<Long> pendingAdditions = new LinkedList<>();
3506        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
3507
3508        MessageKeys remove(Transaction tx, Long key) throws IOException {
3509            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3510            if (result == null && highPriorityIndex!=null) {
3511                result = highPriorityIndex.remove(tx, key);
3512                if (result ==null && lowPriorityIndex!=null) {
3513                    result = lowPriorityIndex.remove(tx, key);
3514                }
3515            }
3516            return result;
3517        }
3518
3519        void load(Transaction tx) throws IOException {
3520            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3521            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3522            defaultPriorityIndex.load(tx);
3523            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3524            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3525            lowPriorityIndex.load(tx);
3526            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3527            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
3528            highPriorityIndex.load(tx);
3529        }
3530
3531        void allocate(Transaction tx) throws IOException {
3532            defaultPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3533            if (metadata.version >= 2) {
3534                lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3535                highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate());
3536            }
3537        }
3538
3539        void configureLast(Transaction tx) throws IOException {
3540            // Figure out the next key using the last entry in the destination.
3541            TreeSet<Long> orderedSet = new TreeSet<>();
3542
3543            addLast(orderedSet, highPriorityIndex, tx);
3544            addLast(orderedSet, defaultPriorityIndex, tx);
3545            addLast(orderedSet, lowPriorityIndex, tx);
3546
3547            if (!orderedSet.isEmpty()) {
3548                nextMessageId = orderedSet.last() + 1;
3549            }
3550        }
3551
3552        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3553            if (index != null) {
3554                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3555                if (lastEntry != null) {
3556                    orderedSet.add(lastEntry.getKey());
3557                }
3558            }
3559        }
3560
3561        void clear(Transaction tx) throws IOException {
3562            this.remove(tx);
3563            this.resetCursorPosition();
3564            this.allocate(tx);
3565            this.load(tx);
3566            this.configureLast(tx);
3567        }
3568
3569        void remove(Transaction tx) throws IOException {
3570            defaultPriorityIndex.clear(tx);
3571            defaultPriorityIndex.unload(tx);
3572            tx.free(defaultPriorityIndex.getPageId());
3573            if (lowPriorityIndex != null) {
3574                lowPriorityIndex.clear(tx);
3575                lowPriorityIndex.unload(tx);
3576
3577                tx.free(lowPriorityIndex.getPageId());
3578            }
3579            if (highPriorityIndex != null) {
3580                highPriorityIndex.clear(tx);
3581                highPriorityIndex.unload(tx);
3582                tx.free(highPriorityIndex.getPageId());
3583            }
3584        }
3585
3586        void resetCursorPosition() {
3587            this.cursor.reset();
3588            lastDefaultKey = null;
3589            lastHighKey = null;
3590            lastLowKey = null;
3591        }
3592
3593        void setBatch(Transaction tx, Long sequence) throws IOException {
3594            if (sequence != null) {
3595                Long nextPosition = new Long(sequence.longValue() + 1);
3596                lastDefaultKey = sequence;
3597                cursor.defaultCursorPosition = nextPosition.longValue();
3598                lastHighKey = sequence;
3599                cursor.highPriorityCursorPosition = nextPosition.longValue();
3600                lastLowKey = sequence;
3601                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3602            }
3603        }
3604
3605        void setBatch(Transaction tx, LastAck last) throws IOException {
3606            setBatch(tx, last.lastAckedSequence);
3607            if (cursor.defaultCursorPosition == 0
3608                    && cursor.highPriorityCursorPosition == 0
3609                    && cursor.lowPriorityCursorPosition == 0) {
3610                long next = last.lastAckedSequence + 1;
3611                switch (last.priority) {
3612                    case DEF:
3613                        cursor.defaultCursorPosition = next;
3614                        cursor.highPriorityCursorPosition = next;
3615                        break;
3616                    case HI:
3617                        cursor.highPriorityCursorPosition = next;
3618                        break;
3619                    case LO:
3620                        cursor.lowPriorityCursorPosition = next;
3621                        cursor.defaultCursorPosition = next;
3622                        cursor.highPriorityCursorPosition = next;
3623                        break;
3624                }
3625            }
3626        }
3627
3628        void stoppedIterating() {
3629            if (lastDefaultKey!=null) {
3630                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3631            }
3632            if (lastHighKey!=null) {
3633                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3634            }
3635            if (lastLowKey!=null) {
3636                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3637            }
3638            lastDefaultKey = null;
3639            lastHighKey = null;
3640            lastLowKey = null;
3641        }
3642
3643        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3644                throws IOException {
3645            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3646                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3647            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3648                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3649            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3650                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3651            }
3652        }
3653
3654        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3655                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3656
3657            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3658            deletes.add(iterator.next());
3659        }
3660
3661        long getNextMessageId() {
3662            return nextMessageId++;
3663        }
3664
3665        void revertNextMessageId() {
3666            nextMessageId--;
3667        }
3668
3669        MessageKeys get(Transaction tx, Long key) throws IOException {
3670            MessageKeys result = defaultPriorityIndex.get(tx, key);
3671            if (result == null) {
3672                result = highPriorityIndex.get(tx, key);
3673                if (result == null) {
3674                    result = lowPriorityIndex.get(tx, key);
3675                    lastGetPriority = LO;
3676                } else {
3677                    lastGetPriority = HI;
3678                }
3679            } else {
3680                lastGetPriority = DEF;
3681            }
3682            return result;
3683        }
3684
3685        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3686            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3687                return defaultPriorityIndex.put(tx, key, value);
3688            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3689                return highPriorityIndex.put(tx, key, value);
3690            } else {
3691                return lowPriorityIndex.put(tx, key, value);
3692            }
3693        }
3694
3695        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3696            return new MessageOrderIterator(tx,cursor,this);
3697        }
3698
3699        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3700            return new MessageOrderIterator(tx,m,this);
3701        }
3702
3703        public byte lastGetPriority() {
3704            return lastGetPriority;
3705        }
3706
3707        public boolean alreadyDispatched(Long sequence) {
3708            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3709                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3710                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3711        }
3712
3713        public void trackPendingAdd(Long seq) {
3714            synchronized (pendingAdditions) {
3715                pendingAdditions.add(seq);
3716            }
3717        }
3718
3719        public void trackPendingAddComplete(Long seq) {
3720            synchronized (pendingAdditions) {
3721                pendingAdditions.remove(seq);
3722            }
3723        }
3724
3725        public Long minPendingAdd() {
3726            synchronized (pendingAdditions) {
3727                if (!pendingAdditions.isEmpty()) {
3728                    return pendingAdditions.get(0);
3729                } else {
3730                    return null;
3731                }
3732            }
3733        }
3734
3735        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3736            Iterator<Entry<Long, MessageKeys>>currentIterator;
3737            final Iterator<Entry<Long, MessageKeys>>highIterator;
3738            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3739            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3740
3741            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3742                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3743                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3744                if (highPriorityIndex != null) {
3745                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3746                } else {
3747                    this.highIterator = null;
3748                }
3749                if (lowPriorityIndex != null) {
3750                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3751                } else {
3752                    this.lowIterator = null;
3753                }
3754            }
3755
3756            @Override
3757            public boolean hasNext() {
3758                if (currentIterator == null) {
3759                    if (highIterator != null) {
3760                        if (highIterator.hasNext()) {
3761                            currentIterator = highIterator;
3762                            return currentIterator.hasNext();
3763                        }
3764                        if (defaultIterator.hasNext()) {
3765                            currentIterator = defaultIterator;
3766                            return currentIterator.hasNext();
3767                        }
3768                        if (lowIterator.hasNext()) {
3769                            currentIterator = lowIterator;
3770                            return currentIterator.hasNext();
3771                        }
3772                        return false;
3773                    } else {
3774                        currentIterator = defaultIterator;
3775                        return currentIterator.hasNext();
3776                    }
3777                }
3778                if (highIterator != null) {
3779                    if (currentIterator.hasNext()) {
3780                        return true;
3781                    }
3782                    if (currentIterator == highIterator) {
3783                        if (defaultIterator.hasNext()) {
3784                            currentIterator = defaultIterator;
3785                            return currentIterator.hasNext();
3786                        }
3787                        if (lowIterator.hasNext()) {
3788                            currentIterator = lowIterator;
3789                            return currentIterator.hasNext();
3790                        }
3791                        return false;
3792                    }
3793
3794                    if (currentIterator == defaultIterator) {
3795                        if (lowIterator.hasNext()) {
3796                            currentIterator = lowIterator;
3797                            return currentIterator.hasNext();
3798                        }
3799                        return false;
3800                    }
3801                }
3802                return currentIterator.hasNext();
3803            }
3804
3805            @Override
3806            public Entry<Long, MessageKeys> next() {
3807                Entry<Long, MessageKeys> result = currentIterator.next();
3808                if (result != null) {
3809                    Long key = result.getKey();
3810                    if (highIterator != null) {
3811                        if (currentIterator == defaultIterator) {
3812                            lastDefaultKey = key;
3813                        } else if (currentIterator == highIterator) {
3814                            lastHighKey = key;
3815                        } else {
3816                            lastLowKey = key;
3817                        }
3818                    } else {
3819                        lastDefaultKey = key;
3820                    }
3821                }
3822                return result;
3823            }
3824
3825            @Override
3826            public void remove() {
3827                throw new UnsupportedOperationException();
3828            }
3829        }
3830    }
3831
3832    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3833        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3834
3835        @Override
3836        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3837            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3838            ObjectOutputStream oout = new ObjectOutputStream(baos);
3839            oout.writeObject(object);
3840            oout.flush();
3841            oout.close();
3842            byte[] data = baos.toByteArray();
3843            dataOut.writeInt(data.length);
3844            dataOut.write(data);
3845        }
3846
3847        @Override
3848        @SuppressWarnings("unchecked")
3849        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3850            int dataLen = dataIn.readInt();
3851            byte[] data = new byte[dataLen];
3852            dataIn.readFully(data);
3853            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3854            ObjectInputStream oin = new ObjectInputStream(bais);
3855            try {
3856                return (HashSet<String>) oin.readObject();
3857            } catch (ClassNotFoundException cfe) {
3858                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3859                ioe.initCause(cfe);
3860                throw ioe;
3861            }
3862        }
3863    }
3864
3865    public File getIndexDirectory() {
3866        return indexDirectory;
3867    }
3868
3869    public void setIndexDirectory(File indexDirectory) {
3870        this.indexDirectory = indexDirectory;
3871    }
3872
3873    interface IndexAware {
3874        public void sequenceAssignedWithIndexLocked(long index);
3875    }
3876
3877    public String getPreallocationScope() {
3878        return preallocationScope;
3879    }
3880
3881    public void setPreallocationScope(String preallocationScope) {
3882        this.preallocationScope = preallocationScope;
3883    }
3884
3885    public String getPreallocationStrategy() {
3886        return preallocationStrategy;
3887    }
3888
3889    public void setPreallocationStrategy(String preallocationStrategy) {
3890        this.preallocationStrategy = preallocationStrategy;
3891    }
3892
3893    public int getCompactAcksAfterNoGC() {
3894        return compactAcksAfterNoGC;
3895    }
3896
3897    /**
3898     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3899     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3900     * <p>
3901     * A value of -1 will disable this feature.
3902     *
3903     * @param compactAcksAfterNoGC
3904     *      Number of empty GC cycles before we rewrite old ACKS.
3905     */
3906    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3907        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3908    }
3909
3910    /**
3911     * Returns whether Ack compaction will ignore that the store is still growing
3912     * and run more often.
3913     *
3914     * @return the compactAcksIgnoresStoreGrowth current value.
3915     */
3916    public boolean isCompactAcksIgnoresStoreGrowth() {
3917        return compactAcksIgnoresStoreGrowth;
3918    }
3919
3920    /**
3921     * Configure if Ack compaction will occur regardless of continued growth of the
3922     * journal logs meaning that the store has not run out of space yet.  Because the
3923     * compaction operation can be costly this value is defaulted to off and the Ack
3924     * compaction is only done when it seems that the store cannot grow and larger.
3925     *
3926     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3927     */
3928    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3929        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3930    }
3931
3932    /**
3933     * Returns whether Ack compaction is enabled
3934     *
3935     * @return enableAckCompaction
3936     */
3937    public boolean isEnableAckCompaction() {
3938        return enableAckCompaction;
3939    }
3940
3941    /**
3942     * Configure if the Ack compaction task should be enabled to run
3943     *
3944     * @param enableAckCompaction
3945     */
3946    public void setEnableAckCompaction(boolean enableAckCompaction) {
3947        this.enableAckCompaction = enableAckCompaction;
3948    }
3949
3950    /**
3951     * @return
3952     */
3953    public boolean isEnableSubscriptionStatistics() {
3954        return enableSubscriptionStatistics;
3955    }
3956
3957    /**
3958     * Enable caching statistics for each subscription to allow non-blocking
3959     * retrieval of metrics.  This could incur some overhead to compute if there are a lot
3960     * of subscriptions.
3961     *
3962     * @param enableSubscriptionStatistics
3963     */
3964    public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
3965        this.enableSubscriptionStatistics = enableSubscriptionStatistics;
3966    }
3967}