001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.transport.stomp; 019 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.OutputStream; 025 import java.net.Socket; 026 import java.net.UnknownHostException; 027 import java.util.HashMap; 028 029 public class StompConnection { 030 031 public static final long RECEIVE_TIMEOUT = 10000; 032 033 private Socket stompSocket; 034 private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream(); 035 036 public void open(String host, int port) throws IOException, UnknownHostException { 037 open(new Socket(host, port)); 038 } 039 040 public void open(Socket socket) { 041 stompSocket = socket; 042 } 043 044 public void close() throws IOException { 045 if (stompSocket != null) { 046 stompSocket.close(); 047 stompSocket = null; 048 } 049 } 050 051 public void sendFrame(String data) throws Exception { 052 byte[] bytes = data.getBytes("UTF-8"); 053 OutputStream outputStream = stompSocket.getOutputStream(); 054 outputStream.write(bytes); 055 outputStream.flush(); 056 } 057 058 public void sendFrame(String frame, byte[] data) throws Exception { 059 byte[] bytes = frame.getBytes("UTF-8"); 060 OutputStream outputStream = stompSocket.getOutputStream(); 061 outputStream.write(bytes); 062 outputStream.write(data); 063 outputStream.flush(); 064 } 065 066 public StompFrame receive() throws Exception { 067 return receive(RECEIVE_TIMEOUT); 068 } 069 070 public StompFrame receive(long timeOut) throws Exception { 071 stompSocket.setSoTimeout((int)timeOut); 072 InputStream is = stompSocket.getInputStream(); 073 StompWireFormat wf = new StompWireFormat(); 074 DataInputStream dis = new DataInputStream(is); 075 return (StompFrame)wf.unmarshal(dis); 076 } 077 078 public String receiveFrame() throws Exception { 079 return receiveFrame(RECEIVE_TIMEOUT); 080 } 081 082 public String receiveFrame(long timeOut) throws Exception { 083 stompSocket.setSoTimeout((int)timeOut); 084 InputStream is = stompSocket.getInputStream(); 085 int c = 0; 086 for (;;) { 087 c = is.read(); 088 if (c < 0) { 089 throw new IOException("socket closed."); 090 } else if (c == 0) { 091 c = is.read(); 092 if (c == '\n') { 093 // end of frame 094 return stringFromBuffer(inputBuffer); 095 } else { 096 inputBuffer.write(0); 097 inputBuffer.write(c); 098 } 099 } else { 100 inputBuffer.write(c); 101 } 102 } 103 } 104 105 private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception { 106 byte[] ba = inputBuffer.toByteArray(); 107 inputBuffer.reset(); 108 return new String(ba, "UTF-8"); 109 } 110 111 public Socket getStompSocket() { 112 return stompSocket; 113 } 114 115 public void setStompSocket(Socket stompSocket) { 116 this.stompSocket = stompSocket; 117 } 118 119 public void connect(String username, String password) throws Exception { 120 connect(username, password, null); 121 } 122 123 public void connect(String username, String password, String client) throws Exception { 124 HashMap<String, String> headers = new HashMap<String, String>(); 125 headers.put("login", username); 126 headers.put("passcode", password); 127 if (client != null) { 128 headers.put("client-id", client); 129 } 130 StompFrame frame = new StompFrame("CONNECT", headers); 131 sendFrame(frame.format()); 132 133 StompFrame connect = receive(); 134 if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) { 135 throw new Exception ("Not connected: " + connect.getBody()); 136 } 137 } 138 139 public void disconnect() throws Exception { 140 StompFrame frame = new StompFrame("DISCONNECT"); 141 sendFrame(frame.format()); 142 } 143 144 public void send(String destination, String message) throws Exception { 145 send(destination, message, null, null); 146 } 147 148 public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception { 149 if (headers == null) { 150 headers = new HashMap<String, String>(); 151 } 152 headers.put("destination", destination); 153 if (transaction != null) { 154 headers.put("transaction", transaction); 155 } 156 StompFrame frame = new StompFrame("SEND", headers, message.getBytes()); 157 sendFrame(frame.format()); 158 } 159 160 public void subscribe(String destination) throws Exception { 161 subscribe(destination, null, null); 162 } 163 164 public void subscribe(String destination, String ack) throws Exception { 165 subscribe(destination, ack, new HashMap<String, String>()); 166 } 167 168 public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception { 169 if (headers == null) { 170 headers = new HashMap<String, String>(); 171 } 172 headers.put("destination", destination); 173 if (ack != null) { 174 headers.put("ack", ack); 175 } 176 StompFrame frame = new StompFrame("SUBSCRIBE", headers); 177 sendFrame(frame.format()); 178 } 179 180 public void unsubscribe(String destination) throws Exception { 181 unsubscribe(destination, null); 182 } 183 184 public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception { 185 if (headers == null) { 186 headers = new HashMap<String, String>(); 187 } 188 headers.put("destination", destination); 189 StompFrame frame = new StompFrame("UNSUBSCRIBE", headers); 190 sendFrame(frame.format()); 191 } 192 193 public void begin(String transaction) throws Exception { 194 HashMap<String, String> headers = new HashMap<String, String>(); 195 headers.put("transaction", transaction); 196 StompFrame frame = new StompFrame("BEGIN", headers); 197 sendFrame(frame.format()); 198 } 199 200 public void abort(String transaction) throws Exception { 201 HashMap<String, String> headers = new HashMap<String, String>(); 202 headers.put("transaction", transaction); 203 StompFrame frame = new StompFrame("ABORT", headers); 204 sendFrame(frame.format()); 205 } 206 207 public void commit(String transaction) throws Exception { 208 HashMap<String, String> headers = new HashMap<String, String>(); 209 headers.put("transaction", transaction); 210 StompFrame frame = new StompFrame("COMMIT", headers); 211 sendFrame(frame.format()); 212 } 213 214 public void ack(StompFrame frame) throws Exception { 215 ack(frame.getHeaders().get("message-id"), null); 216 } 217 218 public void ack(StompFrame frame, String transaction) throws Exception { 219 ack(frame.getHeaders().get("message-id"), transaction); 220 } 221 222 public void ack(String messageId) throws Exception { 223 ack(messageId, null); 224 } 225 226 public void ack(String messageId, String transaction) throws Exception { 227 HashMap<String, String> headers = new HashMap<String, String>(); 228 headers.put("message-id", messageId); 229 if (transaction != null) 230 headers.put("transaction", transaction); 231 StompFrame frame = new StompFrame("ACK", headers); 232 sendFrame(frame.format()); 233 } 234 235 protected String appendHeaders(HashMap<String, Object> headers) { 236 StringBuilder result = new StringBuilder(); 237 for (String key : headers.keySet()) { 238 result.append(key + ":" + headers.get(key) + "\n"); 239 } 240 result.append("\n"); 241 return result.toString(); 242 } 243 244 }