001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.stomp; 018 019 import java.io.DataInput; 020 import java.io.DataInputStream; 021 import java.io.DataOutput; 022 import java.io.DataOutputStream; 023 import java.io.IOException; 024 import java.io.InputStream; 025 import java.io.PushbackInputStream; 026 import java.util.HashMap; 027 import java.util.Map; 028 029 import org.apache.activemq.util.ByteArrayInputStream; 030 import org.apache.activemq.util.ByteArrayOutputStream; 031 import org.apache.activemq.util.ByteSequence; 032 import org.apache.activemq.wireformat.WireFormat; 033 034 /** 035 * Implements marshalling and unmarsalling the <a 036 * href="http://stomp.codehaus.org/">Stomp</a> protocol. 037 */ 038 public class StompWireFormat implements WireFormat { 039 040 private static final byte[] NO_DATA = new byte[] {}; 041 private static final byte[] END_OF_FRAME = new byte[] {0, '\n'}; 042 043 private static final int MAX_COMMAND_LENGTH = 1024; 044 private static final int MAX_HEADER_LENGTH = 1024 * 10; 045 private static final int MAX_HEADERS = 1000; 046 private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100; 047 048 private boolean encodingEnabled = false; 049 private int version = 1; 050 051 public ByteSequence marshal(Object command) throws IOException { 052 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 053 DataOutputStream dos = new DataOutputStream(baos); 054 marshal(command, dos); 055 dos.close(); 056 return baos.toByteSequence(); 057 } 058 059 public Object unmarshal(ByteSequence packet) throws IOException { 060 ByteArrayInputStream stream = new ByteArrayInputStream(packet); 061 DataInputStream dis = new DataInputStream(stream); 062 return unmarshal(dis); 063 } 064 065 public void marshal(Object command, DataOutput os) throws IOException { 066 StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command; 067 068 if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) { 069 os.write(Stomp.BREAK); 070 return; 071 } 072 073 StringBuilder buffer = new StringBuilder(); 074 buffer.append(stomp.getAction()); 075 buffer.append(Stomp.NEWLINE); 076 077 // Output the headers. 078 for (Map.Entry<String, String> entry : stomp.getHeaders().entrySet()) { 079 buffer.append(entry.getKey()); 080 buffer.append(Stomp.Headers.SEPERATOR); 081 buffer.append(encodeHeader(entry.getValue())); 082 buffer.append(Stomp.NEWLINE); 083 } 084 085 // Add a newline to seperate the headers from the content. 086 buffer.append(Stomp.NEWLINE); 087 088 os.write(buffer.toString().getBytes("UTF-8")); 089 os.write(stomp.getContent()); 090 os.write(END_OF_FRAME); 091 } 092 093 public Object unmarshal(DataInput in) throws IOException { 094 095 try { 096 097 // parse action 098 String action = parseAction(in); 099 100 // Parse the headers 101 HashMap<String, String> headers = parseHeaders(in); 102 103 // Read in the data part. 104 byte[] data = NO_DATA; 105 String contentLength = headers.get(Stomp.Headers.CONTENT_LENGTH); 106 if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLength != null) { 107 108 // Bless the client, he's telling us how much data to read in. 109 int length = parseContentLength(contentLength); 110 111 data = new byte[length]; 112 in.readFully(data); 113 114 if (in.readByte() != 0) { 115 throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true); 116 } 117 118 } else { 119 120 // We don't know how much to read.. data ends when we hit a 0 121 byte b; 122 ByteArrayOutputStream baos = null; 123 while ((b = in.readByte()) != 0) { 124 125 if (baos == null) { 126 baos = new ByteArrayOutputStream(); 127 } else if (baos.size() > MAX_DATA_LENGTH) { 128 throw new ProtocolException("The maximum data length was exceeded", true); 129 } 130 131 baos.write(b); 132 } 133 134 if (baos != null) { 135 baos.close(); 136 data = baos.toByteArray(); 137 } 138 } 139 140 return new StompFrame(action, headers, data); 141 142 } catch (ProtocolException e) { 143 return new StompFrameError(e); 144 } 145 } 146 147 private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException { 148 ByteSequence sequence = readHeaderLine(in, maxLength, errorMessage); 149 return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8").trim(); 150 } 151 152 private ByteSequence readHeaderLine(DataInput in, int maxLength, String errorMessage) throws IOException { 153 byte b; 154 ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength); 155 while ((b = in.readByte()) != '\n') { 156 if (baos.size() > maxLength) { 157 throw new ProtocolException(errorMessage, true); 158 } 159 baos.write(b); 160 } 161 baos.close(); 162 return baos.toByteSequence(); 163 } 164 165 protected String parseAction(DataInput in) throws IOException { 166 String action = null; 167 168 // skip white space to next real action line 169 while (true) { 170 action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); 171 if (action == null) { 172 throw new IOException("connection was closed"); 173 } else { 174 action = action.trim(); 175 if (action.length() > 0) { 176 break; 177 } 178 } 179 } 180 return action; 181 } 182 183 protected HashMap<String, String> parseHeaders(DataInput in) throws IOException { 184 HashMap<String, String> headers = new HashMap<String, String>(25); 185 while (true) { 186 ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); 187 if (line != null && line.length > 1) { 188 189 if (headers.size() > MAX_HEADERS) { 190 throw new ProtocolException("The maximum number of headers was exceeded", true); 191 } 192 193 try { 194 195 ByteArrayInputStream headerLine = new ByteArrayInputStream(line); 196 ByteArrayOutputStream stream = new ByteArrayOutputStream(line.length); 197 198 // First complete the name 199 int result = -1; 200 while ((result = headerLine.read()) != -1) { 201 if (result != ':') { 202 stream.write(result); 203 } else { 204 break; 205 } 206 } 207 208 ByteSequence nameSeq = stream.toByteSequence(); 209 String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8").trim(); 210 String value = decodeHeader(headerLine); 211 headers.put(name, value); 212 } catch (Exception e) { 213 throw new ProtocolException("Unable to parser header line [" + line + "]", true); 214 } 215 } else { 216 break; 217 } 218 } 219 return headers; 220 } 221 222 protected int parseContentLength(String contentLength) throws ProtocolException { 223 int length; 224 try { 225 length = Integer.parseInt(contentLength.trim()); 226 } catch (NumberFormatException e) { 227 throw new ProtocolException("Specified content-length is not a valid integer", true); 228 } 229 230 if (length > MAX_DATA_LENGTH) { 231 throw new ProtocolException("The maximum data length was exceeded", true); 232 } 233 234 return length; 235 } 236 237 private String encodeHeader(String header) throws IOException { 238 String result = header; 239 if (this.encodingEnabled) { 240 byte[] utf8buf = header.getBytes("UTF-8"); 241 ByteArrayOutputStream stream = new ByteArrayOutputStream(utf8buf.length); 242 for(byte val : utf8buf) { 243 switch(val) { 244 case Stomp.ESCAPE: 245 stream.write(Stomp.ESCAPE_ESCAPE_SEQ); 246 break; 247 case Stomp.BREAK: 248 stream.write(Stomp.NEWLINE_ESCAPE_SEQ); 249 break; 250 case Stomp.COLON: 251 stream.write(Stomp.COLON_ESCAPE_SEQ); 252 break; 253 default: 254 stream.write(val); 255 } 256 } 257 result = new String(stream.toByteArray(), "UTF-8"); 258 } 259 260 return result; 261 } 262 263 private String decodeHeader(InputStream header) throws IOException { 264 ByteArrayOutputStream decoded = new ByteArrayOutputStream(); 265 PushbackInputStream stream = new PushbackInputStream(header); 266 267 int value = -1; 268 while( (value = stream.read()) != -1) { 269 if (value == 92) { 270 271 int next = stream.read(); 272 if (next != -1) { 273 switch(next) { 274 case 110: 275 decoded.write(Stomp.BREAK); 276 break; 277 case 99: 278 decoded.write(Stomp.COLON); 279 break; 280 case 92: 281 decoded.write(Stomp.ESCAPE); 282 break; 283 default: 284 stream.unread(next); 285 decoded.write(value); 286 } 287 } else { 288 decoded.write(value); 289 } 290 291 } else { 292 decoded.write(value); 293 } 294 } 295 296 return new String(decoded.toByteArray(), "UTF-8"); 297 } 298 299 public int getVersion() { 300 return version; 301 } 302 303 public void setVersion(int version) { 304 this.version = version; 305 } 306 307 public boolean isEncodingEnabled() { 308 return this.encodingEnabled; 309 } 310 311 public void setEncodingEnabled(boolean value) { 312 this.encodingEnabled = value; 313 } 314 315 }