001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.stomp; 018 019 import java.io.DataOutputStream; 020 import java.io.IOException; 021 import java.util.HashMap; 022 import java.util.Map; 023 024 import javax.jms.Destination; 025 import javax.jms.JMSException; 026 027 import org.apache.activemq.advisory.AdvisorySupport; 028 import org.apache.activemq.command.ActiveMQBytesMessage; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.ActiveMQMessage; 031 import org.apache.activemq.command.ActiveMQTextMessage; 032 import org.apache.activemq.command.DataStructure; 033 import org.apache.activemq.util.ByteArrayOutputStream; 034 import org.apache.activemq.util.ByteSequence; 035 036 import com.thoughtworks.xstream.XStream; 037 import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; 038 039 /** 040 * Implements ActiveMQ 4.0 translations 041 */ 042 public class LegacyFrameTranslator implements FrameTranslator { 043 044 045 public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { 046 final Map<?, ?> headers = command.getHeaders(); 047 final ActiveMQMessage msg; 048 /* 049 * To reduce the complexity of this method perhaps a Chain of Responsibility 050 * would be a better implementation 051 */ 052 if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) { 053 String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE); 054 if(intendedType.equalsIgnoreCase("text")){ 055 ActiveMQTextMessage text = new ActiveMQTextMessage(); 056 try { 057 //text.setText(new String(command.getContent(), "UTF-8")); 058 ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4); 059 DataOutputStream data = new DataOutputStream(bytes); 060 data.writeInt(command.getContent().length); 061 data.write(command.getContent()); 062 text.setContent(bytes.toByteSequence()); 063 } catch (Throwable e) { 064 throw new ProtocolException("Text could not bet set: " + e, false, e); 065 } 066 msg = text; 067 } else if(intendedType.equalsIgnoreCase("bytes")) { 068 ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage(); 069 byteMessage.writeBytes(command.getContent()); 070 msg = byteMessage; 071 } else { 072 throw new ProtocolException("Unsupported message type '"+intendedType+"'",false); 073 } 074 }else if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { 075 headers.remove(Stomp.Headers.CONTENT_LENGTH); 076 ActiveMQBytesMessage bm = new ActiveMQBytesMessage(); 077 bm.writeBytes(command.getContent()); 078 msg = bm; 079 } else { 080 ActiveMQTextMessage text = new ActiveMQTextMessage(); 081 try { 082 //text.setText(new String(command.getContent(), "UTF-8")); 083 ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4); 084 DataOutputStream data = new DataOutputStream(bytes); 085 data.writeInt(command.getContent().length); 086 data.write(command.getContent()); 087 text.setContent(bytes.toByteSequence()); 088 } catch (Throwable e) { 089 throw new ProtocolException("Text could not bet set: " + e, false, e); 090 } 091 msg = text; 092 } 093 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); 094 return msg; 095 } 096 097 public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { 098 StompFrame command = new StompFrame(); 099 command.setAction(Stomp.Responses.MESSAGE); 100 Map<String, String> headers = new HashMap<String, String>(25); 101 command.setHeaders(headers); 102 103 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); 104 105 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { 106 107 if (!message.isCompressed() && message.getContent() != null) { 108 ByteSequence msgContent = message.getContent(); 109 if (msgContent.getLength() > 4) { 110 byte[] content = new byte[msgContent.getLength() - 4]; 111 System.arraycopy(msgContent.data, 4, content, 0, content.length); 112 command.setContent(content); 113 } 114 } else { 115 ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy(); 116 String messageText = msg.getText(); 117 if (messageText != null) { 118 command.setContent(msg.getText().getBytes("UTF-8")); 119 } 120 } 121 122 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { 123 124 ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy(); 125 msg.setReadOnlyBody(true); 126 byte[] data = new byte[(int)msg.getBodyLength()]; 127 msg.readBytes(data); 128 129 headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length)); 130 command.setContent(data); 131 } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && 132 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { 133 134 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( 135 converter, message, command, this); 136 137 String body = marshallAdvisory(message.getDataStructure()); 138 command.setContent(body.getBytes("UTF-8")); 139 } 140 return command; 141 } 142 143 public String convertDestination(ProtocolConverter converter, Destination d) { 144 if (d == null) { 145 return null; 146 } 147 ActiveMQDestination activeMQDestination = (ActiveMQDestination)d; 148 String physicalName = activeMQDestination.getPhysicalName(); 149 150 String rc = converter.getCreatedTempDestinationName(activeMQDestination); 151 if( rc!=null ) { 152 return rc; 153 } 154 155 StringBuilder buffer = new StringBuilder(); 156 if (activeMQDestination.isQueue()) { 157 if (activeMQDestination.isTemporary()) { 158 buffer.append("/remote-temp-queue/"); 159 } else { 160 buffer.append("/queue/"); 161 } 162 } else { 163 if (activeMQDestination.isTemporary()) { 164 buffer.append("/remote-temp-topic/"); 165 } else { 166 buffer.append("/topic/"); 167 } 168 } 169 buffer.append(physicalName); 170 return buffer.toString(); 171 } 172 173 public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException { 174 if (name == null) { 175 return null; 176 } else if (name.startsWith("/queue/")) { 177 String qName = name.substring("/queue/".length(), name.length()); 178 return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE); 179 } else if (name.startsWith("/topic/")) { 180 String tName = name.substring("/topic/".length(), name.length()); 181 return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE); 182 } else if (name.startsWith("/remote-temp-queue/")) { 183 String tName = name.substring("/remote-temp-queue/".length(), name.length()); 184 return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE); 185 } else if (name.startsWith("/remote-temp-topic/")) { 186 String tName = name.substring("/remote-temp-topic/".length(), name.length()); 187 return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE); 188 } else if (name.startsWith("/temp-queue/")) { 189 return converter.createTempDestination(name, false); 190 } else if (name.startsWith("/temp-topic/")) { 191 return converter.createTempDestination(name, true); 192 } else { 193 if (forceFallback) { 194 try { 195 ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(name); 196 if (fallback != null) { 197 return fallback; 198 } 199 } catch (JMSException e) { 200 throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " 201 + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e); 202 } 203 } 204 throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " 205 + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); 206 } 207 } 208 209 /** 210 * Return an Advisory message as a JSON formatted string 211 * @param ds 212 * @return 213 */ 214 protected String marshallAdvisory(final DataStructure ds) { 215 XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); 216 xstream.setMode(XStream.NO_REFERENCES); 217 xstream.aliasPackage("", "org.apache.activemq.command"); 218 return xstream.toXML(ds); 219 } 220 }