001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Date;
022import java.util.HashSet;
023import java.util.Set;
024import java.util.TreeSet;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.atomic.AtomicBoolean;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.region.BaseDestination;
032import org.apache.activemq.command.Message;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.TransactionId;
036import org.apache.activemq.command.XATransactionId;
037import org.apache.activemq.store.AbstractMessageStore;
038import org.apache.activemq.store.IndexListener;
039import org.apache.activemq.store.ListenableFuture;
040import org.apache.activemq.store.MessageStore;
041import org.apache.activemq.store.PersistenceAdapter;
042import org.apache.activemq.store.ProxyMessageStore;
043import org.apache.activemq.store.ProxyTopicMessageStore;
044import org.apache.activemq.store.TopicMessageStore;
045import org.apache.activemq.store.TransactionRecoveryListener;
046import org.apache.activemq.store.TransactionStore;
047import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
048import org.apache.activemq.store.kahadb.data.KahaEntryType;
049import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
050import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
051import org.apache.activemq.store.kahadb.disk.journal.Journal;
052import org.apache.activemq.store.kahadb.disk.journal.Location;
053import org.apache.activemq.usage.StoreUsage;
054import org.apache.activemq.util.DataByteArrayInputStream;
055import org.apache.activemq.util.DataByteArrayOutputStream;
056import org.apache.activemq.util.IOHelper;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060public class MultiKahaDBTransactionStore implements TransactionStore {
061    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
062    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
063    final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
064    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
065    private Journal journal;
066    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
067    private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
068    private final AtomicBoolean started = new AtomicBoolean(false);
069
070    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
071        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
072    }
073
074    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
075        return new ProxyMessageStore(messageStore) {
076            @Override
077            public void addMessage(ConnectionContext context, final Message send) throws IOException {
078                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
079            }
080
081            @Override
082            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
083                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
084            }
085
086            @Override
087            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
088                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
089            }
090
091            @Override
092            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
093                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
094            }
095
096            @Override
097            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
098                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
099            }
100
101            @Override
102            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
103                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
104            }
105
106            @Override
107            public void registerIndexListener(IndexListener indexListener) {
108                getDelegate().registerIndexListener(indexListener);
109                try {
110                    if (indexListener instanceof BaseDestination) {
111                        // update queue storeUsage
112                        Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination());
113                        if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter) {
114                            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) matchingPersistenceAdapter;
115                            if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
116                                StoreUsage storeUsage = filteredAdapter.getUsage();
117                                storeUsage.setStore(filteredAdapter.getPersistenceAdapter());
118                                storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage());
119                                ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage);
120                            }
121                        }
122                    }
123                } catch (Exception ignored) {
124                    LOG.warn("Failed to set mKahaDB destination store usage", ignored);
125                }
126            }
127        };
128    }
129
130    public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
131        return new ProxyTopicMessageStore(messageStore) {
132            @Override
133            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
134                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
135            }
136
137            @Override
138            public void addMessage(ConnectionContext context, final Message send) throws IOException {
139                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
140            }
141
142            @Override
143            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
144                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
145            }
146
147            @Override
148            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
149                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
150            }
151
152            @Override
153            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
154                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
155            }
156
157            @Override
158            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
159                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
160            }
161
162            @Override
163            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
164                                    MessageId messageId, MessageAck ack) throws IOException {
165                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
166                        subscriptionName, messageId, ack);
167            }
168        };
169    }
170
171    public void deleteAllMessages() {
172        IOHelper.deleteChildren(getDirectory());
173    }
174
175    public int getJournalMaxFileLength() {
176        return journalMaxFileLength;
177    }
178
179    public void setJournalMaxFileLength(int journalMaxFileLength) {
180        this.journalMaxFileLength = journalMaxFileLength;
181    }
182
183    public int getJournalMaxWriteBatchSize() {
184        return journalWriteBatchSize;
185    }
186
187    public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
188        this.journalWriteBatchSize = journalWriteBatchSize;
189    }
190
191    public class Tx {
192        private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
193        private int prepareLocationId = 0;
194
195        public void trackStore(TransactionStore store) {
196            stores.add(store);
197        }
198
199        public Set<TransactionStore> getStores() {
200            return stores;
201        }
202
203        public void trackPrepareLocation(Location location) {
204            this.prepareLocationId = location.getDataFileId();
205        }
206
207        public int getPreparedLocationId() {
208            return prepareLocationId;
209        }
210    }
211
212    public Tx getTx(TransactionId txid) {
213        Tx tx = inflightTransactions.get(txid);
214        if (tx == null) {
215            tx = new Tx();
216            inflightTransactions.put(txid, tx);
217        }
218        return tx;
219    }
220
221    public Tx removeTx(TransactionId txid) {
222        return inflightTransactions.remove(txid);
223    }
224
225    @Override
226    public void prepare(TransactionId txid) throws IOException {
227        Tx tx = getTx(txid);
228        for (TransactionStore store : tx.getStores()) {
229            store.prepare(txid);
230        }
231    }
232
233    @Override
234    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
235            throws IOException {
236
237        if (preCommit != null) {
238            preCommit.run();
239        }
240
241        Tx tx = getTx(txid);
242        if (wasPrepared) {
243            for (TransactionStore store : tx.getStores()) {
244                store.commit(txid, true, null, null);
245            }
246        } else {
247            // can only do 1pc on a single store
248            if (tx.getStores().size() == 1) {
249                for (TransactionStore store : tx.getStores()) {
250                    store.commit(txid, false, null, null);
251                }
252            } else {
253                // need to do local 2pc
254                for (TransactionStore store : tx.getStores()) {
255                    store.prepare(txid);
256                }
257                persistOutcome(tx, txid);
258                for (TransactionStore store : tx.getStores()) {
259                    store.commit(txid, true, null, null);
260                }
261                persistCompletion(txid);
262            }
263        }
264        removeTx(txid);
265        if (postCommit != null) {
266            postCommit.run();
267        }
268    }
269
270    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
271        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
272    }
273
274    public void persistCompletion(TransactionId txid) throws IOException {
275        store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
276    }
277
278    private Location store(JournalCommand<?> data) throws IOException {
279        int size = data.serializedSizeFramed();
280        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
281        os.writeByte(data.type().getNumber());
282        data.writeFramed(os);
283        Location location = journal.write(os.toByteSequence(), true);
284        journal.setLastAppendLocation(location);
285        return location;
286    }
287
288    @Override
289    public void rollback(TransactionId txid) throws IOException {
290        Tx tx = removeTx(txid);
291        if (tx != null) {
292            for (TransactionStore store : tx.getStores()) {
293                store.rollback(txid);
294            }
295        }
296    }
297
298    @Override
299    public void start() throws Exception {
300        if (started.compareAndSet(false, true)) {
301            journal = new Journal() {
302                @Override
303                public void cleanup() {
304                    super.cleanup();
305                    txStoreCleanup();
306                }
307            };
308            journal.setDirectory(getDirectory());
309            journal.setMaxFileLength(journalMaxFileLength);
310            journal.setWriteBatchSize(journalWriteBatchSize);
311            IOHelper.mkdirs(journal.getDirectory());
312            journal.start();
313            recoverPendingLocalTransactions();
314            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
315        }
316    }
317
318    private void txStoreCleanup() {
319        Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
320        for (Tx tx : inflightTransactions.values()) {
321            knownDataFileIds.remove(tx.getPreparedLocationId());
322        }
323        try {
324            journal.removeDataFiles(knownDataFileIds);
325        } catch (Exception e) {
326            LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
327        }
328    }
329
330    private File getDirectory() {
331        return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
332    }
333
334    @Override
335    public void stop() throws Exception {
336        if (started.compareAndSet(true, false) && journal != null) {
337            journal.close();
338            journal = null;
339        }
340    }
341
342    private void recoverPendingLocalTransactions() throws IOException {
343        Location location = journal.getNextLocation(null);
344        while (location != null) {
345            process(load(location));
346            location = journal.getNextLocation(location);
347        }
348        recoveredPendingCommit.addAll(inflightTransactions.keySet());
349        LOG.info("pending local transactions: " + recoveredPendingCommit);
350    }
351
352    public JournalCommand<?> load(Location location) throws IOException {
353        DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
354        byte readByte = is.readByte();
355        KahaEntryType type = KahaEntryType.valueOf(readByte);
356        if (type == null) {
357            throw new IOException("Could not load journal record. Invalid location: " + location);
358        }
359        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
360        message.mergeFramed(is);
361        return message;
362    }
363
364    public void process(JournalCommand<?> command) throws IOException {
365        switch (command.type()) {
366            case KAHA_PREPARE_COMMAND:
367                KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
368                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
369                break;
370            case KAHA_COMMIT_COMMAND:
371                KahaCommitCommand commitCommand = (KahaCommitCommand) command;
372                removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
373                break;
374            case KAHA_TRACE_COMMAND:
375                break;
376            default:
377                throw new IOException("Unexpected command in transaction journal: " + command);
378        }
379    }
380
381
382    @Override
383    public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
384
385        for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
386            adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
387                @Override
388                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
389                    try {
390                        getTx(xid).trackStore(adapter.createTransactionStore());
391                    } catch (IOException e) {
392                        LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
393                    }
394                    listener.recover(xid, addedMessages, acks);
395                }
396            });
397        }
398
399        try {
400            Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
401            // force completion of local xa
402            for (TransactionId txid : broker.getPreparedTransactions(null)) {
403                if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
404                    try {
405                        if (recoveredPendingCommit.contains(txid)) {
406                            LOG.info("delivering pending commit outcome for tid: " + txid);
407                            broker.commitTransaction(null, txid, false);
408
409                        } else {
410                            LOG.info("delivering rollback outcome to store for tid: " + txid);
411                            broker.forgetTransaction(null, txid);
412                        }
413                        persistCompletion(txid);
414                    } catch (Exception ex) {
415                        LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
416                    }
417                }
418            }
419        } catch (Exception e) {
420            LOG.error("failed to resolve pending local transactions", e);
421        }
422    }
423
424    void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
425            throws IOException {
426        if (message.getTransactionId() != null) {
427            getTx(message.getTransactionId()).trackStore(transactionStore);
428        }
429        destination.addMessage(context, message);
430    }
431
432    ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
433            throws IOException {
434        if (message.getTransactionId() != null) {
435            getTx(message.getTransactionId()).trackStore(transactionStore);
436            destination.addMessage(context, message);
437            return AbstractMessageStore.FUTURE;
438        } else {
439            return destination.asyncAddQueueMessage(context, message);
440        }
441    }
442
443    ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
444            throws IOException {
445
446        if (message.getTransactionId() != null) {
447            getTx(message.getTransactionId()).trackStore(transactionStore);
448            destination.addMessage(context, message);
449            return AbstractMessageStore.FUTURE;
450        } else {
451            return destination.asyncAddTopicMessage(context, message);
452        }
453    }
454
455    final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
456            throws IOException {
457        if (ack.getTransactionId() != null) {
458            getTx(ack.getTransactionId()).trackStore(transactionStore);
459        }
460        destination.removeMessage(context, ack);
461    }
462
463    final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
464            throws IOException {
465        if (ack.getTransactionId() != null) {
466            getTx(ack.getTransactionId()).trackStore(transactionStore);
467        }
468        destination.removeAsyncMessage(context, ack);
469    }
470
471    final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
472                           final String clientId, final String subscriptionName,
473                           final MessageId messageId, final MessageAck ack) throws IOException {
474        if (ack.getTransactionId() != null) {
475            getTx(ack.getTransactionId()).trackStore(transactionStore);
476        }
477        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
478    }
479
480}