001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.broker.util; 019 020 import org.apache.activemq.broker.BrokerPluginSupport; 021 import org.apache.activemq.broker.ConnectionContext; 022 import org.apache.activemq.broker.ConsumerBrokerExchange; 023 import org.apache.activemq.broker.ProducerBrokerExchange; 024 import org.apache.activemq.broker.region.Destination; 025 import org.apache.activemq.broker.region.Subscription; 026 import org.apache.activemq.command.*; 027 import org.apache.activemq.filter.DestinationPath; 028 029 import java.util.ArrayList; 030 import java.util.List; 031 import java.util.StringTokenizer; 032 033 /** 034 * @org.apache.xbean.XBean element="destinationPathSeparatorPlugin" 035 */ 036 037 public class DestinationPathSeparatorBroker extends BrokerPluginSupport { 038 039 String pathSeparator = "/"; 040 041 protected ActiveMQDestination convertDestination(ActiveMQDestination destination) { 042 if (destination != null && destination.getPhysicalName().contains(pathSeparator)) { 043 List<String> l = new ArrayList<String>(); 044 StringTokenizer iter = new StringTokenizer(destination.getPhysicalName(), pathSeparator); 045 while (iter.hasMoreTokens()) { 046 String name = iter.nextToken().trim(); 047 if (name.length() == 0) { 048 continue; 049 } 050 l.add(name); 051 } 052 053 String newName = DestinationPath.toString(l.toArray(new String[l.size()])); 054 return ActiveMQDestination.createDestination(newName, destination.getDestinationType()); 055 } else { 056 return destination; 057 } 058 } 059 060 061 @Override 062 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 063 ack.setDestination(convertDestination(ack.getDestination())); 064 super.acknowledge(consumerExchange, ack); 065 } 066 067 @Override 068 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 069 info.setDestination(convertDestination(info.getDestination())); 070 return super.addConsumer(context, info); 071 } 072 073 @Override 074 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 075 info.setDestination(convertDestination(info.getDestination())); 076 super.addProducer(context, info); 077 } 078 079 @Override 080 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 081 info.setDestination(convertDestination(info.getDestination())); 082 super.removeConsumer(context, info); 083 } 084 085 @Override 086 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 087 info.setDestination(convertDestination(info.getDestination())); 088 super.removeProducer(context, info); 089 } 090 091 @Override 092 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 093 messageSend.setDestination(convertDestination(messageSend.getDestination())); 094 super.send(producerExchange, messageSend); 095 } 096 097 @Override 098 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { 099 return super.addDestination(context, convertDestination(destination), createIfTemporary); 100 } 101 102 @Override 103 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 104 super.removeDestination(context, convertDestination(destination), timeout); 105 } 106 107 @Override 108 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 109 info.setDestination(convertDestination(info.getDestination())); 110 super.addDestinationInfo(context, info); 111 } 112 113 @Override 114 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 115 info.setDestination(convertDestination(info.getDestination())); 116 super.removeDestinationInfo(context, info); 117 } 118 119 @Override 120 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 121 control.setDestination(convertDestination(control.getDestination())); 122 super.processConsumerControl(consumerExchange, control); 123 } 124 125 @Override 126 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 127 pull.setDestination(convertDestination(pull.getDestination())); 128 return super.messagePull(context, pull); 129 } 130 131 public void setPathSeparator(String pathSeparator) { 132 this.pathSeparator = pathSeparator; 133 } 134 }