001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.util.HashMap; 025import java.util.zip.DeflaterOutputStream; 026import java.util.zip.InflaterInputStream; 027 028import javax.jms.JMSException; 029import javax.jms.MessageNotWriteableException; 030import javax.jms.TextMessage; 031 032import org.apache.activemq.ActiveMQConnection; 033import org.apache.activemq.util.ByteArrayInputStream; 034import org.apache.activemq.util.ByteArrayOutputStream; 035import org.apache.activemq.util.ByteSequence; 036import org.apache.activemq.util.JMSExceptionSupport; 037import org.apache.activemq.util.MarshallingSupport; 038import org.apache.activemq.wireformat.WireFormat; 039 040/** 041 * @openwire:marshaller code="28" 042 * 043 */ 044public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage { 045 046 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE; 047 048 protected String text; 049 050 @Override 051 public Message copy() { 052 ActiveMQTextMessage copy = new ActiveMQTextMessage(); 053 copy(copy); 054 return copy; 055 } 056 057 private void copy(ActiveMQTextMessage copy) { 058 super.copy(copy); 059 copy.text = text; 060 } 061 062 @Override 063 public byte getDataStructureType() { 064 return DATA_STRUCTURE_TYPE; 065 } 066 067 @Override 068 public String getJMSXMimeType() { 069 return "jms/text-message"; 070 } 071 072 @Override 073 public void setText(String text) throws MessageNotWriteableException { 074 checkReadOnlyBody(); 075 this.text = text; 076 setContent(null); 077 } 078 079 @Override 080 public String getText() throws JMSException { 081 ByteSequence content = getContent(); 082 083 if (text == null && content != null) { 084 text = decodeContent(content); 085 setContent(null); 086 setCompressed(false); 087 } 088 return text; 089 } 090 091 private String decodeContent(ByteSequence bodyAsBytes) throws JMSException { 092 String text = null; 093 if (bodyAsBytes != null) { 094 InputStream is = null; 095 try { 096 is = new ByteArrayInputStream(bodyAsBytes); 097 if (isCompressed()) { 098 is = new InflaterInputStream(is); 099 } 100 DataInputStream dataIn = new DataInputStream(is); 101 text = MarshallingSupport.readUTF8(dataIn); 102 dataIn.close(); 103 } catch (IOException ioe) { 104 throw JMSExceptionSupport.create(ioe); 105 } finally { 106 if (is != null) { 107 try { 108 is.close(); 109 } catch (IOException e) { 110 // ignore 111 } 112 } 113 } 114 } 115 return text; 116 } 117 118 @Override 119 public void beforeMarshall(WireFormat wireFormat) throws IOException { 120 super.beforeMarshall(wireFormat); 121 storeContentAndClear(); 122 } 123 124 @Override 125 public void storeContentAndClear() { 126 storeContent(); 127 text=null; 128 } 129 130 @Override 131 public void storeContent() { 132 try { 133 ByteSequence content = getContent(); 134 String text = this.text; 135 if (content == null && text != null) { 136 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 137 OutputStream os = bytesOut; 138 ActiveMQConnection connection = getConnection(); 139 if (connection != null && connection.isUseCompression()) { 140 compressed = true; 141 os = new DeflaterOutputStream(os); 142 } 143 DataOutputStream dataOut = new DataOutputStream(os); 144 MarshallingSupport.writeUTF8(dataOut, text); 145 dataOut.close(); 146 setContent(bytesOut.toByteSequence()); 147 } 148 } catch (IOException e) { 149 throw new RuntimeException(e); 150 } 151 } 152 153 // see https://issues.apache.org/activemq/browse/AMQ-2103 154 // and https://issues.apache.org/activemq/browse/AMQ-2966 155 @Override 156 public void clearUnMarshalledState() throws JMSException { 157 super.clearUnMarshalledState(); 158 this.text = null; 159 } 160 161 @Override 162 public boolean isContentMarshalled() { 163 return content != null || text == null; 164 } 165 166 /** 167 * Clears out the message body. Clearing a message's body does not clear its 168 * header values or property entries. <p/> 169 * <P> 170 * If this message body was read-only, calling this method leaves the 171 * message body in the same state as an empty body in a newly created 172 * message. 173 * 174 * @throws JMSException if the JMS provider fails to clear the message body 175 * due to some internal error. 176 */ 177 @Override 178 public void clearBody() throws JMSException { 179 super.clearBody(); 180 this.text = null; 181 } 182 183 @Override 184 public int getSize() { 185 String text = this.text; 186 if (size == 0 && content == null && text != null) { 187 size = getMinimumMessageSize(); 188 if (marshalledProperties != null) { 189 size += marshalledProperties.getLength(); 190 } 191 size += text.length() * 2; 192 } 193 return super.getSize(); 194 } 195 196 @Override 197 public String toString() { 198 try { 199 String text = this.text; 200 if( text == null ) { 201 text = decodeContent(getContent()); 202 } 203 if (text != null) { 204 text = MarshallingSupport.truncate64(text); 205 HashMap<String, Object> overrideFields = new HashMap<String, Object>(); 206 overrideFields.put("text", text); 207 return super.toString(overrideFields); 208 } 209 } catch (JMSException e) { 210 } 211 return super.toString(); 212 } 213}