001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.stomp; 018 019 import org.apache.activemq.command.*; 020 021 import javax.jms.JMSException; 022 import java.io.IOException; 023 import java.util.Iterator; 024 import java.util.LinkedHashMap; 025 import java.util.LinkedList; 026 import java.util.Map; 027 import java.util.Map.Entry; 028 029 /** 030 * Keeps track of the STOMP subscription so that acking is correctly done. 031 * 032 * @author <a href="http://hiramchirino.com">chirino</a> 033 */ 034 public class StompSubscription { 035 036 public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO; 037 public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT; 038 public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL; 039 040 protected final ProtocolConverter protocolConverter; 041 protected final String subscriptionId; 042 protected final ConsumerInfo consumerInfo; 043 044 protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>(); 045 protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>(); 046 047 protected String ackMode = AUTO_ACK; 048 protected ActiveMQDestination destination; 049 protected String transformation; 050 051 public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { 052 this.protocolConverter = stompTransport; 053 this.subscriptionId = subscriptionId; 054 this.consumerInfo = consumerInfo; 055 this.transformation = transformation; 056 } 057 058 void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { 059 ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 060 if (ackMode == CLIENT_ACK) { 061 synchronized (this) { 062 dispatchedMessage.put(message.getMessageId(), md); 063 } 064 } else if (ackMode == INDIVIDUAL_ACK) { 065 synchronized (this) { 066 dispatchedMessage.put(message.getMessageId(), md); 067 } 068 } else if (ackMode == AUTO_ACK) { 069 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 070 protocolConverter.getStompTransport().sendToActiveMQ(ack); 071 } 072 073 boolean ignoreTransformation = false; 074 075 if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) { 076 message.setReadOnlyProperties(false); 077 message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation); 078 } else { 079 if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) { 080 ignoreTransformation = true; 081 } 082 } 083 084 StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation); 085 086 command.setAction(Stomp.Responses.MESSAGE); 087 if (subscriptionId != null) { 088 command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); 089 } 090 091 protocolConverter.getStompTransport().sendToStomp(command); 092 } 093 094 synchronized void onStompAbort(TransactionId transactionId) { 095 unconsumedMessage.clear(); 096 } 097 098 synchronized void onStompCommit(TransactionId transactionId) { 099 for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { 100 @SuppressWarnings("rawtypes") 101 Map.Entry entry = (Entry)iter.next(); 102 MessageDispatch msg = (MessageDispatch)entry.getValue(); 103 if (unconsumedMessage.contains(msg)) { 104 iter.remove(); 105 } 106 } 107 108 if (!unconsumedMessage.isEmpty()) { 109 MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); 110 protocolConverter.getStompTransport().sendToActiveMQ(ack); 111 unconsumedMessage.clear(); 112 } 113 } 114 115 synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) { 116 117 MessageId msgId = new MessageId(messageId); 118 119 if (!dispatchedMessage.containsKey(msgId)) { 120 return null; 121 } 122 123 MessageAck ack = new MessageAck(); 124 ack.setDestination(consumerInfo.getDestination()); 125 ack.setConsumerId(consumerInfo.getConsumerId()); 126 127 if (ackMode == CLIENT_ACK) { 128 if (transactionId == null) { 129 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 130 } else { 131 ack.setAckType(MessageAck.DELIVERED_ACK_TYPE); 132 } 133 int count = 0; 134 for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { 135 136 @SuppressWarnings("rawtypes") 137 Map.Entry entry = (Entry)iter.next(); 138 MessageId id = (MessageId)entry.getKey(); 139 MessageDispatch msg = (MessageDispatch)entry.getValue(); 140 141 if (transactionId != null) { 142 if (!unconsumedMessage.contains(msg)) { 143 unconsumedMessage.add(msg); 144 count++; 145 } 146 } else { 147 iter.remove(); 148 count++; 149 } 150 151 if (id.equals(msgId)) { 152 ack.setLastMessageId(id); 153 break; 154 } 155 } 156 ack.setMessageCount(count); 157 if (transactionId != null) { 158 ack.setTransactionId(transactionId); 159 } 160 161 } else if (ackMode == INDIVIDUAL_ACK) { 162 ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); 163 ack.setMessageID(msgId); 164 if (transactionId != null) { 165 unconsumedMessage.add(dispatchedMessage.get(msgId)); 166 ack.setTransactionId(transactionId); 167 } 168 dispatchedMessage.remove(msgId); 169 } 170 return ack; 171 } 172 173 public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException { 174 175 MessageId msgId = new MessageId(messageId); 176 177 if (!dispatchedMessage.containsKey(msgId)) { 178 return null; 179 } 180 181 MessageAck ack = new MessageAck(); 182 ack.setDestination(consumerInfo.getDestination()); 183 ack.setConsumerId(consumerInfo.getConsumerId()); 184 ack.setAckType(MessageAck.POSION_ACK_TYPE); 185 ack.setMessageID(msgId); 186 if (transactionId != null) { 187 unconsumedMessage.add(dispatchedMessage.get(msgId)); 188 ack.setTransactionId(transactionId); 189 } 190 dispatchedMessage.remove(msgId); 191 192 return ack; 193 } 194 195 public String getAckMode() { 196 return ackMode; 197 } 198 199 public void setAckMode(String ackMode) { 200 this.ackMode = ackMode; 201 } 202 203 public String getSubscriptionId() { 204 return subscriptionId; 205 } 206 207 public void setDestination(ActiveMQDestination destination) { 208 this.destination = destination; 209 } 210 211 public ActiveMQDestination getDestination() { 212 return destination; 213 } 214 215 public ConsumerInfo getConsumerInfo() { 216 return consumerInfo; 217 } 218 }