001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.memory; 018 019 import java.io.IOException; 020 import java.util.Collections; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.Map.Entry; 025 import org.apache.activemq.broker.ConnectionContext; 026 import org.apache.activemq.command.ActiveMQDestination; 027 import org.apache.activemq.command.Message; 028 import org.apache.activemq.command.MessageAck; 029 import org.apache.activemq.command.MessageId; 030 import org.apache.activemq.command.SubscriptionInfo; 031 import org.apache.activemq.store.MessageRecoveryListener; 032 import org.apache.activemq.store.TopicMessageStore; 033 import org.apache.activemq.util.LRUCache; 034 import org.apache.activemq.util.SubscriptionKey; 035 036 /** 037 * 038 */ 039 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore { 040 041 private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase; 042 private Map<SubscriptionKey, MemoryTopicSub> topicSubMap; 043 044 public MemoryTopicMessageStore(ActiveMQDestination destination) { 045 this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap()); 046 } 047 048 public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) { 049 super(destination, messageTable); 050 this.subscriberDatabase = subscriberDatabase; 051 this.topicSubMap = makeSubMap(); 052 } 053 054 protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() { 055 return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>()); 056 } 057 058 protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() { 059 return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>()); 060 } 061 062 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 063 super.addMessage(context, message); 064 for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) { 065 MemoryTopicSub sub = i.next(); 066 sub.addMessage(message.getMessageId(), message); 067 } 068 } 069 070 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 071 MessageId messageId, MessageAck ack) throws IOException { 072 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 073 MemoryTopicSub sub = topicSubMap.get(key); 074 if (sub != null) { 075 sub.removeMessage(messageId); 076 } 077 } 078 079 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 080 return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName)); 081 } 082 083 public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { 084 SubscriptionKey key = new SubscriptionKey(info); 085 MemoryTopicSub sub = new MemoryTopicSub(); 086 topicSubMap.put(key, sub); 087 if (retroactive) { 088 for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) { 089 Map.Entry entry = (Entry)i.next(); 090 sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue()); 091 } 092 } 093 subscriberDatabase.put(key, info); 094 } 095 096 public synchronized void deleteSubscription(String clientId, String subscriptionName) { 097 org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 098 subscriberDatabase.remove(key); 099 topicSubMap.remove(key); 100 } 101 102 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 103 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 104 if (sub != null) { 105 sub.recoverSubscription(listener); 106 } 107 } 108 109 public synchronized void delete() { 110 super.delete(); 111 subscriberDatabase.clear(); 112 topicSubMap.clear(); 113 } 114 115 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 116 return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); 117 } 118 119 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException { 120 int result = 0; 121 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); 122 if (sub != null) { 123 result = sub.size(); 124 } 125 return result; 126 } 127 128 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { 129 MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 130 if (sub != null) { 131 sub.recoverNextMessages(maxReturned, listener); 132 } 133 } 134 135 public void resetBatching(String clientId, String subscriptionName) { 136 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 137 if (sub != null) { 138 sub.resetBatching(); 139 } 140 } 141 }