001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.journal; 018 019 import java.io.IOException; 020 import java.util.HashMap; 021 import java.util.Iterator; 022 023 import org.apache.activeio.journal.RecordLocation; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.command.ActiveMQTopic; 026 import org.apache.activemq.command.JournalTopicAck; 027 import org.apache.activemq.command.Message; 028 import org.apache.activemq.command.MessageAck; 029 import org.apache.activemq.command.MessageId; 030 import org.apache.activemq.command.SubscriptionInfo; 031 import org.apache.activemq.store.MessageRecoveryListener; 032 import org.apache.activemq.store.TopicMessageStore; 033 import org.apache.activemq.transaction.Synchronization; 034 import org.apache.activemq.util.Callback; 035 import org.apache.activemq.util.SubscriptionKey; 036 import org.slf4j.Logger; 037 import org.slf4j.LoggerFactory; 038 039 /** 040 * A MessageStore that uses a Journal to store it's messages. 041 * 042 * 043 */ 044 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore { 045 046 private static final Logger LOG = LoggerFactory.getLogger(JournalTopicMessageStore.class); 047 048 private TopicMessageStore longTermStore; 049 private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); 050 051 public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, 052 ActiveMQTopic destinationName) { 053 super(adapter, checkpointStore, destinationName); 054 this.longTermStore = checkpointStore; 055 } 056 057 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 058 throws Exception { 059 this.peristenceAdapter.checkpoint(true, true); 060 longTermStore.recoverSubscription(clientId, subscriptionName, listener); 061 } 062 063 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 064 MessageRecoveryListener listener) throws Exception { 065 this.peristenceAdapter.checkpoint(true, true); 066 longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener); 067 068 } 069 070 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 071 return longTermStore.lookupSubscription(clientId, subscriptionName); 072 } 073 074 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 075 this.peristenceAdapter.checkpoint(true, true); 076 longTermStore.addSubsciption(subscriptionInfo, retroactive); 077 } 078 079 public void addMessage(ConnectionContext context, Message message) throws IOException { 080 super.addMessage(context, message); 081 } 082 083 /** 084 */ 085 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 086 final MessageId messageId, MessageAck originalAck) throws IOException { 087 final boolean debug = LOG.isDebugEnabled(); 088 089 JournalTopicAck ack = new JournalTopicAck(); 090 ack.setDestination(destination); 091 ack.setMessageId(messageId); 092 ack.setMessageSequenceId(messageId.getBrokerSequenceId()); 093 ack.setSubscritionName(subscriptionName); 094 ack.setClientId(clientId); 095 ack.setTransactionId(context.getTransaction() != null 096 ? context.getTransaction().getTransactionId() : null); 097 final RecordLocation location = peristenceAdapter.writeCommand(ack, false); 098 099 final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 100 if (!context.isInTransaction()) { 101 if (debug) { 102 LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); 103 } 104 acknowledge(messageId, location, key); 105 } else { 106 if (debug) { 107 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); 108 } 109 synchronized (this) { 110 inFlightTxLocations.add(location); 111 } 112 transactionStore.acknowledge(this, ack, location); 113 context.getTransaction().addSynchronization(new Synchronization() { 114 public void afterCommit() throws Exception { 115 if (debug) { 116 LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); 117 } 118 synchronized (JournalTopicMessageStore.this) { 119 inFlightTxLocations.remove(location); 120 acknowledge(messageId, location, key); 121 } 122 } 123 124 public void afterRollback() throws Exception { 125 if (debug) { 126 LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); 127 } 128 synchronized (JournalTopicMessageStore.this) { 129 inFlightTxLocations.remove(location); 130 } 131 } 132 }); 133 } 134 135 } 136 137 public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, 138 MessageId messageId) { 139 try { 140 SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName); 141 if (sub != null) { 142 longTermStore.acknowledge(context, clientId, subscritionName, messageId, null); 143 } 144 } catch (Throwable e) { 145 LOG.debug("Could not replay acknowledge for message '" + messageId 146 + "'. Message may have already been acknowledged. reason: " + e); 147 } 148 } 149 150 /** 151 * @param messageId 152 * @param location 153 * @param key 154 */ 155 protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { 156 synchronized (this) { 157 lastLocation = location; 158 ackedLastAckLocations.put(key, messageId); 159 } 160 } 161 162 public RecordLocation checkpoint() throws IOException { 163 164 final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations; 165 166 // swap out the hash maps.. 167 synchronized (this) { 168 cpAckedLastAckLocations = this.ackedLastAckLocations; 169 this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); 170 } 171 172 return super.checkpoint(new Callback() { 173 public void execute() throws Exception { 174 175 // Checkpoint the acknowledged messages. 176 Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator(); 177 while (iterator.hasNext()) { 178 SubscriptionKey subscriptionKey = iterator.next(); 179 MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); 180 longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, 181 subscriptionKey.subscriptionName, identity, null); 182 } 183 184 } 185 }); 186 187 } 188 189 /** 190 * @return Returns the longTermStore. 191 */ 192 public TopicMessageStore getLongTermTopicMessageStore() { 193 return longTermStore; 194 } 195 196 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 197 longTermStore.deleteSubscription(clientId, subscriptionName); 198 } 199 200 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 201 return longTermStore.getAllSubscriptions(); 202 } 203 204 public int getMessageCount(String clientId, String subscriberName) throws IOException { 205 this.peristenceAdapter.checkpoint(true, true); 206 return longTermStore.getMessageCount(clientId, subscriberName); 207 } 208 209 public void resetBatching(String clientId, String subscriptionName) { 210 longTermStore.resetBatching(clientId, subscriptionName); 211 } 212 213 }