001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021 022import javax.jms.InvalidSelectorException; 023import javax.management.ObjectName; 024 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ConsumerInfo; 028import org.apache.activemq.command.MessageAck; 029import org.apache.activemq.command.MessageDispatchNotification; 030import org.apache.activemq.command.MessagePull; 031import org.apache.activemq.command.Response; 032import org.apache.activemq.filter.MessageEvaluationContext; 033 034/** 035 * 036 */ 037public interface Subscription extends SubscriptionRecovery { 038 039 /** 040 * Used to add messages that match the subscription. 041 * @param node 042 * @throws Exception 043 * @throws InterruptedException 044 * @throws IOException 045 */ 046 void add(MessageReference node) throws Exception; 047 048 /** 049 * Used when client acknowledge receipt of dispatched message. 050 * @throws IOException 051 * @throws Exception 052 */ 053 void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception; 054 055 /** 056 * Allows a consumer to pull a message on demand 057 */ 058 Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception; 059 060 /** 061 * Returns true if this subscription is a Wildcard subscription. 062 * @return true if wildcard subscription. 063 */ 064 boolean isWildcard(); 065 066 /** 067 * Is the subscription interested in the message? 068 * @param node 069 * @param context 070 * @return true if matching 071 * @throws IOException 072 */ 073 boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException; 074 075 /** 076 * Is the subscription interested in messages in the destination? 077 * @param destination 078 * @return true if matching 079 */ 080 boolean matches(ActiveMQDestination destination); 081 082 /** 083 * The subscription will be receiving messages from the destination. 084 * @param context 085 * @param destination 086 * @throws Exception 087 */ 088 void add(ConnectionContext context, Destination destination) throws Exception; 089 090 /** 091 * The subscription will be no longer be receiving messages from the destination. 092 * @param context 093 * @param destination 094 * @return a list of un-acked messages that were added to the subscription. 095 */ 096 List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception; 097 098 /** 099 * The ConsumerInfo object that created the subscription. 100 */ 101 ConsumerInfo getConsumerInfo(); 102 103 /** 104 * The subscription should release as may references as it can to help the garbage collector 105 * reclaim memory. 106 */ 107 void gc(); 108 109 /** 110 * Used by a Slave Broker to update dispatch infomation 111 * @param mdn 112 * @throws Exception 113 */ 114 void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception; 115 116 /** 117 * @return number of messages pending delivery 118 */ 119 int getPendingQueueSize(); 120 121 /** 122 * @return size of the messages pending delivery 123 */ 124 long getPendingMessageSize(); 125 126 /** 127 * @return number of messages dispatched to the client 128 */ 129 int getDispatchedQueueSize(); 130 131 /** 132 * @return number of messages dispatched to the client 133 */ 134 long getDispatchedCounter(); 135 136 /** 137 * @return number of messages that matched the subscription 138 */ 139 long getEnqueueCounter(); 140 141 /** 142 * @return number of messages queued by the client 143 */ 144 long getDequeueCounter(); 145 146 SubscriptionStatistics getSubscriptionStatistics(); 147 148 /** 149 * @return the JMS selector on the current subscription 150 */ 151 String getSelector(); 152 153 /** 154 * Attempts to change the current active selector on the subscription. 155 * This operation is not supported for persistent topics. 156 */ 157 void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException; 158 159 /** 160 * @return the JMX object name that this subscription was registered as if applicable 161 */ 162 ObjectName getObjectName(); 163 164 /** 165 * Set when the subscription is registered in JMX 166 */ 167 void setObjectName(ObjectName objectName); 168 169 /** 170 * @return true when 60% or more room is left for dispatching messages 171 */ 172 boolean isLowWaterMark(); 173 174 /** 175 * @return true when 10% or less room is left for dispatching messages 176 */ 177 boolean isHighWaterMark(); 178 179 /** 180 * @return true if there is no space to dispatch messages 181 */ 182 boolean isFull(); 183 184 /** 185 * inform the MessageConsumer on the client to change it's prefetch 186 * @param newPrefetch 187 */ 188 void updateConsumerPrefetch(int newPrefetch); 189 190 /** 191 * Called when the subscription is destroyed. 192 */ 193 void destroy(); 194 195 /** 196 * @return the prefetch size that is configured for the subscription 197 */ 198 int getPrefetchSize(); 199 200 /** 201 * @return the number of messages awaiting acknowledgement 202 */ 203 int getInFlightSize(); 204 205 /** 206 * @return the size in bytes of the messages awaiting acknowledgement 207 */ 208 long getInFlightMessageSize(); 209 210 /** 211 * @return the in flight messages as a percentage of the prefetch size 212 */ 213 int getInFlightUsage(); 214 215 /** 216 * Informs the Broker if the subscription needs to intervention to recover it's state 217 * e.g. DurableTopicSubscriber may do 218 * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor 219 * @return true if recovery required 220 */ 221 boolean isRecoveryRequired(); 222 223 /** 224 * @return true if a browser 225 */ 226 boolean isBrowser(); 227 228 /** 229 * @return the number of messages this subscription can accept before its full 230 */ 231 int countBeforeFull(); 232 233 ConnectionContext getContext(); 234 235 public int getCursorMemoryHighWaterMark(); 236 237 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); 238 239 boolean isSlowConsumer(); 240 241 void unmatched(MessageReference node) throws IOException; 242 243 /** 244 * Returns the time since the last Ack message was received by this subscription. 245 * 246 * If there has never been an ack this value should be set to the creation time of the 247 * subscription. 248 * 249 * @return time of last received Ack message or Subscription create time if no Acks. 250 */ 251 long getTimeOfLastMessageAck(); 252 253 long getConsumedCount(); 254 255 void incrementConsumedCount(); 256 257 void resetConsumedCount(); 258 259}