001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt.strategy;
018
019import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertActiveMQToMQTT;
020import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertMQTTToActiveMQ;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Set;
028import java.util.StringTokenizer;
029
030import org.apache.activemq.ActiveMQPrefetchPolicy;
031import org.apache.activemq.broker.region.QueueRegion;
032import org.apache.activemq.broker.region.RegionBroker;
033import org.apache.activemq.command.ActiveMQDestination;
034import org.apache.activemq.command.ActiveMQQueue;
035import org.apache.activemq.command.ActiveMQTopic;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.DestinationInfo;
038import org.apache.activemq.command.RemoveSubscriptionInfo;
039import org.apache.activemq.command.Response;
040import org.apache.activemq.command.SubscriptionInfo;
041import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
042import org.apache.activemq.transport.mqtt.MQTTProtocolException;
043import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
044import org.apache.activemq.transport.mqtt.MQTTSubscription;
045import org.apache.activemq.transport.mqtt.ResponseHandler;
046import org.fusesource.mqtt.client.QoS;
047import org.fusesource.mqtt.codec.CONNECT;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Subscription strategy that converts all MQTT subscribes that would be durable to
053 * Virtual Topic Queue subscriptions.  Also maps all publish requests to be prefixed
054 * with the VirtualTopic. prefix unless already present.
055 */
056public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
057
058    private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic.";
059    private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer.";
060
061    private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionStrategy.class);
062
063    private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet<ActiveMQQueue>());
064
065    @Override
066    public void onConnect(CONNECT connect) throws MQTTProtocolException {
067        List<ActiveMQQueue> queues = lookupQueues(protocol.getClientId());
068        List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId());
069
070        // When clean session is true we must purge all of the client's old Queue subscriptions
071        // and any durable subscriptions created on the VirtualTopic instance as well.
072
073        if (connect.cleanSession()) {
074            deleteDurableQueues(queues);
075            deleteDurableSubs(subs);
076        } else {
077            restoreDurableQueue(queues);
078            restoreDurableSubs(subs);
079        }
080    }
081
082    @Override
083    public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
084        ActiveMQDestination destination = null;
085        int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
086        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
087        String converted = convertMQTTToActiveMQ(topicName);
088        if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
089
090            if (converted.startsWith(VIRTUALTOPIC_PREFIX)) {
091                destination = new ActiveMQTopic(converted);
092                prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
093                consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
094            } else {
095                converted = VIRTUALTOPIC_CONSUMER_PREFIX +
096                            convertMQTTToActiveMQ(protocol.getClientId()) + ":" + requestedQoS + "." +
097                            VIRTUALTOPIC_PREFIX + converted;
098                destination = new ActiveMQQueue(converted);
099                prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
100            }
101        } else {
102            if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
103                converted = VIRTUALTOPIC_PREFIX + converted;
104            }
105            destination = new ActiveMQTopic(converted);
106            prefetch = ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
107        }
108
109        consumerInfo.setDestination(destination);
110        if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
111            consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
112        } else {
113            consumerInfo.setPrefetchSize(prefetch);
114        }
115        consumerInfo.setRetroactive(true);
116        consumerInfo.setDispatchAsync(true);
117
118        return doSubscribe(consumerInfo, topicName, requestedQoS);
119    }
120
121    @Override
122    public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
123
124        ActiveMQDestination destination = mqttSubscription.getDestination();
125
126        // check whether the Queue has been recovered in restoreDurableQueue
127        // mark subscription available for recovery for duplicate subscription
128        if (destination.isQueue() && restoredQueues.remove(destination)) {
129            return;
130        }
131
132        // check whether the Topic has been recovered in restoreDurableSubs
133        // mark subscription available for recovery for duplicate subscription
134        if (destination.isTopic() && restoredDurableSubs.remove(destination.getPhysicalName())) {
135            return;
136        }
137
138        if (mqttSubscription.getDestination().isTopic()) {
139            super.onReSubscribe(mqttSubscription);
140        } else {
141            doUnSubscribe(mqttSubscription);
142            ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
143            consumerInfo.setConsumerId(getNextConsumerId());
144            doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS());
145        }
146    }
147
148    @Override
149    public void onUnSubscribe(String topicName) throws MQTTProtocolException {
150        MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
151        if (subscription != null) {
152            doUnSubscribe(subscription);
153            if (subscription.getDestination().isQueue()) {
154                DestinationInfo remove = new DestinationInfo();
155                remove.setConnectionId(protocol.getConnectionId());
156                remove.setDestination(subscription.getDestination());
157                remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
158
159                protocol.sendToActiveMQ(remove, new ResponseHandler() {
160                    @Override
161                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
162                        // ignore failures..
163                    }
164                });
165            } else if (subscription.getConsumerInfo().getSubscriptionName() != null) {
166                // also remove it from restored durable subscriptions set
167                restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
168
169                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
170                rsi.setConnectionId(protocol.getConnectionId());
171                rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
172                rsi.setClientId(protocol.getClientId());
173                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
174                    @Override
175                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
176                        // ignore failures..
177                    }
178                });
179            }
180        }
181    }
182
183    @Override
184    public ActiveMQDestination onSend(String topicName) {
185        ActiveMQTopic topic = new ActiveMQTopic(topicName);
186        if (topic.isComposite()) {
187           ActiveMQDestination[] composites = topic.getCompositeDestinations();
188           for (ActiveMQDestination composite : composites) {
189                composite.setPhysicalName(prefix(composite.getPhysicalName()));
190           }
191           ActiveMQTopic result = new ActiveMQTopic();
192           result.setCompositeDestinations(composites);
193           return result;
194        } else {
195          return new ActiveMQTopic(prefix(topicName));
196        }
197    }
198
199    private String prefix(String topicName) {
200        if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
201            return VIRTUALTOPIC_PREFIX + topicName;
202        } else {
203            return topicName;
204        }
205    }
206
207    @Override
208    public String onSend(ActiveMQDestination destination) {
209        String destinationName = destination.getPhysicalName();
210        int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX);
211        if (position >= 0) {
212            destinationName = destinationName.substring(position + VIRTUALTOPIC_PREFIX.length()).substring(0);
213        }
214        return destinationName;
215    }
216
217    @Override
218    public boolean isControlTopic(ActiveMQDestination destination) {
219        String destinationName = destination.getPhysicalName();
220        if (destinationName.startsWith("$") || destinationName.startsWith(VIRTUALTOPIC_PREFIX + "$")) {
221            return true;
222        }
223        return false;
224    }
225
226    private void deleteDurableQueues(List<ActiveMQQueue> queues) {
227        try {
228            for (ActiveMQQueue queue : queues) {
229                LOG.debug("Removing queue subscription for {} ",queue.getPhysicalName());
230                DestinationInfo removeAction = new DestinationInfo();
231                removeAction.setConnectionId(protocol.getConnectionId());
232                removeAction.setDestination(queue);
233                removeAction.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
234
235                protocol.sendToActiveMQ(removeAction, new ResponseHandler() {
236                    @Override
237                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
238                        // ignore failures..
239                    }
240                });
241            }
242        } catch (Throwable e) {
243            LOG.warn("Could not delete the MQTT queue subscriptions.", e);
244        }
245    }
246
247    private void restoreDurableQueue(List<ActiveMQQueue> queues) {
248        try {
249            for (ActiveMQQueue queue : queues) {
250                String name = queue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length());
251                StringTokenizer tokenizer = new StringTokenizer(name);
252                tokenizer.nextToken(":.");
253                String qosString = tokenizer.nextToken();
254                tokenizer.nextToken();
255                String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1));
256                QoS qoS = QoS.valueOf(qosString);
257                LOG.trace("Restoring queue subscription: {}:{}", topicName, qoS);
258
259                ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
260                consumerInfo.setDestination(queue);
261                consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
262                if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
263                    consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
264                }
265                consumerInfo.setRetroactive(true);
266                consumerInfo.setDispatchAsync(true);
267
268                doSubscribe(consumerInfo, topicName, qoS);
269
270                // mark this durable subscription as restored by Broker
271                restoredQueues.add(queue);
272            }
273        } catch (IOException e) {
274            LOG.warn("Could not restore the MQTT queue subscriptions.", e);
275        }
276    }
277
278    List<ActiveMQQueue> lookupQueues(String clientId) throws MQTTProtocolException {
279        List<ActiveMQQueue> result = new ArrayList<ActiveMQQueue>();
280        RegionBroker regionBroker;
281
282        try {
283            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
284        } catch (Exception e) {
285            throw new MQTTProtocolException("Error recovering queues: " + e.getMessage(), false, e);
286        }
287
288        final QueueRegion queueRegion = (QueueRegion) regionBroker.getQueueRegion();
289        for (ActiveMQDestination destination : queueRegion.getDestinationMap().keySet()) {
290            if (destination.isQueue() && !destination.isTemporary()) {
291                if (destination.getPhysicalName().startsWith("Consumer." + clientId + ":")) {
292                    LOG.debug("Recovered client sub: {} on connect", destination.getPhysicalName());
293                    result.add((ActiveMQQueue) destination);
294                }
295            }
296        }
297
298        return result;
299    }
300}