001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.command;
019
020import java.io.BufferedInputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.EOFException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.util.zip.DeflaterOutputStream;
028import java.util.zip.InflaterInputStream;
029
030import javax.jms.JMSException;
031import javax.jms.MessageEOFException;
032import javax.jms.MessageFormatException;
033import javax.jms.MessageNotReadableException;
034import javax.jms.MessageNotWriteableException;
035import javax.jms.StreamMessage;
036
037import org.apache.activemq.ActiveMQConnection;
038import org.apache.activemq.util.ByteArrayInputStream;
039import org.apache.activemq.util.ByteArrayOutputStream;
040import org.apache.activemq.util.ByteSequence;
041import org.apache.activemq.util.JMSExceptionSupport;
042import org.apache.activemq.util.MarshallingSupport;
043
044/**
045 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
046 * types in the Java programming language. It is filled and read sequentially.
047 * It inherits from the <CODE>Message</CODE> interface and adds a stream
048 * message body. Its methods are based largely on those found in
049 * <CODE>java.io.DataInputStream</CODE> and
050 * <CODE>java.io.DataOutputStream</CODE>. <p/>
051 * <P>
052 * The primitive types can be read or written explicitly using methods for each
053 * type. They may also be read or written generically as objects. For instance,
054 * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
055 * <CODE>StreamMessage.writeObject(new
056 * Integer(6))</CODE>. Both forms are
057 * provided, because the explicit form is convenient for static programming, and
058 * the object form is needed when types are not known at compile time. <p/>
059 * <P>
060 * When the message is first created, and when <CODE>clearBody</CODE> is
061 * called, the body of the message is in write-only mode. After the first call
062 * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
063 * After a message has been sent, the client that sent it can retain and modify
064 * it without affecting the message that has been sent. The same message object
065 * can be sent multiple times. When a message has been received, the provider
066 * has called <CODE>reset</CODE> so that the message body is in read-only mode
067 * for the client. <p/>
068 * <P>
069 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
070 * message body is cleared and the message body is in write-only mode. <p/>
071 * <P>
072 * If a client attempts to read a message in write-only mode, a
073 * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
074 * <P>
075 * If a client attempts to write a message in read-only mode, a
076 * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
077 * <P>
078 * <CODE>StreamMessage</CODE> objects support the following conversion table.
079 * The marked cases must be supported. The unmarked cases must throw a
080 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
081 * conversions may throw a runtime exception if the primitive's
082 * <CODE>valueOf()</CODE> method does not accept it as a valid
083 * <CODE>String</CODE> representation of the primitive. <p/>
084 * <P>
085 * A value written as the row type can be read as the column type. <p/>
086 *
087 * <PRE>
088 *  | | boolean byte short char int long float double String byte[]
089 * |----------------------------------------------------------------------
090 * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
091 * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
092 * X |----------------------------------------------------------------------
093 *
094 * </PRE>
095 *
096 * <p/>
097 * <P>
098 * Attempting to read a null value as a primitive type must be treated as
099 * calling the primitive's corresponding <code>valueOf(String)</code>
100 * conversion method with a null value. Since <code>char</code> does not
101 * support a <code>String</code> conversion, attempting to read a null value
102 * as a <code>char</code> must throw a <code>NullPointerException</code>.
103 *
104 * @openwire:marshaller code="27"
105 * @see javax.jms.Session#createStreamMessage()
106 * @see javax.jms.BytesMessage
107 * @see javax.jms.MapMessage
108 * @see javax.jms.Message
109 * @see javax.jms.ObjectMessage
110 * @see javax.jms.TextMessage
111 */
112public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
113
114    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
115
116    protected transient DataOutputStream dataOut;
117    protected transient ByteArrayOutputStream bytesOut;
118    protected transient DataInputStream dataIn;
119    protected transient int remainingBytes = -1;
120
121    @Override
122    public Message copy() {
123        ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
124        copy(copy);
125        return copy;
126    }
127
128    private void copy(ActiveMQStreamMessage copy) {
129        storeContent();
130        super.copy(copy);
131        copy.dataOut = null;
132        copy.bytesOut = null;
133        copy.dataIn = null;
134    }
135
136    @Override
137    public void onSend() throws JMSException {
138        super.onSend();
139        storeContent();
140    }
141
142    @Override
143    public void storeContent() {
144        if (dataOut != null) {
145            try {
146                dataOut.close();
147                setContent(bytesOut.toByteSequence());
148                bytesOut = null;
149                dataOut = null;
150            } catch (IOException ioe) {
151                throw new RuntimeException(ioe);
152            }
153        }
154    }
155
156    @Override
157    public boolean isContentMarshalled() {
158        return content != null || dataOut == null;
159    }
160
161    @Override
162    public byte getDataStructureType() {
163        return DATA_STRUCTURE_TYPE;
164    }
165
166    @Override
167    public String getJMSXMimeType() {
168        return "jms/stream-message";
169    }
170
171    /**
172     * Clears out the message body. Clearing a message's body does not clear its
173     * header values or property entries. <p/>
174     * <P>
175     * If this message body was read-only, calling this method leaves the
176     * message body in the same state as an empty body in a newly created
177     * message.
178     *
179     * @throws JMSException if the JMS provider fails to clear the message body
180     *                 due to some internal error.
181     */
182
183    @Override
184    public void clearBody() throws JMSException {
185        super.clearBody();
186        this.dataOut = null;
187        this.dataIn = null;
188        this.bytesOut = null;
189        this.remainingBytes = -1;
190    }
191
192    /**
193     * Reads a <code>boolean</code> from the stream message.
194     *
195     * @return the <code>boolean</code> value read
196     * @throws JMSException if the JMS provider fails to read the message due to
197     *                 some internal error.
198     * @throws MessageEOFException if unexpected end of message stream has been
199     *                 reached.
200     * @throws MessageFormatException if this type conversion is invalid.
201     * @throws MessageNotReadableException if the message is in write-only mode.
202     */
203
204    @Override
205    public boolean readBoolean() throws JMSException {
206        initializeReading();
207        try {
208
209            this.dataIn.mark(10);
210            int type = this.dataIn.read();
211            if (type == -1) {
212                throw new MessageEOFException("reached end of data");
213            }
214            if (type == MarshallingSupport.BOOLEAN_TYPE) {
215                return this.dataIn.readBoolean();
216            }
217            if (type == MarshallingSupport.STRING_TYPE) {
218                return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
219            }
220            if (type == MarshallingSupport.NULL) {
221                this.dataIn.reset();
222                throw new NullPointerException("Cannot convert NULL value to boolean.");
223            } else {
224                this.dataIn.reset();
225                throw new MessageFormatException(" not a boolean type");
226            }
227        } catch (EOFException e) {
228            throw JMSExceptionSupport.createMessageEOFException(e);
229        } catch (IOException e) {
230            throw JMSExceptionSupport.createMessageFormatException(e);
231        }
232    }
233
234    /**
235     * Reads a <code>byte</code> value from the stream message.
236     *
237     * @return the next byte from the stream message as a 8-bit
238     *         <code>byte</code>
239     * @throws JMSException if the JMS provider fails to read the message due to
240     *                 some internal error.
241     * @throws MessageEOFException if unexpected end of message stream has been
242     *                 reached.
243     * @throws MessageFormatException if this type conversion is invalid.
244     * @throws MessageNotReadableException if the message is in write-only mode.
245     */
246
247    @Override
248    public byte readByte() throws JMSException {
249        initializeReading();
250        try {
251
252            this.dataIn.mark(10);
253            int type = this.dataIn.read();
254            if (type == -1) {
255                throw new MessageEOFException("reached end of data");
256            }
257            if (type == MarshallingSupport.BYTE_TYPE) {
258                return this.dataIn.readByte();
259            }
260            if (type == MarshallingSupport.STRING_TYPE) {
261                return Byte.valueOf(this.dataIn.readUTF()).byteValue();
262            }
263            if (type == MarshallingSupport.NULL) {
264                this.dataIn.reset();
265                throw new NullPointerException("Cannot convert NULL value to byte.");
266            } else {
267                this.dataIn.reset();
268                throw new MessageFormatException(" not a byte type");
269            }
270        } catch (NumberFormatException mfe) {
271            try {
272                this.dataIn.reset();
273            } catch (IOException ioe) {
274                throw JMSExceptionSupport.create(ioe);
275            }
276            throw mfe;
277
278        } catch (EOFException e) {
279            throw JMSExceptionSupport.createMessageEOFException(e);
280        } catch (IOException e) {
281            throw JMSExceptionSupport.createMessageFormatException(e);
282        }
283    }
284
285    /**
286     * Reads a 16-bit integer from the stream message.
287     *
288     * @return a 16-bit integer from the stream message
289     * @throws JMSException if the JMS provider fails to read the message due to
290     *                 some internal error.
291     * @throws MessageEOFException if unexpected end of message stream has been
292     *                 reached.
293     * @throws MessageFormatException if this type conversion is invalid.
294     * @throws MessageNotReadableException if the message is in write-only mode.
295     */
296
297    @Override
298    public short readShort() throws JMSException {
299        initializeReading();
300        try {
301
302            this.dataIn.mark(17);
303            int type = this.dataIn.read();
304            if (type == -1) {
305                throw new MessageEOFException("reached end of data");
306            }
307            if (type == MarshallingSupport.SHORT_TYPE) {
308                return this.dataIn.readShort();
309            }
310            if (type == MarshallingSupport.BYTE_TYPE) {
311                return this.dataIn.readByte();
312            }
313            if (type == MarshallingSupport.STRING_TYPE) {
314                return Short.valueOf(this.dataIn.readUTF()).shortValue();
315            }
316            if (type == MarshallingSupport.NULL) {
317                this.dataIn.reset();
318                throw new NullPointerException("Cannot convert NULL value to short.");
319            } else {
320                this.dataIn.reset();
321                throw new MessageFormatException(" not a short type");
322            }
323        } catch (NumberFormatException mfe) {
324            try {
325                this.dataIn.reset();
326            } catch (IOException ioe) {
327                throw JMSExceptionSupport.create(ioe);
328            }
329            throw mfe;
330
331        } catch (EOFException e) {
332            throw JMSExceptionSupport.createMessageEOFException(e);
333        } catch (IOException e) {
334            throw JMSExceptionSupport.createMessageFormatException(e);
335        }
336
337    }
338
339    /**
340     * Reads a Unicode character value from the stream message.
341     *
342     * @return a Unicode character from the stream message
343     * @throws JMSException if the JMS provider fails to read the message due to
344     *                 some internal error.
345     * @throws MessageEOFException if unexpected end of message stream has been
346     *                 reached.
347     * @throws MessageFormatException if this type conversion is invalid
348     * @throws MessageNotReadableException if the message is in write-only mode.
349     */
350
351    @Override
352    public char readChar() throws JMSException {
353        initializeReading();
354        try {
355
356            this.dataIn.mark(17);
357            int type = this.dataIn.read();
358            if (type == -1) {
359                throw new MessageEOFException("reached end of data");
360            }
361            if (type == MarshallingSupport.CHAR_TYPE) {
362                return this.dataIn.readChar();
363            }
364            if (type == MarshallingSupport.NULL) {
365                this.dataIn.reset();
366                throw new NullPointerException("Cannot convert NULL value to char.");
367            } else {
368                this.dataIn.reset();
369                throw new MessageFormatException(" not a char type");
370            }
371        } catch (NumberFormatException mfe) {
372            try {
373                this.dataIn.reset();
374            } catch (IOException ioe) {
375                throw JMSExceptionSupport.create(ioe);
376            }
377            throw mfe;
378
379        } catch (EOFException e) {
380            throw JMSExceptionSupport.createMessageEOFException(e);
381        } catch (IOException e) {
382            throw JMSExceptionSupport.createMessageFormatException(e);
383        }
384    }
385
386    /**
387     * Reads a 32-bit integer from the stream message.
388     *
389     * @return a 32-bit integer value from the stream message, interpreted as an
390     *         <code>int</code>
391     * @throws JMSException if the JMS provider fails to read the message due to
392     *                 some internal error.
393     * @throws MessageEOFException if unexpected end of message stream has been
394     *                 reached.
395     * @throws MessageFormatException if this type conversion is invalid.
396     * @throws MessageNotReadableException if the message is in write-only mode.
397     */
398
399    @Override
400    public int readInt() throws JMSException {
401        initializeReading();
402        try {
403
404            this.dataIn.mark(33);
405            int type = this.dataIn.read();
406            if (type == -1) {
407                throw new MessageEOFException("reached end of data");
408            }
409            if (type == MarshallingSupport.INTEGER_TYPE) {
410                return this.dataIn.readInt();
411            }
412            if (type == MarshallingSupport.SHORT_TYPE) {
413                return this.dataIn.readShort();
414            }
415            if (type == MarshallingSupport.BYTE_TYPE) {
416                return this.dataIn.readByte();
417            }
418            if (type == MarshallingSupport.STRING_TYPE) {
419                return Integer.valueOf(this.dataIn.readUTF()).intValue();
420            }
421            if (type == MarshallingSupport.NULL) {
422                this.dataIn.reset();
423                throw new NullPointerException("Cannot convert NULL value to int.");
424            } else {
425                this.dataIn.reset();
426                throw new MessageFormatException(" not an int type");
427            }
428        } catch (NumberFormatException mfe) {
429            try {
430                this.dataIn.reset();
431            } catch (IOException ioe) {
432                throw JMSExceptionSupport.create(ioe);
433            }
434            throw mfe;
435
436        } catch (EOFException e) {
437            throw JMSExceptionSupport.createMessageEOFException(e);
438        } catch (IOException e) {
439            throw JMSExceptionSupport.createMessageFormatException(e);
440        }
441    }
442
443    /**
444     * Reads a 64-bit integer from the stream message.
445     *
446     * @return a 64-bit integer value from the stream message, interpreted as a
447     *         <code>long</code>
448     * @throws JMSException if the JMS provider fails to read the message due to
449     *                 some internal error.
450     * @throws MessageEOFException if unexpected end of message stream has been
451     *                 reached.
452     * @throws MessageFormatException if this type conversion is invalid.
453     * @throws MessageNotReadableException if the message is in write-only mode.
454     */
455
456    @Override
457    public long readLong() throws JMSException {
458        initializeReading();
459        try {
460
461            this.dataIn.mark(65);
462            int type = this.dataIn.read();
463            if (type == -1) {
464                throw new MessageEOFException("reached end of data");
465            }
466            if (type == MarshallingSupport.LONG_TYPE) {
467                return this.dataIn.readLong();
468            }
469            if (type == MarshallingSupport.INTEGER_TYPE) {
470                return this.dataIn.readInt();
471            }
472            if (type == MarshallingSupport.SHORT_TYPE) {
473                return this.dataIn.readShort();
474            }
475            if (type == MarshallingSupport.BYTE_TYPE) {
476                return this.dataIn.readByte();
477            }
478            if (type == MarshallingSupport.STRING_TYPE) {
479                return Long.valueOf(this.dataIn.readUTF()).longValue();
480            }
481            if (type == MarshallingSupport.NULL) {
482                this.dataIn.reset();
483                throw new NullPointerException("Cannot convert NULL value to long.");
484            } else {
485                this.dataIn.reset();
486                throw new MessageFormatException(" not a long type");
487            }
488        } catch (NumberFormatException mfe) {
489            try {
490                this.dataIn.reset();
491            } catch (IOException ioe) {
492                throw JMSExceptionSupport.create(ioe);
493            }
494            throw mfe;
495
496        } catch (EOFException e) {
497            throw JMSExceptionSupport.createMessageEOFException(e);
498        } catch (IOException e) {
499            throw JMSExceptionSupport.createMessageFormatException(e);
500        }
501    }
502
503    /**
504     * Reads a <code>float</code> from the stream message.
505     *
506     * @return a <code>float</code> value from the stream message
507     * @throws JMSException if the JMS provider fails to read the message due to
508     *                 some internal error.
509     * @throws MessageEOFException if unexpected end of message stream has been
510     *                 reached.
511     * @throws MessageFormatException if this type conversion is invalid.
512     * @throws MessageNotReadableException if the message is in write-only mode.
513     */
514
515    @Override
516    public float readFloat() throws JMSException {
517        initializeReading();
518        try {
519            this.dataIn.mark(33);
520            int type = this.dataIn.read();
521            if (type == -1) {
522                throw new MessageEOFException("reached end of data");
523            }
524            if (type == MarshallingSupport.FLOAT_TYPE) {
525                return this.dataIn.readFloat();
526            }
527            if (type == MarshallingSupport.STRING_TYPE) {
528                return Float.valueOf(this.dataIn.readUTF()).floatValue();
529            }
530            if (type == MarshallingSupport.NULL) {
531                this.dataIn.reset();
532                throw new NullPointerException("Cannot convert NULL value to float.");
533            } else {
534                this.dataIn.reset();
535                throw new MessageFormatException(" not a float type");
536            }
537        } catch (NumberFormatException mfe) {
538            try {
539                this.dataIn.reset();
540            } catch (IOException ioe) {
541                throw JMSExceptionSupport.create(ioe);
542            }
543            throw mfe;
544
545        } catch (EOFException e) {
546            throw JMSExceptionSupport.createMessageEOFException(e);
547        } catch (IOException e) {
548            throw JMSExceptionSupport.createMessageFormatException(e);
549        }
550    }
551
552    /**
553     * Reads a <code>double</code> from the stream message.
554     *
555     * @return a <code>double</code> value from the stream message
556     * @throws JMSException if the JMS provider fails to read the message due to
557     *                 some internal error.
558     * @throws MessageEOFException if unexpected end of message stream has been
559     *                 reached.
560     * @throws MessageFormatException if this type conversion is invalid.
561     * @throws MessageNotReadableException if the message is in write-only mode.
562     */
563
564    @Override
565    public double readDouble() throws JMSException {
566        initializeReading();
567        try {
568
569            this.dataIn.mark(65);
570            int type = this.dataIn.read();
571            if (type == -1) {
572                throw new MessageEOFException("reached end of data");
573            }
574            if (type == MarshallingSupport.DOUBLE_TYPE) {
575                return this.dataIn.readDouble();
576            }
577            if (type == MarshallingSupport.FLOAT_TYPE) {
578                return this.dataIn.readFloat();
579            }
580            if (type == MarshallingSupport.STRING_TYPE) {
581                return Double.valueOf(this.dataIn.readUTF()).doubleValue();
582            }
583            if (type == MarshallingSupport.NULL) {
584                this.dataIn.reset();
585                throw new NullPointerException("Cannot convert NULL value to double.");
586            } else {
587                this.dataIn.reset();
588                throw new MessageFormatException(" not a double type");
589            }
590        } catch (NumberFormatException mfe) {
591            try {
592                this.dataIn.reset();
593            } catch (IOException ioe) {
594                throw JMSExceptionSupport.create(ioe);
595            }
596            throw mfe;
597
598        } catch (EOFException e) {
599            throw JMSExceptionSupport.createMessageEOFException(e);
600        } catch (IOException e) {
601            throw JMSExceptionSupport.createMessageFormatException(e);
602        }
603    }
604
605    /**
606     * Reads a <CODE>String</CODE> from the stream message.
607     *
608     * @return a Unicode string from the stream message
609     * @throws JMSException if the JMS provider fails to read the message due to
610     *                 some internal error.
611     * @throws MessageEOFException if unexpected end of message stream has been
612     *                 reached.
613     * @throws MessageFormatException if this type conversion is invalid.
614     * @throws MessageNotReadableException if the message is in write-only mode.
615     */
616
617    @Override
618    public String readString() throws JMSException {
619        initializeReading();
620        try {
621
622            this.dataIn.mark(65);
623            int type = this.dataIn.read();
624            if (type == -1) {
625                throw new MessageEOFException("reached end of data");
626            }
627            if (type == MarshallingSupport.NULL) {
628                return null;
629            }
630            if (type == MarshallingSupport.BIG_STRING_TYPE) {
631                return MarshallingSupport.readUTF8(dataIn);
632            }
633            if (type == MarshallingSupport.STRING_TYPE) {
634                return this.dataIn.readUTF();
635            }
636            if (type == MarshallingSupport.LONG_TYPE) {
637                return new Long(this.dataIn.readLong()).toString();
638            }
639            if (type == MarshallingSupport.INTEGER_TYPE) {
640                return new Integer(this.dataIn.readInt()).toString();
641            }
642            if (type == MarshallingSupport.SHORT_TYPE) {
643                return new Short(this.dataIn.readShort()).toString();
644            }
645            if (type == MarshallingSupport.BYTE_TYPE) {
646                return new Byte(this.dataIn.readByte()).toString();
647            }
648            if (type == MarshallingSupport.FLOAT_TYPE) {
649                return new Float(this.dataIn.readFloat()).toString();
650            }
651            if (type == MarshallingSupport.DOUBLE_TYPE) {
652                return new Double(this.dataIn.readDouble()).toString();
653            }
654            if (type == MarshallingSupport.BOOLEAN_TYPE) {
655                return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
656            }
657            if (type == MarshallingSupport.CHAR_TYPE) {
658                return new Character(this.dataIn.readChar()).toString();
659            } else {
660                this.dataIn.reset();
661                throw new MessageFormatException(" not a String type");
662            }
663        } catch (NumberFormatException mfe) {
664            try {
665                this.dataIn.reset();
666            } catch (IOException ioe) {
667                throw JMSExceptionSupport.create(ioe);
668            }
669            throw mfe;
670
671        } catch (EOFException e) {
672            throw JMSExceptionSupport.createMessageEOFException(e);
673        } catch (IOException e) {
674            throw JMSExceptionSupport.createMessageFormatException(e);
675        }
676    }
677
678    /**
679     * Reads a byte array field from the stream message into the specified
680     * <CODE>byte[]</CODE> object (the read buffer). <p/>
681     * <P>
682     * To read the field value, <CODE>readBytes</CODE> should be successively
683     * called until it returns a value less than the length of the read buffer.
684     * The value of the bytes in the buffer following the last byte read is
685     * undefined. <p/>
686     * <P>
687     * If <CODE>readBytes</CODE> returns a value equal to the length of the
688     * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
689     * are no more bytes to be read, this call returns -1. <p/>
690     * <P>
691     * If the byte array field value is null, <CODE>readBytes</CODE> returns
692     * -1. <p/>
693     * <P>
694     * If the byte array field value is empty, <CODE>readBytes</CODE> returns
695     * 0. <p/>
696     * <P>
697     * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
698     * field value has been made, the full value of the field must be read
699     * before it is valid to read the next field. An attempt to read the next
700     * field before that has been done will throw a
701     * <CODE>MessageFormatException</CODE>. <p/>
702     * <P>
703     * To read the byte field value into a new <CODE>byte[]</CODE> object, use
704     * the <CODE>readObject</CODE> method.
705     *
706     * @param value the buffer into which the data is read
707     * @return the total number of bytes read into the buffer, or -1 if there is
708     *         no more data because the end of the byte field has been reached
709     * @throws JMSException if the JMS provider fails to read the message due to
710     *                 some internal error.
711     * @throws MessageEOFException if unexpected end of message stream has been
712     *                 reached.
713     * @throws MessageFormatException if this type conversion is invalid.
714     * @throws MessageNotReadableException if the message is in write-only mode.
715     * @see #readObject()
716     */
717
718    @Override
719    public int readBytes(byte[] value) throws JMSException {
720
721        initializeReading();
722        try {
723            if (value == null) {
724                throw new NullPointerException();
725            }
726
727            if (remainingBytes == -1) {
728                this.dataIn.mark(value.length + 1);
729                int type = this.dataIn.read();
730                if (type == -1) {
731                    throw new MessageEOFException("reached end of data");
732                }
733                if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
734                    throw new MessageFormatException("Not a byte array");
735                }
736                remainingBytes = this.dataIn.readInt();
737            } else if (remainingBytes == 0) {
738                remainingBytes = -1;
739                return -1;
740            }
741
742            if (value.length <= remainingBytes) {
743                // small buffer
744                remainingBytes -= value.length;
745                this.dataIn.readFully(value);
746                return value.length;
747            } else {
748                // big buffer
749                int rc = this.dataIn.read(value, 0, remainingBytes);
750                remainingBytes = 0;
751                return rc != -1 ? rc : 0;
752            }
753
754        } catch (EOFException e) {
755            JMSException jmsEx = new MessageEOFException(e.getMessage());
756            jmsEx.setLinkedException(e);
757            throw jmsEx;
758        } catch (IOException e) {
759            JMSException jmsEx = new MessageFormatException(e.getMessage());
760            jmsEx.setLinkedException(e);
761            throw jmsEx;
762        }
763    }
764
765    /**
766     * Reads an object from the stream message. <p/>
767     * <P>
768     * This method can be used to return, in objectified format, an object in
769     * the Java programming language ("Java object") that has been written to
770     * the stream with the equivalent <CODE>writeObject</CODE> method call, or
771     * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
772     * <P>
773     * Note that byte values are returned as <CODE>byte[]</CODE>, not
774     * <CODE>Byte[]</CODE>. <p/>
775     * <P>
776     * An attempt to call <CODE>readObject</CODE> to read a byte field value
777     * into a new <CODE>byte[]</CODE> object before the full value of the byte
778     * field has been read will throw a <CODE>MessageFormatException</CODE>.
779     *
780     * @return a Java object from the stream message, in objectified format (for
781     *         example, if the object was written as an <CODE>int</CODE>, an
782     *         <CODE>Integer</CODE> is returned)
783     * @throws JMSException if the JMS provider fails to read the message due to
784     *                 some internal error.
785     * @throws MessageEOFException if unexpected end of message stream has been
786     *                 reached.
787     * @throws MessageFormatException if this type conversion is invalid.
788     * @throws MessageNotReadableException if the message is in write-only mode.
789     * @see #readBytes(byte[] value)
790     */
791
792    @Override
793    public Object readObject() throws JMSException {
794        initializeReading();
795        try {
796            this.dataIn.mark(65);
797            int type = this.dataIn.read();
798            if (type == -1) {
799                throw new MessageEOFException("reached end of data");
800            }
801            if (type == MarshallingSupport.NULL) {
802                return null;
803            }
804            if (type == MarshallingSupport.BIG_STRING_TYPE) {
805                return MarshallingSupport.readUTF8(dataIn);
806            }
807            if (type == MarshallingSupport.STRING_TYPE) {
808                return this.dataIn.readUTF();
809            }
810            if (type == MarshallingSupport.LONG_TYPE) {
811                return Long.valueOf(this.dataIn.readLong());
812            }
813            if (type == MarshallingSupport.INTEGER_TYPE) {
814                return Integer.valueOf(this.dataIn.readInt());
815            }
816            if (type == MarshallingSupport.SHORT_TYPE) {
817                return Short.valueOf(this.dataIn.readShort());
818            }
819            if (type == MarshallingSupport.BYTE_TYPE) {
820                return Byte.valueOf(this.dataIn.readByte());
821            }
822            if (type == MarshallingSupport.FLOAT_TYPE) {
823                return new Float(this.dataIn.readFloat());
824            }
825            if (type == MarshallingSupport.DOUBLE_TYPE) {
826                return new Double(this.dataIn.readDouble());
827            }
828            if (type == MarshallingSupport.BOOLEAN_TYPE) {
829                return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
830            }
831            if (type == MarshallingSupport.CHAR_TYPE) {
832                return Character.valueOf(this.dataIn.readChar());
833            }
834            if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
835                int len = this.dataIn.readInt();
836                byte[] value = new byte[len];
837                this.dataIn.readFully(value);
838                return value;
839            } else {
840                this.dataIn.reset();
841                throw new MessageFormatException("unknown type");
842            }
843        } catch (NumberFormatException mfe) {
844            try {
845                this.dataIn.reset();
846            } catch (IOException ioe) {
847                throw JMSExceptionSupport.create(ioe);
848            }
849            throw mfe;
850
851        } catch (EOFException e) {
852            JMSException jmsEx = new MessageEOFException(e.getMessage());
853            jmsEx.setLinkedException(e);
854            throw jmsEx;
855        } catch (IOException e) {
856            JMSException jmsEx = new MessageFormatException(e.getMessage());
857            jmsEx.setLinkedException(e);
858            throw jmsEx;
859        }
860    }
861
862    /**
863     * Writes a <code>boolean</code> to the stream message. The value
864     * <code>true</code> is written as the value <code>(byte)1</code>; the
865     * value <code>false</code> is written as the value <code>(byte)0</code>.
866     *
867     * @param value the <code>boolean</code> value to be written
868     * @throws JMSException if the JMS provider fails to write the message due
869     *                 to some internal error.
870     * @throws MessageNotWriteableException if the message is in read-only mode.
871     */
872
873    @Override
874    public void writeBoolean(boolean value) throws JMSException {
875        initializeWriting();
876        try {
877            MarshallingSupport.marshalBoolean(dataOut, value);
878        } catch (IOException ioe) {
879            throw JMSExceptionSupport.create(ioe);
880        }
881    }
882
883    /**
884     * Writes a <code>byte</code> to the stream message.
885     *
886     * @param value the <code>byte</code> value to be written
887     * @throws JMSException if the JMS provider fails to write the message due
888     *                 to some internal error.
889     * @throws MessageNotWriteableException if the message is in read-only mode.
890     */
891
892    @Override
893    public void writeByte(byte value) throws JMSException {
894        initializeWriting();
895        try {
896            MarshallingSupport.marshalByte(dataOut, value);
897        } catch (IOException ioe) {
898            throw JMSExceptionSupport.create(ioe);
899        }
900    }
901
902    /**
903     * Writes a <code>short</code> to the stream message.
904     *
905     * @param value the <code>short</code> value to be written
906     * @throws JMSException if the JMS provider fails to write the message due
907     *                 to some internal error.
908     * @throws MessageNotWriteableException if the message is in read-only mode.
909     */
910
911    @Override
912    public void writeShort(short value) throws JMSException {
913        initializeWriting();
914        try {
915            MarshallingSupport.marshalShort(dataOut, value);
916        } catch (IOException ioe) {
917            throw JMSExceptionSupport.create(ioe);
918        }
919    }
920
921    /**
922     * Writes a <code>char</code> to the stream message.
923     *
924     * @param value the <code>char</code> value to be written
925     * @throws JMSException if the JMS provider fails to write the message due
926     *                 to some internal error.
927     * @throws MessageNotWriteableException if the message is in read-only mode.
928     */
929
930    @Override
931    public void writeChar(char value) throws JMSException {
932        initializeWriting();
933        try {
934            MarshallingSupport.marshalChar(dataOut, value);
935        } catch (IOException ioe) {
936            throw JMSExceptionSupport.create(ioe);
937        }
938    }
939
940    /**
941     * Writes an <code>int</code> to the stream message.
942     *
943     * @param value the <code>int</code> value to be written
944     * @throws JMSException if the JMS provider fails to write the message due
945     *                 to some internal error.
946     * @throws MessageNotWriteableException if the message is in read-only mode.
947     */
948
949    @Override
950    public void writeInt(int value) throws JMSException {
951        initializeWriting();
952        try {
953            MarshallingSupport.marshalInt(dataOut, value);
954        } catch (IOException ioe) {
955            throw JMSExceptionSupport.create(ioe);
956        }
957    }
958
959    /**
960     * Writes a <code>long</code> to the stream message.
961     *
962     * @param value the <code>long</code> value to be written
963     * @throws JMSException if the JMS provider fails to write the message due
964     *                 to some internal error.
965     * @throws MessageNotWriteableException if the message is in read-only mode.
966     */
967
968    @Override
969    public void writeLong(long value) throws JMSException {
970        initializeWriting();
971        try {
972            MarshallingSupport.marshalLong(dataOut, value);
973        } catch (IOException ioe) {
974            throw JMSExceptionSupport.create(ioe);
975        }
976    }
977
978    /**
979     * Writes a <code>float</code> to the stream message.
980     *
981     * @param value the <code>float</code> value to be written
982     * @throws JMSException if the JMS provider fails to write the message due
983     *                 to some internal error.
984     * @throws MessageNotWriteableException if the message is in read-only mode.
985     */
986
987    @Override
988    public void writeFloat(float value) throws JMSException {
989        initializeWriting();
990        try {
991            MarshallingSupport.marshalFloat(dataOut, value);
992        } catch (IOException ioe) {
993            throw JMSExceptionSupport.create(ioe);
994        }
995    }
996
997    /**
998     * Writes a <code>double</code> to the stream message.
999     *
1000     * @param value the <code>double</code> value to be written
1001     * @throws JMSException if the JMS provider fails to write the message due
1002     *                 to some internal error.
1003     * @throws MessageNotWriteableException if the message is in read-only mode.
1004     */
1005
1006    @Override
1007    public void writeDouble(double value) throws JMSException {
1008        initializeWriting();
1009        try {
1010            MarshallingSupport.marshalDouble(dataOut, value);
1011        } catch (IOException ioe) {
1012            throw JMSExceptionSupport.create(ioe);
1013        }
1014    }
1015
1016    /**
1017     * Writes a <code>String</code> to the stream message.
1018     *
1019     * @param value the <code>String</code> value to be written
1020     * @throws JMSException if the JMS provider fails to write the message due
1021     *                 to some internal error.
1022     * @throws MessageNotWriteableException if the message is in read-only mode.
1023     */
1024
1025    @Override
1026    public void writeString(String value) throws JMSException {
1027        initializeWriting();
1028        try {
1029            if (value == null) {
1030                MarshallingSupport.marshalNull(dataOut);
1031            } else {
1032                MarshallingSupport.marshalString(dataOut, value);
1033            }
1034        } catch (IOException ioe) {
1035            throw JMSExceptionSupport.create(ioe);
1036        }
1037    }
1038
1039    /**
1040     * Writes a byte array field to the stream message. <p/>
1041     * <P>
1042     * The byte array <code>value</code> is written to the message as a byte
1043     * array field. Consecutively written byte array fields are treated as two
1044     * distinct fields when the fields are read.
1045     *
1046     * @param value the byte array value to be written
1047     * @throws JMSException if the JMS provider fails to write the message due
1048     *                 to some internal error.
1049     * @throws MessageNotWriteableException if the message is in read-only mode.
1050     */
1051
1052    @Override
1053    public void writeBytes(byte[] value) throws JMSException {
1054        writeBytes(value, 0, value.length);
1055    }
1056
1057    /**
1058     * Writes a portion of a byte array as a byte array field to the stream
1059     * message. <p/>
1060     * <P>
1061     * The a portion of the byte array <code>value</code> is written to the
1062     * message as a byte array field. Consecutively written byte array fields
1063     * are treated as two distinct fields when the fields are read.
1064     *
1065     * @param value the byte array value to be written
1066     * @param offset the initial offset within the byte array
1067     * @param length the number of bytes to use
1068     * @throws JMSException if the JMS provider fails to write the message due
1069     *                 to some internal error.
1070     * @throws MessageNotWriteableException if the message is in read-only mode.
1071     */
1072
1073    @Override
1074    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
1075        initializeWriting();
1076        try {
1077            MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
1078        } catch (IOException ioe) {
1079            throw JMSExceptionSupport.create(ioe);
1080        }
1081    }
1082
1083    /**
1084     * Writes an object to the stream message. <p/>
1085     * <P>
1086     * This method works only for the objectified primitive object types (<code>Integer</code>,
1087     * <code>Double</code>, <code>Long</code>&nbsp;...),
1088     * <code>String</code> objects, and byte arrays.
1089     *
1090     * @param value the Java object to be written
1091     * @throws JMSException if the JMS provider fails to write the message due
1092     *                 to some internal error.
1093     * @throws MessageFormatException if the object is invalid.
1094     * @throws MessageNotWriteableException if the message is in read-only mode.
1095     */
1096
1097    @Override
1098    public void writeObject(Object value) throws JMSException {
1099        initializeWriting();
1100        if (value == null) {
1101            try {
1102                MarshallingSupport.marshalNull(dataOut);
1103            } catch (IOException ioe) {
1104                throw JMSExceptionSupport.create(ioe);
1105            }
1106        } else if (value instanceof String) {
1107            writeString(value.toString());
1108        } else if (value instanceof Character) {
1109            writeChar(((Character)value).charValue());
1110        } else if (value instanceof Boolean) {
1111            writeBoolean(((Boolean)value).booleanValue());
1112        } else if (value instanceof Byte) {
1113            writeByte(((Byte)value).byteValue());
1114        } else if (value instanceof Short) {
1115            writeShort(((Short)value).shortValue());
1116        } else if (value instanceof Integer) {
1117            writeInt(((Integer)value).intValue());
1118        } else if (value instanceof Float) {
1119            writeFloat(((Float)value).floatValue());
1120        } else if (value instanceof Double) {
1121            writeDouble(((Double)value).doubleValue());
1122        } else if (value instanceof byte[]) {
1123            writeBytes((byte[])value);
1124        }else if (value instanceof Long) {
1125            writeLong(((Long)value).longValue());
1126        }else {
1127            throw new MessageFormatException("Unsupported Object type: " + value.getClass());
1128        }
1129    }
1130
1131    /**
1132     * Puts the message body in read-only mode and repositions the stream of
1133     * bytes to the beginning.
1134     *
1135     * @throws JMSException if an internal error occurs
1136     */
1137
1138    @Override
1139    public void reset() throws JMSException {
1140        storeContent();
1141        this.bytesOut = null;
1142        this.dataIn = null;
1143        this.dataOut = null;
1144        this.remainingBytes = -1;
1145        setReadOnlyBody(true);
1146    }
1147
1148    private void initializeWriting() throws JMSException {
1149        checkReadOnlyBody();
1150        if (this.dataOut == null) {
1151            this.bytesOut = new ByteArrayOutputStream();
1152            OutputStream os = bytesOut;
1153            ActiveMQConnection connection = getConnection();
1154            if (connection != null && connection.isUseCompression()) {
1155                compressed = true;
1156                os = new DeflaterOutputStream(os);
1157            }
1158            this.dataOut = new DataOutputStream(os);
1159        }
1160
1161        // For a message that already had a body and was sent we need to restore the content
1162        // if the message is used again without having its clearBody method called.
1163        if (this.content != null && this.content.length > 0) {
1164            try {
1165                if (compressed) {
1166                    ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength());
1167                    InflaterInputStream inflater = new InflaterInputStream(input);
1168                    try {
1169                        byte[] buffer = new byte[8*1024];
1170                        int read = 0;
1171                        while ((read = inflater.read(buffer)) != -1) {
1172                            this.dataOut.write(buffer, 0, read);
1173                        }
1174                    } finally {
1175                        inflater.close();
1176                    }
1177                } else {
1178                    this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength());
1179                }
1180                // Free up the buffer from the old content, will be re-written when
1181                // tbe message is sent again and storeContent() is called.
1182                this.content = null;
1183            } catch (IOException ioe) {
1184                throw JMSExceptionSupport.create(ioe);
1185            }
1186        }
1187    }
1188
1189    protected void checkWriteOnlyBody() throws MessageNotReadableException {
1190        if (!readOnlyBody) {
1191            throw new MessageNotReadableException("Message body is write-only");
1192        }
1193    }
1194
1195    private void initializeReading() throws MessageNotReadableException {
1196        checkWriteOnlyBody();
1197        if (this.dataIn == null) {
1198            ByteSequence data = getContent();
1199            if (data == null) {
1200                data = new ByteSequence(new byte[] {}, 0, 0);
1201            }
1202            InputStream is = new ByteArrayInputStream(data);
1203            if (isCompressed()) {
1204                is = new InflaterInputStream(is);
1205                is = new BufferedInputStream(is);
1206            }
1207            this.dataIn = new DataInputStream(is);
1208        }
1209    }
1210
1211    @Override
1212    public void compress() throws IOException {
1213        storeContent();
1214        super.compress();
1215    }
1216
1217    @Override
1218    public String toString() {
1219        return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
1220    }
1221}