001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.Collections; 021import java.util.LinkedHashMap; 022import java.util.Map; 023 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.Message; 027import org.apache.activemq.command.MessageAck; 028import org.apache.activemq.command.MessageId; 029import org.apache.activemq.store.AbstractMessageStore; 030import org.apache.activemq.store.IndexListener; 031import org.apache.activemq.store.MessageRecoveryListener; 032import org.apache.activemq.store.MessageStoreStatistics; 033 034/** 035 * An implementation of {@link org.apache.activemq.store.MessageStore} 036 */ 037public class MemoryMessageStore extends AbstractMessageStore { 038 039 protected final Map<MessageId, Message> messageTable; 040 protected MessageId lastBatchId; 041 protected long sequenceId; 042 043 public MemoryMessageStore(ActiveMQDestination destination) { 044 this(destination, new LinkedHashMap<MessageId, Message>()); 045 } 046 047 public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) { 048 super(destination); 049 this.messageTable = Collections.synchronizedMap(messageTable); 050 } 051 052 @Override 053 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 054 synchronized (messageTable) { 055 messageTable.put(message.getMessageId(), message); 056 incMessageStoreStatistics(getMessageStoreStatistics(), message); 057 message.incrementReferenceCount(); 058 message.getMessageId().setFutureOrSequenceLong(sequenceId++); 059 if (indexListener != null) { 060 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 061 } 062 } 063 } 064 065 @Override 066 public Message getMessage(MessageId identity) throws IOException { 067 return messageTable.get(identity); 068 } 069 070 @Override 071 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 072 removeMessage(ack.getLastMessageId()); 073 } 074 075 public void removeMessage(MessageId msgId) throws IOException { 076 synchronized (messageTable) { 077 Message removed = messageTable.remove(msgId); 078 if (removed != null) { 079 removed.decrementReferenceCount(); 080 decMessageStoreStatistics(getMessageStoreStatistics(), removed); 081 } 082 if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { 083 lastBatchId = null; 084 } 085 } 086 } 087 088 @Override 089 public void recover(MessageRecoveryListener listener) throws Exception { 090 // the message table is a synchronizedMap - so just have to synchronize here 091 synchronized (messageTable) { 092 for (Message message : messageTable.values()) { 093 listener.recoverMessage(message); 094 } 095 } 096 } 097 098 @Override 099 public void removeAllMessages(ConnectionContext context) throws IOException { 100 synchronized (messageTable) { 101 messageTable.clear(); 102 getMessageStoreStatistics().reset(); 103 } 104 } 105 106 public void delete() { 107 synchronized (messageTable) { 108 messageTable.clear(); 109 getMessageStoreStatistics().reset(); 110 } 111 } 112 113 @Override 114 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 115 synchronized (messageTable) { 116 boolean pastLackBatch = lastBatchId == null; 117 for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) { 118 if (pastLackBatch) { 119 Object msg = entry.getValue(); 120 lastBatchId = entry.getKey(); 121 if (msg.getClass() == MessageId.class) { 122 listener.recoverMessageReference((MessageId) msg); 123 } else { 124 listener.recoverMessage((Message) msg); 125 } 126 } else { 127 pastLackBatch = entry.getKey().equals(lastBatchId); 128 } 129 } 130 } 131 } 132 133 @Override 134 public void resetBatching() { 135 lastBatchId = null; 136 } 137 138 @Override 139 public void setBatch(MessageId messageId) { 140 lastBatchId = messageId; 141 } 142 143 @Override 144 public void updateMessage(Message message) { 145 synchronized (messageTable) { 146 Message original = messageTable.get(message.getMessageId()); 147 148 // if can't be found then increment count, else remove old size 149 if (original == null) { 150 getMessageStoreStatistics().getMessageCount().increment(); 151 } else { 152 getMessageStoreStatistics().getMessageSize().addSize(-original.getSize()); 153 } 154 messageTable.put(message.getMessageId(), message); 155 getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); 156 } 157 } 158 159 @Override 160 public void recoverMessageStoreStatistics() throws IOException { 161 synchronized (messageTable) { 162 long size = 0; 163 int count = 0; 164 for (Message message : messageTable.values()) { 165 size += message.getSize(); 166 } 167 168 getMessageStoreStatistics().reset(); 169 getMessageStoreStatistics().getMessageCount().setCount(count); 170 getMessageStoreStatistics().getMessageSize().setTotalSize(size); 171 } 172 } 173 174 protected static final void incMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) { 175 if (stats != null && message != null) { 176 stats.getMessageCount().increment(); 177 stats.getMessageSize().addSize(message.getSize()); 178 } 179 } 180 181 protected static final void decMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) { 182 if (stats != null && message != null) { 183 stats.getMessageCount().decrement(); 184 stats.getMessageSize().addSize(-message.getSize()); 185 } 186 } 187}