001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.command; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.ObjectOutputStream; 025import java.io.OutputStream; 026import java.io.Serializable; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.zip.DeflaterOutputStream; 032import java.util.zip.InflaterInputStream; 033 034import javax.jms.JMSException; 035import javax.jms.ObjectMessage; 036 037import org.apache.activemq.ActiveMQConnection; 038import org.apache.activemq.util.ByteArrayInputStream; 039import org.apache.activemq.util.ByteArrayOutputStream; 040import org.apache.activemq.util.ByteSequence; 041import org.apache.activemq.util.ClassLoadingAwareObjectInputStream; 042import org.apache.activemq.util.JMSExceptionSupport; 043import org.apache.activemq.wireformat.WireFormat; 044 045/** 046 * An <CODE>ObjectMessage</CODE> object is used to send a message that 047 * contains a serializable object in the Java programming language ("Java 048 * object"). It inherits from the <CODE>Message</CODE> interface and adds a 049 * body containing a single reference to an object. Only 050 * <CODE>Serializable</CODE> Java objects can be used. <p/> 051 * <P> 052 * If a collection of Java objects must be sent, one of the 053 * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/> 054 * <P> 055 * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only 056 * mode. If a client attempts to write to the message at this point, a 057 * <CODE>MessageNotWriteableException</CODE> is thrown. If 058 * <CODE>clearBody</CODE> is called, the message can now be both read from and 059 * written to. 060 * 061 * @openwire:marshaller code="26" 062 * @see javax.jms.Session#createObjectMessage() 063 * @see javax.jms.Session#createObjectMessage(Serializable) 064 * @see javax.jms.BytesMessage 065 * @see javax.jms.MapMessage 066 * @see javax.jms.Message 067 * @see javax.jms.StreamMessage 068 * @see javax.jms.TextMessage 069 */ 070public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage, TransientInitializer { 071 072 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE; 073 074 private transient List<String> trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages); 075 private transient boolean trustAllPackages = false; 076 077 protected transient Serializable object; 078 079 @Override 080 public Message copy() { 081 ActiveMQObjectMessage copy = new ActiveMQObjectMessage(); 082 copy(copy); 083 copy.setTrustAllPackages(trustAllPackages); 084 copy.setTrustedPackages(trustedPackages); 085 return copy; 086 } 087 088 private void copy(ActiveMQObjectMessage copy) { 089 ActiveMQConnection connection = getConnection(); 090 if (connection == null || !connection.isObjectMessageSerializationDefered()) { 091 storeContent(); 092 copy.object = null; 093 } else { 094 copy.object = object; 095 } 096 super.copy(copy); 097 098 } 099 100 @Override 101 public void storeContentAndClear() { 102 storeContent(); 103 object = null; 104 } 105 106 @Override 107 public void storeContent() { 108 ByteSequence bodyAsBytes = getContent(); 109 if (bodyAsBytes == null && object != null) { 110 try { 111 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 112 OutputStream os = bytesOut; 113 ActiveMQConnection connection = getConnection(); 114 if (connection != null && connection.isUseCompression()) { 115 compressed = true; 116 os = new DeflaterOutputStream(os); 117 } 118 DataOutputStream dataOut = new DataOutputStream(os); 119 ObjectOutputStream objOut = new ObjectOutputStream(dataOut); 120 objOut.writeObject(object); 121 objOut.flush(); 122 objOut.reset(); 123 objOut.close(); 124 setContent(bytesOut.toByteSequence()); 125 } catch (IOException ioe) { 126 throw new RuntimeException(ioe.getMessage(), ioe); 127 } 128 } 129 } 130 131 @Override 132 public boolean isContentMarshalled() { 133 return content != null || object == null; 134 } 135 136 @Override 137 public byte getDataStructureType() { 138 return DATA_STRUCTURE_TYPE; 139 } 140 141 @Override 142 public String getJMSXMimeType() { 143 return "jms/object-message"; 144 } 145 146 /** 147 * Clears out the message body. Clearing a message's body does not clear its 148 * header values or property entries. <p/> 149 * <P> 150 * If this message body was read-only, calling this method leaves the 151 * message body in the same state as an empty body in a newly created 152 * message. 153 * 154 * @throws JMSException if the JMS provider fails to clear the message body 155 * due to some internal error. 156 */ 157 158 @Override 159 public void clearBody() throws JMSException { 160 super.clearBody(); 161 this.object = null; 162 } 163 164 /** 165 * Sets the serializable object containing this message's data. It is 166 * important to note that an <CODE>ObjectMessage</CODE> contains a 167 * snapshot of the object at the time <CODE>setObject()</CODE> is called; 168 * subsequent modifications of the object will have no effect on the 169 * <CODE>ObjectMessage</CODE> body. 170 * 171 * @param newObject the message's data 172 * @throws JMSException if the JMS provider fails to set the object due to 173 * some internal error. 174 * @throws javax.jms.MessageFormatException if object serialization fails. 175 * @throws javax.jms.MessageNotWriteableException if the message is in 176 * read-only mode. 177 */ 178 179 @Override 180 public void setObject(Serializable newObject) throws JMSException { 181 checkReadOnlyBody(); 182 this.object = newObject; 183 setContent(null); 184 ActiveMQConnection connection = getConnection(); 185 if (connection == null || !connection.isObjectMessageSerializationDefered()) { 186 storeContent(); 187 } 188 } 189 190 /** 191 * Gets the serializable object containing this message's data. The default 192 * value is null. 193 * 194 * @return the serializable object containing this message's data 195 * @throws JMSException 196 */ 197 @Override 198 public Serializable getObject() throws JMSException { 199 if (object == null && getContent() != null) { 200 try { 201 ByteSequence content = getContent(); 202 InputStream is = new ByteArrayInputStream(content); 203 if (isCompressed()) { 204 is = new InflaterInputStream(is); 205 } 206 DataInputStream dataIn = new DataInputStream(is); 207 ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn); 208 objIn.setTrustedPackages(trustedPackages); 209 objIn.setTrustAllPackages(trustAllPackages); 210 try { 211 object = (Serializable)objIn.readObject(); 212 } catch (ClassNotFoundException ce) { 213 throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce); 214 } finally { 215 dataIn.close(); 216 } 217 } catch (IOException e) { 218 throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e); 219 } 220 } 221 return this.object; 222 } 223 224 @Override 225 public void beforeMarshall(WireFormat wireFormat) throws IOException { 226 super.beforeMarshall(wireFormat); 227 // may have initiated on vm transport with deferred marshalling 228 storeContent(); 229 } 230 231 @Override 232 public void clearUnMarshalledState() throws JMSException { 233 super.clearUnMarshalledState(); 234 this.object = null; 235 } 236 237 @Override 238 public void onMessageRolledBack() { 239 super.onMessageRolledBack(); 240 241 // lets force the object to be deserialized again - as we could have 242 // changed the object 243 object = null; 244 } 245 246 @Override 247 public void compress() throws IOException { 248 storeContent(); 249 super.compress(); 250 } 251 252 @Override 253 public String toString() { 254 try { 255 getObject(); 256 } catch (JMSException e) { 257 } 258 return super.toString(); 259 } 260 261 public List<String> getTrustedPackages() { 262 return trustedPackages; 263 } 264 265 public void setTrustedPackages(List<String> trustedPackages) { 266 this.trustedPackages = trustedPackages; 267 } 268 269 public boolean isTrustAllPackages() { 270 return trustAllPackages; 271 } 272 273 public void setTrustAllPackages(boolean trustAllPackages) { 274 this.trustAllPackages = trustAllPackages; 275 } 276 277 @Override 278 public void initTransients() { 279 trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages); 280 trustAllPackages = false; 281 } 282}