001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.Collections; 021import java.util.List; 022import java.util.concurrent.CopyOnWriteArrayList; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import javax.jms.InvalidSelectorException; 026import javax.jms.JMSException; 027import javax.management.ObjectName; 028 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.ConsumerId; 033import org.apache.activemq.command.ConsumerInfo; 034import org.apache.activemq.command.MessageAck; 035import org.apache.activemq.filter.BooleanExpression; 036import org.apache.activemq.filter.DestinationFilter; 037import org.apache.activemq.filter.LogicExpression; 038import org.apache.activemq.filter.MessageEvaluationContext; 039import org.apache.activemq.filter.NoLocalExpression; 040import org.apache.activemq.selector.SelectorParser; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044public abstract class AbstractSubscription implements Subscription { 045 046 private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class); 047 048 protected Broker broker; 049 protected ConnectionContext context; 050 protected ConsumerInfo info; 051 protected final DestinationFilter destinationFilter; 052 protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>(); 053 protected final AtomicInteger prefetchExtension = new AtomicInteger(0); 054 055 private boolean usePrefetchExtension = true; 056 private BooleanExpression selectorExpression; 057 private ObjectName objectName; 058 private int cursorMemoryHighWaterMark = 70; 059 private boolean slowConsumer; 060 private long lastAckTime; 061 private final SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics(); 062 063 public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { 064 this.broker = broker; 065 this.context = context; 066 this.info = info; 067 this.destinationFilter = DestinationFilter.parseFilter(info.getDestination()); 068 this.selectorExpression = parseSelector(info); 069 this.lastAckTime = System.currentTimeMillis(); 070 } 071 072 private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException { 073 BooleanExpression rc = null; 074 if (info.getSelector() != null) { 075 rc = SelectorParser.parse(info.getSelector()); 076 } 077 if (info.isNoLocal()) { 078 if (rc == null) { 079 rc = new NoLocalExpression(info.getConsumerId().getConnectionId()); 080 } else { 081 rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc); 082 } 083 } 084 if (info.getAdditionalPredicate() != null) { 085 if (rc == null) { 086 rc = info.getAdditionalPredicate(); 087 } else { 088 rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc); 089 } 090 } 091 return rc; 092 } 093 094 @Override 095 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { 096 this.lastAckTime = System.currentTimeMillis(); 097 subscriptionStatistics.getConsumedCount().increment(); 098 } 099 100 @Override 101 public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { 102 ConsumerId targetConsumerId = node.getTargetConsumerId(); 103 if (targetConsumerId != null) { 104 if (!targetConsumerId.equals(info.getConsumerId())) { 105 return false; 106 } 107 } 108 try { 109 return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node); 110 } catch (JMSException e) { 111 LOG.info("Selector failed to evaluate: {}", e.getMessage(), e); 112 return false; 113 } 114 } 115 116 @Override 117 public boolean isWildcard() { 118 return destinationFilter.isWildcard(); 119 } 120 121 @Override 122 public boolean matches(ActiveMQDestination destination) { 123 return destinationFilter.matches(destination); 124 } 125 126 @Override 127 public void add(ConnectionContext context, Destination destination) throws Exception { 128 destinations.add(destination); 129 } 130 131 @Override 132 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 133 destinations.remove(destination); 134 return Collections.EMPTY_LIST; 135 } 136 137 @Override 138 public ConsumerInfo getConsumerInfo() { 139 return info; 140 } 141 142 @Override 143 public void gc() { 144 } 145 146 @Override 147 public ConnectionContext getContext() { 148 return context; 149 } 150 151 public ConsumerInfo getInfo() { 152 return info; 153 } 154 155 public BooleanExpression getSelectorExpression() { 156 return selectorExpression; 157 } 158 159 @Override 160 public String getSelector() { 161 return info.getSelector(); 162 } 163 164 @Override 165 public void setSelector(String selector) throws InvalidSelectorException { 166 ConsumerInfo copy = info.copy(); 167 copy.setSelector(selector); 168 BooleanExpression newSelector = parseSelector(copy); 169 // its valid so lets actually update it now 170 info.setSelector(selector); 171 this.selectorExpression = newSelector; 172 } 173 174 @Override 175 public ObjectName getObjectName() { 176 return objectName; 177 } 178 179 @Override 180 public void setObjectName(ObjectName objectName) { 181 this.objectName = objectName; 182 } 183 184 @Override 185 public int getPrefetchSize() { 186 return info.getPrefetchSize(); 187 } 188 189 public boolean isUsePrefetchExtension() { 190 return usePrefetchExtension; 191 } 192 193 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 194 this.usePrefetchExtension = usePrefetchExtension; 195 } 196 197 public void setPrefetchSize(int newSize) { 198 info.setPrefetchSize(newSize); 199 } 200 201 @Override 202 public boolean isRecoveryRequired() { 203 return true; 204 } 205 206 @Override 207 public boolean isSlowConsumer() { 208 return slowConsumer; 209 } 210 211 public void setSlowConsumer(boolean val) { 212 slowConsumer = val; 213 } 214 215 @Override 216 public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception { 217 boolean result = false; 218 MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); 219 try { 220 Destination regionDestination = (Destination) message.getRegionDestination(); 221 msgContext.setDestination(regionDestination.getActiveMQDestination()); 222 msgContext.setMessageReference(message); 223 result = matches(message, msgContext); 224 if (result) { 225 doAddRecoveredMessage(message); 226 } 227 } finally { 228 msgContext.clear(); 229 } 230 return result; 231 } 232 233 @Override 234 public ActiveMQDestination getActiveMQDestination() { 235 return info != null ? info.getDestination() : null; 236 } 237 238 @Override 239 public boolean isBrowser() { 240 return info != null && info.isBrowser(); 241 } 242 243 @Override 244 public long getInFlightMessageSize() { 245 return subscriptionStatistics.getInflightMessageSize().getTotalSize(); 246 } 247 248 @Override 249 public int getInFlightUsage() { 250 int prefetchSize = info.getPrefetchSize(); 251 if (prefetchSize > 0) { 252 return (getInFlightSize() * 100) / prefetchSize; 253 } 254 return Integer.MAX_VALUE; 255 } 256 257 /** 258 * Add a destination 259 * @param destination 260 */ 261 public void addDestination(Destination destination) { 262 } 263 264 /** 265 * Remove a destination 266 * @param destination 267 */ 268 public void removeDestination(Destination destination) { 269 } 270 271 @Override 272 public int getCursorMemoryHighWaterMark(){ 273 return this.cursorMemoryHighWaterMark; 274 } 275 276 @Override 277 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){ 278 this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark; 279 } 280 281 @Override 282 public int countBeforeFull() { 283 return info.getPrefetchSize() - getDispatchedQueueSize(); 284 } 285 286 @Override 287 public void unmatched(MessageReference node) throws IOException { 288 // only durable topic subs have something to do here 289 } 290 291 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 292 add(message); 293 } 294 295 @Override 296 public long getTimeOfLastMessageAck() { 297 return lastAckTime; 298 } 299 300 public void setTimeOfLastMessageAck(long value) { 301 this.lastAckTime = value; 302 } 303 304 @Override 305 public long getConsumedCount(){ 306 return subscriptionStatistics.getConsumedCount().getCount(); 307 } 308 309 @Override 310 public void incrementConsumedCount(){ 311 subscriptionStatistics.getConsumedCount().increment(); 312 } 313 314 @Override 315 public void resetConsumedCount(){ 316 subscriptionStatistics.getConsumedCount().reset(); 317 } 318 319 @Override 320 public SubscriptionStatistics getSubscriptionStatistics() { 321 return subscriptionStatistics; 322 } 323 324 public void wakeupDestinationsForDispatch() { 325 for (Destination dest : destinations) { 326 dest.wakeup(); 327 } 328 } 329 330 public AtomicInteger getPrefetchExtension() { 331 return this.prefetchExtension; 332 } 333 334 protected void contractPrefetchExtension(int amount) { 335 if (isUsePrefetchExtension() && getPrefetchSize() != 0) { 336 decrementPrefetchExtension(amount); 337 } 338 } 339 340 protected void expandPrefetchExtension(int amount) { 341 if (isUsePrefetchExtension() && getPrefetchSize() != 0) { 342 incrementPrefetchExtension(amount); 343 } 344 } 345 346 protected void decrementPrefetchExtension(int amount) { 347 while (true) { 348 int currentExtension = prefetchExtension.get(); 349 int newExtension = Math.max(0, currentExtension - amount); 350 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 351 break; 352 } 353 } 354 } 355 356 private void incrementPrefetchExtension(int amount) { 357 while (true) { 358 int currentExtension = prefetchExtension.get(); 359 int newExtension = Math.max(currentExtension, currentExtension + amount); 360 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 361 break; 362 } 363 } 364 } 365 366}