001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicReference;
043
044import org.apache.activemq.broker.ConnectionContext;
045import org.apache.activemq.broker.region.BaseDestination;
046import org.apache.activemq.broker.scheduler.JobSchedulerStore;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQQueue;
049import org.apache.activemq.command.ActiveMQTempQueue;
050import org.apache.activemq.command.ActiveMQTempTopic;
051import org.apache.activemq.command.ActiveMQTopic;
052import org.apache.activemq.command.Message;
053import org.apache.activemq.command.MessageAck;
054import org.apache.activemq.command.MessageId;
055import org.apache.activemq.command.ProducerId;
056import org.apache.activemq.command.SubscriptionInfo;
057import org.apache.activemq.command.TransactionId;
058import org.apache.activemq.openwire.OpenWireFormat;
059import org.apache.activemq.protobuf.Buffer;
060import org.apache.activemq.store.AbstractMessageStore;
061import org.apache.activemq.store.IndexListener;
062import org.apache.activemq.store.ListenableFuture;
063import org.apache.activemq.store.MessageRecoveryListener;
064import org.apache.activemq.store.MessageStore;
065import org.apache.activemq.store.MessageStoreStatistics;
066import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
067import org.apache.activemq.store.NoLocalSubscriptionAware;
068import org.apache.activemq.store.PersistenceAdapter;
069import org.apache.activemq.store.TopicMessageStore;
070import org.apache.activemq.store.TransactionIdTransformer;
071import org.apache.activemq.store.TransactionStore;
072import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
073import org.apache.activemq.store.kahadb.data.KahaDestination;
074import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
075import org.apache.activemq.store.kahadb.data.KahaLocation;
076import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
077import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
078import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
079import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
080import org.apache.activemq.store.kahadb.disk.journal.Location;
081import org.apache.activemq.store.kahadb.disk.page.Transaction;
082import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
083import org.apache.activemq.usage.MemoryUsage;
084import org.apache.activemq.usage.SystemUsage;
085import org.apache.activemq.util.IOExceptionSupport;
086import org.apache.activemq.util.ServiceStopper;
087import org.apache.activemq.util.ThreadPoolUtils;
088import org.apache.activemq.wireformat.WireFormat;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware {
093    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
094    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
095
096    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
097    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
098            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
099    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
100    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
101            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
102
103    protected ExecutorService queueExecutor;
104    protected ExecutorService topicExecutor;
105    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
106    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
107    final WireFormat wireFormat = new OpenWireFormat();
108    private SystemUsage usageManager;
109    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
110    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
111    Semaphore globalQueueSemaphore;
112    Semaphore globalTopicSemaphore;
113    private boolean concurrentStoreAndDispatchQueues = true;
114    // when true, message order may be compromised when cache is exhausted if store is out
115    // or order w.r.t cache
116    private boolean concurrentStoreAndDispatchTopics = false;
117    private final boolean concurrentStoreAndDispatchTransactions = false;
118    private int maxAsyncJobs = MAX_ASYNC_JOBS;
119    private final KahaDBTransactionStore transactionStore;
120    private TransactionIdTransformer transactionIdTransformer;
121
122    public KahaDBStore() {
123        this.transactionStore = new KahaDBTransactionStore(this);
124        this.transactionIdTransformer = new TransactionIdTransformer() {
125            @Override
126            public TransactionId transform(TransactionId txid) {
127                return txid;
128            }
129        };
130    }
131
132    @Override
133    public String toString() {
134        return "KahaDB:[" + directory.getAbsolutePath() + "]";
135    }
136
137    @Override
138    public void setBrokerName(String brokerName) {
139    }
140
141    @Override
142    public void setUsageManager(SystemUsage usageManager) {
143        this.usageManager = usageManager;
144    }
145
146    public SystemUsage getUsageManager() {
147        return this.usageManager;
148    }
149
150    /**
151     * @return the concurrentStoreAndDispatch
152     */
153    public boolean isConcurrentStoreAndDispatchQueues() {
154        return this.concurrentStoreAndDispatchQueues;
155    }
156
157    /**
158     * @param concurrentStoreAndDispatch
159     *            the concurrentStoreAndDispatch to set
160     */
161    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
162        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
163    }
164
165    /**
166     * @return the concurrentStoreAndDispatch
167     */
168    public boolean isConcurrentStoreAndDispatchTopics() {
169        return this.concurrentStoreAndDispatchTopics;
170    }
171
172    /**
173     * @param concurrentStoreAndDispatch
174     *            the concurrentStoreAndDispatch to set
175     */
176    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
177        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
178    }
179
180    public boolean isConcurrentStoreAndDispatchTransactions() {
181        return this.concurrentStoreAndDispatchTransactions;
182    }
183
184    /**
185     * @return the maxAsyncJobs
186     */
187    public int getMaxAsyncJobs() {
188        return this.maxAsyncJobs;
189    }
190
191    /**
192     * @param maxAsyncJobs
193     *            the maxAsyncJobs to set
194     */
195    public void setMaxAsyncJobs(int maxAsyncJobs) {
196        this.maxAsyncJobs = maxAsyncJobs;
197    }
198
199
200    @Override
201    protected void configureMetadata() {
202        if (brokerService != null) {
203            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
204            wireFormat.setVersion(metadata.openwireVersion);
205
206            if (LOG.isDebugEnabled()) {
207                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
208            }
209
210        }
211    }
212
213    @Override
214    public void doStart() throws Exception {
215        //configure the metadata before start, right now
216        //this is just the open wire version
217        configureMetadata();
218
219        super.doStart();
220
221        if (brokerService != null) {
222            // In case the recovered store used a different OpenWire version log a warning
223            // to assist in determining why journal reads fail.
224            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
225                LOG.warn("Existing Store uses a different OpenWire version[{}] " +
226                         "than the version configured[{}] reverting to the version " +
227                         "used by this store, some newer broker features may not work" +
228                         "as expected.",
229                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
230
231                // Update the broker service instance to the actual version in use.
232                wireFormat.setVersion(metadata.openwireVersion);
233                brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
234            }
235        }
236
237        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
238        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
239        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
240        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
241        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
242            asyncQueueJobQueue, new ThreadFactory() {
243                @Override
244                public Thread newThread(Runnable runnable) {
245                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
246                    thread.setDaemon(true);
247                    return thread;
248                }
249            });
250        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
251            asyncTopicJobQueue, new ThreadFactory() {
252                @Override
253                public Thread newThread(Runnable runnable) {
254                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
255                    thread.setDaemon(true);
256                    return thread;
257                }
258            });
259    }
260
261    @Override
262    public void doStop(ServiceStopper stopper) throws Exception {
263        // drain down async jobs
264        LOG.info("Stopping async queue tasks");
265        if (this.globalQueueSemaphore != null) {
266            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
267        }
268        synchronized (this.asyncQueueMaps) {
269            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
270                synchronized (m) {
271                    for (StoreTask task : m.values()) {
272                        task.cancel();
273                    }
274                }
275            }
276            this.asyncQueueMaps.clear();
277        }
278        LOG.info("Stopping async topic tasks");
279        if (this.globalTopicSemaphore != null) {
280            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
281        }
282        synchronized (this.asyncTopicMaps) {
283            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
284                synchronized (m) {
285                    for (StoreTask task : m.values()) {
286                        task.cancel();
287                    }
288                }
289            }
290            this.asyncTopicMaps.clear();
291        }
292        if (this.globalQueueSemaphore != null) {
293            this.globalQueueSemaphore.drainPermits();
294        }
295        if (this.globalTopicSemaphore != null) {
296            this.globalTopicSemaphore.drainPermits();
297        }
298        if (this.queueExecutor != null) {
299            ThreadPoolUtils.shutdownNow(queueExecutor);
300            queueExecutor = null;
301        }
302        if (this.topicExecutor != null) {
303            ThreadPoolUtils.shutdownNow(topicExecutor);
304            topicExecutor = null;
305        }
306        LOG.info("Stopped KahaDB");
307        super.doStop(stopper);
308    }
309
310    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
311        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
312            @Override
313            public Location execute(Transaction tx) throws IOException {
314                StoredDestination sd = getStoredDestination(destination, tx);
315                Long sequence = sd.messageIdIndex.get(tx, key);
316                if (sequence == null) {
317                    return null;
318                }
319                return sd.orderIndex.get(tx, sequence).location;
320            }
321        });
322    }
323
324    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
325        StoreQueueTask task = null;
326        synchronized (store.asyncTaskMap) {
327            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
328        }
329        return task;
330    }
331
332    // with asyncTaskMap locked
333    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
334        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
335        this.queueExecutor.execute(task);
336    }
337
338    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
339        StoreTopicTask task = null;
340        synchronized (store.asyncTaskMap) {
341            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
342        }
343        return task;
344    }
345
346    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
347        synchronized (store.asyncTaskMap) {
348            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
349        }
350        this.topicExecutor.execute(task);
351    }
352
353    @Override
354    public TransactionStore createTransactionStore() throws IOException {
355        return this.transactionStore;
356    }
357
358    public boolean getForceRecoverIndex() {
359        return this.forceRecoverIndex;
360    }
361
362    public void setForceRecoverIndex(boolean forceRecoverIndex) {
363        this.forceRecoverIndex = forceRecoverIndex;
364    }
365
366    public class KahaDBMessageStore extends AbstractMessageStore {
367        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
368        protected KahaDestination dest;
369        private final int maxAsyncJobs;
370        private final Semaphore localDestinationSemaphore;
371
372        double doneTasks, canceledTasks = 0;
373
374        public KahaDBMessageStore(ActiveMQDestination destination) {
375            super(destination);
376            this.dest = convert(destination);
377            this.maxAsyncJobs = getMaxAsyncJobs();
378            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
379        }
380
381        @Override
382        public ActiveMQDestination getDestination() {
383            return destination;
384        }
385
386        @Override
387        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
388                throws IOException {
389            if (isConcurrentStoreAndDispatchQueues()) {
390                message.beforeMarshall(wireFormat);
391                StoreQueueTask result = new StoreQueueTask(this, context, message);
392                ListenableFuture<Object> future = result.getFuture();
393                message.getMessageId().setFutureOrSequenceLong(future);
394                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
395                result.aquireLocks();
396                synchronized (asyncTaskMap) {
397                    addQueueTask(this, result);
398                    if (indexListener != null) {
399                        indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
400                    }
401                }
402                return future;
403            } else {
404                return super.asyncAddQueueMessage(context, message);
405            }
406        }
407
408        @Override
409        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
410            if (isConcurrentStoreAndDispatchQueues()) {
411                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
412                StoreQueueTask task = null;
413                synchronized (asyncTaskMap) {
414                    task = (StoreQueueTask) asyncTaskMap.get(key);
415                }
416                if (task != null) {
417                    if (ack.isInTransaction() || !task.cancel()) {
418                        try {
419                            task.future.get();
420                        } catch (InterruptedException e) {
421                            throw new InterruptedIOException(e.toString());
422                        } catch (Exception ignored) {
423                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
424                        }
425                        removeMessage(context, ack);
426                    } else {
427                        indexLock.writeLock().lock();
428                        try {
429                            metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
430                        } finally {
431                            indexLock.writeLock().unlock();
432                        }
433                        synchronized (asyncTaskMap) {
434                            asyncTaskMap.remove(key);
435                        }
436                    }
437                } else {
438                    removeMessage(context, ack);
439                }
440            } else {
441                removeMessage(context, ack);
442            }
443        }
444
445        @Override
446        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
447            final KahaAddMessageCommand command = new KahaAddMessageCommand();
448            command.setDestination(dest);
449            command.setMessageId(message.getMessageId().toProducerKey());
450            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
451            command.setPriority(message.getPriority());
452            command.setPrioritySupported(isPrioritizedMessages());
453            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
454            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
455            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
456                // sync add? (for async, future present from getFutureOrSequenceLong)
457                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
458
459                @Override
460                public void sequenceAssignedWithIndexLocked(final long sequence) {
461                    message.getMessageId().setFutureOrSequenceLong(sequence);
462                    if (indexListener != null) {
463                        if (possibleFuture == null) {
464                            trackPendingAdd(dest, sequence);
465                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
466                                @Override
467                                public void run() {
468                                    trackPendingAddComplete(dest, sequence);
469                                }
470                            }));
471                        }
472                    }
473                }
474            }, null);
475        }
476
477        @Override
478        public void updateMessage(Message message) throws IOException {
479            if (LOG.isTraceEnabled()) {
480                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
481            }
482            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
483            KahaAddMessageCommand command = new KahaAddMessageCommand();
484            command.setDestination(dest);
485            command.setMessageId(message.getMessageId().toProducerKey());
486            command.setPriority(message.getPriority());
487            command.setPrioritySupported(prioritizedMessages);
488            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
489            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
490            updateMessageCommand.setMessage(command);
491            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
492        }
493
494        @Override
495        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
496            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
497            command.setDestination(dest);
498            command.setMessageId(ack.getLastMessageId().toProducerKey());
499            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
500
501            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
502            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
503            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
504        }
505
506        @Override
507        public void removeAllMessages(ConnectionContext context) throws IOException {
508            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
509            command.setDestination(dest);
510            store(command, true, null, null);
511        }
512
513        @Override
514        public Message getMessage(MessageId identity) throws IOException {
515            final String key = identity.toProducerKey();
516
517            // Hopefully one day the page file supports concurrent read
518            // operations... but for now we must
519            // externally synchronize...
520            Location location;
521            indexLock.writeLock().lock();
522            try {
523                location = findMessageLocation(key, dest);
524            } finally {
525                indexLock.writeLock().unlock();
526            }
527            if (location == null) {
528                return null;
529            }
530
531            return loadMessage(location);
532        }
533
534        @Override
535        public boolean isEmpty() throws IOException {
536            indexLock.writeLock().lock();
537            try {
538                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
539                    @Override
540                    public Boolean execute(Transaction tx) throws IOException {
541                        // Iterate through all index entries to get a count of
542                        // messages in the destination.
543                        StoredDestination sd = getStoredDestination(dest, tx);
544                        return sd.locationIndex.isEmpty(tx);
545                    }
546                });
547            } finally {
548                indexLock.writeLock().unlock();
549            }
550        }
551
552        @Override
553        public void recover(final MessageRecoveryListener listener) throws Exception {
554            // recovery may involve expiry which will modify
555            indexLock.writeLock().lock();
556            try {
557                pageFile.tx().execute(new Transaction.Closure<Exception>() {
558                    @Override
559                    public void execute(Transaction tx) throws Exception {
560                        StoredDestination sd = getStoredDestination(dest, tx);
561                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
562                        sd.orderIndex.resetCursorPosition();
563                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
564                                .hasNext(); ) {
565                            Entry<Long, MessageKeys> entry = iterator.next();
566                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
567                                continue;
568                            }
569                            Message msg = loadMessage(entry.getValue().location);
570                            listener.recoverMessage(msg);
571                        }
572                    }
573                });
574            } finally {
575                indexLock.writeLock().unlock();
576            }
577        }
578
579        @Override
580        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
581            indexLock.writeLock().lock();
582            try {
583                pageFile.tx().execute(new Transaction.Closure<Exception>() {
584                    @Override
585                    public void execute(Transaction tx) throws Exception {
586                        StoredDestination sd = getStoredDestination(dest, tx);
587                        Entry<Long, MessageKeys> entry = null;
588                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
589                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
590                            entry = iterator.next();
591                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
592                                continue;
593                            }
594                            Message msg = loadMessage(entry.getValue().location);
595                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
596                            listener.recoverMessage(msg);
597                            counter++;
598                            if (counter >= maxReturned) {
599                                break;
600                            }
601                        }
602                        sd.orderIndex.stoppedIterating();
603                    }
604                });
605            } finally {
606                indexLock.writeLock().unlock();
607            }
608        }
609
610        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
611            int counter = 0;
612            String id;
613            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
614                id = iterator.next();
615                iterator.remove();
616                Long sequence = sd.messageIdIndex.get(tx, id);
617                if (sequence != null) {
618                    if (sd.orderIndex.alreadyDispatched(sequence)) {
619                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
620                        counter++;
621                        if (counter >= maxReturned) {
622                            break;
623                        }
624                    } else {
625                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
626                    }
627                } else {
628                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
629                }
630            }
631            return counter;
632        }
633
634
635        @Override
636        public void resetBatching() {
637            if (pageFile.isLoaded()) {
638                indexLock.writeLock().lock();
639                try {
640                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
641                        @Override
642                        public void execute(Transaction tx) throws Exception {
643                            StoredDestination sd = getExistingStoredDestination(dest, tx);
644                            if (sd != null) {
645                                sd.orderIndex.resetCursorPosition();}
646                            }
647                        });
648                } catch (Exception e) {
649                    LOG.error("Failed to reset batching",e);
650                } finally {
651                    indexLock.writeLock().unlock();
652                }
653            }
654        }
655
656        @Override
657        public void setBatch(final MessageId identity) throws IOException {
658            indexLock.writeLock().lock();
659            try {
660                pageFile.tx().execute(new Transaction.Closure<IOException>() {
661                    @Override
662                    public void execute(Transaction tx) throws IOException {
663                        StoredDestination sd = getStoredDestination(dest, tx);
664                        Long location = (Long) identity.getFutureOrSequenceLong();
665                        Long pending = sd.orderIndex.minPendingAdd();
666                        if (pending != null) {
667                            location = Math.min(location, pending-1);
668                        }
669                        sd.orderIndex.setBatch(tx, location);
670                    }
671                });
672            } finally {
673                indexLock.writeLock().unlock();
674            }
675        }
676
677        @Override
678        public void setMemoryUsage(MemoryUsage memoryUsage) {
679        }
680        @Override
681        public void start() throws Exception {
682            super.start();
683        }
684        @Override
685        public void stop() throws Exception {
686            super.stop();
687        }
688
689        protected void lockAsyncJobQueue() {
690            try {
691                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
692                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
693                }
694            } catch (Exception e) {
695                LOG.error("Failed to lock async jobs for " + this.destination, e);
696            }
697        }
698
699        protected void unlockAsyncJobQueue() {
700            this.localDestinationSemaphore.release(this.maxAsyncJobs);
701        }
702
703        protected void acquireLocalAsyncLock() {
704            try {
705                this.localDestinationSemaphore.acquire();
706            } catch (InterruptedException e) {
707                LOG.error("Failed to aquire async lock for " + this.destination, e);
708            }
709        }
710
711        protected void releaseLocalAsyncLock() {
712            this.localDestinationSemaphore.release();
713        }
714
715        @Override
716        public String toString(){
717            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
718        }
719
720        @Override
721        protected void recoverMessageStoreStatistics() throws IOException {
722            try {
723                MessageStoreStatistics recoveredStatistics;
724                lockAsyncJobQueue();
725                indexLock.writeLock().lock();
726                try {
727                    recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
728                        @Override
729                        public MessageStoreStatistics execute(Transaction tx) throws IOException {
730                            MessageStoreStatistics statistics = new MessageStoreStatistics();
731
732                            // Iterate through all index entries to get the size of each message
733                            StoredDestination sd = getStoredDestination(dest, tx);
734                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
735                                int locationSize = iterator.next().getKey().getSize();
736                                statistics.getMessageCount().increment();
737                                statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
738                            }
739                           return statistics;
740                        }
741                    });
742                    getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
743                    getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
744                } finally {
745                    indexLock.writeLock().unlock();
746                }
747            } finally {
748                unlockAsyncJobQueue();
749            }
750        }
751    }
752
753    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
754        private final AtomicInteger subscriptionCount = new AtomicInteger();
755        protected final MessageStoreSubscriptionStatistics messageStoreSubStats =
756                new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics());
757
758        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
759            super(destination);
760            this.subscriptionCount.set(getAllSubscriptions().length);
761            if (isConcurrentStoreAndDispatchTopics()) {
762                asyncTopicMaps.add(asyncTaskMap);
763            }
764        }
765
766        @Override
767        protected void recoverMessageStoreStatistics() throws IOException {
768            super.recoverMessageStoreStatistics();
769            this.recoverMessageStoreSubMetrics();
770        }
771
772        @Override
773        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
774                throws IOException {
775            if (isConcurrentStoreAndDispatchTopics()) {
776                message.beforeMarshall(wireFormat);
777                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
778                result.aquireLocks();
779                addTopicTask(this, result);
780                return result.getFuture();
781            } else {
782                return super.asyncAddTopicMessage(context, message);
783            }
784        }
785
786        @Override
787        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
788                                MessageId messageId, MessageAck ack) throws IOException {
789            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
790            if (isConcurrentStoreAndDispatchTopics()) {
791                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
792                StoreTopicTask task = null;
793                synchronized (asyncTaskMap) {
794                    task = (StoreTopicTask) asyncTaskMap.get(key);
795                }
796                if (task != null) {
797                    if (task.addSubscriptionKey(subscriptionKey)) {
798                        removeTopicTask(this, messageId);
799                        if (task.cancel()) {
800                            synchronized (asyncTaskMap) {
801                                asyncTaskMap.remove(key);
802                            }
803                        }
804                    }
805                } else {
806                    doAcknowledge(context, subscriptionKey, messageId, ack);
807                }
808            } else {
809                doAcknowledge(context, subscriptionKey, messageId, ack);
810            }
811        }
812
813        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
814                throws IOException {
815            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
816            command.setDestination(dest);
817            command.setSubscriptionKey(subscriptionKey);
818            command.setMessageId(messageId.toProducerKey());
819            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
820            if (ack != null && ack.isUnmatchedAck()) {
821                command.setAck(UNMATCHED);
822            } else {
823                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
824                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
825            }
826            store(command, false, null, null);
827        }
828
829        @Override
830        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
831            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
832                    .getSubscriptionName());
833            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
834            command.setDestination(dest);
835            command.setSubscriptionKey(subscriptionKey.toString());
836            command.setRetroactive(retroactive);
837            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
838            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
839            store(command, isEnableJournalDiskSyncs() && true, null, null);
840            this.subscriptionCount.incrementAndGet();
841        }
842
843        @Override
844        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
845            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
846            command.setDestination(dest);
847            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
848            store(command, isEnableJournalDiskSyncs() && true, null, null);
849            this.subscriptionCount.decrementAndGet();
850        }
851
852        @Override
853        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
854
855            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
856            indexLock.writeLock().lock();
857            try {
858                pageFile.tx().execute(new Transaction.Closure<IOException>() {
859                    @Override
860                    public void execute(Transaction tx) throws IOException {
861                        StoredDestination sd = getStoredDestination(dest, tx);
862                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
863                                .hasNext();) {
864                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
865                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
866                                    .getValue().getSubscriptionInfo().newInput()));
867                            subscriptions.add(info);
868
869                        }
870                    }
871                });
872            } finally {
873                indexLock.writeLock().unlock();
874            }
875
876            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
877            subscriptions.toArray(rc);
878            return rc;
879        }
880
881        @Override
882        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
883            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
884            indexLock.writeLock().lock();
885            try {
886                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
887                    @Override
888                    public SubscriptionInfo execute(Transaction tx) throws IOException {
889                        StoredDestination sd = getStoredDestination(dest, tx);
890                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
891                        if (command == null) {
892                            return null;
893                        }
894                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
895                                .getSubscriptionInfo().newInput()));
896                    }
897                });
898            } finally {
899                indexLock.writeLock().unlock();
900            }
901        }
902
903        @Override
904        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
905            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
906
907            if (isEnableSubscriptionStatistics()) {
908                return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount();
909            } else {
910
911                indexLock.writeLock().lock();
912                try {
913                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
914                        @Override
915                        public Integer execute(Transaction tx) throws IOException {
916                            StoredDestination sd = getStoredDestination(dest, tx);
917                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
918                            if (cursorPos == null) {
919                                // The subscription might not exist.
920                                return 0;
921                            }
922
923                            return (int) getStoredMessageCount(tx, sd, subscriptionKey);
924                        }
925                    });
926                } finally {
927                    indexLock.writeLock().unlock();
928                }
929            }
930        }
931
932
933        @Override
934        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
935            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
936            if (isEnableSubscriptionStatistics()) {
937                return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize();
938            } else {
939                indexLock.writeLock().lock();
940                try {
941                    return pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
942                        @Override
943                        public Long execute(Transaction tx) throws IOException {
944                            StoredDestination sd = getStoredDestination(dest, tx);
945                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
946                            if (cursorPos == null) {
947                                // The subscription might not exist.
948                                return 0l;
949                            }
950
951                            return getStoredMessageSize(tx, sd, subscriptionKey);
952                        }
953                    });
954                } finally {
955                    indexLock.writeLock().unlock();
956                }
957            }
958        }
959
960        protected void recoverMessageStoreSubMetrics() throws IOException {
961            if (isEnableSubscriptionStatistics()) {
962
963                final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics();
964                indexLock.writeLock().lock();
965                try {
966                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
967                        @Override
968                        public void execute(Transaction tx) throws IOException {
969                            StoredDestination sd = getStoredDestination(dest, tx);
970                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions
971                                    .iterator(tx); iterator.hasNext();) {
972                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
973
974                                String subscriptionKey = entry.getKey();
975                                LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
976                                if (cursorPos != null) {
977                                    long size = getStoredMessageSize(tx, sd, subscriptionKey);
978                                    statistics.getMessageCount(subscriptionKey)
979                                            .setCount(getStoredMessageCount(tx, sd, subscriptionKey));
980                                    statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0);
981                                }
982                            }
983                        }
984                    });
985                } finally {
986                    indexLock.writeLock().unlock();
987                }
988            }
989        }
990
991        @Override
992        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
993                throws Exception {
994            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
995            @SuppressWarnings("unused")
996            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
997            indexLock.writeLock().lock();
998            try {
999                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1000                    @Override
1001                    public void execute(Transaction tx) throws Exception {
1002                        StoredDestination sd = getStoredDestination(dest, tx);
1003                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
1004                        sd.orderIndex.setBatch(tx, cursorPos);
1005                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
1006                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
1007                                .hasNext();) {
1008                            Entry<Long, MessageKeys> entry = iterator.next();
1009                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1010                                continue;
1011                            }
1012                            listener.recoverMessage(loadMessage(entry.getValue().location));
1013                        }
1014                        sd.orderIndex.resetCursorPosition();
1015                    }
1016                });
1017            } finally {
1018                indexLock.writeLock().unlock();
1019            }
1020        }
1021
1022        @Override
1023        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
1024                final MessageRecoveryListener listener) throws Exception {
1025            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1026            @SuppressWarnings("unused")
1027            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
1028            indexLock.writeLock().lock();
1029            try {
1030                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1031                    @Override
1032                    public void execute(Transaction tx) throws Exception {
1033                        StoredDestination sd = getStoredDestination(dest, tx);
1034                        sd.orderIndex.resetCursorPosition();
1035                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
1036                        if (moc == null) {
1037                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
1038                            if (pos == null) {
1039                                // sub deleted
1040                                return;
1041                            }
1042                            sd.orderIndex.setBatch(tx, pos);
1043                            moc = sd.orderIndex.cursor;
1044                        } else {
1045                            sd.orderIndex.cursor.sync(moc);
1046                        }
1047
1048                        Entry<Long, MessageKeys> entry = null;
1049                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
1050                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
1051                                .hasNext();) {
1052                            entry = iterator.next();
1053                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1054                                continue;
1055                            }
1056                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
1057                                counter++;
1058                            }
1059                            if (counter >= maxReturned || listener.hasSpace() == false) {
1060                                break;
1061                            }
1062                        }
1063                        sd.orderIndex.stoppedIterating();
1064                        if (entry != null) {
1065                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
1066                            sd.subscriptionCursors.put(subscriptionKey, copy);
1067                        }
1068                    }
1069                });
1070            } finally {
1071                indexLock.writeLock().unlock();
1072            }
1073        }
1074
1075        @Override
1076        public void resetBatching(String clientId, String subscriptionName) {
1077            try {
1078                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1079                indexLock.writeLock().lock();
1080                try {
1081                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1082                        @Override
1083                        public void execute(Transaction tx) throws IOException {
1084                            StoredDestination sd = getStoredDestination(dest, tx);
1085                            sd.subscriptionCursors.remove(subscriptionKey);
1086                        }
1087                    });
1088                }finally {
1089                    indexLock.writeLock().unlock();
1090                }
1091            } catch (IOException e) {
1092                throw new RuntimeException(e);
1093            }
1094        }
1095
1096        @Override
1097        public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
1098            return messageStoreSubStats;
1099        }
1100    }
1101
1102    String subscriptionKey(String clientId, String subscriptionName) {
1103        return clientId + ":" + subscriptionName;
1104    }
1105
1106    @Override
1107    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1108        String key = key(convert(destination));
1109        MessageStore store = storeCache.get(key(convert(destination)));
1110        if (store == null) {
1111            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
1112            store = storeCache.putIfAbsent(key, queueStore);
1113            if (store == null) {
1114                store = queueStore;
1115            }
1116        }
1117
1118        return store;
1119    }
1120
1121    @Override
1122    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1123        String key = key(convert(destination));
1124        MessageStore store = storeCache.get(key(convert(destination)));
1125        if (store == null) {
1126            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1127            store = storeCache.putIfAbsent(key, topicStore);
1128            if (store == null) {
1129                store = topicStore;
1130            }
1131        }
1132
1133        return (TopicMessageStore) store;
1134    }
1135
1136    /**
1137     * Cleanup method to remove any state associated with the given destination.
1138     * This method does not stop the message store (it might not be cached).
1139     *
1140     * @param destination
1141     *            Destination to forget
1142     */
1143    @Override
1144    public void removeQueueMessageStore(ActiveMQQueue destination) {
1145    }
1146
1147    /**
1148     * Cleanup method to remove any state associated with the given destination
1149     * This method does not stop the message store (it might not be cached).
1150     *
1151     * @param destination
1152     *            Destination to forget
1153     */
1154    @Override
1155    public void removeTopicMessageStore(ActiveMQTopic destination) {
1156    }
1157
1158    @Override
1159    public void deleteAllMessages() throws IOException {
1160        deleteAllMessages = true;
1161    }
1162
1163    @Override
1164    public Set<ActiveMQDestination> getDestinations() {
1165        try {
1166            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1167            indexLock.writeLock().lock();
1168            try {
1169                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1170                    @Override
1171                    public void execute(Transaction tx) throws IOException {
1172                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1173                                .hasNext();) {
1174                            Entry<String, StoredDestination> entry = iterator.next();
1175                            //Removing isEmpty topic check - see AMQ-5875
1176                            rc.add(convert(entry.getKey()));
1177                        }
1178                    }
1179                });
1180            }finally {
1181                indexLock.writeLock().unlock();
1182            }
1183            return rc;
1184        } catch (IOException e) {
1185            throw new RuntimeException(e);
1186        }
1187    }
1188
1189    @Override
1190    public long getLastMessageBrokerSequenceId() throws IOException {
1191        return 0;
1192    }
1193
1194    @Override
1195    public long getLastProducerSequenceId(ProducerId id) {
1196        indexLock.writeLock().lock();
1197        try {
1198            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1199        } finally {
1200            indexLock.writeLock().unlock();
1201        }
1202    }
1203
1204    @Override
1205    public long size() {
1206        try {
1207            return journalSize.get() + getPageFile().getDiskSize();
1208        } catch (IOException e) {
1209            throw new RuntimeException(e);
1210        }
1211    }
1212
1213    @Override
1214    public void beginTransaction(ConnectionContext context) throws IOException {
1215        throw new IOException("Not yet implemented.");
1216    }
1217    @Override
1218    public void commitTransaction(ConnectionContext context) throws IOException {
1219        throw new IOException("Not yet implemented.");
1220    }
1221    @Override
1222    public void rollbackTransaction(ConnectionContext context) throws IOException {
1223        throw new IOException("Not yet implemented.");
1224    }
1225
1226    @Override
1227    public void checkpoint(boolean sync) throws IOException {
1228        super.checkpointCleanup(sync);
1229    }
1230
1231    // /////////////////////////////////////////////////////////////////
1232    // Internal helper methods.
1233    // /////////////////////////////////////////////////////////////////
1234
1235    /**
1236     * @param location
1237     * @return
1238     * @throws IOException
1239     */
1240    Message loadMessage(Location location) throws IOException {
1241        try {
1242            JournalCommand<?> command = load(location);
1243            KahaAddMessageCommand addMessage = null;
1244            switch (command.type()) {
1245                case KAHA_UPDATE_MESSAGE_COMMAND:
1246                    addMessage = ((KahaUpdateMessageCommand) command).getMessage();
1247                    break;
1248                case KAHA_ADD_MESSAGE_COMMAND:
1249                    addMessage = (KahaAddMessageCommand) command;
1250                    break;
1251                default:
1252                    throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location);
1253            }
1254            if (!addMessage.hasMessage()) {
1255                throw new IOException("Could not load journal record, null message content at location: " + location);
1256            }
1257            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1258            return msg;
1259        } catch (Throwable t) {
1260            IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t);
1261            LOG.error("Failed to load message at: {}", location , ioe);
1262            brokerService.handleIOException(ioe);
1263            throw ioe;
1264        }
1265    }
1266
1267    // /////////////////////////////////////////////////////////////////
1268    // Internal conversion methods.
1269    // /////////////////////////////////////////////////////////////////
1270
1271    KahaLocation convert(Location location) {
1272        KahaLocation rc = new KahaLocation();
1273        rc.setLogId(location.getDataFileId());
1274        rc.setOffset(location.getOffset());
1275        return rc;
1276    }
1277
1278    KahaDestination convert(ActiveMQDestination dest) {
1279        KahaDestination rc = new KahaDestination();
1280        rc.setName(dest.getPhysicalName());
1281        switch (dest.getDestinationType()) {
1282        case ActiveMQDestination.QUEUE_TYPE:
1283            rc.setType(DestinationType.QUEUE);
1284            return rc;
1285        case ActiveMQDestination.TOPIC_TYPE:
1286            rc.setType(DestinationType.TOPIC);
1287            return rc;
1288        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1289            rc.setType(DestinationType.TEMP_QUEUE);
1290            return rc;
1291        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1292            rc.setType(DestinationType.TEMP_TOPIC);
1293            return rc;
1294        default:
1295            return null;
1296        }
1297    }
1298
1299    ActiveMQDestination convert(String dest) {
1300        int p = dest.indexOf(":");
1301        if (p < 0) {
1302            throw new IllegalArgumentException("Not in the valid destination format");
1303        }
1304        int type = Integer.parseInt(dest.substring(0, p));
1305        String name = dest.substring(p + 1);
1306        return convert(type, name);
1307    }
1308
1309    private ActiveMQDestination convert(KahaDestination commandDestination) {
1310        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1311    }
1312
1313    private ActiveMQDestination convert(int type, String name) {
1314        switch (KahaDestination.DestinationType.valueOf(type)) {
1315        case QUEUE:
1316            return new ActiveMQQueue(name);
1317        case TOPIC:
1318            return new ActiveMQTopic(name);
1319        case TEMP_QUEUE:
1320            return new ActiveMQTempQueue(name);
1321        case TEMP_TOPIC:
1322            return new ActiveMQTempTopic(name);
1323        default:
1324            throw new IllegalArgumentException("Not in the valid destination format");
1325        }
1326    }
1327
1328    public TransactionIdTransformer getTransactionIdTransformer() {
1329        return transactionIdTransformer;
1330    }
1331
1332    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1333        this.transactionIdTransformer = transactionIdTransformer;
1334    }
1335
1336    static class AsyncJobKey {
1337        MessageId id;
1338        ActiveMQDestination destination;
1339
1340        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1341            this.id = id;
1342            this.destination = destination;
1343        }
1344
1345        @Override
1346        public boolean equals(Object obj) {
1347            if (obj == this) {
1348                return true;
1349            }
1350            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1351                    && destination.equals(((AsyncJobKey) obj).destination);
1352        }
1353
1354        @Override
1355        public int hashCode() {
1356            return id.hashCode() + destination.hashCode();
1357        }
1358
1359        @Override
1360        public String toString() {
1361            return destination.getPhysicalName() + "-" + id;
1362        }
1363    }
1364
1365    public interface StoreTask {
1366        public boolean cancel();
1367
1368        public void aquireLocks();
1369
1370        public void releaseLocks();
1371    }
1372
1373    class StoreQueueTask implements Runnable, StoreTask {
1374        protected final Message message;
1375        protected final ConnectionContext context;
1376        protected final KahaDBMessageStore store;
1377        protected final InnerFutureTask future;
1378        protected final AtomicBoolean done = new AtomicBoolean();
1379        protected final AtomicBoolean locked = new AtomicBoolean();
1380
1381        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1382            this.store = store;
1383            this.context = context;
1384            this.message = message;
1385            this.future = new InnerFutureTask(this);
1386        }
1387
1388        public ListenableFuture<Object> getFuture() {
1389            return this.future;
1390        }
1391
1392        @Override
1393        public boolean cancel() {
1394            if (this.done.compareAndSet(false, true)) {
1395                return this.future.cancel(false);
1396            }
1397            return false;
1398        }
1399
1400        @Override
1401        public void aquireLocks() {
1402            if (this.locked.compareAndSet(false, true)) {
1403                try {
1404                    globalQueueSemaphore.acquire();
1405                    store.acquireLocalAsyncLock();
1406                    message.incrementReferenceCount();
1407                } catch (InterruptedException e) {
1408                    LOG.warn("Failed to aquire lock", e);
1409                }
1410            }
1411
1412        }
1413
1414        @Override
1415        public void releaseLocks() {
1416            if (this.locked.compareAndSet(true, false)) {
1417                store.releaseLocalAsyncLock();
1418                globalQueueSemaphore.release();
1419                message.decrementReferenceCount();
1420            }
1421        }
1422
1423        @Override
1424        public void run() {
1425            this.store.doneTasks++;
1426            try {
1427                if (this.done.compareAndSet(false, true)) {
1428                    this.store.addMessage(context, message);
1429                    removeQueueTask(this.store, this.message.getMessageId());
1430                    this.future.complete();
1431                } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) {
1432                    System.err.println(this.store.dest.getName() + " cancelled: "
1433                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1434                    this.store.canceledTasks = this.store.doneTasks = 0;
1435                }
1436            } catch (Throwable t) {
1437                this.future.setException(t);
1438                removeQueueTask(this.store, this.message.getMessageId());
1439            }
1440        }
1441
1442        protected Message getMessage() {
1443            return this.message;
1444        }
1445
1446        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1447
1448            private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
1449
1450            public InnerFutureTask(Runnable runnable) {
1451                super(runnable, null);
1452            }
1453
1454            public void setException(final Throwable e) {
1455                super.setException(e);
1456            }
1457
1458            public void complete() {
1459                super.set(null);
1460            }
1461
1462            @Override
1463            public void done() {
1464                fireListener();
1465            }
1466
1467            @Override
1468            public void addListener(Runnable listener) {
1469                this.listenerRef.set(listener);
1470                if (isDone()) {
1471                    fireListener();
1472                }
1473            }
1474
1475            private void fireListener() {
1476                Runnable listener = listenerRef.getAndSet(null);
1477                if (listener != null) {
1478                    try {
1479                        listener.run();
1480                    } catch (Exception ignored) {
1481                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1482                    }
1483                }
1484            }
1485        }
1486    }
1487
1488    class StoreTopicTask extends StoreQueueTask {
1489        private final int subscriptionCount;
1490        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1491        private final KahaDBTopicMessageStore topicStore;
1492        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1493                int subscriptionCount) {
1494            super(store, context, message);
1495            this.topicStore = store;
1496            this.subscriptionCount = subscriptionCount;
1497
1498        }
1499
1500        @Override
1501        public void aquireLocks() {
1502            if (this.locked.compareAndSet(false, true)) {
1503                try {
1504                    globalTopicSemaphore.acquire();
1505                    store.acquireLocalAsyncLock();
1506                    message.incrementReferenceCount();
1507                } catch (InterruptedException e) {
1508                    LOG.warn("Failed to aquire lock", e);
1509                }
1510            }
1511        }
1512
1513        @Override
1514        public void releaseLocks() {
1515            if (this.locked.compareAndSet(true, false)) {
1516                message.decrementReferenceCount();
1517                store.releaseLocalAsyncLock();
1518                globalTopicSemaphore.release();
1519            }
1520        }
1521
1522        /**
1523         * add a key
1524         *
1525         * @param key
1526         * @return true if all acknowledgements received
1527         */
1528        public boolean addSubscriptionKey(String key) {
1529            synchronized (this.subscriptionKeys) {
1530                this.subscriptionKeys.add(key);
1531            }
1532            return this.subscriptionKeys.size() >= this.subscriptionCount;
1533        }
1534
1535        @Override
1536        public void run() {
1537            this.store.doneTasks++;
1538            try {
1539                if (this.done.compareAndSet(false, true)) {
1540                    this.topicStore.addMessage(context, message);
1541                    // apply any acks we have
1542                    synchronized (this.subscriptionKeys) {
1543                        for (String key : this.subscriptionKeys) {
1544                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1545
1546                        }
1547                    }
1548                    removeTopicTask(this.topicStore, this.message.getMessageId());
1549                    this.future.complete();
1550                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1551                    System.err.println(this.store.dest.getName() + " cancelled: "
1552                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1553                    this.store.canceledTasks = this.store.doneTasks = 0;
1554                }
1555            } catch (Throwable t) {
1556                this.future.setException(t);
1557                removeTopicTask(this.topicStore, this.message.getMessageId());
1558            }
1559        }
1560    }
1561
1562    public class StoreTaskExecutor extends ThreadPoolExecutor {
1563
1564        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1565            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1566        }
1567
1568        @Override
1569        protected void afterExecute(Runnable runnable, Throwable throwable) {
1570            super.afterExecute(runnable, throwable);
1571
1572            if (runnable instanceof StoreTask) {
1573               ((StoreTask)runnable).releaseLocks();
1574            }
1575        }
1576    }
1577
1578    @Override
1579    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1580        return new JobSchedulerStoreImpl();
1581    }
1582
1583    /* (non-Javadoc)
1584     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
1585     */
1586    @Override
1587    public boolean isPersistNoLocal() {
1588        // Prior to v11 the broker did not store the noLocal value for durable subs.
1589        return brokerService.getStoreOpenWireVersion() >= 11;
1590    }
1591}