001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.command; 018 019 import java.io.DataInputStream; 020 import java.io.DataOutputStream; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.OutputStream; 024 import java.util.HashMap; 025 import java.util.zip.DeflaterOutputStream; 026 import java.util.zip.InflaterInputStream; 027 028 import javax.jms.JMSException; 029 import javax.jms.MessageNotWriteableException; 030 import javax.jms.TextMessage; 031 032 import org.apache.activemq.ActiveMQConnection; 033 import org.apache.activemq.util.ByteArrayInputStream; 034 import org.apache.activemq.util.ByteArrayOutputStream; 035 import org.apache.activemq.util.ByteSequence; 036 import org.apache.activemq.util.JMSExceptionSupport; 037 import org.apache.activemq.util.MarshallingSupport; 038 import org.apache.activemq.wireformat.WireFormat; 039 040 /** 041 * @openwire:marshaller code="28" 042 * 043 */ 044 public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage { 045 046 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE; 047 048 protected String text; 049 050 public Message copy() { 051 ActiveMQTextMessage copy = new ActiveMQTextMessage(); 052 copy(copy); 053 return copy; 054 } 055 056 private void copy(ActiveMQTextMessage copy) { 057 super.copy(copy); 058 copy.text = text; 059 } 060 061 public byte getDataStructureType() { 062 return DATA_STRUCTURE_TYPE; 063 } 064 065 public String getJMSXMimeType() { 066 return "jms/text-message"; 067 } 068 069 public void setText(String text) throws MessageNotWriteableException { 070 checkReadOnlyBody(); 071 this.text = text; 072 setContent(null); 073 } 074 075 public String getText() throws JMSException { 076 if (text == null && getContent() != null) { 077 InputStream is = null; 078 try { 079 ByteSequence bodyAsBytes = getContent(); 080 if (bodyAsBytes != null) { 081 is = new ByteArrayInputStream(bodyAsBytes); 082 if (isCompressed()) { 083 is = new InflaterInputStream(is); 084 } 085 DataInputStream dataIn = new DataInputStream(is); 086 text = MarshallingSupport.readUTF8(dataIn); 087 dataIn.close(); 088 setContent(null); 089 setCompressed(false); 090 } 091 } catch (IOException ioe) { 092 throw JMSExceptionSupport.create(ioe); 093 } finally { 094 if (is != null) { 095 try { 096 is.close(); 097 } catch (IOException e) { 098 // ignore 099 } 100 } 101 } 102 } 103 return text; 104 } 105 106 public void beforeMarshall(WireFormat wireFormat) throws IOException { 107 super.beforeMarshall(wireFormat); 108 109 ByteSequence content = getContent(); 110 if (content == null && text != null) { 111 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 112 OutputStream os = bytesOut; 113 ActiveMQConnection connection = getConnection(); 114 if (connection != null && connection.isUseCompression()) { 115 compressed = true; 116 os = new DeflaterOutputStream(os); 117 } 118 DataOutputStream dataOut = new DataOutputStream(os); 119 MarshallingSupport.writeUTF8(dataOut, this.text); 120 dataOut.close(); 121 setContent(bytesOut.toByteSequence()); 122 } 123 } 124 125 // see https://issues.apache.org/activemq/browse/AMQ-2103 126 // and https://issues.apache.org/activemq/browse/AMQ-2966 127 public void clearMarshalledState() throws JMSException { 128 super.clearMarshalledState(); 129 this.text = null; 130 } 131 132 /** 133 * Clears out the message body. Clearing a message's body does not clear its 134 * header values or property entries. <p/> 135 * <P> 136 * If this message body was read-only, calling this method leaves the 137 * message body in the same state as an empty body in a newly created 138 * message. 139 * 140 * @throws JMSException if the JMS provider fails to clear the message body 141 * due to some internal error. 142 */ 143 public void clearBody() throws JMSException { 144 super.clearBody(); 145 this.text = null; 146 } 147 148 public int getSize() { 149 if (size == 0 && content == null && text != null) { 150 size = getMinimumMessageSize(); 151 if (marshalledProperties != null) { 152 size += marshalledProperties.getLength(); 153 } 154 size += text.length() * 2; 155 } 156 return super.getSize(); 157 } 158 159 public String toString() { 160 try { 161 String text = getText(); 162 if (text != null) { 163 text = MarshallingSupport.truncate64(text); 164 HashMap<String, Object> overrideFields = new HashMap<String, Object>(); 165 overrideFields.put("text", text); 166 return super.toString(overrideFields); 167 } 168 } catch (JMSException e) { 169 } 170 return super.toString(); 171 } 172 }