001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.EOFException;
024    import java.io.File;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.InterruptedIOException;
028    import java.io.ObjectInputStream;
029    import java.io.ObjectOutputStream;
030    import java.io.OutputStream;
031    import java.util.ArrayList;
032    import java.util.Collection;
033    import java.util.Collections;
034    import java.util.Date;
035    import java.util.HashMap;
036    import java.util.HashSet;
037    import java.util.Iterator;
038    import java.util.LinkedHashMap;
039    import java.util.LinkedHashSet;
040    import java.util.List;
041    import java.util.Map;
042    import java.util.Map.Entry;
043    import java.util.Set;
044    import java.util.SortedSet;
045    import java.util.Stack;
046    import java.util.TreeMap;
047    import java.util.TreeSet;
048    import java.util.concurrent.atomic.AtomicBoolean;
049    import java.util.concurrent.atomic.AtomicLong;
050    import java.util.concurrent.locks.ReentrantReadWriteLock;
051    
052    import org.apache.activemq.ActiveMQMessageAuditNoSync;
053    import org.apache.activemq.broker.BrokerService;
054    import org.apache.activemq.broker.BrokerServiceAware;
055    import org.apache.activemq.command.MessageAck;
056    import org.apache.activemq.command.SubscriptionInfo;
057    import org.apache.activemq.command.TransactionId;
058    import org.apache.activemq.protobuf.Buffer;
059    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
060    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
061    import org.apache.activemq.store.kahadb.data.KahaDestination;
062    import org.apache.activemq.store.kahadb.data.KahaEntryType;
063    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
064    import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
065    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
066    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
067    import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
068    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
069    import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
070    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
071    import org.apache.activemq.util.Callback;
072    import org.apache.activemq.util.IOHelper;
073    import org.apache.activemq.util.ServiceStopper;
074    import org.apache.activemq.util.ServiceSupport;
075    import org.apache.kahadb.index.BTreeIndex;
076    import org.apache.kahadb.index.BTreeVisitor;
077    import org.apache.kahadb.index.ListIndex;
078    import org.apache.kahadb.journal.DataFile;
079    import org.apache.kahadb.journal.Journal;
080    import org.apache.kahadb.journal.Location;
081    import org.apache.kahadb.page.Page;
082    import org.apache.kahadb.page.PageFile;
083    import org.apache.kahadb.page.Transaction;
084    import org.apache.kahadb.util.ByteSequence;
085    import org.apache.kahadb.util.DataByteArrayInputStream;
086    import org.apache.kahadb.util.DataByteArrayOutputStream;
087    import org.apache.kahadb.util.LocationMarshaller;
088    import org.apache.kahadb.util.LockFile;
089    import org.apache.kahadb.util.LongMarshaller;
090    import org.apache.kahadb.util.Marshaller;
091    import org.apache.kahadb.util.Sequence;
092    import org.apache.kahadb.util.SequenceSet;
093    import org.apache.kahadb.util.StringMarshaller;
094    import org.apache.kahadb.util.VariableMarshaller;
095    import org.slf4j.Logger;
096    import org.slf4j.LoggerFactory;
097    
098    public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
099    
100        protected BrokerService brokerService;
101    
102        public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
103        public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
104        public static final File DEFAULT_DIRECTORY = new File("KahaDB");
105        protected static final Buffer UNMATCHED;
106        static {
107            UNMATCHED = new Buffer(new byte[]{});
108        }
109        private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
110        private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
111    
112        static final int CLOSED_STATE = 1;
113        static final int OPEN_STATE = 2;
114        static final long NOT_ACKED = -1;
115    
116        static final int VERSION = 4;
117    
118        protected class Metadata {
119            protected Page<Metadata> page;
120            protected int state;
121            protected BTreeIndex<String, StoredDestination> destinations;
122            protected Location lastUpdate;
123            protected Location firstInProgressTransactionLocation;
124            protected Location producerSequenceIdTrackerLocation = null;
125            protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
126            protected int version = VERSION;
127            public void read(DataInput is) throws IOException {
128                state = is.readInt();
129                destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
130                if (is.readBoolean()) {
131                    lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
132                } else {
133                    lastUpdate = null;
134                }
135                if (is.readBoolean()) {
136                    firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
137                } else {
138                    firstInProgressTransactionLocation = null;
139                }
140                try {
141                    if (is.readBoolean()) {
142                        producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
143                    } else {
144                        producerSequenceIdTrackerLocation = null;
145                    }
146                } catch (EOFException expectedOnUpgrade) {
147                }
148                try {
149                   version = is.readInt();
150                } catch (EOFException expectedOnUpgrade) {
151                    version=1;
152                }
153                LOG.info("KahaDB is version " + version);
154            }
155    
156            public void write(DataOutput os) throws IOException {
157                os.writeInt(state);
158                os.writeLong(destinations.getPageId());
159    
160                if (lastUpdate != null) {
161                    os.writeBoolean(true);
162                    LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
163                } else {
164                    os.writeBoolean(false);
165                }
166    
167                if (firstInProgressTransactionLocation != null) {
168                    os.writeBoolean(true);
169                    LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
170                } else {
171                    os.writeBoolean(false);
172                }
173    
174                if (producerSequenceIdTrackerLocation != null) {
175                    os.writeBoolean(true);
176                    LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
177                } else {
178                    os.writeBoolean(false);
179                }
180                os.writeInt(VERSION);
181            }
182        }
183    
184        class MetadataMarshaller extends VariableMarshaller<Metadata> {
185            public Metadata readPayload(DataInput dataIn) throws IOException {
186                Metadata rc = new Metadata();
187                rc.read(dataIn);
188                return rc;
189            }
190    
191            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
192                object.write(dataOut);
193            }
194        }
195    
196        protected PageFile pageFile;
197        protected Journal journal;
198        protected Metadata metadata = new Metadata();
199    
200        protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
201    
202        protected boolean failIfDatabaseIsLocked;
203    
204        protected boolean deleteAllMessages;
205        protected File directory = DEFAULT_DIRECTORY;
206        protected Thread checkpointThread;
207        protected boolean enableJournalDiskSyncs=true;
208        protected boolean archiveDataLogs;
209        protected File directoryArchive;
210        protected AtomicLong storeSize = new AtomicLong(0);
211        long checkpointInterval = 5*1000;
212        long cleanupInterval = 30*1000;
213        int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
214        int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
215        boolean enableIndexWriteAsync = false;
216        int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
217    
218        protected AtomicBoolean opened = new AtomicBoolean();
219        private LockFile lockFile;
220        private boolean ignoreMissingJournalfiles = false;
221        private int indexCacheSize = 10000;
222        private boolean checkForCorruptJournalFiles = false;
223        private boolean checksumJournalFiles = false;
224        private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
225        protected boolean forceRecoverIndex = false;
226        private final Object checkpointThreadLock = new Object();
227        private boolean rewriteOnRedelivery = false;
228        private boolean archiveCorruptedIndex = false;
229        private boolean useIndexLFRUEviction = false;
230        private float indexLFUEvictionFactor = 0.2f;
231        private boolean enableIndexDiskSyncs = true;
232        private boolean enableIndexRecoveryFile = true;
233        private boolean enableIndexPageCaching = true;
234    
235        public MessageDatabase() {
236        }
237    
238        @Override
239        public void doStart() throws Exception {
240            load();
241        }
242    
243        @Override
244        public void doStop(ServiceStopper stopper) throws Exception {
245            unload();
246        }
247    
248        private void loadPageFile() throws IOException {
249            this.indexLock.writeLock().lock();
250            try {
251                final PageFile pageFile = getPageFile();
252                pageFile.load();
253                pageFile.tx().execute(new Transaction.Closure<IOException>() {
254                    public void execute(Transaction tx) throws IOException {
255                        if (pageFile.getPageCount() == 0) {
256                            // First time this is created.. Initialize the metadata
257                            Page<Metadata> page = tx.allocate();
258                            assert page.getPageId() == 0;
259                            page.set(metadata);
260                            metadata.page = page;
261                            metadata.state = CLOSED_STATE;
262                            metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
263    
264                            tx.store(metadata.page, metadataMarshaller, true);
265                        } else {
266                            Page<Metadata> page = tx.load(0, metadataMarshaller);
267                            metadata = page.get();
268                            metadata.page = page;
269                        }
270                        metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
271                        metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
272                        metadata.destinations.load(tx);
273                    }
274                });
275                // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
276                // Perhaps we should just keep an index of file
277                storedDestinations.clear();
278                pageFile.tx().execute(new Transaction.Closure<IOException>() {
279                    public void execute(Transaction tx) throws IOException {
280                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
281                            Entry<String, StoredDestination> entry = iterator.next();
282                            StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
283                            storedDestinations.put(entry.getKey(), sd);
284                        }
285                    }
286                });
287                pageFile.flush();
288            } finally {
289                this.indexLock.writeLock().unlock();
290            }
291        }
292    
293        private void startCheckpoint() {
294            if (checkpointInterval == 0 &&  cleanupInterval == 0) {
295                LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
296                return;
297            }
298            synchronized (checkpointThreadLock) {
299                boolean start = false;
300                if (checkpointThread == null) {
301                    start = true;
302                } else if (!checkpointThread.isAlive()) {
303                    start = true;
304                    LOG.info("KahaDB: Recovering checkpoint thread after death");
305                }
306                if (start) {
307                    checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
308                        @Override
309                        public void run() {
310                            try {
311                                long lastCleanup = System.currentTimeMillis();
312                                long lastCheckpoint = System.currentTimeMillis();
313                                // Sleep for a short time so we can periodically check
314                                // to see if we need to exit this thread.
315                                long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
316                                while (opened.get()) {
317                                    Thread.sleep(sleepTime);
318                                    long now = System.currentTimeMillis();
319                                    if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
320                                        checkpointCleanup(true);
321                                        lastCleanup = now;
322                                        lastCheckpoint = now;
323                                    } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
324                                        checkpointCleanup(false);
325                                        lastCheckpoint = now;
326                                    }
327                                }
328                            } catch (InterruptedException e) {
329                                // Looks like someone really wants us to exit this thread...
330                            } catch (IOException ioe) {
331                                LOG.error("Checkpoint failed", ioe);
332                                brokerService.handleIOException(ioe);
333                            }
334                        }
335                    };
336    
337                    checkpointThread.setDaemon(true);
338                    checkpointThread.start();
339                }
340            }
341        }
342    
343        public void open() throws IOException {
344            if( opened.compareAndSet(false, true) ) {
345                getJournal().start();
346                try {
347                    loadPageFile();
348                } catch (Throwable t) {
349                    LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
350                    if (LOG.isDebugEnabled()) {
351                        LOG.debug("Index load failure", t);
352                    }
353                    // try to recover index
354                    try {
355                        pageFile.unload();
356                    } catch (Exception ignore) {}
357                    if (archiveCorruptedIndex) {
358                        pageFile.archive();
359                    } else {
360                        pageFile.delete();
361                    }
362                    metadata = new Metadata();
363                    pageFile = null;
364                    loadPageFile();
365                }
366                startCheckpoint();
367                recover();
368            }
369        }
370    
371        private void lock() throws IOException {
372    
373            if (lockFile == null) {
374                File lockFileName = new File(directory, "lock");
375                lockFile = new LockFile(lockFileName, true);
376                if (failIfDatabaseIsLocked) {
377                    lockFile.lock();
378                } else {
379                    boolean locked = false;
380                    while ((!isStopped()) && (!isStopping())) {
381                        try {
382                            lockFile.lock();
383                            locked = true;
384                            break;
385                        } catch (IOException e) {
386                            LOG.info("Database "
387                                    + lockFileName
388                                    + " is locked... waiting "
389                                    + (getDatabaseLockedWaitDelay() / 1000)
390                                    + " seconds for the database to be unlocked. Reason: "
391                                    + e);
392                            try {
393                                Thread.sleep(getDatabaseLockedWaitDelay());
394                            } catch (InterruptedException e1) {
395                            }
396                        }
397                    }
398                    if (!locked) {
399                        throw new IOException("attempt to obtain lock aborted due to shutdown");
400                    }
401                }
402            }
403        }
404    
405        // for testing
406        public LockFile getLockFile() {
407            return lockFile;
408        }
409    
410        public void load() throws IOException {
411            this.indexLock.writeLock().lock();
412            try {
413                lock();
414                if (deleteAllMessages) {
415                    getJournal().start();
416                    getJournal().delete();
417                    getJournal().close();
418                    journal = null;
419                    getPageFile().delete();
420                    LOG.info("Persistence store purged.");
421                    deleteAllMessages = false;
422                }
423    
424                open();
425                store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
426            } finally {
427                this.indexLock.writeLock().unlock();
428            }
429        }
430    
431        public void close() throws IOException, InterruptedException {
432            if( opened.compareAndSet(true, false)) {
433                try {
434                    this.indexLock.writeLock().lock();
435                    try {
436                        if (metadata.page != null) {
437                            pageFile.tx().execute(new Transaction.Closure<IOException>() {
438                                public void execute(Transaction tx) throws IOException {
439                                    checkpointUpdate(tx, true);
440                                }
441                            });
442                        }
443                        pageFile.unload();
444                        metadata = new Metadata();
445                    } finally {
446                        this.indexLock.writeLock().unlock();
447                    }
448                    journal.close();
449                    synchronized (checkpointThreadLock) {
450                        if (checkpointThread != null) {
451                            checkpointThread.join();
452                        }
453                    }
454                } finally {
455                    lockFile.unlock();
456                    lockFile=null;
457                }
458            }
459        }
460    
461        public void unload() throws IOException, InterruptedException {
462            this.indexLock.writeLock().lock();
463            try {
464                if( pageFile != null && pageFile.isLoaded() ) {
465                    metadata.state = CLOSED_STATE;
466                    metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
467    
468                    if (metadata.page != null) {
469                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
470                            public void execute(Transaction tx) throws IOException {
471                                tx.store(metadata.page, metadataMarshaller, true);
472                            }
473                        });
474                    }
475                }
476            } finally {
477                this.indexLock.writeLock().unlock();
478            }
479            close();
480        }
481    
482        // public for testing
483        @SuppressWarnings("rawtypes")
484        public Location getFirstInProgressTxLocation() {
485            Location l = null;
486            synchronized (inflightTransactions) {
487                if (!inflightTransactions.isEmpty()) {
488                    for (List<Operation> ops : inflightTransactions.values()) {
489                        if (!ops.isEmpty()) {
490                            l = ops.get(0).getLocation();
491                            break;
492                        }
493                    }
494                }
495                if (!preparedTransactions.isEmpty()) {
496                    for (List<Operation> ops : preparedTransactions.values()) {
497                        if (!ops.isEmpty()) {
498                            Location t = ops.get(0).getLocation();
499                            if (l==null || t.compareTo(l) <= 0) {
500                                l = t;
501                            }
502                            break;
503                        }
504                    }
505                }
506            }
507            return l;
508        }
509    
510        /**
511         * Move all the messages that were in the journal into long term storage. We
512         * just replay and do a checkpoint.
513         *
514         * @throws IOException
515         * @throws IOException
516         * @throws IllegalStateException
517         */
518        private void recover() throws IllegalStateException, IOException {
519            this.indexLock.writeLock().lock();
520            try {
521    
522                long start = System.currentTimeMillis();
523                Location producerAuditPosition = recoverProducerAudit();
524                Location lastIndoubtPosition = getRecoveryPosition();
525    
526                Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
527    
528                if (recoveryPosition != null) {
529                    int redoCounter = 0;
530                    LOG.info("Recovering from the journal ...");
531                    while (recoveryPosition != null) {
532                        JournalCommand<?> message = load(recoveryPosition);
533                        metadata.lastUpdate = recoveryPosition;
534                        process(message, recoveryPosition, lastIndoubtPosition);
535                        redoCounter++;
536                        recoveryPosition = journal.getNextLocation(recoveryPosition);
537                         if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
538                             LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
539                         }
540                    }
541                    if (LOG.isInfoEnabled()) {
542                        long end = System.currentTimeMillis();
543                        LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
544                    }
545                }
546    
547                // We may have to undo some index updates.
548                pageFile.tx().execute(new Transaction.Closure<IOException>() {
549                    public void execute(Transaction tx) throws IOException {
550                        recoverIndex(tx);
551                    }
552                });
553    
554                // rollback any recovered inflight local transactions
555                Set<TransactionId> toRollback = new HashSet<TransactionId>();
556                synchronized (inflightTransactions) {
557                    for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
558                        TransactionId id = it.next();
559                        if (id.isLocalTransaction()) {
560                            toRollback.add(id);
561                        }
562                    }
563                    for (TransactionId tx: toRollback) {
564                        if (LOG.isDebugEnabled()) {
565                            LOG.debug("rolling back recovered indoubt local transaction " + tx);
566                        }
567                        store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
568                    }
569                }
570            } finally {
571                this.indexLock.writeLock().unlock();
572            }
573        }
574    
575        @SuppressWarnings("unused")
576        private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
577            return TransactionIdConversion.convertToLocal(tx);
578        }
579    
580        private Location minimum(Location producerAuditPosition,
581                Location lastIndoubtPosition) {
582            Location min = null;
583            if (producerAuditPosition != null) {
584                min = producerAuditPosition;
585                if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
586                    min = lastIndoubtPosition;
587                }
588            } else {
589                min = lastIndoubtPosition;
590            }
591            return min;
592        }
593    
594        private Location recoverProducerAudit() throws IOException {
595            if (metadata.producerSequenceIdTrackerLocation != null) {
596                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
597                try {
598                    ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
599                    metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
600                    return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
601                } catch (Exception e) {
602                    LOG.warn("Cannot recover message audit", e);
603                    return journal.getNextLocation(null);
604                }
605            } else {
606                // got no audit stored so got to recreate via replay from start of the journal
607                return journal.getNextLocation(null);
608            }
609        }
610    
611        protected void recoverIndex(Transaction tx) throws IOException {
612            long start = System.currentTimeMillis();
613            // It is possible index updates got applied before the journal updates..
614            // in that case we need to removed references to messages that are not in the journal
615            final Location lastAppendLocation = journal.getLastAppendLocation();
616            long undoCounter=0;
617    
618            // Go through all the destinations to see if they have messages past the lastAppendLocation
619            for (StoredDestination sd : storedDestinations.values()) {
620    
621                final ArrayList<Long> matches = new ArrayList<Long>();
622                // Find all the Locations that are >= than the last Append Location.
623                sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
624                    @Override
625                    protected void matched(Location key, Long value) {
626                        matches.add(value);
627                    }
628                });
629    
630                for (Long sequenceId : matches) {
631                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
632                    sd.locationIndex.remove(tx, keys.location);
633                    sd.messageIdIndex.remove(tx, keys.messageId);
634                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
635                    undoCounter++;
636                    // TODO: do we need to modify the ack positions for the pub sub case?
637                }
638            }
639    
640            if( undoCounter > 0 ) {
641                // The rolledback operations are basically in flight journal writes.  To avoid getting
642                // these the end user should do sync writes to the journal.
643                if (LOG.isInfoEnabled()) {
644                    long end = System.currentTimeMillis();
645                    LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
646                }
647            }
648    
649            undoCounter = 0;
650            start = System.currentTimeMillis();
651    
652            // Lets be extra paranoid here and verify that all the datafiles being referenced
653            // by the indexes still exists.
654    
655            final SequenceSet ss = new SequenceSet();
656            for (StoredDestination sd : storedDestinations.values()) {
657                // Use a visitor to cut down the number of pages that we load
658                sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
659                    int last=-1;
660    
661                    public boolean isInterestedInKeysBetween(Location first, Location second) {
662                        if( first==null ) {
663                            return !ss.contains(0, second.getDataFileId());
664                        } else if( second==null ) {
665                            return true;
666                        } else {
667                            return !ss.contains(first.getDataFileId(), second.getDataFileId());
668                        }
669                    }
670    
671                    public void visit(List<Location> keys, List<Long> values) {
672                        for (Location l : keys) {
673                            int fileId = l.getDataFileId();
674                            if( last != fileId ) {
675                                ss.add(fileId);
676                                last = fileId;
677                            }
678                        }
679                    }
680    
681                });
682            }
683            HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
684            while (!ss.isEmpty()) {
685                missingJournalFiles.add((int) ss.removeFirst());
686            }
687            missingJournalFiles.removeAll(journal.getFileMap().keySet());
688    
689            if (!missingJournalFiles.isEmpty()) {
690                if (LOG.isInfoEnabled()) {
691                    LOG.info("Some journal files are missing: " + missingJournalFiles);
692                }
693            }
694    
695            ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
696            for (Integer missing : missingJournalFiles) {
697                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
698            }
699    
700            if (checkForCorruptJournalFiles) {
701                Collection<DataFile> dataFiles = journal.getFileMap().values();
702                for (DataFile dataFile : dataFiles) {
703                    int id = dataFile.getDataFileId();
704                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
705                    Sequence seq = dataFile.getCorruptedBlocks().getHead();
706                    while (seq != null) {
707                        missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
708                        seq = seq.getNext();
709                    }
710                }
711            }
712    
713            if (!missingPredicates.isEmpty()) {
714                for (StoredDestination sd : storedDestinations.values()) {
715    
716                    final ArrayList<Long> matches = new ArrayList<Long>();
717                    sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
718                        @Override
719                        protected void matched(Location key, Long value) {
720                            matches.add(value);
721                        }
722                    });
723    
724                    // If somes message references are affected by the missing data files...
725                    if (!matches.isEmpty()) {
726    
727                        // We either 'gracefully' recover dropping the missing messages or
728                        // we error out.
729                        if( ignoreMissingJournalfiles ) {
730                            // Update the index to remove the references to the missing data
731                            for (Long sequenceId : matches) {
732                                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
733                                sd.locationIndex.remove(tx, keys.location);
734                                sd.messageIdIndex.remove(tx, keys.messageId);
735                                undoCounter++;
736                                // TODO: do we need to modify the ack positions for the pub sub case?
737                            }
738    
739                        } else {
740                            throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
741                        }
742                    }
743                }
744            }
745    
746            if( undoCounter > 0 ) {
747                // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
748                // should do sync writes to the journal.
749                if (LOG.isInfoEnabled()) {
750                    long end = System.currentTimeMillis();
751                    LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
752                }
753            }
754        }
755    
756        private Location nextRecoveryPosition;
757        private Location lastRecoveryPosition;
758    
759        public void incrementalRecover() throws IOException {
760            this.indexLock.writeLock().lock();
761            try {
762                if( nextRecoveryPosition == null ) {
763                    if( lastRecoveryPosition==null ) {
764                        nextRecoveryPosition = getRecoveryPosition();
765                    } else {
766                        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
767                    }
768                }
769                while (nextRecoveryPosition != null) {
770                    lastRecoveryPosition = nextRecoveryPosition;
771                    metadata.lastUpdate = lastRecoveryPosition;
772                    JournalCommand<?> message = load(lastRecoveryPosition);
773                    process(message, lastRecoveryPosition, (Runnable)null);
774                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
775                }
776            } finally {
777                this.indexLock.writeLock().unlock();
778            }
779        }
780    
781        public Location getLastUpdatePosition() throws IOException {
782            return metadata.lastUpdate;
783        }
784    
785        private Location getRecoveryPosition() throws IOException {
786    
787            if (!this.forceRecoverIndex) {
788    
789                // If we need to recover the transactions..
790                if (metadata.firstInProgressTransactionLocation != null) {
791                    return metadata.firstInProgressTransactionLocation;
792                }
793    
794                // Perhaps there were no transactions...
795                if( metadata.lastUpdate!=null) {
796                    // Start replay at the record after the last one recorded in the index file.
797                    return journal.getNextLocation(metadata.lastUpdate);
798                }
799            }
800            // This loads the first position.
801            return journal.getNextLocation(null);
802        }
803    
804        protected void checkpointCleanup(final boolean cleanup) throws IOException {
805            long start;
806            this.indexLock.writeLock().lock();
807            try {
808                start = System.currentTimeMillis();
809                if( !opened.get() ) {
810                    return;
811                }
812                pageFile.tx().execute(new Transaction.Closure<IOException>() {
813                    public void execute(Transaction tx) throws IOException {
814                        checkpointUpdate(tx, cleanup);
815                    }
816                });
817            } finally {
818                this.indexLock.writeLock().unlock();
819            }
820    
821            long end = System.currentTimeMillis();
822            if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
823                if (LOG.isInfoEnabled()) {
824                    LOG.info("Slow KahaDB access: cleanup took " + (end - start));
825                }
826            }
827        }
828    
829        public void checkpoint(Callback closure) throws Exception {
830            this.indexLock.writeLock().lock();
831            try {
832                pageFile.tx().execute(new Transaction.Closure<IOException>() {
833                    public void execute(Transaction tx) throws IOException {
834                        checkpointUpdate(tx, false);
835                    }
836                });
837                closure.execute();
838            } finally {
839                this.indexLock.writeLock().unlock();
840            }
841        }
842    
843        public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
844            int size = data.serializedSizeFramed();
845            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
846            os.writeByte(data.type().getNumber());
847            data.writeFramed(os);
848            return os.toByteSequence();
849        }
850    
851        // /////////////////////////////////////////////////////////////////
852        // Methods call by the broker to update and query the store.
853        // /////////////////////////////////////////////////////////////////
854        public Location store(JournalCommand<?> data) throws IOException {
855            return store(data, false, null,null);
856        }
857    
858        public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
859            return store(data, false, null,null, onJournalStoreComplete);
860        }
861    
862        public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
863            return store(data, sync, before, after, null);
864        }
865    
866        /**
867         * All updated are are funneled through this method. The updates are converted
868         * to a JournalMessage which is logged to the journal and then the data from
869         * the JournalMessage is used to update the index just like it would be done
870         * during a recovery process.
871         */
872        public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
873            if (before != null) {
874                before.run();
875            }
876            try {
877                ByteSequence sequence = toByteSequence(data);
878                long start = System.currentTimeMillis();
879                Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
880                long start2 = System.currentTimeMillis();
881                process(data, location, after);
882                long end = System.currentTimeMillis();
883                if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
884                    if (LOG.isInfoEnabled()) {
885                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
886                    }
887                }
888    
889                if (after != null) {
890                    Runnable afterCompletion = null;
891                    synchronized (orderedTransactionAfters) {
892                        if (!orderedTransactionAfters.empty()) {
893                            afterCompletion = orderedTransactionAfters.pop();
894                        }
895                    }
896                    if (afterCompletion != null) {
897                        afterCompletion.run();
898                    } else {
899                        // non persistent message case
900                        after.run();
901                    }
902                }
903    
904                if (checkpointThread != null && !checkpointThread.isAlive()) {
905                    startCheckpoint();
906                }
907                return location;
908            } catch (IOException ioe) {
909                LOG.error("KahaDB failed to store to Journal", ioe);
910                brokerService.handleIOException(ioe);
911                throw ioe;
912            }
913        }
914    
915        /**
916         * Loads a previously stored JournalMessage
917         *
918         * @param location
919         * @return
920         * @throws IOException
921         */
922        public JournalCommand<?> load(Location location) throws IOException {
923            long start = System.currentTimeMillis();
924            ByteSequence data = journal.read(location);
925            long end = System.currentTimeMillis();
926            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
927                if (LOG.isInfoEnabled()) {
928                    LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
929                }
930            }
931            DataByteArrayInputStream is = new DataByteArrayInputStream(data);
932            byte readByte = is.readByte();
933            KahaEntryType type = KahaEntryType.valueOf(readByte);
934            if( type == null ) {
935                throw new IOException("Could not load journal record. Invalid location: "+location);
936            }
937            JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
938            message.mergeFramed(is);
939            return message;
940        }
941    
942        /**
943         * do minimal recovery till we reach the last inDoubtLocation
944         * @param data
945         * @param location
946         * @param inDoubtlocation
947         * @throws IOException
948         */
949        void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
950            if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
951                process(data, location, (Runnable) null);
952            } else {
953                // just recover producer audit
954                data.visit(new Visitor() {
955                    public void visit(KahaAddMessageCommand command) throws IOException {
956                        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
957                    }
958                });
959            }
960        }
961    
962        // /////////////////////////////////////////////////////////////////
963        // Journaled record processing methods. Once the record is journaled,
964        // these methods handle applying the index updates. These may be called
965        // from the recovery method too so they need to be idempotent
966        // /////////////////////////////////////////////////////////////////
967    
968        void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
969            data.visit(new Visitor() {
970                @Override
971                public void visit(KahaAddMessageCommand command) throws IOException {
972                    process(command, location);
973                }
974    
975                @Override
976                public void visit(KahaRemoveMessageCommand command) throws IOException {
977                    process(command, location);
978                }
979    
980                @Override
981                public void visit(KahaPrepareCommand command) throws IOException {
982                    process(command, location);
983                }
984    
985                @Override
986                public void visit(KahaCommitCommand command) throws IOException {
987                    process(command, location, after);
988                }
989    
990                @Override
991                public void visit(KahaRollbackCommand command) throws IOException {
992                    process(command, location);
993                }
994    
995                @Override
996                public void visit(KahaRemoveDestinationCommand command) throws IOException {
997                    process(command, location);
998                }
999    
1000                @Override
1001                public void visit(KahaSubscriptionCommand command) throws IOException {
1002                    process(command, location);
1003                }
1004    
1005                @Override
1006                public void visit(KahaProducerAuditCommand command) throws IOException {
1007                    processLocation(location);
1008                }
1009    
1010                @Override
1011                public void visit(KahaTraceCommand command) {
1012                    processLocation(location);
1013                }
1014            });
1015        }
1016    
1017        @SuppressWarnings("rawtypes")
1018        protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
1019            if (command.hasTransactionInfo()) {
1020                List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1021                inflightTx.add(new AddOpperation(command, location));
1022            } else {
1023                this.indexLock.writeLock().lock();
1024                try {
1025                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1026                        public void execute(Transaction tx) throws IOException {
1027                            upadateIndex(tx, command, location);
1028                        }
1029                    });
1030                } finally {
1031                    this.indexLock.writeLock().unlock();
1032                }
1033            }
1034        }
1035    
1036        @SuppressWarnings("rawtypes")
1037        protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1038            if (command.hasTransactionInfo()) {
1039               List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1040               inflightTx.add(new RemoveOpperation(command, location));
1041            } else {
1042                this.indexLock.writeLock().lock();
1043                try {
1044                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1045                        public void execute(Transaction tx) throws IOException {
1046                            updateIndex(tx, command, location);
1047                        }
1048                    });
1049                } finally {
1050                    this.indexLock.writeLock().unlock();
1051                }
1052            }
1053        }
1054    
1055        protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1056            this.indexLock.writeLock().lock();
1057            try {
1058                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1059                    public void execute(Transaction tx) throws IOException {
1060                        updateIndex(tx, command, location);
1061                    }
1062                });
1063            } finally {
1064                this.indexLock.writeLock().unlock();
1065            }
1066        }
1067    
1068        protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1069            this.indexLock.writeLock().lock();
1070            try {
1071                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1072                    public void execute(Transaction tx) throws IOException {
1073                        updateIndex(tx, command, location);
1074                    }
1075                });
1076            } finally {
1077                this.indexLock.writeLock().unlock();
1078            }
1079        }
1080    
1081        protected void processLocation(final Location location) {
1082            this.indexLock.writeLock().lock();
1083            try {
1084                metadata.lastUpdate = location;
1085            } finally {
1086                this.indexLock.writeLock().unlock();
1087            }
1088        }
1089    
1090        private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
1091        private void push(Runnable after) {
1092            if (after != null) {
1093                synchronized (orderedTransactionAfters) {
1094                    orderedTransactionAfters.push(after);
1095                }
1096            }
1097        }
1098    
1099        @SuppressWarnings("rawtypes")
1100        protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
1101            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1102            List<Operation> inflightTx;
1103            synchronized (inflightTransactions) {
1104                inflightTx = inflightTransactions.remove(key);
1105                if (inflightTx == null) {
1106                    inflightTx = preparedTransactions.remove(key);
1107                }
1108            }
1109            if (inflightTx == null) {
1110                if (after != null) {
1111                    // since we don't push this after and we may find another, lets run it now
1112                    after.run();
1113                }
1114                return;
1115            }
1116    
1117            final List<Operation> messagingTx = inflightTx;
1118            this.indexLock.writeLock().lock();
1119            try {
1120                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1121                    public void execute(Transaction tx) throws IOException {
1122                        for (Operation op : messagingTx) {
1123                            op.execute(tx);
1124                        }
1125                    }
1126                });
1127                metadata.lastUpdate = location;
1128                push(after);
1129            } finally {
1130                this.indexLock.writeLock().unlock();
1131            }
1132        }
1133    
1134        @SuppressWarnings("rawtypes")
1135        protected void process(KahaPrepareCommand command, Location location) {
1136            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1137            synchronized (inflightTransactions) {
1138                List<Operation> tx = inflightTransactions.remove(key);
1139                if (tx != null) {
1140                    preparedTransactions.put(key, tx);
1141                }
1142            }
1143        }
1144    
1145        @SuppressWarnings("rawtypes")
1146        protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1147            TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1148            List<Operation> updates = null;
1149            synchronized (inflightTransactions) {
1150                updates = inflightTransactions.remove(key);
1151                if (updates == null) {
1152                    updates = preparedTransactions.remove(key);
1153                }
1154            }
1155            if (isRewriteOnRedelivery()) {
1156                persistRedeliveryCount(updates);
1157            }
1158        }
1159    
1160        @SuppressWarnings("rawtypes")
1161        private void persistRedeliveryCount(List<Operation> updates)  throws IOException {
1162            if (updates != null) {
1163                for (Operation operation : updates) {
1164                    operation.getCommand().visit(new Visitor() {
1165                        @Override
1166                        public void visit(KahaRemoveMessageCommand command) throws IOException {
1167                            incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
1168                        }
1169                    });
1170                }
1171            }
1172        }
1173    
1174       abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
1175    
1176        // /////////////////////////////////////////////////////////////////
1177        // These methods do the actual index updates.
1178        // /////////////////////////////////////////////////////////////////
1179    
1180        protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1181        private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1182    
1183        void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1184            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1185    
1186            // Skip adding the message to the index if this is a topic and there are
1187            // no subscriptions.
1188            if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1189                return;
1190            }
1191    
1192            // Add the message.
1193            int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1194            long id = sd.orderIndex.getNextMessageId(priority);
1195            Long previous = sd.locationIndex.put(tx, location, id);
1196            if (previous == null) {
1197                previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1198                if (previous == null) {
1199                    sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1200                    if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1201                        addAckLocationForNewMessage(tx, sd, id);
1202                    }
1203                } else {
1204                    // If the message ID as indexed, then the broker asked us to
1205                    // store a DUP
1206                    // message. Bad BOY! Don't do it, and log a warning.
1207                    LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1208                    sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1209                    sd.locationIndex.remove(tx, location);
1210                    rollbackStatsOnDuplicate(command.getDestination());
1211                }
1212            } else {
1213                // restore the previous value.. Looks like this was a redo of a
1214                // previously
1215                // added message. We don't want to assign it a new id as the other
1216                // indexes would
1217                // be wrong..
1218                //
1219                sd.locationIndex.put(tx, location, previous);
1220            }
1221            // record this id in any event, initial send or recovery
1222            metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1223            metadata.lastUpdate = location;
1224        }
1225    
1226        abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
1227    
1228        void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1229            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1230            if (!command.hasSubscriptionKey()) {
1231    
1232                // In the queue case we just remove the message from the index..
1233                Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1234                if (sequenceId != null) {
1235                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1236                    if (keys != null) {
1237                        sd.locationIndex.remove(tx, keys.location);
1238                        recordAckMessageReferenceLocation(ackLocation, keys.location);
1239                    }  else if (LOG.isDebugEnabled()) {
1240                        LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1241                    }
1242                } else if (LOG.isDebugEnabled()) {
1243                    LOG.debug("message not found in sequence id index: " + command.getMessageId());
1244                }
1245            } else {
1246                // In the topic case we need remove the message once it's been acked
1247                // by all the subs
1248                Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1249    
1250                // Make sure it's a valid message id...
1251                if (sequence != null) {
1252                    String subscriptionKey = command.getSubscriptionKey();
1253                    if (command.getAck() != UNMATCHED) {
1254                        sd.orderIndex.get(tx, sequence);
1255                        byte priority = sd.orderIndex.lastGetPriority();
1256                        sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1257                    }
1258                    // The following method handles deleting un-referenced messages.
1259                    removeAckLocation(tx, sd, subscriptionKey, sequence);
1260                } else if (LOG.isDebugEnabled()) {
1261                    LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1262                }
1263    
1264            }
1265            metadata.lastUpdate = ackLocation;
1266        }
1267    
1268        Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1269        private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1270            Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1271            if (referenceFileIds == null) {
1272                referenceFileIds = new HashSet<Integer>();
1273                referenceFileIds.add(messageLocation.getDataFileId());
1274                ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1275            } else {
1276                Integer id = Integer.valueOf(messageLocation.getDataFileId());
1277                if (!referenceFileIds.contains(id)) {
1278                    referenceFileIds.add(id);
1279                }
1280            }
1281        }
1282    
1283        void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1284            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1285            sd.orderIndex.remove(tx);
1286    
1287            sd.locationIndex.clear(tx);
1288            sd.locationIndex.unload(tx);
1289            tx.free(sd.locationIndex.getPageId());
1290    
1291            sd.messageIdIndex.clear(tx);
1292            sd.messageIdIndex.unload(tx);
1293            tx.free(sd.messageIdIndex.getPageId());
1294    
1295            if (sd.subscriptions != null) {
1296                sd.subscriptions.clear(tx);
1297                sd.subscriptions.unload(tx);
1298                tx.free(sd.subscriptions.getPageId());
1299    
1300                sd.subscriptionAcks.clear(tx);
1301                sd.subscriptionAcks.unload(tx);
1302                tx.free(sd.subscriptionAcks.getPageId());
1303    
1304                sd.ackPositions.clear(tx);
1305                sd.ackPositions.unload(tx);
1306                tx.free(sd.ackPositions.getHeadPageId());
1307            }
1308    
1309            String key = key(command.getDestination());
1310            storedDestinations.remove(key);
1311            metadata.destinations.remove(tx, key);
1312        }
1313    
1314        void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1315            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1316            final String subscriptionKey = command.getSubscriptionKey();
1317    
1318            // If set then we are creating it.. otherwise we are destroying the sub
1319            if (command.hasSubscriptionInfo()) {
1320                sd.subscriptions.put(tx, subscriptionKey, command);
1321                long ackLocation=NOT_ACKED;
1322                if (!command.getRetroactive()) {
1323                    ackLocation = sd.orderIndex.nextMessageId-1;
1324                } else {
1325                    addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
1326                }
1327                sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1328                sd.subscriptionCache.add(subscriptionKey);
1329            } else {
1330                // delete the sub...
1331                sd.subscriptions.remove(tx, subscriptionKey);
1332                sd.subscriptionAcks.remove(tx, subscriptionKey);
1333                sd.subscriptionCache.remove(subscriptionKey);
1334                removeAckLocationsForSub(tx, sd, subscriptionKey);
1335    
1336                if (sd.subscriptions.isEmpty(tx)) {
1337                    sd.messageIdIndex.clear(tx);
1338                    sd.locationIndex.clear(tx);
1339                    sd.orderIndex.clear(tx);
1340                }
1341            }
1342        }
1343    
1344        /**
1345         * @param tx
1346         * @throws IOException
1347         */
1348        void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1349            LOG.debug("Checkpoint started.");
1350    
1351            // reflect last update exclusive of current checkpoint
1352            Location firstTxLocation = metadata.lastUpdate;
1353    
1354            metadata.state = OPEN_STATE;
1355            metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1356            metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
1357            tx.store(metadata.page, metadataMarshaller, true);
1358            pageFile.flush();
1359    
1360            if( cleanup ) {
1361    
1362                final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1363                final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1364    
1365                if (LOG.isTraceEnabled()) {
1366                    LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
1367                }
1368    
1369                // Don't GC files under replication
1370                if( journalFilesBeingReplicated!=null ) {
1371                    gcCandidateSet.removeAll(journalFilesBeingReplicated);
1372                }
1373    
1374                if (metadata.producerSequenceIdTrackerLocation != null) {
1375                    gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
1376                }
1377    
1378                // Don't GC files after the first in progress tx
1379                if( metadata.firstInProgressTransactionLocation!=null ) {
1380                    if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
1381                        firstTxLocation = metadata.firstInProgressTransactionLocation;
1382                    }
1383                }
1384    
1385                if( firstTxLocation!=null ) {
1386                    while( !gcCandidateSet.isEmpty() ) {
1387                        Integer last = gcCandidateSet.last();
1388                        if( last >= firstTxLocation.getDataFileId() ) {
1389                            gcCandidateSet.remove(last);
1390                        } else {
1391                            break;
1392                        }
1393                    }
1394                    if (LOG.isTraceEnabled()) {
1395                        LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
1396                    }
1397                }
1398    
1399                // Go through all the destinations to see if any of them can remove GC candidates.
1400                for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1401                    if( gcCandidateSet.isEmpty() ) {
1402                        break;
1403                    }
1404    
1405                    // Use a visitor to cut down the number of pages that we load
1406                    entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1407                        int last=-1;
1408                        public boolean isInterestedInKeysBetween(Location first, Location second) {
1409                            if( first==null ) {
1410                                SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1411                                if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1412                                    subset.remove(second.getDataFileId());
1413                                }
1414                                return !subset.isEmpty();
1415                            } else if( second==null ) {
1416                                SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1417                                if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1418                                    subset.remove(first.getDataFileId());
1419                                }
1420                                return !subset.isEmpty();
1421                            } else {
1422                                SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1423                                if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1424                                    subset.remove(first.getDataFileId());
1425                                }
1426                                if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1427                                    subset.remove(second.getDataFileId());
1428                                }
1429                                return !subset.isEmpty();
1430                            }
1431                        }
1432    
1433                        public void visit(List<Location> keys, List<Long> values) {
1434                            for (Location l : keys) {
1435                                int fileId = l.getDataFileId();
1436                                if( last != fileId ) {
1437                                    gcCandidateSet.remove(fileId);
1438                                    last = fileId;
1439                                }
1440                            }
1441                        }
1442                    });
1443                    if (LOG.isTraceEnabled()) {
1444                        LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1445                    }
1446                }
1447    
1448                // check we are not deleting file with ack for in-use journal files
1449                if (LOG.isTraceEnabled()) {
1450                    LOG.trace("gc candidates: " + gcCandidateSet);
1451                }
1452                final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1453                Iterator<Integer> candidates = gcCandidateSet.iterator();
1454                while (candidates.hasNext()) {
1455                    Integer candidate = candidates.next();
1456                    Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1457                    if (referencedFileIds != null) {
1458                        for (Integer referencedFileId : referencedFileIds) {
1459                            if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1460                                // active file that is not targeted for deletion is referenced so don't delete
1461                                candidates.remove();
1462                                break;
1463                            }
1464                        }
1465                        if (gcCandidateSet.contains(candidate)) {
1466                            ackMessageFileMap.remove(candidate);
1467                        } else {
1468                            if (LOG.isTraceEnabled()) {
1469                                LOG.trace("not removing data file: " + candidate
1470                                        + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1471                            }
1472                        }
1473                    }
1474                }
1475    
1476                if (!gcCandidateSet.isEmpty()) {
1477                    if (LOG.isDebugEnabled()) {
1478                        LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1479                    }
1480                    journal.removeDataFiles(gcCandidateSet);
1481                }
1482            }
1483    
1484            LOG.debug("Checkpoint done.");
1485        }
1486    
1487        final Runnable nullCompletionCallback = new Runnable() {
1488            @Override
1489            public void run() {
1490            }
1491        };
1492        private Location checkpointProducerAudit() throws IOException {
1493            ByteArrayOutputStream baos = new ByteArrayOutputStream();
1494            ObjectOutputStream oout = new ObjectOutputStream(baos);
1495            oout.writeObject(metadata.producerSequenceIdTracker);
1496            oout.flush();
1497            oout.close();
1498            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1499            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1500            try {
1501                location.getLatch().await();
1502            } catch (InterruptedException e) {
1503                throw new InterruptedIOException(e.toString());
1504            }
1505            return location;
1506        }
1507    
1508        public HashSet<Integer> getJournalFilesBeingReplicated() {
1509            return journalFilesBeingReplicated;
1510        }
1511    
1512        // /////////////////////////////////////////////////////////////////
1513        // StoredDestination related implementation methods.
1514        // /////////////////////////////////////////////////////////////////
1515    
1516        private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1517    
1518        class StoredSubscription {
1519            SubscriptionInfo subscriptionInfo;
1520            String lastAckId;
1521            Location lastAckLocation;
1522            Location cursor;
1523        }
1524    
1525        static class MessageKeys {
1526            final String messageId;
1527            final Location location;
1528    
1529            public MessageKeys(String messageId, Location location) {
1530                this.messageId=messageId;
1531                this.location=location;
1532            }
1533    
1534            @Override
1535            public String toString() {
1536                return "["+messageId+","+location+"]";
1537            }
1538        }
1539    
1540        static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1541            static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1542    
1543            public MessageKeys readPayload(DataInput dataIn) throws IOException {
1544                return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1545            }
1546    
1547            public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1548                dataOut.writeUTF(object.messageId);
1549                LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1550            }
1551        }
1552    
1553        class LastAck {
1554            long lastAckedSequence;
1555            byte priority;
1556    
1557            public LastAck(LastAck source) {
1558                this.lastAckedSequence = source.lastAckedSequence;
1559                this.priority = source.priority;
1560            }
1561    
1562            public LastAck() {
1563                this.priority = MessageOrderIndex.HI;
1564            }
1565    
1566            public LastAck(long ackLocation) {
1567                this.lastAckedSequence = ackLocation;
1568                this.priority = MessageOrderIndex.LO;
1569            }
1570    
1571            public LastAck(long ackLocation, byte priority) {
1572                this.lastAckedSequence = ackLocation;
1573                this.priority = priority;
1574            }
1575    
1576            public String toString() {
1577                return "[" + lastAckedSequence + ":" + priority + "]";
1578            }
1579        }
1580    
1581        protected class LastAckMarshaller implements Marshaller<LastAck> {
1582    
1583            public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1584                dataOut.writeLong(object.lastAckedSequence);
1585                dataOut.writeByte(object.priority);
1586            }
1587    
1588            public LastAck readPayload(DataInput dataIn) throws IOException {
1589                LastAck lastAcked = new LastAck();
1590                lastAcked.lastAckedSequence = dataIn.readLong();
1591                if (metadata.version >= 3) {
1592                    lastAcked.priority = dataIn.readByte();
1593                }
1594                return lastAcked;
1595            }
1596    
1597            public int getFixedSize() {
1598                return 9;
1599            }
1600    
1601            public LastAck deepCopy(LastAck source) {
1602                return new LastAck(source);
1603            }
1604    
1605            public boolean isDeepCopySupported() {
1606                return true;
1607            }
1608        }
1609    
1610        class StoredDestination {
1611    
1612            MessageOrderIndex orderIndex = new MessageOrderIndex();
1613            BTreeIndex<Location, Long> locationIndex;
1614            BTreeIndex<String, Long> messageIdIndex;
1615    
1616            // These bits are only set for Topics
1617            BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1618            BTreeIndex<String, LastAck> subscriptionAcks;
1619            HashMap<String, MessageOrderCursor> subscriptionCursors;
1620            ListIndex<String, SequenceSet> ackPositions;
1621    
1622            // Transient data used to track which Messages are no longer needed.
1623            final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1624            final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1625        }
1626    
1627        protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1628    
1629            public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1630                final StoredDestination value = new StoredDestination();
1631                value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1632                value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1633                value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1634    
1635                if (dataIn.readBoolean()) {
1636                    value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1637                    value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1638                    if (metadata.version >= 4) {
1639                        value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1640                    } else {
1641                        // upgrade
1642                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
1643                            public void execute(Transaction tx) throws IOException {
1644                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
1645                                    new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1646                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1647                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1648                                oldAckPositions.load(tx);
1649    
1650                                LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
1651    
1652                                // Do the initial build of the data in memory before writing into the store
1653                                // based Ack Positions List to avoid a lot of disk thrashing.
1654                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
1655                                while (iterator.hasNext()) {
1656                                    Entry<Long, HashSet<String>> entry = iterator.next();
1657    
1658                                    for(String subKey : entry.getValue()) {
1659                                        SequenceSet pendingAcks = temp.get(subKey);
1660                                        if (pendingAcks == null) {
1661                                            pendingAcks = new SequenceSet();
1662                                            temp.put(subKey, pendingAcks);
1663                                        }
1664    
1665                                        pendingAcks.add(entry.getKey());
1666                                    }
1667                                }
1668    
1669                                // Now move the pending messages to ack data into the store backed
1670                                // structure.
1671                                value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1672                                for(String subscriptionKey : temp.keySet()) {
1673                                    value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
1674                                }
1675    
1676                            }
1677                        });
1678                    }
1679                }
1680                if (metadata.version >= 2) {
1681                    value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1682                    value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1683                } else {
1684                        // upgrade
1685                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
1686                            public void execute(Transaction tx) throws IOException {
1687                                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1688                                value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1689                                value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1690                                value.orderIndex.lowPriorityIndex.load(tx);
1691    
1692                                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1693                                value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1694                                value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1695                                value.orderIndex.highPriorityIndex.load(tx);
1696                            }
1697                        });
1698                }
1699    
1700                return value;
1701            }
1702    
1703            public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1704                dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1705                dataOut.writeLong(value.locationIndex.getPageId());
1706                dataOut.writeLong(value.messageIdIndex.getPageId());
1707                if (value.subscriptions != null) {
1708                    dataOut.writeBoolean(true);
1709                    dataOut.writeLong(value.subscriptions.getPageId());
1710                    dataOut.writeLong(value.subscriptionAcks.getPageId());
1711                    dataOut.writeLong(value.ackPositions.getHeadPageId());
1712                } else {
1713                    dataOut.writeBoolean(false);
1714                }
1715                dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1716                dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1717            }
1718        }
1719    
1720        static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1721            final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1722    
1723            public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1724                KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1725                rc.mergeFramed((InputStream)dataIn);
1726                return rc;
1727            }
1728    
1729            public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1730                object.writeFramed((OutputStream)dataOut);
1731            }
1732        }
1733    
1734        protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1735            String key = key(destination);
1736            StoredDestination rc = storedDestinations.get(key);
1737            if (rc == null) {
1738                boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1739                rc = loadStoredDestination(tx, key, topic);
1740                // Cache it. We may want to remove/unload destinations from the
1741                // cache that are not used for a while
1742                // to reduce memory usage.
1743                storedDestinations.put(key, rc);
1744            }
1745            return rc;
1746        }
1747    
1748        protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1749            String key = key(destination);
1750            StoredDestination rc = storedDestinations.get(key);
1751            if (rc == null && metadata.destinations.containsKey(tx, key)) {
1752                rc = getStoredDestination(destination, tx);
1753            }
1754            return rc;
1755        }
1756    
1757        /**
1758         * @param tx
1759         * @param key
1760         * @param topic
1761         * @return
1762         * @throws IOException
1763         */
1764        private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1765            // Try to load the existing indexes..
1766            StoredDestination rc = metadata.destinations.get(tx, key);
1767            if (rc == null) {
1768                // Brand new destination.. allocate indexes for it.
1769                rc = new StoredDestination();
1770                rc.orderIndex.allocate(tx);
1771                rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1772                rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1773    
1774                if (topic) {
1775                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1776                    rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1777                    rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1778                }
1779                metadata.destinations.put(tx, key, rc);
1780            }
1781    
1782            // Configure the marshalers and load.
1783            rc.orderIndex.load(tx);
1784    
1785            // Figure out the next key using the last entry in the destination.
1786            rc.orderIndex.configureLast(tx);
1787    
1788            rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE);
1789            rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1790            rc.locationIndex.load(tx);
1791    
1792            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1793            rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1794            rc.messageIdIndex.load(tx);
1795    
1796            // If it was a topic...
1797            if (topic) {
1798    
1799                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1800                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1801                rc.subscriptions.load(tx);
1802    
1803                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1804                rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1805                rc.subscriptionAcks.load(tx);
1806    
1807                rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
1808                rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
1809                rc.ackPositions.load(tx);
1810    
1811                rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1812    
1813                if (metadata.version < 3) {
1814    
1815                    // on upgrade need to fill ackLocation with available messages past last ack
1816                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1817                        Entry<String, LastAck> entry = iterator.next();
1818                        for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1819                                rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1820                            Long sequence = orderIterator.next().getKey();
1821                            addAckLocation(tx, rc, sequence, entry.getKey());
1822                        }
1823                        // modify so it is upgraded
1824                        rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1825                    }
1826                }
1827    
1828                // Configure the message references index
1829                Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
1830                while (subscriptions.hasNext()) {
1831                    Entry<String, SequenceSet> subscription = subscriptions.next();
1832                    SequenceSet pendingAcks = subscription.getValue();
1833                    if (pendingAcks != null && !pendingAcks.isEmpty()) {
1834                        Long lastPendingAck = pendingAcks.getTail().getLast();
1835                        for(Long sequenceId : pendingAcks) {
1836                            Long current = rc.messageReferences.get(sequenceId);
1837                            if (current == null) {
1838                                current = new Long(0);
1839                            }
1840    
1841                            // We always add a trailing empty entry for the next position to start from
1842                            // so we need to ensure we don't count that as a message reference on reload.
1843                            if (!sequenceId.equals(lastPendingAck)) {
1844                                current = current.longValue() + 1;
1845                            }
1846    
1847                            rc.messageReferences.put(sequenceId, current);
1848                        }
1849                    }
1850                }
1851    
1852                // Configure the subscription cache
1853                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1854                    Entry<String, LastAck> entry = iterator.next();
1855                    rc.subscriptionCache.add(entry.getKey());
1856                }
1857    
1858                if (rc.orderIndex.nextMessageId == 0) {
1859                    // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1860                    if (!rc.subscriptionAcks.isEmpty(tx)) {
1861                        for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1862                            Entry<String, LastAck> entry = iterator.next();
1863                            rc.orderIndex.nextMessageId =
1864                                    Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1865                        }
1866                    }
1867                } else {
1868                    // update based on ackPositions for unmatched, last entry is always the next
1869                    if (!rc.messageReferences.isEmpty()) {
1870                        Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
1871                        rc.orderIndex.nextMessageId =
1872                                Math.max(rc.orderIndex.nextMessageId, nextMessageId);
1873                    }
1874                }
1875            }
1876    
1877            if (metadata.version < VERSION) {
1878                // store again after upgrade
1879                metadata.destinations.put(tx, key, rc);
1880            }
1881            return rc;
1882        }
1883    
1884        private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1885            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1886            if (sequences == null) {
1887                sequences = new SequenceSet();
1888                sequences.add(messageSequence);
1889                sd.ackPositions.add(tx, subscriptionKey, sequences);
1890            } else {
1891                sequences.add(messageSequence);
1892                sd.ackPositions.put(tx, subscriptionKey, sequences);
1893            }
1894    
1895            Long count = sd.messageReferences.get(messageSequence);
1896            if (count == null) {
1897                count = Long.valueOf(0L);
1898            }
1899            count = count.longValue() + 1;
1900            sd.messageReferences.put(messageSequence, count);
1901        }
1902    
1903        // new sub is interested in potentially all existing messages
1904        private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1905            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1906            if (sequences == null) {
1907                sequences = new SequenceSet();
1908                sequences.add(messageSequence);
1909                sd.ackPositions.add(tx, subscriptionKey, sequences);
1910            } else {
1911                sequences.add(messageSequence);
1912                sd.ackPositions.put(tx, subscriptionKey, sequences);
1913            }
1914    
1915            Long count = sd.messageReferences.get(messageSequence);
1916            if (count == null) {
1917                count = Long.valueOf(0L);
1918            }
1919            count = count.longValue() + 1;
1920            sd.messageReferences.put(messageSequence, count);
1921        }
1922    
1923        // on a new message add, all existing subs are interested in this message
1924        private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1925            for(String subscriptionKey : sd.subscriptionCache) {
1926                SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1927                if (sequences == null) {
1928                    sequences = new SequenceSet();
1929                    sequences.add(new Sequence(messageSequence, messageSequence + 1));
1930                    sd.ackPositions.add(tx, subscriptionKey, sequences);
1931                } else {
1932                    sequences.add(new Sequence(messageSequence, messageSequence + 1));
1933                    sd.ackPositions.put(tx, subscriptionKey, sequences);
1934                }
1935    
1936                Long count = sd.messageReferences.get(messageSequence);
1937                if (count == null) {
1938                    count = Long.valueOf(0L);
1939                }
1940                count = count.longValue() + 1;
1941                sd.messageReferences.put(messageSequence, count);
1942                sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
1943            }
1944        }
1945    
1946        private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1947            if (!sd.ackPositions.isEmpty(tx)) {
1948                SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
1949                if (sequences == null || sequences.isEmpty()) {
1950                    return;
1951                }
1952    
1953                ArrayList<Long> unreferenced = new ArrayList<Long>();
1954    
1955                for(Long sequenceId : sequences) {
1956                    Long references = sd.messageReferences.get(sequenceId);
1957                    if (references != null) {
1958                        references = references.longValue() - 1;
1959    
1960                        if (references.longValue() > 0) {
1961                            sd.messageReferences.put(sequenceId, references);
1962                        } else {
1963                            sd.messageReferences.remove(sequenceId);
1964                            unreferenced.add(sequenceId);
1965                        }
1966                    }
1967                }
1968    
1969                for(Long sequenceId : unreferenced) {
1970                    // Find all the entries that need to get deleted.
1971                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1972                    sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
1973    
1974                    // Do the actual deletes.
1975                    for (Entry<Long, MessageKeys> entry : deletes) {
1976                        sd.locationIndex.remove(tx, entry.getValue().location);
1977                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1978                        sd.orderIndex.remove(tx, entry.getKey());
1979                    }
1980                }
1981            }
1982        }
1983    
1984        /**
1985         * @param tx
1986         * @param sd
1987         * @param subscriptionKey
1988         * @param messageSequence
1989         * @throws IOException
1990         */
1991        private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
1992            // Remove the sub from the previous location set..
1993            if (messageSequence != null) {
1994                SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
1995                if (range != null && !range.isEmpty()) {
1996                    range.remove(messageSequence);
1997                    if (!range.isEmpty()) {
1998                        sd.ackPositions.put(tx, subscriptionKey, range);
1999                    } else {
2000                        sd.ackPositions.remove(tx, subscriptionKey);
2001                    }
2002    
2003                    // Check if the message is reference by any other subscription.
2004                    Long count = sd.messageReferences.get(messageSequence);
2005                    if (count != null){
2006                    long references = count.longValue() - 1;
2007                        if (references > 0) {
2008                            sd.messageReferences.put(messageSequence, Long.valueOf(references));
2009                            return;
2010                        } else {
2011                            sd.messageReferences.remove(messageSequence);
2012                        }
2013                    }
2014    
2015                    // Find all the entries that need to get deleted.
2016                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2017                    sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2018    
2019                    // Do the actual deletes.
2020                    for (Entry<Long, MessageKeys> entry : deletes) {
2021                        sd.locationIndex.remove(tx, entry.getValue().location);
2022                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2023                        sd.orderIndex.remove(tx, entry.getKey());
2024                    }
2025                }
2026            }
2027        }
2028    
2029        public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2030            return sd.subscriptionAcks.get(tx, subscriptionKey);
2031        }
2032    
2033        public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2034            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2035            if (messageSequences != null) {
2036                long result = messageSequences.rangeSize();
2037                // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2038                return result > 0 ? result - 1 : 0;
2039            }
2040    
2041            return 0;
2042        }
2043    
2044        private String key(KahaDestination destination) {
2045            return destination.getType().getNumber() + ":" + destination.getName();
2046        }
2047    
2048        // /////////////////////////////////////////////////////////////////
2049        // Transaction related implementation methods.
2050        // /////////////////////////////////////////////////////////////////
2051        @SuppressWarnings("rawtypes")
2052        private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2053        @SuppressWarnings("rawtypes")
2054        protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2055        protected final Set<String> ackedAndPrepared = new HashSet<String>();
2056    
2057        // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2058        // till then they are skipped by the store.
2059        // 'at most once' XA guarantee
2060        public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2061            this.indexLock.writeLock().lock();
2062            try {
2063                for (MessageAck ack : acks) {
2064                    ackedAndPrepared.add(ack.getLastMessageId().toString());
2065                }
2066            } finally {
2067                this.indexLock.writeLock().unlock();
2068            }
2069        }
2070    
2071        public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
2072            if (acks != null) {
2073                this.indexLock.writeLock().lock();
2074                try {
2075                    for (MessageAck ack : acks) {
2076                        ackedAndPrepared.remove(ack.getLastMessageId().toString());
2077                    }
2078                } finally {
2079                    this.indexLock.writeLock().unlock();
2080                }
2081            }
2082        }
2083    
2084        @SuppressWarnings("rawtypes")
2085        private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
2086            TransactionId key = TransactionIdConversion.convert(info);
2087            List<Operation> tx;
2088            synchronized (inflightTransactions) {
2089                tx = inflightTransactions.get(key);
2090                if (tx == null) {
2091                    tx = Collections.synchronizedList(new ArrayList<Operation>());
2092                    inflightTransactions.put(key, tx);
2093                }
2094            }
2095            return tx;
2096        }
2097    
2098        @SuppressWarnings("unused")
2099        private TransactionId key(KahaTransactionInfo transactionInfo) {
2100            return TransactionIdConversion.convert(transactionInfo);
2101        }
2102    
2103        abstract class Operation <T extends JournalCommand<T>> {
2104            final T command;
2105            final Location location;
2106    
2107            public Operation(T command, Location location) {
2108                this.command = command;
2109                this.location = location;
2110            }
2111    
2112            public Location getLocation() {
2113                return location;
2114            }
2115    
2116            public T getCommand() {
2117                return command;
2118            }
2119    
2120            abstract public void execute(Transaction tx) throws IOException;
2121        }
2122    
2123        class AddOpperation extends Operation<KahaAddMessageCommand> {
2124    
2125            public AddOpperation(KahaAddMessageCommand command, Location location) {
2126                super(command, location);
2127            }
2128    
2129            @Override
2130            public void execute(Transaction tx) throws IOException {
2131                upadateIndex(tx, command, location);
2132            }
2133    
2134        }
2135    
2136        class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
2137    
2138            public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
2139                super(command, location);
2140            }
2141    
2142            @Override
2143            public void execute(Transaction tx) throws IOException {
2144                updateIndex(tx, command, location);
2145            }
2146        }
2147    
2148        // /////////////////////////////////////////////////////////////////
2149        // Initialization related implementation methods.
2150        // /////////////////////////////////////////////////////////////////
2151    
2152        private PageFile createPageFile() {
2153            PageFile index = new PageFile(directory, "db");
2154            index.setEnableWriteThread(isEnableIndexWriteAsync());
2155            index.setWriteBatchSize(getIndexWriteBatchSize());
2156            index.setPageCacheSize(indexCacheSize);
2157            index.setUseLFRUEviction(isUseIndexLFRUEviction());
2158            index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2159            index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2160            index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2161            index.setEnablePageCaching(isEnableIndexPageCaching());
2162            return index;
2163        }
2164    
2165        private Journal createJournal() throws IOException {
2166            Journal manager = new Journal();
2167            manager.setDirectory(directory);
2168            manager.setMaxFileLength(getJournalMaxFileLength());
2169            manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2170            manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2171            manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2172            manager.setArchiveDataLogs(isArchiveDataLogs());
2173            manager.setSizeAccumulator(storeSize);
2174            manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2175            if (getDirectoryArchive() != null) {
2176                IOHelper.mkdirs(getDirectoryArchive());
2177                manager.setDirectoryArchive(getDirectoryArchive());
2178            }
2179            return manager;
2180        }
2181    
2182        public int getJournalMaxWriteBatchSize() {
2183            return journalMaxWriteBatchSize;
2184        }
2185    
2186        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2187            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2188        }
2189    
2190        public File getDirectory() {
2191            return directory;
2192        }
2193    
2194        public void setDirectory(File directory) {
2195            this.directory = directory;
2196        }
2197    
2198        public boolean isDeleteAllMessages() {
2199            return deleteAllMessages;
2200        }
2201    
2202        public void setDeleteAllMessages(boolean deleteAllMessages) {
2203            this.deleteAllMessages = deleteAllMessages;
2204        }
2205    
2206        public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2207            this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2208        }
2209    
2210        public int getIndexWriteBatchSize() {
2211            return setIndexWriteBatchSize;
2212        }
2213    
2214        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2215            this.enableIndexWriteAsync = enableIndexWriteAsync;
2216        }
2217    
2218        boolean isEnableIndexWriteAsync() {
2219            return enableIndexWriteAsync;
2220        }
2221    
2222        public boolean isEnableJournalDiskSyncs() {
2223            return enableJournalDiskSyncs;
2224        }
2225    
2226        public void setEnableJournalDiskSyncs(boolean syncWrites) {
2227            this.enableJournalDiskSyncs = syncWrites;
2228        }
2229    
2230        public long getCheckpointInterval() {
2231            return checkpointInterval;
2232        }
2233    
2234        public void setCheckpointInterval(long checkpointInterval) {
2235            this.checkpointInterval = checkpointInterval;
2236        }
2237    
2238        public long getCleanupInterval() {
2239            return cleanupInterval;
2240        }
2241    
2242        public void setCleanupInterval(long cleanupInterval) {
2243            this.cleanupInterval = cleanupInterval;
2244        }
2245    
2246        public void setJournalMaxFileLength(int journalMaxFileLength) {
2247            this.journalMaxFileLength = journalMaxFileLength;
2248        }
2249    
2250        public int getJournalMaxFileLength() {
2251            return journalMaxFileLength;
2252        }
2253    
2254        public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2255            this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2256        }
2257    
2258        public int getMaxFailoverProducersToTrack() {
2259            return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2260        }
2261    
2262        public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2263            this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2264        }
2265    
2266        public int getFailoverProducersAuditDepth() {
2267            return this.metadata.producerSequenceIdTracker.getAuditDepth();
2268        }
2269    
2270        public PageFile getPageFile() {
2271            if (pageFile == null) {
2272                pageFile = createPageFile();
2273            }
2274            return pageFile;
2275        }
2276    
2277        public Journal getJournal() throws IOException {
2278            if (journal == null) {
2279                journal = createJournal();
2280            }
2281            return journal;
2282        }
2283    
2284        public boolean isFailIfDatabaseIsLocked() {
2285            return failIfDatabaseIsLocked;
2286        }
2287    
2288        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2289            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2290        }
2291    
2292        public boolean isIgnoreMissingJournalfiles() {
2293            return ignoreMissingJournalfiles;
2294        }
2295    
2296        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2297            this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2298        }
2299    
2300        public int getIndexCacheSize() {
2301            return indexCacheSize;
2302        }
2303    
2304        public void setIndexCacheSize(int indexCacheSize) {
2305            this.indexCacheSize = indexCacheSize;
2306        }
2307    
2308        public boolean isCheckForCorruptJournalFiles() {
2309            return checkForCorruptJournalFiles;
2310        }
2311    
2312        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2313            this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2314        }
2315    
2316        public boolean isChecksumJournalFiles() {
2317            return checksumJournalFiles;
2318        }
2319    
2320        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2321            this.checksumJournalFiles = checksumJournalFiles;
2322        }
2323    
2324        public void setBrokerService(BrokerService brokerService) {
2325            this.brokerService = brokerService;
2326        }
2327    
2328        /**
2329         * @return the archiveDataLogs
2330         */
2331        public boolean isArchiveDataLogs() {
2332            return this.archiveDataLogs;
2333        }
2334    
2335        /**
2336         * @param archiveDataLogs the archiveDataLogs to set
2337         */
2338        public void setArchiveDataLogs(boolean archiveDataLogs) {
2339            this.archiveDataLogs = archiveDataLogs;
2340        }
2341    
2342        /**
2343         * @return the directoryArchive
2344         */
2345        public File getDirectoryArchive() {
2346            return this.directoryArchive;
2347        }
2348    
2349        /**
2350         * @param directoryArchive the directoryArchive to set
2351         */
2352        public void setDirectoryArchive(File directoryArchive) {
2353            this.directoryArchive = directoryArchive;
2354        }
2355    
2356        /**
2357         * @return the databaseLockedWaitDelay
2358         */
2359        public int getDatabaseLockedWaitDelay() {
2360            return this.databaseLockedWaitDelay;
2361        }
2362    
2363        /**
2364         * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
2365         */
2366        public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
2367            this.databaseLockedWaitDelay = databaseLockedWaitDelay;
2368        }
2369    
2370        public boolean isRewriteOnRedelivery() {
2371            return rewriteOnRedelivery;
2372        }
2373    
2374        public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
2375            this.rewriteOnRedelivery = rewriteOnRedelivery;
2376        }
2377    
2378        public boolean isArchiveCorruptedIndex() {
2379            return archiveCorruptedIndex;
2380        }
2381    
2382        public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2383            this.archiveCorruptedIndex = archiveCorruptedIndex;
2384        }
2385    
2386        public float getIndexLFUEvictionFactor() {
2387            return indexLFUEvictionFactor;
2388        }
2389    
2390        public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2391            this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2392        }
2393    
2394        public boolean isUseIndexLFRUEviction() {
2395            return useIndexLFRUEviction;
2396        }
2397    
2398        public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2399            this.useIndexLFRUEviction = useIndexLFRUEviction;
2400        }
2401    
2402        public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2403            this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2404        }
2405    
2406        public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2407            this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2408        }
2409    
2410        public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2411            this.enableIndexPageCaching = enableIndexPageCaching;
2412        }
2413    
2414        public boolean isEnableIndexDiskSyncs() {
2415            return enableIndexDiskSyncs;
2416        }
2417    
2418        public boolean isEnableIndexRecoveryFile() {
2419            return enableIndexRecoveryFile;
2420        }
2421    
2422        public boolean isEnableIndexPageCaching() {
2423            return enableIndexPageCaching;
2424        }
2425    
2426        // /////////////////////////////////////////////////////////////////
2427        // Internal conversion methods.
2428        // /////////////////////////////////////////////////////////////////
2429    
2430        class MessageOrderCursor{
2431            long defaultCursorPosition;
2432            long lowPriorityCursorPosition;
2433            long highPriorityCursorPosition;
2434            MessageOrderCursor(){
2435            }
2436    
2437            MessageOrderCursor(long position){
2438                this.defaultCursorPosition=position;
2439                this.lowPriorityCursorPosition=position;
2440                this.highPriorityCursorPosition=position;
2441            }
2442    
2443            MessageOrderCursor(MessageOrderCursor other){
2444                this.defaultCursorPosition=other.defaultCursorPosition;
2445                this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2446                this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2447            }
2448    
2449            MessageOrderCursor copy() {
2450                return new MessageOrderCursor(this);
2451            }
2452    
2453            void reset() {
2454                this.defaultCursorPosition=0;
2455                this.highPriorityCursorPosition=0;
2456                this.lowPriorityCursorPosition=0;
2457            }
2458    
2459            void increment() {
2460                if (defaultCursorPosition!=0) {
2461                    defaultCursorPosition++;
2462                }
2463                if (highPriorityCursorPosition!=0) {
2464                    highPriorityCursorPosition++;
2465                }
2466                if (lowPriorityCursorPosition!=0) {
2467                    lowPriorityCursorPosition++;
2468                }
2469            }
2470    
2471            public String toString() {
2472               return "MessageOrderCursor:[def:" + defaultCursorPosition
2473                       + ", low:" + lowPriorityCursorPosition
2474                       + ", high:" +  highPriorityCursorPosition + "]";
2475            }
2476    
2477            public void sync(MessageOrderCursor other) {
2478                this.defaultCursorPosition=other.defaultCursorPosition;
2479                this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2480                this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2481            }
2482        }
2483    
2484        class MessageOrderIndex {
2485            static final byte HI = 9;
2486            static final byte LO = 0;
2487            static final byte DEF = 4;
2488    
2489            long nextMessageId;
2490            BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2491            BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2492            BTreeIndex<Long, MessageKeys> highPriorityIndex;
2493            MessageOrderCursor cursor = new MessageOrderCursor();
2494            Long lastDefaultKey;
2495            Long lastHighKey;
2496            Long lastLowKey;
2497            byte lastGetPriority;
2498    
2499            MessageKeys remove(Transaction tx, Long key) throws IOException {
2500                MessageKeys result = defaultPriorityIndex.remove(tx, key);
2501                if (result == null && highPriorityIndex!=null) {
2502                    result = highPriorityIndex.remove(tx, key);
2503                    if (result ==null && lowPriorityIndex!=null) {
2504                        result = lowPriorityIndex.remove(tx, key);
2505                    }
2506                }
2507                return result;
2508            }
2509    
2510            void load(Transaction tx) throws IOException {
2511                defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2512                defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2513                defaultPriorityIndex.load(tx);
2514                lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2515                lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2516                lowPriorityIndex.load(tx);
2517                highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2518                highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2519                highPriorityIndex.load(tx);
2520            }
2521    
2522            void allocate(Transaction tx) throws IOException {
2523                defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2524                if (metadata.version >= 2) {
2525                    lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2526                    highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2527                }
2528            }
2529    
2530            void configureLast(Transaction tx) throws IOException {
2531                // Figure out the next key using the last entry in the destination.
2532                if (highPriorityIndex != null) {
2533                    Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2534                    if (lastEntry != null) {
2535                        nextMessageId = lastEntry.getKey() + 1;
2536                    } else {
2537                        lastEntry = defaultPriorityIndex.getLast(tx);
2538                        if (lastEntry != null) {
2539                            nextMessageId = lastEntry.getKey() + 1;
2540                        } else {
2541                            lastEntry = lowPriorityIndex.getLast(tx);
2542                            if (lastEntry != null) {
2543                                nextMessageId = lastEntry.getKey() + 1;
2544                            }
2545                        }
2546                    }
2547                } else {
2548                    Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2549                    if (lastEntry != null) {
2550                        nextMessageId = lastEntry.getKey() + 1;
2551                    }
2552                }
2553            }
2554    
2555            void clear(Transaction tx) throws IOException {
2556                this.remove(tx);
2557                this.resetCursorPosition();
2558                this.allocate(tx);
2559                this.load(tx);
2560                this.configureLast(tx);
2561            }
2562    
2563            void remove(Transaction tx) throws IOException {
2564                defaultPriorityIndex.clear(tx);
2565                defaultPriorityIndex.unload(tx);
2566                tx.free(defaultPriorityIndex.getPageId());
2567                if (lowPriorityIndex != null) {
2568                    lowPriorityIndex.clear(tx);
2569                    lowPriorityIndex.unload(tx);
2570    
2571                    tx.free(lowPriorityIndex.getPageId());
2572                }
2573                if (highPriorityIndex != null) {
2574                    highPriorityIndex.clear(tx);
2575                    highPriorityIndex.unload(tx);
2576                    tx.free(highPriorityIndex.getPageId());
2577                }
2578            }
2579    
2580            void resetCursorPosition() {
2581                this.cursor.reset();
2582                lastDefaultKey = null;
2583                lastHighKey = null;
2584                lastLowKey = null;
2585            }
2586    
2587            void setBatch(Transaction tx, Long sequence) throws IOException {
2588                if (sequence != null) {
2589                    Long nextPosition = new Long(sequence.longValue() + 1);
2590                    if (defaultPriorityIndex.containsKey(tx, sequence)) {
2591                        lastDefaultKey = sequence;
2592                        cursor.defaultCursorPosition = nextPosition.longValue();
2593                    } else if (highPriorityIndex != null) {
2594                        if (highPriorityIndex.containsKey(tx, sequence)) {
2595                            lastHighKey = sequence;
2596                            cursor.highPriorityCursorPosition = nextPosition.longValue();
2597                        } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2598                            lastLowKey = sequence;
2599                            cursor.lowPriorityCursorPosition = nextPosition.longValue();
2600                        }
2601                    } else {
2602                        LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
2603                        lastDefaultKey = sequence;
2604                        cursor.defaultCursorPosition = nextPosition.longValue();
2605                    }
2606                }
2607            }
2608    
2609            void setBatch(Transaction tx, LastAck last) throws IOException {
2610                setBatch(tx, last.lastAckedSequence);
2611                if (cursor.defaultCursorPosition == 0
2612                        && cursor.highPriorityCursorPosition == 0
2613                        && cursor.lowPriorityCursorPosition == 0) {
2614                    long next = last.lastAckedSequence + 1;
2615                    switch (last.priority) {
2616                        case DEF:
2617                            cursor.defaultCursorPosition = next;
2618                            cursor.highPriorityCursorPosition = next;
2619                            break;
2620                        case HI:
2621                            cursor.highPriorityCursorPosition = next;
2622                            break;
2623                        case LO:
2624                            cursor.lowPriorityCursorPosition = next;
2625                            cursor.defaultCursorPosition = next;
2626                            cursor.highPriorityCursorPosition = next;
2627                            break;
2628                    }
2629                }
2630            }
2631    
2632            void stoppedIterating() {
2633                if (lastDefaultKey!=null) {
2634                    cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2635                }
2636                if (lastHighKey!=null) {
2637                    cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2638                }
2639                if (lastLowKey!=null) {
2640                    cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2641                }
2642                lastDefaultKey = null;
2643                lastHighKey = null;
2644                lastLowKey = null;
2645            }
2646    
2647            void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2648                    throws IOException {
2649                if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2650                    getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2651                } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2652                    getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2653                } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2654                    getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2655                }
2656            }
2657    
2658            void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2659                    BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2660    
2661                Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2662                deletes.add(iterator.next());
2663            }
2664    
2665            long getNextMessageId(int priority) {
2666                return nextMessageId++;
2667            }
2668    
2669            MessageKeys get(Transaction tx, Long key) throws IOException {
2670                MessageKeys result = defaultPriorityIndex.get(tx, key);
2671                if (result == null) {
2672                    result = highPriorityIndex.get(tx, key);
2673                    if (result == null) {
2674                        result = lowPriorityIndex.get(tx, key);
2675                        lastGetPriority = LO;
2676                    } else {
2677                        lastGetPriority = HI;
2678                    }
2679                } else {
2680                    lastGetPriority = DEF;
2681                }
2682                return result;
2683            }
2684    
2685            MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2686                if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2687                    return defaultPriorityIndex.put(tx, key, value);
2688                } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2689                    return highPriorityIndex.put(tx, key, value);
2690                } else {
2691                    return lowPriorityIndex.put(tx, key, value);
2692                }
2693            }
2694    
2695            Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2696                return new MessageOrderIterator(tx,cursor);
2697            }
2698    
2699            Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2700                return new MessageOrderIterator(tx,m);
2701            }
2702    
2703            public byte lastGetPriority() {
2704                return lastGetPriority;
2705            }
2706    
2707            class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2708                Iterator<Entry<Long, MessageKeys>>currentIterator;
2709                final Iterator<Entry<Long, MessageKeys>>highIterator;
2710                final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2711                final Iterator<Entry<Long, MessageKeys>>lowIterator;
2712    
2713                MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2714                    this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2715                    if (highPriorityIndex != null) {
2716                        this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2717                    } else {
2718                        this.highIterator = null;
2719                    }
2720                    if (lowPriorityIndex != null) {
2721                        this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2722                    } else {
2723                        this.lowIterator = null;
2724                    }
2725                }
2726    
2727                public boolean hasNext() {
2728                    if (currentIterator == null) {
2729                        if (highIterator != null) {
2730                            if (highIterator.hasNext()) {
2731                                currentIterator = highIterator;
2732                                return currentIterator.hasNext();
2733                            }
2734                            if (defaultIterator.hasNext()) {
2735                                currentIterator = defaultIterator;
2736                                return currentIterator.hasNext();
2737                            }
2738                            if (lowIterator.hasNext()) {
2739                                currentIterator = lowIterator;
2740                                return currentIterator.hasNext();
2741                            }
2742                            return false;
2743                        } else {
2744                            currentIterator = defaultIterator;
2745                            return currentIterator.hasNext();
2746                        }
2747                    }
2748                    if (highIterator != null) {
2749                        if (currentIterator.hasNext()) {
2750                            return true;
2751                        }
2752                        if (currentIterator == highIterator) {
2753                            if (defaultIterator.hasNext()) {
2754                                currentIterator = defaultIterator;
2755                                return currentIterator.hasNext();
2756                            }
2757                            if (lowIterator.hasNext()) {
2758                                currentIterator = lowIterator;
2759                                return currentIterator.hasNext();
2760                            }
2761                            return false;
2762                        }
2763    
2764                        if (currentIterator == defaultIterator) {
2765                            if (lowIterator.hasNext()) {
2766                                currentIterator = lowIterator;
2767                                return currentIterator.hasNext();
2768                            }
2769                            return false;
2770                        }
2771                    }
2772                    return currentIterator.hasNext();
2773                }
2774    
2775                public Entry<Long, MessageKeys> next() {
2776                    Entry<Long, MessageKeys> result = currentIterator.next();
2777                    if (result != null) {
2778                        Long key = result.getKey();
2779                        if (highIterator != null) {
2780                            if (currentIterator == defaultIterator) {
2781                                lastDefaultKey = key;
2782                            } else if (currentIterator == highIterator) {
2783                                lastHighKey = key;
2784                            } else {
2785                                lastLowKey = key;
2786                            }
2787                        } else {
2788                            lastDefaultKey = key;
2789                        }
2790                    }
2791                    return result;
2792                }
2793    
2794                public void remove() {
2795                    throw new UnsupportedOperationException();
2796                }
2797    
2798            }
2799        }
2800    
2801        private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2802            final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2803    
2804            public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2805                ByteArrayOutputStream baos = new ByteArrayOutputStream();
2806                ObjectOutputStream oout = new ObjectOutputStream(baos);
2807                oout.writeObject(object);
2808                oout.flush();
2809                oout.close();
2810                byte[] data = baos.toByteArray();
2811                dataOut.writeInt(data.length);
2812                dataOut.write(data);
2813            }
2814    
2815            @SuppressWarnings("unchecked")
2816            public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2817                int dataLen = dataIn.readInt();
2818                byte[] data = new byte[dataLen];
2819                dataIn.readFully(data);
2820                ByteArrayInputStream bais = new ByteArrayInputStream(data);
2821                ObjectInputStream oin = new ObjectInputStream(bais);
2822                try {
2823                    return (HashSet<String>) oin.readObject();
2824                } catch (ClassNotFoundException cfe) {
2825                    IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2826                    ioe.initCause(cfe);
2827                    throw ioe;
2828                }
2829            }
2830        }
2831    }