001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.stomp;
019    
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    import java.net.Socket;
026    import java.net.UnknownHostException;
027    import java.util.HashMap;
028    
029    public class StompConnection {
030    
031        public static final long RECEIVE_TIMEOUT = 10000;
032    
033        private Socket stompSocket;
034        private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035    
036        public void open(String host, int port) throws IOException, UnknownHostException {
037            open(new Socket(host, port));
038        }
039    
040        public void open(Socket socket) {
041            stompSocket = socket;
042        }
043    
044        public void close() throws IOException {
045            if (stompSocket != null) {
046                stompSocket.close();
047                stompSocket = null;
048            }
049        }
050    
051        public void sendFrame(String data) throws Exception {
052            byte[] bytes = data.getBytes("UTF-8");
053            OutputStream outputStream = stompSocket.getOutputStream();
054            outputStream.write(bytes);
055            outputStream.flush();
056        }
057    
058        public void sendFrame(String frame, byte[] data) throws Exception {
059            byte[] bytes = frame.getBytes("UTF-8");
060            OutputStream outputStream = stompSocket.getOutputStream();
061            outputStream.write(bytes);
062            outputStream.write(data);
063            outputStream.flush();
064        }
065    
066        public StompFrame receive() throws Exception {
067            return receive(RECEIVE_TIMEOUT);
068        }
069    
070        public StompFrame receive(long timeOut) throws Exception {
071            stompSocket.setSoTimeout((int)timeOut);
072            InputStream is = stompSocket.getInputStream();
073            StompWireFormat wf = new StompWireFormat();
074            DataInputStream dis = new DataInputStream(is);
075            return (StompFrame)wf.unmarshal(dis);
076        }
077    
078        public String receiveFrame() throws Exception {
079            return receiveFrame(RECEIVE_TIMEOUT);
080        }
081    
082        public String receiveFrame(long timeOut) throws Exception {
083            stompSocket.setSoTimeout((int)timeOut);
084            InputStream is = stompSocket.getInputStream();
085            int c = 0;
086            for (;;) {
087                c = is.read();
088                if (c < 0) {
089                    throw new IOException("socket closed.");
090                } else if (c == 0) {
091                    c = is.read();
092                    if (c == '\n') {
093                        // end of frame
094                        return stringFromBuffer(inputBuffer);
095                    } else {
096                        inputBuffer.write(0);
097                        inputBuffer.write(c);
098                    }
099                } else {
100                    inputBuffer.write(c);
101                }
102            }
103        }
104    
105        private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
106            byte[] ba = inputBuffer.toByteArray();
107            inputBuffer.reset();
108            return new String(ba, "UTF-8");
109        }
110    
111        public Socket getStompSocket() {
112            return stompSocket;
113        }
114    
115        public void setStompSocket(Socket stompSocket) {
116            this.stompSocket = stompSocket;
117        }
118    
119        public void connect(String username, String password) throws Exception {
120            connect(username, password, null);
121        }
122    
123        public void connect(String username, String password, String client) throws Exception {
124            HashMap<String, String> headers = new HashMap<String, String>();
125            headers.put("login", username);
126            headers.put("passcode", password);
127            if (client != null) {
128                headers.put("client-id", client);
129            }
130            StompFrame frame = new StompFrame("CONNECT", headers);
131            sendFrame(frame.format());
132    
133            StompFrame connect = receive();
134            if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
135                throw new Exception ("Not connected: " + connect.getBody());
136            }
137        }
138    
139        public void disconnect() throws Exception {
140            StompFrame frame = new StompFrame("DISCONNECT");
141            sendFrame(frame.format());
142        }
143    
144        public void send(String destination, String message) throws Exception {
145            send(destination, message, null, null);
146        }
147    
148        public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
149            if (headers == null) {
150                headers = new HashMap<String, String>();
151            }
152            headers.put("destination", destination);
153            if (transaction != null) {
154                headers.put("transaction", transaction);
155            }
156            StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
157            sendFrame(frame.format());
158        }
159    
160        public void subscribe(String destination) throws Exception {
161            subscribe(destination, null, null);
162        }
163    
164        public void subscribe(String destination, String ack) throws Exception {
165            subscribe(destination, ack, new HashMap<String, String>());
166        }
167    
168        public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
169            if (headers == null) {
170                headers = new HashMap<String, String>();
171            }
172            headers.put("destination", destination);
173            if (ack != null) {
174                headers.put("ack", ack);
175            }
176            StompFrame frame = new StompFrame("SUBSCRIBE", headers);
177            sendFrame(frame.format());
178        }
179    
180        public void unsubscribe(String destination) throws Exception {
181            unsubscribe(destination, null);
182        }
183    
184        public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
185            if (headers == null) {
186                headers = new HashMap<String, String>();
187            }
188            headers.put("destination", destination);
189            StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
190            sendFrame(frame.format());
191        }
192    
193        public void begin(String transaction) throws Exception {
194            HashMap<String, String> headers = new HashMap<String, String>();
195            headers.put("transaction", transaction);
196            StompFrame frame = new StompFrame("BEGIN", headers);
197            sendFrame(frame.format());
198        }
199    
200        public void abort(String transaction) throws Exception {
201            HashMap<String, String> headers = new HashMap<String, String>();
202            headers.put("transaction", transaction);
203            StompFrame frame = new StompFrame("ABORT", headers);
204            sendFrame(frame.format());
205        }
206    
207        public void commit(String transaction) throws Exception {
208            HashMap<String, String> headers = new HashMap<String, String>();
209            headers.put("transaction", transaction);
210            StompFrame frame = new StompFrame("COMMIT", headers);
211            sendFrame(frame.format());
212        }
213    
214        public void ack(StompFrame frame) throws Exception {
215            ack(frame.getHeaders().get("message-id"), null);
216        }
217    
218        public void ack(StompFrame frame, String transaction) throws Exception {
219            ack(frame.getHeaders().get("message-id"), transaction);
220        }
221    
222        public void ack(String messageId) throws Exception {
223            ack(messageId, null);
224        }
225    
226        public void ack(String messageId, String transaction) throws Exception {
227            HashMap<String, String> headers = new HashMap<String, String>();
228            headers.put("message-id", messageId);
229            if (transaction != null)
230                headers.put("transaction", transaction);
231            StompFrame frame = new StompFrame("ACK", headers);
232            sendFrame(frame.format());
233        }
234    
235        protected String appendHeaders(HashMap<String, Object> headers) {
236            StringBuilder result = new StringBuilder();
237            for (String key : headers.keySet()) {
238                result.append(key + ":" + headers.get(key) + "\n");
239            }
240            result.append("\n");
241            return result.toString();
242        }
243    
244    }