001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.command; 019 020import java.io.BufferedInputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.EOFException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.util.zip.DeflaterOutputStream; 028import java.util.zip.InflaterInputStream; 029 030import javax.jms.JMSException; 031import javax.jms.MessageEOFException; 032import javax.jms.MessageFormatException; 033import javax.jms.MessageNotReadableException; 034import javax.jms.MessageNotWriteableException; 035import javax.jms.StreamMessage; 036 037import org.apache.activemq.ActiveMQConnection; 038import org.apache.activemq.util.ByteArrayInputStream; 039import org.apache.activemq.util.ByteArrayOutputStream; 040import org.apache.activemq.util.ByteSequence; 041import org.apache.activemq.util.JMSExceptionSupport; 042import org.apache.activemq.util.MarshallingSupport; 043 044/** 045 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive 046 * types in the Java programming language. It is filled and read sequentially. 047 * It inherits from the <CODE>Message</CODE> interface and adds a stream 048 * message body. Its methods are based largely on those found in 049 * <CODE>java.io.DataInputStream</CODE> and 050 * <CODE>java.io.DataOutputStream</CODE>. <p/> 051 * <P> 052 * The primitive types can be read or written explicitly using methods for each 053 * type. They may also be read or written generically as objects. For instance, 054 * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to 055 * <CODE>StreamMessage.writeObject(new 056 * Integer(6))</CODE>. Both forms are 057 * provided, because the explicit form is convenient for static programming, and 058 * the object form is needed when types are not known at compile time. <p/> 059 * <P> 060 * When the message is first created, and when <CODE>clearBody</CODE> is 061 * called, the body of the message is in write-only mode. After the first call 062 * to <CODE>reset</CODE> has been made, the message body is in read-only mode. 063 * After a message has been sent, the client that sent it can retain and modify 064 * it without affecting the message that has been sent. The same message object 065 * can be sent multiple times. When a message has been received, the provider 066 * has called <CODE>reset</CODE> so that the message body is in read-only mode 067 * for the client. <p/> 068 * <P> 069 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the 070 * message body is cleared and the message body is in write-only mode. <p/> 071 * <P> 072 * If a client attempts to read a message in write-only mode, a 073 * <CODE>MessageNotReadableException</CODE> is thrown. <p/> 074 * <P> 075 * If a client attempts to write a message in read-only mode, a 076 * <CODE>MessageNotWriteableException</CODE> is thrown. <p/> 077 * <P> 078 * <CODE>StreamMessage</CODE> objects support the following conversion table. 079 * The marked cases must be supported. The unmarked cases must throw a 080 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive 081 * conversions may throw a runtime exception if the primitive's 082 * <CODE>valueOf()</CODE> method does not accept it as a valid 083 * <CODE>String</CODE> representation of the primitive. <p/> 084 * <P> 085 * A value written as the row type can be read as the column type. <p/> 086 * 087 * <PRE> 088 * | | boolean byte short char int long float double String byte[] 089 * |---------------------------------------------------------------------- 090 * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X 091 * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] | 092 * X |---------------------------------------------------------------------- 093 * 094 * </PRE> 095 * 096 * <p/> 097 * <P> 098 * Attempting to read a null value as a primitive type must be treated as 099 * calling the primitive's corresponding <code>valueOf(String)</code> 100 * conversion method with a null value. Since <code>char</code> does not 101 * support a <code>String</code> conversion, attempting to read a null value 102 * as a <code>char</code> must throw a <code>NullPointerException</code>. 103 * 104 * @openwire:marshaller code="27" 105 * @see javax.jms.Session#createStreamMessage() 106 * @see javax.jms.BytesMessage 107 * @see javax.jms.MapMessage 108 * @see javax.jms.Message 109 * @see javax.jms.ObjectMessage 110 * @see javax.jms.TextMessage 111 */ 112public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage { 113 114 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE; 115 116 protected transient DataOutputStream dataOut; 117 protected transient ByteArrayOutputStream bytesOut; 118 protected transient DataInputStream dataIn; 119 protected transient int remainingBytes = -1; 120 121 @Override 122 public Message copy() { 123 ActiveMQStreamMessage copy = new ActiveMQStreamMessage(); 124 copy(copy); 125 return copy; 126 } 127 128 private void copy(ActiveMQStreamMessage copy) { 129 storeContent(); 130 super.copy(copy); 131 copy.dataOut = null; 132 copy.bytesOut = null; 133 copy.dataIn = null; 134 } 135 136 @Override 137 public void onSend() throws JMSException { 138 super.onSend(); 139 storeContent(); 140 } 141 142 @Override 143 public void storeContent() { 144 if (dataOut != null) { 145 try { 146 dataOut.close(); 147 setContent(bytesOut.toByteSequence()); 148 bytesOut = null; 149 dataOut = null; 150 } catch (IOException ioe) { 151 throw new RuntimeException(ioe); 152 } 153 } 154 } 155 156 @Override 157 public boolean isContentMarshalled() { 158 return content != null || dataOut == null; 159 } 160 161 @Override 162 public byte getDataStructureType() { 163 return DATA_STRUCTURE_TYPE; 164 } 165 166 @Override 167 public String getJMSXMimeType() { 168 return "jms/stream-message"; 169 } 170 171 /** 172 * Clears out the message body. Clearing a message's body does not clear its 173 * header values or property entries. <p/> 174 * <P> 175 * If this message body was read-only, calling this method leaves the 176 * message body in the same state as an empty body in a newly created 177 * message. 178 * 179 * @throws JMSException if the JMS provider fails to clear the message body 180 * due to some internal error. 181 */ 182 183 @Override 184 public void clearBody() throws JMSException { 185 super.clearBody(); 186 this.dataOut = null; 187 this.dataIn = null; 188 this.bytesOut = null; 189 this.remainingBytes = -1; 190 } 191 192 /** 193 * Reads a <code>boolean</code> from the stream message. 194 * 195 * @return the <code>boolean</code> value read 196 * @throws JMSException if the JMS provider fails to read the message due to 197 * some internal error. 198 * @throws MessageEOFException if unexpected end of message stream has been 199 * reached. 200 * @throws MessageFormatException if this type conversion is invalid. 201 * @throws MessageNotReadableException if the message is in write-only mode. 202 */ 203 204 @Override 205 public boolean readBoolean() throws JMSException { 206 initializeReading(); 207 try { 208 209 this.dataIn.mark(10); 210 int type = this.dataIn.read(); 211 if (type == -1) { 212 throw new MessageEOFException("reached end of data"); 213 } 214 if (type == MarshallingSupport.BOOLEAN_TYPE) { 215 return this.dataIn.readBoolean(); 216 } 217 if (type == MarshallingSupport.STRING_TYPE) { 218 return Boolean.valueOf(this.dataIn.readUTF()).booleanValue(); 219 } 220 if (type == MarshallingSupport.NULL) { 221 this.dataIn.reset(); 222 throw new NullPointerException("Cannot convert NULL value to boolean."); 223 } else { 224 this.dataIn.reset(); 225 throw new MessageFormatException(" not a boolean type"); 226 } 227 } catch (EOFException e) { 228 throw JMSExceptionSupport.createMessageEOFException(e); 229 } catch (IOException e) { 230 throw JMSExceptionSupport.createMessageFormatException(e); 231 } 232 } 233 234 /** 235 * Reads a <code>byte</code> value from the stream message. 236 * 237 * @return the next byte from the stream message as a 8-bit 238 * <code>byte</code> 239 * @throws JMSException if the JMS provider fails to read the message due to 240 * some internal error. 241 * @throws MessageEOFException if unexpected end of message stream has been 242 * reached. 243 * @throws MessageFormatException if this type conversion is invalid. 244 * @throws MessageNotReadableException if the message is in write-only mode. 245 */ 246 247 @Override 248 public byte readByte() throws JMSException { 249 initializeReading(); 250 try { 251 252 this.dataIn.mark(10); 253 int type = this.dataIn.read(); 254 if (type == -1) { 255 throw new MessageEOFException("reached end of data"); 256 } 257 if (type == MarshallingSupport.BYTE_TYPE) { 258 return this.dataIn.readByte(); 259 } 260 if (type == MarshallingSupport.STRING_TYPE) { 261 return Byte.valueOf(this.dataIn.readUTF()).byteValue(); 262 } 263 if (type == MarshallingSupport.NULL) { 264 this.dataIn.reset(); 265 throw new NullPointerException("Cannot convert NULL value to byte."); 266 } else { 267 this.dataIn.reset(); 268 throw new MessageFormatException(" not a byte type"); 269 } 270 } catch (NumberFormatException mfe) { 271 try { 272 this.dataIn.reset(); 273 } catch (IOException ioe) { 274 throw JMSExceptionSupport.create(ioe); 275 } 276 throw mfe; 277 278 } catch (EOFException e) { 279 throw JMSExceptionSupport.createMessageEOFException(e); 280 } catch (IOException e) { 281 throw JMSExceptionSupport.createMessageFormatException(e); 282 } 283 } 284 285 /** 286 * Reads a 16-bit integer from the stream message. 287 * 288 * @return a 16-bit integer from the stream message 289 * @throws JMSException if the JMS provider fails to read the message due to 290 * some internal error. 291 * @throws MessageEOFException if unexpected end of message stream has been 292 * reached. 293 * @throws MessageFormatException if this type conversion is invalid. 294 * @throws MessageNotReadableException if the message is in write-only mode. 295 */ 296 297 @Override 298 public short readShort() throws JMSException { 299 initializeReading(); 300 try { 301 302 this.dataIn.mark(17); 303 int type = this.dataIn.read(); 304 if (type == -1) { 305 throw new MessageEOFException("reached end of data"); 306 } 307 if (type == MarshallingSupport.SHORT_TYPE) { 308 return this.dataIn.readShort(); 309 } 310 if (type == MarshallingSupport.BYTE_TYPE) { 311 return this.dataIn.readByte(); 312 } 313 if (type == MarshallingSupport.STRING_TYPE) { 314 return Short.valueOf(this.dataIn.readUTF()).shortValue(); 315 } 316 if (type == MarshallingSupport.NULL) { 317 this.dataIn.reset(); 318 throw new NullPointerException("Cannot convert NULL value to short."); 319 } else { 320 this.dataIn.reset(); 321 throw new MessageFormatException(" not a short type"); 322 } 323 } catch (NumberFormatException mfe) { 324 try { 325 this.dataIn.reset(); 326 } catch (IOException ioe) { 327 throw JMSExceptionSupport.create(ioe); 328 } 329 throw mfe; 330 331 } catch (EOFException e) { 332 throw JMSExceptionSupport.createMessageEOFException(e); 333 } catch (IOException e) { 334 throw JMSExceptionSupport.createMessageFormatException(e); 335 } 336 337 } 338 339 /** 340 * Reads a Unicode character value from the stream message. 341 * 342 * @return a Unicode character from the stream message 343 * @throws JMSException if the JMS provider fails to read the message due to 344 * some internal error. 345 * @throws MessageEOFException if unexpected end of message stream has been 346 * reached. 347 * @throws MessageFormatException if this type conversion is invalid 348 * @throws MessageNotReadableException if the message is in write-only mode. 349 */ 350 351 @Override 352 public char readChar() throws JMSException { 353 initializeReading(); 354 try { 355 356 this.dataIn.mark(17); 357 int type = this.dataIn.read(); 358 if (type == -1) { 359 throw new MessageEOFException("reached end of data"); 360 } 361 if (type == MarshallingSupport.CHAR_TYPE) { 362 return this.dataIn.readChar(); 363 } 364 if (type == MarshallingSupport.NULL) { 365 this.dataIn.reset(); 366 throw new NullPointerException("Cannot convert NULL value to char."); 367 } else { 368 this.dataIn.reset(); 369 throw new MessageFormatException(" not a char type"); 370 } 371 } catch (NumberFormatException mfe) { 372 try { 373 this.dataIn.reset(); 374 } catch (IOException ioe) { 375 throw JMSExceptionSupport.create(ioe); 376 } 377 throw mfe; 378 379 } catch (EOFException e) { 380 throw JMSExceptionSupport.createMessageEOFException(e); 381 } catch (IOException e) { 382 throw JMSExceptionSupport.createMessageFormatException(e); 383 } 384 } 385 386 /** 387 * Reads a 32-bit integer from the stream message. 388 * 389 * @return a 32-bit integer value from the stream message, interpreted as an 390 * <code>int</code> 391 * @throws JMSException if the JMS provider fails to read the message due to 392 * some internal error. 393 * @throws MessageEOFException if unexpected end of message stream has been 394 * reached. 395 * @throws MessageFormatException if this type conversion is invalid. 396 * @throws MessageNotReadableException if the message is in write-only mode. 397 */ 398 399 @Override 400 public int readInt() throws JMSException { 401 initializeReading(); 402 try { 403 404 this.dataIn.mark(33); 405 int type = this.dataIn.read(); 406 if (type == -1) { 407 throw new MessageEOFException("reached end of data"); 408 } 409 if (type == MarshallingSupport.INTEGER_TYPE) { 410 return this.dataIn.readInt(); 411 } 412 if (type == MarshallingSupport.SHORT_TYPE) { 413 return this.dataIn.readShort(); 414 } 415 if (type == MarshallingSupport.BYTE_TYPE) { 416 return this.dataIn.readByte(); 417 } 418 if (type == MarshallingSupport.STRING_TYPE) { 419 return Integer.valueOf(this.dataIn.readUTF()).intValue(); 420 } 421 if (type == MarshallingSupport.NULL) { 422 this.dataIn.reset(); 423 throw new NullPointerException("Cannot convert NULL value to int."); 424 } else { 425 this.dataIn.reset(); 426 throw new MessageFormatException(" not an int type"); 427 } 428 } catch (NumberFormatException mfe) { 429 try { 430 this.dataIn.reset(); 431 } catch (IOException ioe) { 432 throw JMSExceptionSupport.create(ioe); 433 } 434 throw mfe; 435 436 } catch (EOFException e) { 437 throw JMSExceptionSupport.createMessageEOFException(e); 438 } catch (IOException e) { 439 throw JMSExceptionSupport.createMessageFormatException(e); 440 } 441 } 442 443 /** 444 * Reads a 64-bit integer from the stream message. 445 * 446 * @return a 64-bit integer value from the stream message, interpreted as a 447 * <code>long</code> 448 * @throws JMSException if the JMS provider fails to read the message due to 449 * some internal error. 450 * @throws MessageEOFException if unexpected end of message stream has been 451 * reached. 452 * @throws MessageFormatException if this type conversion is invalid. 453 * @throws MessageNotReadableException if the message is in write-only mode. 454 */ 455 456 @Override 457 public long readLong() throws JMSException { 458 initializeReading(); 459 try { 460 461 this.dataIn.mark(65); 462 int type = this.dataIn.read(); 463 if (type == -1) { 464 throw new MessageEOFException("reached end of data"); 465 } 466 if (type == MarshallingSupport.LONG_TYPE) { 467 return this.dataIn.readLong(); 468 } 469 if (type == MarshallingSupport.INTEGER_TYPE) { 470 return this.dataIn.readInt(); 471 } 472 if (type == MarshallingSupport.SHORT_TYPE) { 473 return this.dataIn.readShort(); 474 } 475 if (type == MarshallingSupport.BYTE_TYPE) { 476 return this.dataIn.readByte(); 477 } 478 if (type == MarshallingSupport.STRING_TYPE) { 479 return Long.valueOf(this.dataIn.readUTF()).longValue(); 480 } 481 if (type == MarshallingSupport.NULL) { 482 this.dataIn.reset(); 483 throw new NullPointerException("Cannot convert NULL value to long."); 484 } else { 485 this.dataIn.reset(); 486 throw new MessageFormatException(" not a long type"); 487 } 488 } catch (NumberFormatException mfe) { 489 try { 490 this.dataIn.reset(); 491 } catch (IOException ioe) { 492 throw JMSExceptionSupport.create(ioe); 493 } 494 throw mfe; 495 496 } catch (EOFException e) { 497 throw JMSExceptionSupport.createMessageEOFException(e); 498 } catch (IOException e) { 499 throw JMSExceptionSupport.createMessageFormatException(e); 500 } 501 } 502 503 /** 504 * Reads a <code>float</code> from the stream message. 505 * 506 * @return a <code>float</code> value from the stream message 507 * @throws JMSException if the JMS provider fails to read the message due to 508 * some internal error. 509 * @throws MessageEOFException if unexpected end of message stream has been 510 * reached. 511 * @throws MessageFormatException if this type conversion is invalid. 512 * @throws MessageNotReadableException if the message is in write-only mode. 513 */ 514 515 @Override 516 public float readFloat() throws JMSException { 517 initializeReading(); 518 try { 519 this.dataIn.mark(33); 520 int type = this.dataIn.read(); 521 if (type == -1) { 522 throw new MessageEOFException("reached end of data"); 523 } 524 if (type == MarshallingSupport.FLOAT_TYPE) { 525 return this.dataIn.readFloat(); 526 } 527 if (type == MarshallingSupport.STRING_TYPE) { 528 return Float.valueOf(this.dataIn.readUTF()).floatValue(); 529 } 530 if (type == MarshallingSupport.NULL) { 531 this.dataIn.reset(); 532 throw new NullPointerException("Cannot convert NULL value to float."); 533 } else { 534 this.dataIn.reset(); 535 throw new MessageFormatException(" not a float type"); 536 } 537 } catch (NumberFormatException mfe) { 538 try { 539 this.dataIn.reset(); 540 } catch (IOException ioe) { 541 throw JMSExceptionSupport.create(ioe); 542 } 543 throw mfe; 544 545 } catch (EOFException e) { 546 throw JMSExceptionSupport.createMessageEOFException(e); 547 } catch (IOException e) { 548 throw JMSExceptionSupport.createMessageFormatException(e); 549 } 550 } 551 552 /** 553 * Reads a <code>double</code> from the stream message. 554 * 555 * @return a <code>double</code> value from the stream message 556 * @throws JMSException if the JMS provider fails to read the message due to 557 * some internal error. 558 * @throws MessageEOFException if unexpected end of message stream has been 559 * reached. 560 * @throws MessageFormatException if this type conversion is invalid. 561 * @throws MessageNotReadableException if the message is in write-only mode. 562 */ 563 564 @Override 565 public double readDouble() throws JMSException { 566 initializeReading(); 567 try { 568 569 this.dataIn.mark(65); 570 int type = this.dataIn.read(); 571 if (type == -1) { 572 throw new MessageEOFException("reached end of data"); 573 } 574 if (type == MarshallingSupport.DOUBLE_TYPE) { 575 return this.dataIn.readDouble(); 576 } 577 if (type == MarshallingSupport.FLOAT_TYPE) { 578 return this.dataIn.readFloat(); 579 } 580 if (type == MarshallingSupport.STRING_TYPE) { 581 return Double.valueOf(this.dataIn.readUTF()).doubleValue(); 582 } 583 if (type == MarshallingSupport.NULL) { 584 this.dataIn.reset(); 585 throw new NullPointerException("Cannot convert NULL value to double."); 586 } else { 587 this.dataIn.reset(); 588 throw new MessageFormatException(" not a double type"); 589 } 590 } catch (NumberFormatException mfe) { 591 try { 592 this.dataIn.reset(); 593 } catch (IOException ioe) { 594 throw JMSExceptionSupport.create(ioe); 595 } 596 throw mfe; 597 598 } catch (EOFException e) { 599 throw JMSExceptionSupport.createMessageEOFException(e); 600 } catch (IOException e) { 601 throw JMSExceptionSupport.createMessageFormatException(e); 602 } 603 } 604 605 /** 606 * Reads a <CODE>String</CODE> from the stream message. 607 * 608 * @return a Unicode string from the stream message 609 * @throws JMSException if the JMS provider fails to read the message due to 610 * some internal error. 611 * @throws MessageEOFException if unexpected end of message stream has been 612 * reached. 613 * @throws MessageFormatException if this type conversion is invalid. 614 * @throws MessageNotReadableException if the message is in write-only mode. 615 */ 616 617 @Override 618 public String readString() throws JMSException { 619 initializeReading(); 620 try { 621 622 this.dataIn.mark(65); 623 int type = this.dataIn.read(); 624 if (type == -1) { 625 throw new MessageEOFException("reached end of data"); 626 } 627 if (type == MarshallingSupport.NULL) { 628 return null; 629 } 630 if (type == MarshallingSupport.BIG_STRING_TYPE) { 631 return MarshallingSupport.readUTF8(dataIn); 632 } 633 if (type == MarshallingSupport.STRING_TYPE) { 634 return this.dataIn.readUTF(); 635 } 636 if (type == MarshallingSupport.LONG_TYPE) { 637 return new Long(this.dataIn.readLong()).toString(); 638 } 639 if (type == MarshallingSupport.INTEGER_TYPE) { 640 return new Integer(this.dataIn.readInt()).toString(); 641 } 642 if (type == MarshallingSupport.SHORT_TYPE) { 643 return new Short(this.dataIn.readShort()).toString(); 644 } 645 if (type == MarshallingSupport.BYTE_TYPE) { 646 return new Byte(this.dataIn.readByte()).toString(); 647 } 648 if (type == MarshallingSupport.FLOAT_TYPE) { 649 return new Float(this.dataIn.readFloat()).toString(); 650 } 651 if (type == MarshallingSupport.DOUBLE_TYPE) { 652 return new Double(this.dataIn.readDouble()).toString(); 653 } 654 if (type == MarshallingSupport.BOOLEAN_TYPE) { 655 return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString(); 656 } 657 if (type == MarshallingSupport.CHAR_TYPE) { 658 return new Character(this.dataIn.readChar()).toString(); 659 } else { 660 this.dataIn.reset(); 661 throw new MessageFormatException(" not a String type"); 662 } 663 } catch (NumberFormatException mfe) { 664 try { 665 this.dataIn.reset(); 666 } catch (IOException ioe) { 667 throw JMSExceptionSupport.create(ioe); 668 } 669 throw mfe; 670 671 } catch (EOFException e) { 672 throw JMSExceptionSupport.createMessageEOFException(e); 673 } catch (IOException e) { 674 throw JMSExceptionSupport.createMessageFormatException(e); 675 } 676 } 677 678 /** 679 * Reads a byte array field from the stream message into the specified 680 * <CODE>byte[]</CODE> object (the read buffer). <p/> 681 * <P> 682 * To read the field value, <CODE>readBytes</CODE> should be successively 683 * called until it returns a value less than the length of the read buffer. 684 * The value of the bytes in the buffer following the last byte read is 685 * undefined. <p/> 686 * <P> 687 * If <CODE>readBytes</CODE> returns a value equal to the length of the 688 * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there 689 * are no more bytes to be read, this call returns -1. <p/> 690 * <P> 691 * If the byte array field value is null, <CODE>readBytes</CODE> returns 692 * -1. <p/> 693 * <P> 694 * If the byte array field value is empty, <CODE>readBytes</CODE> returns 695 * 0. <p/> 696 * <P> 697 * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> 698 * field value has been made, the full value of the field must be read 699 * before it is valid to read the next field. An attempt to read the next 700 * field before that has been done will throw a 701 * <CODE>MessageFormatException</CODE>. <p/> 702 * <P> 703 * To read the byte field value into a new <CODE>byte[]</CODE> object, use 704 * the <CODE>readObject</CODE> method. 705 * 706 * @param value the buffer into which the data is read 707 * @return the total number of bytes read into the buffer, or -1 if there is 708 * no more data because the end of the byte field has been reached 709 * @throws JMSException if the JMS provider fails to read the message due to 710 * some internal error. 711 * @throws MessageEOFException if unexpected end of message stream has been 712 * reached. 713 * @throws MessageFormatException if this type conversion is invalid. 714 * @throws MessageNotReadableException if the message is in write-only mode. 715 * @see #readObject() 716 */ 717 718 @Override 719 public int readBytes(byte[] value) throws JMSException { 720 721 initializeReading(); 722 try { 723 if (value == null) { 724 throw new NullPointerException(); 725 } 726 727 if (remainingBytes == -1) { 728 this.dataIn.mark(value.length + 1); 729 int type = this.dataIn.read(); 730 if (type == -1) { 731 throw new MessageEOFException("reached end of data"); 732 } 733 if (type != MarshallingSupport.BYTE_ARRAY_TYPE) { 734 throw new MessageFormatException("Not a byte array"); 735 } 736 remainingBytes = this.dataIn.readInt(); 737 } else if (remainingBytes == 0) { 738 remainingBytes = -1; 739 return -1; 740 } 741 742 if (value.length <= remainingBytes) { 743 // small buffer 744 remainingBytes -= value.length; 745 this.dataIn.readFully(value); 746 return value.length; 747 } else { 748 // big buffer 749 int rc = this.dataIn.read(value, 0, remainingBytes); 750 remainingBytes = 0; 751 return rc != -1 ? rc : 0; 752 } 753 754 } catch (EOFException e) { 755 JMSException jmsEx = new MessageEOFException(e.getMessage()); 756 jmsEx.setLinkedException(e); 757 throw jmsEx; 758 } catch (IOException e) { 759 JMSException jmsEx = new MessageFormatException(e.getMessage()); 760 jmsEx.setLinkedException(e); 761 throw jmsEx; 762 } 763 } 764 765 /** 766 * Reads an object from the stream message. <p/> 767 * <P> 768 * This method can be used to return, in objectified format, an object in 769 * the Java programming language ("Java object") that has been written to 770 * the stream with the equivalent <CODE>writeObject</CODE> method call, or 771 * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/> 772 * <P> 773 * Note that byte values are returned as <CODE>byte[]</CODE>, not 774 * <CODE>Byte[]</CODE>. <p/> 775 * <P> 776 * An attempt to call <CODE>readObject</CODE> to read a byte field value 777 * into a new <CODE>byte[]</CODE> object before the full value of the byte 778 * field has been read will throw a <CODE>MessageFormatException</CODE>. 779 * 780 * @return a Java object from the stream message, in objectified format (for 781 * example, if the object was written as an <CODE>int</CODE>, an 782 * <CODE>Integer</CODE> is returned) 783 * @throws JMSException if the JMS provider fails to read the message due to 784 * some internal error. 785 * @throws MessageEOFException if unexpected end of message stream has been 786 * reached. 787 * @throws MessageFormatException if this type conversion is invalid. 788 * @throws MessageNotReadableException if the message is in write-only mode. 789 * @see #readBytes(byte[] value) 790 */ 791 792 @Override 793 public Object readObject() throws JMSException { 794 initializeReading(); 795 try { 796 this.dataIn.mark(65); 797 int type = this.dataIn.read(); 798 if (type == -1) { 799 throw new MessageEOFException("reached end of data"); 800 } 801 if (type == MarshallingSupport.NULL) { 802 return null; 803 } 804 if (type == MarshallingSupport.BIG_STRING_TYPE) { 805 return MarshallingSupport.readUTF8(dataIn); 806 } 807 if (type == MarshallingSupport.STRING_TYPE) { 808 return this.dataIn.readUTF(); 809 } 810 if (type == MarshallingSupport.LONG_TYPE) { 811 return Long.valueOf(this.dataIn.readLong()); 812 } 813 if (type == MarshallingSupport.INTEGER_TYPE) { 814 return Integer.valueOf(this.dataIn.readInt()); 815 } 816 if (type == MarshallingSupport.SHORT_TYPE) { 817 return Short.valueOf(this.dataIn.readShort()); 818 } 819 if (type == MarshallingSupport.BYTE_TYPE) { 820 return Byte.valueOf(this.dataIn.readByte()); 821 } 822 if (type == MarshallingSupport.FLOAT_TYPE) { 823 return new Float(this.dataIn.readFloat()); 824 } 825 if (type == MarshallingSupport.DOUBLE_TYPE) { 826 return new Double(this.dataIn.readDouble()); 827 } 828 if (type == MarshallingSupport.BOOLEAN_TYPE) { 829 return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE; 830 } 831 if (type == MarshallingSupport.CHAR_TYPE) { 832 return Character.valueOf(this.dataIn.readChar()); 833 } 834 if (type == MarshallingSupport.BYTE_ARRAY_TYPE) { 835 int len = this.dataIn.readInt(); 836 byte[] value = new byte[len]; 837 this.dataIn.readFully(value); 838 return value; 839 } else { 840 this.dataIn.reset(); 841 throw new MessageFormatException("unknown type"); 842 } 843 } catch (NumberFormatException mfe) { 844 try { 845 this.dataIn.reset(); 846 } catch (IOException ioe) { 847 throw JMSExceptionSupport.create(ioe); 848 } 849 throw mfe; 850 851 } catch (EOFException e) { 852 JMSException jmsEx = new MessageEOFException(e.getMessage()); 853 jmsEx.setLinkedException(e); 854 throw jmsEx; 855 } catch (IOException e) { 856 JMSException jmsEx = new MessageFormatException(e.getMessage()); 857 jmsEx.setLinkedException(e); 858 throw jmsEx; 859 } 860 } 861 862 /** 863 * Writes a <code>boolean</code> to the stream message. The value 864 * <code>true</code> is written as the value <code>(byte)1</code>; the 865 * value <code>false</code> is written as the value <code>(byte)0</code>. 866 * 867 * @param value the <code>boolean</code> value to be written 868 * @throws JMSException if the JMS provider fails to write the message due 869 * to some internal error. 870 * @throws MessageNotWriteableException if the message is in read-only mode. 871 */ 872 873 @Override 874 public void writeBoolean(boolean value) throws JMSException { 875 initializeWriting(); 876 try { 877 MarshallingSupport.marshalBoolean(dataOut, value); 878 } catch (IOException ioe) { 879 throw JMSExceptionSupport.create(ioe); 880 } 881 } 882 883 /** 884 * Writes a <code>byte</code> to the stream message. 885 * 886 * @param value the <code>byte</code> value to be written 887 * @throws JMSException if the JMS provider fails to write the message due 888 * to some internal error. 889 * @throws MessageNotWriteableException if the message is in read-only mode. 890 */ 891 892 @Override 893 public void writeByte(byte value) throws JMSException { 894 initializeWriting(); 895 try { 896 MarshallingSupport.marshalByte(dataOut, value); 897 } catch (IOException ioe) { 898 throw JMSExceptionSupport.create(ioe); 899 } 900 } 901 902 /** 903 * Writes a <code>short</code> to the stream message. 904 * 905 * @param value the <code>short</code> value to be written 906 * @throws JMSException if the JMS provider fails to write the message due 907 * to some internal error. 908 * @throws MessageNotWriteableException if the message is in read-only mode. 909 */ 910 911 @Override 912 public void writeShort(short value) throws JMSException { 913 initializeWriting(); 914 try { 915 MarshallingSupport.marshalShort(dataOut, value); 916 } catch (IOException ioe) { 917 throw JMSExceptionSupport.create(ioe); 918 } 919 } 920 921 /** 922 * Writes a <code>char</code> to the stream message. 923 * 924 * @param value the <code>char</code> value to be written 925 * @throws JMSException if the JMS provider fails to write the message due 926 * to some internal error. 927 * @throws MessageNotWriteableException if the message is in read-only mode. 928 */ 929 930 @Override 931 public void writeChar(char value) throws JMSException { 932 initializeWriting(); 933 try { 934 MarshallingSupport.marshalChar(dataOut, value); 935 } catch (IOException ioe) { 936 throw JMSExceptionSupport.create(ioe); 937 } 938 } 939 940 /** 941 * Writes an <code>int</code> to the stream message. 942 * 943 * @param value the <code>int</code> value to be written 944 * @throws JMSException if the JMS provider fails to write the message due 945 * to some internal error. 946 * @throws MessageNotWriteableException if the message is in read-only mode. 947 */ 948 949 @Override 950 public void writeInt(int value) throws JMSException { 951 initializeWriting(); 952 try { 953 MarshallingSupport.marshalInt(dataOut, value); 954 } catch (IOException ioe) { 955 throw JMSExceptionSupport.create(ioe); 956 } 957 } 958 959 /** 960 * Writes a <code>long</code> to the stream message. 961 * 962 * @param value the <code>long</code> value to be written 963 * @throws JMSException if the JMS provider fails to write the message due 964 * to some internal error. 965 * @throws MessageNotWriteableException if the message is in read-only mode. 966 */ 967 968 @Override 969 public void writeLong(long value) throws JMSException { 970 initializeWriting(); 971 try { 972 MarshallingSupport.marshalLong(dataOut, value); 973 } catch (IOException ioe) { 974 throw JMSExceptionSupport.create(ioe); 975 } 976 } 977 978 /** 979 * Writes a <code>float</code> to the stream message. 980 * 981 * @param value the <code>float</code> value to be written 982 * @throws JMSException if the JMS provider fails to write the message due 983 * to some internal error. 984 * @throws MessageNotWriteableException if the message is in read-only mode. 985 */ 986 987 @Override 988 public void writeFloat(float value) throws JMSException { 989 initializeWriting(); 990 try { 991 MarshallingSupport.marshalFloat(dataOut, value); 992 } catch (IOException ioe) { 993 throw JMSExceptionSupport.create(ioe); 994 } 995 } 996 997 /** 998 * Writes a <code>double</code> to the stream message. 999 * 1000 * @param value the <code>double</code> value to be written 1001 * @throws JMSException if the JMS provider fails to write the message due 1002 * to some internal error. 1003 * @throws MessageNotWriteableException if the message is in read-only mode. 1004 */ 1005 1006 @Override 1007 public void writeDouble(double value) throws JMSException { 1008 initializeWriting(); 1009 try { 1010 MarshallingSupport.marshalDouble(dataOut, value); 1011 } catch (IOException ioe) { 1012 throw JMSExceptionSupport.create(ioe); 1013 } 1014 } 1015 1016 /** 1017 * Writes a <code>String</code> to the stream message. 1018 * 1019 * @param value the <code>String</code> value to be written 1020 * @throws JMSException if the JMS provider fails to write the message due 1021 * to some internal error. 1022 * @throws MessageNotWriteableException if the message is in read-only mode. 1023 */ 1024 1025 @Override 1026 public void writeString(String value) throws JMSException { 1027 initializeWriting(); 1028 try { 1029 if (value == null) { 1030 MarshallingSupport.marshalNull(dataOut); 1031 } else { 1032 MarshallingSupport.marshalString(dataOut, value); 1033 } 1034 } catch (IOException ioe) { 1035 throw JMSExceptionSupport.create(ioe); 1036 } 1037 } 1038 1039 /** 1040 * Writes a byte array field to the stream message. <p/> 1041 * <P> 1042 * The byte array <code>value</code> is written to the message as a byte 1043 * array field. Consecutively written byte array fields are treated as two 1044 * distinct fields when the fields are read. 1045 * 1046 * @param value the byte array value to be written 1047 * @throws JMSException if the JMS provider fails to write the message due 1048 * to some internal error. 1049 * @throws MessageNotWriteableException if the message is in read-only mode. 1050 */ 1051 1052 @Override 1053 public void writeBytes(byte[] value) throws JMSException { 1054 writeBytes(value, 0, value.length); 1055 } 1056 1057 /** 1058 * Writes a portion of a byte array as a byte array field to the stream 1059 * message. <p/> 1060 * <P> 1061 * The a portion of the byte array <code>value</code> is written to the 1062 * message as a byte array field. Consecutively written byte array fields 1063 * are treated as two distinct fields when the fields are read. 1064 * 1065 * @param value the byte array value to be written 1066 * @param offset the initial offset within the byte array 1067 * @param length the number of bytes to use 1068 * @throws JMSException if the JMS provider fails to write the message due 1069 * to some internal error. 1070 * @throws MessageNotWriteableException if the message is in read-only mode. 1071 */ 1072 1073 @Override 1074 public void writeBytes(byte[] value, int offset, int length) throws JMSException { 1075 initializeWriting(); 1076 try { 1077 MarshallingSupport.marshalByteArray(dataOut, value, offset, length); 1078 } catch (IOException ioe) { 1079 throw JMSExceptionSupport.create(ioe); 1080 } 1081 } 1082 1083 /** 1084 * Writes an object to the stream message. <p/> 1085 * <P> 1086 * This method works only for the objectified primitive object types (<code>Integer</code>, 1087 * <code>Double</code>, <code>Long</code> ...), 1088 * <code>String</code> objects, and byte arrays. 1089 * 1090 * @param value the Java object to be written 1091 * @throws JMSException if the JMS provider fails to write the message due 1092 * to some internal error. 1093 * @throws MessageFormatException if the object is invalid. 1094 * @throws MessageNotWriteableException if the message is in read-only mode. 1095 */ 1096 1097 @Override 1098 public void writeObject(Object value) throws JMSException { 1099 initializeWriting(); 1100 if (value == null) { 1101 try { 1102 MarshallingSupport.marshalNull(dataOut); 1103 } catch (IOException ioe) { 1104 throw JMSExceptionSupport.create(ioe); 1105 } 1106 } else if (value instanceof String) { 1107 writeString(value.toString()); 1108 } else if (value instanceof Character) { 1109 writeChar(((Character)value).charValue()); 1110 } else if (value instanceof Boolean) { 1111 writeBoolean(((Boolean)value).booleanValue()); 1112 } else if (value instanceof Byte) { 1113 writeByte(((Byte)value).byteValue()); 1114 } else if (value instanceof Short) { 1115 writeShort(((Short)value).shortValue()); 1116 } else if (value instanceof Integer) { 1117 writeInt(((Integer)value).intValue()); 1118 } else if (value instanceof Float) { 1119 writeFloat(((Float)value).floatValue()); 1120 } else if (value instanceof Double) { 1121 writeDouble(((Double)value).doubleValue()); 1122 } else if (value instanceof byte[]) { 1123 writeBytes((byte[])value); 1124 }else if (value instanceof Long) { 1125 writeLong(((Long)value).longValue()); 1126 }else { 1127 throw new MessageFormatException("Unsupported Object type: " + value.getClass()); 1128 } 1129 } 1130 1131 /** 1132 * Puts the message body in read-only mode and repositions the stream of 1133 * bytes to the beginning. 1134 * 1135 * @throws JMSException if an internal error occurs 1136 */ 1137 1138 @Override 1139 public void reset() throws JMSException { 1140 storeContent(); 1141 this.bytesOut = null; 1142 this.dataIn = null; 1143 this.dataOut = null; 1144 this.remainingBytes = -1; 1145 setReadOnlyBody(true); 1146 } 1147 1148 private void initializeWriting() throws JMSException { 1149 checkReadOnlyBody(); 1150 if (this.dataOut == null) { 1151 this.bytesOut = new ByteArrayOutputStream(); 1152 OutputStream os = bytesOut; 1153 ActiveMQConnection connection = getConnection(); 1154 if (connection != null && connection.isUseCompression()) { 1155 compressed = true; 1156 os = new DeflaterOutputStream(os); 1157 } 1158 this.dataOut = new DataOutputStream(os); 1159 } 1160 1161 // For a message that already had a body and was sent we need to restore the content 1162 // if the message is used again without having its clearBody method called. 1163 if (this.content != null && this.content.length > 0) { 1164 try { 1165 if (compressed) { 1166 ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength()); 1167 InflaterInputStream inflater = new InflaterInputStream(input); 1168 try { 1169 byte[] buffer = new byte[8*1024]; 1170 int read = 0; 1171 while ((read = inflater.read(buffer)) != -1) { 1172 this.dataOut.write(buffer, 0, read); 1173 } 1174 } finally { 1175 inflater.close(); 1176 } 1177 } else { 1178 this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength()); 1179 } 1180 // Free up the buffer from the old content, will be re-written when 1181 // tbe message is sent again and storeContent() is called. 1182 this.content = null; 1183 } catch (IOException ioe) { 1184 throw JMSExceptionSupport.create(ioe); 1185 } 1186 } 1187 } 1188 1189 protected void checkWriteOnlyBody() throws MessageNotReadableException { 1190 if (!readOnlyBody) { 1191 throw new MessageNotReadableException("Message body is write-only"); 1192 } 1193 } 1194 1195 private void initializeReading() throws MessageNotReadableException { 1196 checkWriteOnlyBody(); 1197 if (this.dataIn == null) { 1198 ByteSequence data = getContent(); 1199 if (data == null) { 1200 data = new ByteSequence(new byte[] {}, 0, 0); 1201 } 1202 InputStream is = new ByteArrayInputStream(data); 1203 if (isCompressed()) { 1204 is = new InflaterInputStream(is); 1205 is = new BufferedInputStream(is); 1206 } 1207 this.dataIn = new DataInputStream(is); 1208 } 1209 } 1210 1211 @Override 1212 public void compress() throws IOException { 1213 storeContent(); 1214 super.compress(); 1215 } 1216 1217 @Override 1218 public String toString() { 1219 return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; 1220 } 1221}