001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.FileFilter;
021import java.io.IOException;
022import java.nio.charset.Charset;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.CopyOnWriteArrayList;
030
031import javax.transaction.xa.Xid;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Lockable;
037import org.apache.activemq.broker.LockableServiceSupport;
038import org.apache.activemq.broker.Locker;
039import org.apache.activemq.broker.scheduler.JobSchedulerStore;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQQueue;
042import org.apache.activemq.command.ActiveMQTopic;
043import org.apache.activemq.command.LocalTransactionId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.command.TransactionId;
046import org.apache.activemq.command.XATransactionId;
047import org.apache.activemq.filter.AnyDestination;
048import org.apache.activemq.filter.DestinationMap;
049import org.apache.activemq.filter.DestinationMapEntry;
050import org.apache.activemq.store.MessageStore;
051import org.apache.activemq.store.NoLocalSubscriptionAware;
052import org.apache.activemq.store.PersistenceAdapter;
053import org.apache.activemq.store.SharedFileLocker;
054import org.apache.activemq.store.TopicMessageStore;
055import org.apache.activemq.store.TransactionIdTransformer;
056import org.apache.activemq.store.TransactionIdTransformerAware;
057import org.apache.activemq.store.TransactionStore;
058import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
059import org.apache.activemq.usage.StoreUsage;
060import org.apache.activemq.usage.SystemUsage;
061import org.apache.activemq.util.IOExceptionSupport;
062import org.apache.activemq.util.IOHelper;
063import org.apache.activemq.util.IntrospectionSupport;
064import org.apache.activemq.util.ServiceStopper;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import static org.apache.activemq.store.kahadb.MessageDatabase.DEFAULT_DIRECTORY;
069
070/**
071 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
072 * distribution of destinations across multiple kahaDB persistence adapters
073 *
074 * @org.apache.xbean.XBean element="mKahaDB"
075 */
076public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter,
077    BrokerServiceAware, NoLocalSubscriptionAware {
078
079    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
080
081    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
082    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
083
084    final class DelegateDestinationMap extends DestinationMap {
085        @Override
086        public void setEntries(List<DestinationMapEntry>  entries) {
087            super.setEntries(entries);
088        }
089    };
090    final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
091
092    List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<PersistenceAdapter>();
093    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
094
095    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
096
097    // all local store transactions are XA, 2pc if more than one adapter involved
098    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
099        @Override
100        public TransactionId transform(TransactionId txid) {
101            if (txid == null) {
102                return null;
103            }
104            if (txid.isLocalTransaction()) {
105                final LocalTransactionId t = (LocalTransactionId) txid;
106                return new XATransactionId(new Xid() {
107                    @Override
108                    public int getFormatId() {
109                        return LOCAL_FORMAT_ID_MAGIC;
110                    }
111
112                    @Override
113                    public byte[] getGlobalTransactionId() {
114                        return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
115                    }
116
117                    @Override
118                    public byte[] getBranchQualifier() {
119                        return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
120                    }
121                });
122            } else {
123                return txid;
124            }
125        }
126    };
127
128    /**
129     * Sets the  FilteredKahaDBPersistenceAdapter entries
130     *
131     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
132     */
133    @SuppressWarnings({ "rawtypes", "unchecked" })
134    public void setFilteredPersistenceAdapters(List entries) {
135        for (Object entry : entries) {
136            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
137            PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
138            if (filteredAdapter.getDestination() == null) {
139                filteredAdapter.setDestination(matchAll);
140            }
141
142            if (filteredAdapter.isPerDestination()) {
143                configureDirectory(adapter, null);
144                // per destination adapters will be created on demand or during recovery
145                continue;
146            } else {
147                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
148            }
149
150            configureAdapter(adapter);
151            adapters.add(adapter);
152        }
153        destinationMap.setEntries(entries);
154    }
155
156    public static String nameFromDestinationFilter(ActiveMQDestination destination) {
157        if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
158            LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
159                     "potential problem with recovery can result from name truncation.");
160        }
161
162        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
163    }
164
165    public boolean isLocalXid(TransactionId xid) {
166        return xid instanceof XATransactionId &&
167                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
168    }
169
170    @Override
171    public void beginTransaction(ConnectionContext context) throws IOException {
172        throw new IllegalStateException();
173    }
174
175    @Override
176    public void checkpoint(final boolean sync) throws IOException {
177        for (PersistenceAdapter persistenceAdapter : adapters) {
178            persistenceAdapter.checkpoint(sync);
179        }
180    }
181
182    @Override
183    public void commitTransaction(ConnectionContext context) throws IOException {
184        throw new IllegalStateException();
185    }
186
187    @Override
188    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
189        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
190        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
191    }
192
193    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) throws IOException {
194        Object result = destinationMap.chooseValue(destination);
195        if (result == null) {
196            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
197        }
198        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
199        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
200            filteredAdapter = addAdapter(filteredAdapter, destination);
201            if (LOG.isTraceEnabled()) {
202                LOG.info("created per destination adapter for: " + destination  + ", " + result);
203            }
204        }
205        startAdapter(filteredAdapter.getPersistenceAdapter(), destination.getQualifiedName());
206        LOG.debug("destination {} matched persistence adapter {}", new Object[]{destination.getQualifiedName(), filteredAdapter.getPersistenceAdapter()});
207        return filteredAdapter.getPersistenceAdapter();
208    }
209
210    private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
211        try {
212            kahaDBPersistenceAdapter.start();
213        } catch (Exception e) {
214            RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
215            LOG.error(detail.toString(), e);
216            throw detail;
217        }
218    }
219
220    private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
221        try {
222            kahaDBPersistenceAdapter.stop();
223        } catch (Exception e) {
224            RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
225            LOG.error(detail.toString(), e);
226            throw detail;
227        }
228    }
229
230    @Override
231    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
232        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
233        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
234    }
235
236    @Override
237    public TransactionStore createTransactionStore() throws IOException {
238        return transactionStore;
239    }
240
241    @Override
242    public void deleteAllMessages() throws IOException {
243        for (PersistenceAdapter persistenceAdapter : adapters) {
244            persistenceAdapter.deleteAllMessages();
245        }
246        transactionStore.deleteAllMessages();
247        IOHelper.deleteChildren(getDirectory());
248        for (Object o : destinationMap.get(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}))) {
249            if (o instanceof FilteredKahaDBPersistenceAdapter) {
250                FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = (FilteredKahaDBPersistenceAdapter) o;
251                if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory() != DEFAULT_DIRECTORY) {
252                    IOHelper.deleteChildren(filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory());
253                }
254                if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
255                    KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) filteredKahaDBPersistenceAdapter.getPersistenceAdapter();
256                    if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
257                        IOHelper.deleteChildren(kahaDBPersistenceAdapter.getIndexDirectory());
258                    }
259                }
260            }
261        }
262    }
263
264    @Override
265    public Set<ActiveMQDestination> getDestinations() {
266        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
267        for (PersistenceAdapter persistenceAdapter : adapters) {
268            results.addAll(persistenceAdapter.getDestinations());
269        }
270        return results;
271    }
272
273    @Override
274    public long getLastMessageBrokerSequenceId() throws IOException {
275        long maxId = -1;
276        for (PersistenceAdapter persistenceAdapter : adapters) {
277            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
278        }
279        return maxId;
280    }
281
282    @Override
283    public long getLastProducerSequenceId(ProducerId id) throws IOException {
284        long maxId = -1;
285        for (PersistenceAdapter persistenceAdapter : adapters) {
286            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
287        }
288        return maxId;
289    }
290
291    @Override
292    public void allowIOResumption() {
293        for (PersistenceAdapter persistenceAdapter : adapters) {
294            persistenceAdapter.allowIOResumption();
295        }
296    }
297
298    @Override
299    public void removeQueueMessageStore(ActiveMQQueue destination) {
300        PersistenceAdapter adapter = null;
301        try {
302            adapter = getMatchingPersistenceAdapter(destination);
303        } catch (IOException e) {
304            throw new RuntimeException(e);
305        }
306        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
307            adapter.removeQueueMessageStore(destination);
308            removeMessageStore(adapter, destination);
309            destinationMap.removeAll(destination);
310        }
311    }
312
313    @Override
314    public void removeTopicMessageStore(ActiveMQTopic destination) {
315        PersistenceAdapter adapter = null;
316        try {
317            adapter = getMatchingPersistenceAdapter(destination);
318        } catch (IOException e) {
319            throw new RuntimeException(e);
320        }
321        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
322            adapter.removeTopicMessageStore(destination);
323            removeMessageStore(adapter, destination);
324            destinationMap.removeAll(destination);
325        }
326    }
327
328    private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
329        stopAdapter(adapter, destination.toString());
330        File adapterDir = adapter.getDirectory();
331        if (adapterDir != null) {
332            if (IOHelper.deleteFile(adapterDir)) {
333                if (LOG.isTraceEnabled()) {
334                    LOG.info("deleted per destination adapter directory for: " + destination);
335                }
336            } else {
337                if (LOG.isTraceEnabled()) {
338                    LOG.info("failed to deleted per destination adapter directory for: " + destination);
339                }
340            }
341        }
342    }
343
344    @Override
345    public void rollbackTransaction(ConnectionContext context) throws IOException {
346        throw new IllegalStateException();
347    }
348
349    @Override
350    public void setBrokerName(String brokerName) {
351        for (PersistenceAdapter persistenceAdapter : adapters) {
352            persistenceAdapter.setBrokerName(brokerName);
353        }
354    }
355
356    @Override
357    public void setUsageManager(SystemUsage usageManager) {
358        for (PersistenceAdapter persistenceAdapter : adapters) {
359            persistenceAdapter.setUsageManager(usageManager);
360        }
361    }
362
363    @Override
364    public long size() {
365        long size = 0;
366        for (PersistenceAdapter persistenceAdapter : adapters) {
367            size += persistenceAdapter.size();
368        }
369        return size;
370    }
371
372    @Override
373    public void doStart() throws Exception {
374        Object result = destinationMap.chooseValue(matchAll);
375        if (result != null) {
376            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
377            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
378                findAndRegisterExistingAdapters(filteredAdapter);
379            }
380        }
381        for (PersistenceAdapter persistenceAdapter : adapters) {
382            persistenceAdapter.start();
383        }
384    }
385
386    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
387        FileFilter destinationNames = new FileFilter() {
388            @Override
389            public boolean accept(File file) {
390                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
391            }
392        };
393        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
394        if (candidates != null) {
395            for (File candidate : candidates) {
396                registerExistingAdapter(template, candidate);
397            }
398        }
399    }
400
401    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) throws IOException {
402        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, candidate.getName());
403        startAdapter(adapter, candidate.getName());
404        Set<ActiveMQDestination> destinations = adapter.getDestinations();
405        if (destinations.size() != 0) {
406            registerAdapter(filteredAdapter, adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
407        } else {
408            stopAdapter(adapter, candidate.getName());
409        }
410    }
411
412    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) throws IOException {
413        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, nameFromDestinationFilter(destination));
414        return registerAdapter(filteredAdapter, adapter, destination);
415    }
416
417    private PersistenceAdapter adapterFromTemplate(FilteredKahaDBPersistenceAdapter template, String destinationName) throws IOException {
418        PersistenceAdapter adapter = kahaDBFromTemplate(template.getPersistenceAdapter());
419        configureAdapter(adapter);
420        configureDirectory(adapter, destinationName);
421        configureIndexDirectory(adapter, template.getPersistenceAdapter(), destinationName);
422        return adapter;
423    }
424
425    private void configureIndexDirectory(PersistenceAdapter adapter, PersistenceAdapter template, String destinationName) {
426        if (template instanceof KahaDBPersistenceAdapter) {
427            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) template;
428            if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
429                if (adapter instanceof KahaDBPersistenceAdapter) {
430                    File directory = kahaDBPersistenceAdapter.getIndexDirectory();
431                    if (destinationName != null) {
432                        directory = new File(directory, destinationName);
433                    }
434                    ((KahaDBPersistenceAdapter)adapter).setIndexDirectory(directory);
435                }
436            }
437        }
438    }
439
440    private void configureDirectory(PersistenceAdapter adapter, String fileName) {
441        File directory = null;
442        File defaultDir = DEFAULT_DIRECTORY;
443        try {
444            defaultDir = adapter.getClass().newInstance().getDirectory();
445        } catch (Exception e) {
446        }
447        if (defaultDir.equals(adapter.getDirectory())) {
448            // not set so inherit from mkahadb
449            directory = getDirectory();
450        } else {
451            directory = adapter.getDirectory();
452        }
453
454        if (fileName != null) {
455            directory = new File(directory, fileName);
456        }
457        adapter.setDirectory(directory);
458    }
459
460    private FilteredKahaDBPersistenceAdapter registerAdapter(FilteredKahaDBPersistenceAdapter template, PersistenceAdapter adapter, ActiveMQDestination destination) {
461        adapters.add(adapter);
462        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(template, destination, adapter);
463        destinationMap.put(destination, result);
464        return result;
465    }
466
467    private void configureAdapter(PersistenceAdapter adapter) {
468        // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
469        ((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
470        if (isUseLock()) {
471            if( adapter instanceof Lockable ) {
472                ((Lockable)adapter).setUseLock(false);
473            }
474        }
475        if( adapter instanceof BrokerServiceAware ) {
476            ((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
477        }
478    }
479
480    private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException {
481        try {
482            Map<String, Object> configuration = new HashMap<String, Object>();
483            IntrospectionSupport.getProperties(template, configuration, null);
484            PersistenceAdapter adapter = template.getClass().newInstance();
485            IntrospectionSupport.setProperties(adapter, configuration);
486            return adapter;
487        } catch (Exception e) {
488            throw IOExceptionSupport.create(e);
489        }
490    }
491
492    @Override
493    protected void doStop(ServiceStopper stopper) throws Exception {
494        for (PersistenceAdapter persistenceAdapter : adapters) {
495            stopper.stop(persistenceAdapter);
496        }
497    }
498
499    @Override
500    public File getDirectory() {
501        return this.directory;
502    }
503
504    @Override
505    public void setDirectory(File directory) {
506        this.directory = directory;
507    }
508
509    @Override
510    public void init() throws Exception {
511    }
512
513    @Override
514    public void setBrokerService(BrokerService brokerService) {
515        super.setBrokerService(brokerService);
516        for (PersistenceAdapter persistenceAdapter : adapters) {
517            if( persistenceAdapter instanceof BrokerServiceAware ) {
518                ((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
519            }
520        }
521    }
522
523    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
524        this.transactionStore = transactionStore;
525    }
526
527    /**
528     * Set the max file length of the transaction journal
529     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
530     * be used
531     *
532     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
533     */
534    public void setJournalMaxFileLength(int maxFileLength) {
535        transactionStore.setJournalMaxFileLength(maxFileLength);
536    }
537
538    public int getJournalMaxFileLength() {
539        return transactionStore.getJournalMaxFileLength();
540    }
541
542    /**
543     * Set the max write batch size of  the transaction journal
544     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
545     * be used
546     *
547     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
548     */
549    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
550        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
551    }
552
553    public int getJournalWriteBatchSize() {
554        return transactionStore.getJournalMaxWriteBatchSize();
555    }
556
557    public List<PersistenceAdapter> getAdapters() {
558        return Collections.unmodifiableList(adapters);
559    }
560
561    @Override
562    public String toString() {
563        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
564        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
565    }
566
567    @Override
568    public Locker createDefaultLocker() throws IOException {
569        SharedFileLocker locker = new SharedFileLocker();
570        locker.configure(this);
571        return locker;
572    }
573
574    @Override
575    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
576        return new JobSchedulerStoreImpl();
577    }
578
579    /* (non-Javadoc)
580     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
581     */
582    @Override
583    public boolean isPersistNoLocal() {
584        // Prior to v11 the broker did not store the noLocal value for durable subs.
585        return brokerService.getStoreOpenWireVersion() >= 11;
586    }
587}