001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.concurrent.ConcurrentHashMap; 022 import java.util.concurrent.atomic.AtomicBoolean; 023 import java.util.concurrent.atomic.AtomicLong; 024 025 import javax.jms.InvalidSelectorException; 026 import javax.jms.JMSException; 027 028 import org.apache.activemq.broker.Broker; 029 import org.apache.activemq.broker.ConnectionContext; 030 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 031 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; 032 import org.apache.activemq.command.ActiveMQDestination; 033 import org.apache.activemq.command.ConsumerInfo; 034 import org.apache.activemq.command.Message; 035 import org.apache.activemq.command.MessageAck; 036 import org.apache.activemq.command.MessageDispatch; 037 import org.apache.activemq.command.MessageId; 038 import org.apache.activemq.store.TopicMessageStore; 039 import org.apache.activemq.usage.SystemUsage; 040 import org.apache.activemq.usage.Usage; 041 import org.apache.activemq.usage.UsageListener; 042 import org.apache.activemq.util.SubscriptionKey; 043 import org.slf4j.Logger; 044 import org.slf4j.LoggerFactory; 045 046 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener { 047 048 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class); 049 private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>(); 050 private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 051 private final SubscriptionKey subscriptionKey; 052 private final boolean keepDurableSubsActive; 053 private AtomicBoolean active = new AtomicBoolean(); 054 private AtomicLong offlineTimestamp = new AtomicLong(-1); 055 056 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) 057 throws JMSException { 058 super(broker,usageManager, context, info); 059 this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); 060 this.pending.setSystemUsage(usageManager); 061 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 062 this.keepDurableSubsActive = keepDurableSubsActive; 063 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 064 065 } 066 067 public final boolean isActive() { 068 return active.get(); 069 } 070 071 public final long getOfflineTimestamp() { 072 return offlineTimestamp.get(); 073 } 074 075 public boolean isFull() { 076 return !active.get() || super.isFull(); 077 } 078 079 public void gc() { 080 } 081 082 /** 083 * store will have a pending ack for all durables, irrespective of the selector 084 * so we need to ack if node is un-matched 085 */ 086 public void unmatched(MessageReference node) throws IOException { 087 MessageAck ack = new MessageAck(); 088 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE); 089 ack.setMessageID(node.getMessageId()); 090 node.getRegionDestination().acknowledge(this.getContext(), this, ack, node); 091 } 092 093 @Override 094 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 095 // statically configured via maxPageSize 096 } 097 098 public void add(ConnectionContext context, Destination destination) throws Exception { 099 if (!destinations.contains(destination)) { 100 super.add(context, destination); 101 } 102 // do it just once per destination 103 if (durableDestinations.containsKey(destination.getActiveMQDestination())) { 104 return; 105 } 106 durableDestinations.put(destination.getActiveMQDestination(), destination); 107 108 if (active.get() || keepDurableSubsActive) { 109 Topic topic = (Topic)destination; 110 topic.activate(context, this); 111 if (pending.isEmpty(topic)) { 112 topic.recoverRetroactiveMessages(context, this); 113 } 114 this.enqueueCounter+=pending.size(); 115 } else if (destination.getMessageStore() != null) { 116 TopicMessageStore store = (TopicMessageStore)destination.getMessageStore(); 117 try { 118 this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName()); 119 } catch (IOException e) { 120 JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store "+ e); 121 jmsEx.setLinkedException(e); 122 throw jmsEx; 123 } 124 } 125 dispatchPending(); 126 } 127 128 public void activate(SystemUsage memoryManager, ConnectionContext context, 129 ConsumerInfo info) throws Exception { 130 if (!active.get()) { 131 this.context = context; 132 this.info = info; 133 LOG.debug("Activating " + this); 134 if (!keepDurableSubsActive) { 135 for (Iterator<Destination> iter = durableDestinations.values() 136 .iterator(); iter.hasNext();) { 137 Topic topic = (Topic) iter.next(); 138 add(context, topic); 139 topic.activate(context, this); 140 } 141 } 142 synchronized (pendingLock) { 143 pending.setSystemUsage(memoryManager); 144 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 145 pending.setMaxAuditDepth(getMaxAuditDepth()); 146 pending.setMaxProducersToAudit(getMaxProducersToAudit()); 147 pending.start(); 148 // If nothing was in the persistent store, then try to use the 149 // recovery policy. 150 if (pending.isEmpty()) { 151 for (Iterator<Destination> iter = durableDestinations.values() 152 .iterator(); iter.hasNext();) { 153 Topic topic = (Topic) iter.next(); 154 topic.recoverRetroactiveMessages(context, this); 155 } 156 } 157 } 158 this.active.set(true); 159 this.offlineTimestamp.set(-1); 160 dispatchPending(); 161 this.usageManager.getMemoryUsage().addUsageListener(this); 162 } 163 } 164 165 public void deactivate(boolean keepDurableSubsActive) throws Exception { 166 LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this); 167 active.set(false); 168 offlineTimestamp.set(System.currentTimeMillis()); 169 this.usageManager.getMemoryUsage().removeUsageListener(this); 170 synchronized (pendingLock) { 171 pending.stop(); 172 173 synchronized (dispatchLock) { 174 for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) { 175 Topic topic = (Topic)iter.next(); 176 if (!keepDurableSubsActive) { 177 topic.deactivate(context, this); 178 } else { 179 topic.getDestinationStatistics().getInflight().subtract(dispatched.size()); 180 } 181 } 182 183 for (final MessageReference node : dispatched) { 184 // Mark the dispatched messages as redelivered for next time. 185 Integer count = redeliveredMessages.get(node.getMessageId()); 186 if (count != null) { 187 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); 188 } else { 189 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); 190 } 191 if (keepDurableSubsActive && pending.isTransient()) { 192 pending.addMessageFirst(node); 193 pending.rollback(node.getMessageId()); 194 } else { 195 node.decrementReferenceCount(); 196 } 197 } 198 dispatched.clear(); 199 } 200 if (!keepDurableSubsActive && pending.isTransient()) { 201 try { 202 pending.reset(); 203 while (pending.hasNext()) { 204 MessageReference node = pending.next(); 205 node.decrementReferenceCount(); 206 pending.remove(); 207 } 208 } finally { 209 pending.release(); 210 } 211 } 212 } 213 prefetchExtension.set(0); 214 } 215 216 217 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 218 MessageDispatch md = super.createMessageDispatch(node, message); 219 if (node != QueueMessageReference.NULL_MESSAGE) { 220 Integer count = redeliveredMessages.get(node.getMessageId()); 221 if (count != null) { 222 md.setRedeliveryCounter(count.intValue()); 223 } 224 } 225 return md; 226 } 227 228 public void add(MessageReference node) throws Exception { 229 if (!active.get() && !keepDurableSubsActive) { 230 return; 231 } 232 super.add(node); 233 } 234 235 protected void dispatchPending() throws IOException { 236 if (isActive()) { 237 super.dispatchPending(); 238 } 239 } 240 241 public void removePending(MessageReference node) throws IOException { 242 pending.remove(node); 243 } 244 245 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 246 synchronized(pending) { 247 pending.addRecoveredMessage(message); 248 } 249 } 250 251 public int getPendingQueueSize() { 252 if (active.get() || keepDurableSubsActive) { 253 return super.getPendingQueueSize(); 254 } 255 // TODO: need to get from store 256 return 0; 257 } 258 259 public void setSelector(String selector) throws InvalidSelectorException { 260 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); 261 } 262 263 protected boolean canDispatch(MessageReference node) { 264 return isActive(); 265 } 266 267 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { 268 node.getRegionDestination().acknowledge(context, this, ack, node); 269 redeliveredMessages.remove(node.getMessageId()); 270 node.decrementReferenceCount(); 271 } 272 273 public synchronized String toString() { 274 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" 275 + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); 276 } 277 278 public SubscriptionKey getSubscriptionKey() { 279 return subscriptionKey; 280 } 281 282 /** 283 * Release any references that we are holding. 284 */ 285 public void destroy() { 286 synchronized (pendingLock) { 287 try { 288 289 pending.reset(); 290 while (pending.hasNext()) { 291 MessageReference node = pending.next(); 292 node.decrementReferenceCount(); 293 } 294 295 } finally { 296 pending.release(); 297 pending.clear(); 298 } 299 } 300 synchronized (dispatchLock) { 301 for (MessageReference node : dispatched) { 302 node.decrementReferenceCount(); 303 } 304 dispatched.clear(); 305 } 306 setSlowConsumer(false); 307 } 308 309 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 310 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { 311 try { 312 dispatchPending(); 313 } catch (IOException e) { 314 LOG.warn("problem calling dispatchMatched", e); 315 } 316 } 317 } 318 319 protected boolean isDropped(MessageReference node) { 320 return false; 321 } 322 323 public boolean isKeepDurableSubsActive() { 324 return keepDurableSubsActive; 325 } 326 }