001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.scheduler.JobSchedulerStore; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.ActiveMQQueue; 031import org.apache.activemq.command.ActiveMQTopic; 032import org.apache.activemq.command.ProducerId; 033import org.apache.activemq.store.MessageStore; 034import org.apache.activemq.store.NoLocalSubscriptionAware; 035import org.apache.activemq.store.PersistenceAdapter; 036import org.apache.activemq.store.ProxyMessageStore; 037import org.apache.activemq.store.TopicMessageStore; 038import org.apache.activemq.store.TransactionStore; 039import org.apache.activemq.usage.SystemUsage; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * @org.apache.xbean.XBean 045 */ 046public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware { 047 private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); 048 049 MemoryTransactionStore transactionStore; 050 ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>(); 051 ConcurrentMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>(); 052 private boolean useExternalMessageReferences; 053 054 @Override 055 public Set<ActiveMQDestination> getDestinations() { 056 Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size()); 057 for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) { 058 rc.add(iter.next()); 059 } 060 for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) { 061 rc.add(iter.next()); 062 } 063 return rc; 064 } 065 066 public static MemoryPersistenceAdapter newInstance(File file) { 067 return new MemoryPersistenceAdapter(); 068 } 069 070 @Override 071 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 072 MessageStore rc = queues.get(destination); 073 if (rc == null) { 074 rc = new MemoryMessageStore(destination); 075 if (transactionStore != null) { 076 rc = transactionStore.proxy(rc); 077 } 078 queues.put(destination, rc); 079 } 080 return rc; 081 } 082 083 @Override 084 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 085 TopicMessageStore rc = topics.get(destination); 086 if (rc == null) { 087 rc = new MemoryTopicMessageStore(destination); 088 if (transactionStore != null) { 089 rc = transactionStore.proxy(rc); 090 } 091 topics.put(destination, rc); 092 } 093 return rc; 094 } 095 096 /** 097 * Cleanup method to remove any state associated with the given destination 098 * 099 * @param destination 100 * Destination to forget 101 */ 102 @Override 103 public void removeQueueMessageStore(ActiveMQQueue destination) { 104 queues.remove(destination); 105 } 106 107 /** 108 * Cleanup method to remove any state associated with the given destination 109 * 110 * @param destination 111 * Destination to forget 112 */ 113 @Override 114 public void removeTopicMessageStore(ActiveMQTopic destination) { 115 topics.remove(destination); 116 } 117 118 @Override 119 public TransactionStore createTransactionStore() throws IOException { 120 if (transactionStore == null) { 121 transactionStore = new MemoryTransactionStore(this); 122 } 123 return transactionStore; 124 } 125 126 @Override 127 public void beginTransaction(ConnectionContext context) { 128 } 129 130 @Override 131 public void commitTransaction(ConnectionContext context) { 132 } 133 134 @Override 135 public void rollbackTransaction(ConnectionContext context) { 136 } 137 138 @Override 139 public void start() throws Exception { 140 } 141 142 @Override 143 public void stop() throws Exception { 144 } 145 146 @Override 147 public long getLastMessageBrokerSequenceId() throws IOException { 148 return 0; 149 } 150 151 @Override 152 public void deleteAllMessages() throws IOException { 153 for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) { 154 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 155 if (store != null) { 156 store.delete(); 157 } 158 } 159 for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) { 160 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 161 if (store != null) { 162 store.delete(); 163 } 164 } 165 166 if (transactionStore != null) { 167 transactionStore.delete(); 168 } 169 } 170 171 public boolean isUseExternalMessageReferences() { 172 return useExternalMessageReferences; 173 } 174 175 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 176 this.useExternalMessageReferences = useExternalMessageReferences; 177 } 178 179 protected MemoryMessageStore asMemoryMessageStore(Object value) { 180 if (value instanceof MemoryMessageStore) { 181 return (MemoryMessageStore) value; 182 } 183 if (value instanceof ProxyMessageStore) { 184 MessageStore delegate = ((ProxyMessageStore) value).getDelegate(); 185 if (delegate instanceof MemoryMessageStore) { 186 return (MemoryMessageStore) delegate; 187 } 188 } 189 LOG.warn("Expected an instance of MemoryMessageStore but was: " + value); 190 return null; 191 } 192 193 /** 194 * @param usageManager 195 * The UsageManager that is controlling the broker's memory usage. 196 */ 197 @Override 198 public void setUsageManager(SystemUsage usageManager) { 199 } 200 201 @Override 202 public String toString() { 203 return "MemoryPersistenceAdapter"; 204 } 205 206 @Override 207 public void setBrokerName(String brokerName) { 208 } 209 210 @Override 211 public void setDirectory(File dir) { 212 } 213 214 @Override 215 public File getDirectory() { 216 return null; 217 } 218 219 @Override 220 public void checkpoint(boolean sync) throws IOException { 221 } 222 223 @Override 224 public long size() { 225 return 0; 226 } 227 228 public void setCreateTransactionStore(boolean create) throws IOException { 229 if (create) { 230 createTransactionStore(); 231 } 232 } 233 234 @Override 235 public long getLastProducerSequenceId(ProducerId id) { 236 // memory map does duplicate suppression 237 return -1; 238 } 239 240 @Override 241 public void allowIOResumption() {} 242 243 @Override 244 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 245 // We could eventuall implement an in memory scheduler. 246 throw new UnsupportedOperationException(); 247 } 248 249 /* (non-Javadoc) 250 * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() 251 */ 252 @Override 253 public boolean isPersistNoLocal() { 254 return true; 255 } 256}