001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.amq; 018 019 import java.io.IOException; 020 021 import org.apache.activemq.broker.ConnectionContext; 022 import org.apache.activemq.command.ActiveMQTopic; 023 import org.apache.activemq.command.JournalTopicAck; 024 import org.apache.activemq.command.Message; 025 import org.apache.activemq.command.MessageAck; 026 import org.apache.activemq.command.MessageId; 027 import org.apache.activemq.command.SubscriptionInfo; 028 import org.apache.activemq.filter.BooleanExpression; 029 import org.apache.activemq.filter.MessageEvaluationContext; 030 import org.apache.activemq.kaha.impl.async.Location; 031 import org.apache.activemq.selector.SelectorParser; 032 import org.apache.activemq.store.MessageRecoveryListener; 033 import org.apache.activemq.store.TopicMessageStore; 034 import org.apache.activemq.store.TopicReferenceStore; 035 import org.apache.activemq.transaction.Synchronization; 036 import org.apache.activemq.util.IOExceptionSupport; 037 import org.apache.activemq.util.SubscriptionKey; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * A MessageStore that uses a Journal to store it's messages. 043 * 044 * 045 */ 046 public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore { 047 048 private static final Logger LOG = LoggerFactory.getLogger(AMQTopicMessageStore.class); 049 private TopicReferenceStore topicReferenceStore; 050 public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { 051 super(adapter, topicReferenceStore, destinationName); 052 this.topicReferenceStore = topicReferenceStore; 053 } 054 055 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 056 flush(); 057 topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener)); 058 } 059 060 public void recoverNextMessages(String clientId, String subscriptionName, 061 int maxReturned, final MessageRecoveryListener listener) 062 throws Exception { 063 RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); 064 topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener); 065 if (recoveryListener.size() == 0) { 066 flush(); 067 topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener); 068 } 069 } 070 071 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 072 return topicReferenceStore.lookupSubscription(clientId, subscriptionName); 073 } 074 075 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 076 peristenceAdapter.writeCommand(subscriptionInfo, false); 077 topicReferenceStore.addSubsciption(subscriptionInfo, retroactive); 078 } 079 080 /** 081 */ 082 public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, 083 final MessageId messageId, final MessageAck originalAck) throws IOException { 084 final boolean debug = LOG.isDebugEnabled(); 085 JournalTopicAck ack = new JournalTopicAck(); 086 ack.setDestination(destination); 087 ack.setMessageId(messageId); 088 ack.setMessageSequenceId(messageId.getBrokerSequenceId()); 089 ack.setSubscritionName(subscriptionName); 090 ack.setClientId(clientId); 091 ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null); 092 final Location location = peristenceAdapter.writeCommand(ack, false); 093 final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 094 if (!context.isInTransaction()) { 095 if (debug) { 096 LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); 097 } 098 acknowledge(context,messageId, location, clientId,subscriptionName); 099 } else { 100 if (debug) { 101 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); 102 } 103 lock.lock(); 104 try { 105 inFlightTxLocations.add(location); 106 }finally { 107 lock.unlock(); 108 } 109 transactionStore.acknowledge(this, ack, location); 110 context.getTransaction().addSynchronization(new Synchronization() { 111 112 public void afterCommit() throws Exception { 113 if (debug) { 114 LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); 115 } 116 lock.lock(); 117 try { 118 inFlightTxLocations.remove(location); 119 acknowledge(context,messageId, location, clientId,subscriptionName); 120 }finally { 121 lock.unlock(); 122 } 123 } 124 125 public void afterRollback() throws Exception { 126 if (debug) { 127 LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); 128 } 129 lock.lock(); 130 try{ 131 inFlightTxLocations.remove(location); 132 }finally { 133 lock.unlock(); 134 } 135 } 136 }); 137 } 138 } 139 140 public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) { 141 try { 142 SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName); 143 if (sub != null) { 144 topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId, null); 145 return true; 146 } 147 } catch (Throwable e) { 148 LOG.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); 149 } 150 return false; 151 } 152 153 /** 154 * @param messageId 155 * @param location 156 * @param key 157 * @throws IOException 158 */ 159 protected void acknowledge(final ConnectionContext context, MessageId messageId, 160 Location location, String clientId, String subscriptionName) 161 throws IOException { 162 MessageAck ack = null; 163 lock.lock(); 164 try { 165 lastLocation = location; 166 }finally { 167 lock.unlock(); 168 } 169 170 if (topicReferenceStore.acknowledgeReference(context, clientId, 171 subscriptionName, messageId)) { 172 ack = new MessageAck(); 173 ack.setLastMessageId(messageId); 174 175 } 176 177 if (ack != null) { 178 removeMessage(context, ack); 179 } 180 } 181 182 /** 183 * @return Returns the longTermStore. 184 */ 185 public TopicReferenceStore getTopicReferenceStore() { 186 return topicReferenceStore; 187 } 188 189 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 190 topicReferenceStore.deleteSubscription(clientId, subscriptionName); 191 } 192 193 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 194 return topicReferenceStore.getAllSubscriptions(); 195 } 196 197 public int getMessageCount(String clientId, String subscriberName) throws IOException { 198 flush(); 199 SubscriptionInfo info = lookupSubscription(clientId, subscriberName); 200 try { 201 MessageCounter counter = new MessageCounter(info, this); 202 topicReferenceStore.recoverSubscription(clientId, subscriberName, counter); 203 return counter.count; 204 } catch (Exception e) { 205 throw IOExceptionSupport.create(e); 206 } 207 } 208 209 private class MessageCounter implements MessageRecoveryListener { 210 211 int count = 0; 212 SubscriptionInfo info; 213 BooleanExpression selectorExpression; 214 TopicMessageStore store; 215 216 public MessageCounter(SubscriptionInfo info, TopicMessageStore store) throws Exception { 217 this.info = info; 218 if (info != null) { 219 String selector = info.getSelector(); 220 if (selector != null) { 221 this.selectorExpression = SelectorParser.parse(selector); 222 } 223 } 224 this.store = store; 225 } 226 227 public boolean recoverMessageReference(MessageId ref) throws Exception { 228 if (selectorExpression != null) { 229 MessageEvaluationContext ctx = new MessageEvaluationContext(); 230 ctx.setMessageReference(store.getMessage(ref)); 231 if (selectorExpression.matches(ctx)) { 232 count++; 233 } 234 } else { 235 count ++; 236 } 237 return true; 238 } 239 240 public boolean recoverMessage(Message message) throws Exception { 241 if (selectorExpression != null) { 242 MessageEvaluationContext ctx = new MessageEvaluationContext(); 243 ctx.setMessageReference(store.getMessage(message.getMessageId())); 244 if (selectorExpression.matches(ctx)) { 245 count++; 246 } 247 } else { 248 count++; 249 } 250 return true; 251 } 252 253 public boolean isDuplicate(MessageId ref) { 254 return false; 255 } 256 257 public boolean hasSpace() { 258 return true; 259 } 260 } 261 262 public void resetBatching(String clientId, String subscriptionName) { 263 topicReferenceStore.resetBatching(clientId, subscriptionName); 264 } 265 }