001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.Arrays; 026import java.util.zip.Deflater; 027import java.util.zip.Inflater; 028 029import javax.jms.BytesMessage; 030import javax.jms.JMSException; 031import javax.jms.MessageEOFException; 032import javax.jms.MessageFormatException; 033import javax.jms.MessageNotReadableException; 034import javax.jms.MessageNotWriteableException; 035 036import org.apache.activemq.ActiveMQConnection; 037import org.apache.activemq.util.ByteArrayInputStream; 038import org.apache.activemq.util.ByteArrayOutputStream; 039import org.apache.activemq.util.ByteSequence; 040import org.apache.activemq.util.ByteSequenceData; 041import org.apache.activemq.util.JMSExceptionSupport; 042 043/** 044 * A <CODE>BytesMessage</CODE> object is used to send a message containing a 045 * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE> 046 * interface and adds a bytes message body. The receiver of the message supplies 047 * the interpretation of the bytes. 048 * <P> 049 * The <CODE>BytesMessage</CODE> methods are based largely on those found in 050 * <CODE>java.io.DataInputStream</CODE> and 051 * <CODE>java.io.DataOutputStream</CODE>. 052 * <P> 053 * This message type is for client encoding of existing message formats. If 054 * possible, one of the other self-defining message types should be used 055 * instead. 056 * <P> 057 * Although the JMS API allows the use of message properties with byte messages, 058 * they are typically not used, since the inclusion of properties may affect the 059 * format. 060 * <P> 061 * The primitive types can be written explicitly using methods for each type. 062 * They may also be written generically as objects. For instance, a call to 063 * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to 064 * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are 065 * provided, because the explicit form is convenient for static programming, and 066 * the object form is needed when types are not known at compile time. 067 * <P> 068 * When the message is first created, and when <CODE>clearBody</CODE> is 069 * called, the body of the message is in write-only mode. After the first call 070 * to <CODE>reset</CODE> has been made, the message body is in read-only mode. 071 * After a message has been sent, the client that sent it can retain and modify 072 * it without affecting the message that has been sent. The same message object 073 * can be sent multiple times. When a message has been received, the provider 074 * has called <CODE>reset</CODE> so that the message body is in read-only mode 075 * for the client. 076 * <P> 077 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the 078 * message body is cleared and the message is in write-only mode. 079 * <P> 080 * If a client attempts to read a message in write-only mode, a 081 * <CODE>MessageNotReadableException</CODE> is thrown. 082 * <P> 083 * If a client attempts to write a message in read-only mode, a 084 * <CODE>MessageNotWriteableException</CODE> is thrown. 085 * 086 * @openwire:marshaller code=24 087 * @see javax.jms.Session#createBytesMessage() 088 * @see javax.jms.MapMessage 089 * @see javax.jms.Message 090 * @see javax.jms.ObjectMessage 091 * @see javax.jms.StreamMessage 092 * @see javax.jms.TextMessage 093 */ 094public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage { 095 096 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE; 097 098 protected transient DataOutputStream dataOut; 099 protected transient ByteArrayOutputStream bytesOut; 100 protected transient DataInputStream dataIn; 101 protected transient int length; 102 103 @Override 104 public Message copy() { 105 ActiveMQBytesMessage copy = new ActiveMQBytesMessage(); 106 copy(copy); 107 return copy; 108 } 109 110 private void copy(ActiveMQBytesMessage copy) { 111 storeContent(); 112 super.copy(copy); 113 copy.dataOut = null; 114 copy.bytesOut = null; 115 copy.dataIn = null; 116 } 117 118 @Override 119 public void onSend() throws JMSException { 120 super.onSend(); 121 storeContent(); 122 } 123 124 @Override 125 public void storeContent() { 126 if (dataOut != null) { 127 try { 128 dataOut.close(); 129 ByteSequence bs = bytesOut.toByteSequence(); 130 setContent(bs); 131 132 ActiveMQConnection connection = getConnection(); 133 if (connection != null && connection.isUseCompression()) { 134 doCompress(); 135 } 136 } catch (IOException ioe) { 137 throw new RuntimeException(ioe.getMessage(), ioe); 138 } finally { 139 try { 140 if (bytesOut != null) { 141 bytesOut.close(); 142 bytesOut = null; 143 } 144 if (dataOut != null) { 145 dataOut.close(); 146 dataOut = null; 147 } 148 } catch (IOException ioe) { 149 } 150 } 151 } 152 } 153 154 @Override 155 public boolean isContentMarshalled() { 156 return content != null || dataOut == null; 157 } 158 159 @Override 160 public byte getDataStructureType() { 161 return DATA_STRUCTURE_TYPE; 162 } 163 164 @Override 165 public String getJMSXMimeType() { 166 return "jms/bytes-message"; 167 } 168 169 /** 170 * Clears out the message body. Clearing a message's body does not clear its 171 * header values or property entries. 172 * <P> 173 * If this message body was read-only, calling this method leaves the 174 * message body in the same state as an empty body in a newly created 175 * message. 176 * 177 * @throws JMSException if the JMS provider fails to clear the message body 178 * due to some internal error. 179 */ 180 @Override 181 public void clearBody() throws JMSException { 182 super.clearBody(); 183 this.dataOut = null; 184 this.dataIn = null; 185 this.bytesOut = null; 186 } 187 188 /** 189 * Gets the number of bytes of the message body when the message is in 190 * read-only mode. The value returned can be used to allocate a byte array. 191 * The value returned is the entire length of the message body, regardless 192 * of where the pointer for reading the message is currently located. 193 * 194 * @return number of bytes in the message 195 * @throws JMSException if the JMS provider fails to read the message due to 196 * some internal error. 197 * @throws MessageNotReadableException if the message is in write-only mode. 198 * @since 1.1 199 */ 200 201 @Override 202 public long getBodyLength() throws JMSException { 203 initializeReading(); 204 return length; 205 } 206 207 /** 208 * Reads a <code>boolean</code> from the bytes message stream. 209 * 210 * @return the <code>boolean</code> value read 211 * @throws JMSException if the JMS provider fails to read the message due to 212 * some internal error. 213 * @throws MessageEOFException if unexpected end of bytes stream has been 214 * reached. 215 * @throws MessageNotReadableException if the message is in write-only mode. 216 */ 217 @Override 218 public boolean readBoolean() throws JMSException { 219 initializeReading(); 220 try { 221 return this.dataIn.readBoolean(); 222 } catch (EOFException e) { 223 throw JMSExceptionSupport.createMessageEOFException(e); 224 } catch (IOException e) { 225 throw JMSExceptionSupport.createMessageFormatException(e); 226 } 227 } 228 229 /** 230 * Reads a signed 8-bit value from the bytes message stream. 231 * 232 * @return the next byte from the bytes message stream as a signed 8-bit 233 * <code>byte</code> 234 * @throws JMSException if the JMS provider fails to read the message due to 235 * some internal error. 236 * @throws MessageEOFException if unexpected end of bytes stream has been 237 * reached. 238 * @throws MessageNotReadableException if the message is in write-only mode. 239 */ 240 @Override 241 public byte readByte() throws JMSException { 242 initializeReading(); 243 try { 244 return this.dataIn.readByte(); 245 } catch (EOFException e) { 246 throw JMSExceptionSupport.createMessageEOFException(e); 247 } catch (IOException e) { 248 throw JMSExceptionSupport.createMessageFormatException(e); 249 } 250 } 251 252 /** 253 * Reads an unsigned 8-bit number from the bytes message stream. 254 * 255 * @return the next byte from the bytes message stream, interpreted as an 256 * unsigned 8-bit number 257 * @throws JMSException if the JMS provider fails to read the message due to 258 * some internal error. 259 * @throws MessageEOFException if unexpected end of bytes stream has been 260 * reached. 261 * @throws MessageNotReadableException if the message is in write-only mode. 262 */ 263 @Override 264 public int readUnsignedByte() throws JMSException { 265 initializeReading(); 266 try { 267 return this.dataIn.readUnsignedByte(); 268 } catch (EOFException e) { 269 throw JMSExceptionSupport.createMessageEOFException(e); 270 } catch (IOException e) { 271 throw JMSExceptionSupport.createMessageFormatException(e); 272 } 273 } 274 275 /** 276 * Reads a signed 16-bit number from the bytes message stream. 277 * 278 * @return the next two bytes from the bytes message stream, interpreted as 279 * a signed 16-bit number 280 * @throws JMSException if the JMS provider fails to read the message due to 281 * some internal error. 282 * @throws MessageEOFException if unexpected end of bytes stream has been 283 * reached. 284 * @throws MessageNotReadableException if the message is in write-only mode. 285 */ 286 @Override 287 public short readShort() throws JMSException { 288 initializeReading(); 289 try { 290 return this.dataIn.readShort(); 291 } catch (EOFException e) { 292 throw JMSExceptionSupport.createMessageEOFException(e); 293 } catch (IOException e) { 294 throw JMSExceptionSupport.createMessageFormatException(e); 295 } 296 } 297 298 /** 299 * Reads an unsigned 16-bit number from the bytes message stream. 300 * 301 * @return the next two bytes from the bytes message stream, interpreted as 302 * an unsigned 16-bit integer 303 * @throws JMSException if the JMS provider fails to read the message due to 304 * some internal error. 305 * @throws MessageEOFException if unexpected end of bytes stream has been 306 * reached. 307 * @throws MessageNotReadableException if the message is in write-only mode. 308 */ 309 @Override 310 public int readUnsignedShort() throws JMSException { 311 initializeReading(); 312 try { 313 return this.dataIn.readUnsignedShort(); 314 } catch (EOFException e) { 315 throw JMSExceptionSupport.createMessageEOFException(e); 316 } catch (IOException e) { 317 throw JMSExceptionSupport.createMessageFormatException(e); 318 } 319 } 320 321 /** 322 * Reads a Unicode character value from the bytes message stream. 323 * 324 * @return the next two bytes from the bytes message stream as a Unicode 325 * character 326 * @throws JMSException if the JMS provider fails to read the message due to 327 * some internal error. 328 * @throws MessageEOFException if unexpected end of bytes stream has been 329 * reached. 330 * @throws MessageNotReadableException if the message is in write-only mode. 331 */ 332 @Override 333 public char readChar() throws JMSException { 334 initializeReading(); 335 try { 336 return this.dataIn.readChar(); 337 } catch (EOFException e) { 338 throw JMSExceptionSupport.createMessageEOFException(e); 339 } catch (IOException e) { 340 throw JMSExceptionSupport.createMessageFormatException(e); 341 } 342 } 343 344 /** 345 * Reads a signed 32-bit integer from the bytes message stream. 346 * 347 * @return the next four bytes from the bytes message stream, interpreted as 348 * an <code>int</code> 349 * @throws JMSException if the JMS provider fails to read the message due to 350 * some internal error. 351 * @throws MessageEOFException if unexpected end of bytes stream has been 352 * reached. 353 * @throws MessageNotReadableException if the message is in write-only mode. 354 */ 355 @Override 356 public int readInt() throws JMSException { 357 initializeReading(); 358 try { 359 return this.dataIn.readInt(); 360 } catch (EOFException e) { 361 throw JMSExceptionSupport.createMessageEOFException(e); 362 } catch (IOException e) { 363 throw JMSExceptionSupport.createMessageFormatException(e); 364 } 365 } 366 367 /** 368 * Reads a signed 64-bit integer from the bytes message stream. 369 * 370 * @return the next eight bytes from the bytes message stream, interpreted 371 * as a <code>long</code> 372 * @throws JMSException if the JMS provider fails to read the message due to 373 * some internal error. 374 * @throws MessageEOFException if unexpected end of bytes stream has been 375 * reached. 376 * @throws MessageNotReadableException if the message is in write-only mode. 377 */ 378 @Override 379 public long readLong() throws JMSException { 380 initializeReading(); 381 try { 382 return this.dataIn.readLong(); 383 } catch (EOFException e) { 384 throw JMSExceptionSupport.createMessageEOFException(e); 385 } catch (IOException e) { 386 throw JMSExceptionSupport.createMessageFormatException(e); 387 } 388 } 389 390 /** 391 * Reads a <code>float</code> from the bytes message stream. 392 * 393 * @return the next four bytes from the bytes message stream, interpreted as 394 * a <code>float</code> 395 * @throws JMSException if the JMS provider fails to read the message due to 396 * some internal error. 397 * @throws MessageEOFException if unexpected end of bytes stream has been 398 * reached. 399 * @throws MessageNotReadableException if the message is in write-only mode. 400 */ 401 @Override 402 public float readFloat() throws JMSException { 403 initializeReading(); 404 try { 405 return this.dataIn.readFloat(); 406 } catch (EOFException e) { 407 throw JMSExceptionSupport.createMessageEOFException(e); 408 } catch (IOException e) { 409 throw JMSExceptionSupport.createMessageFormatException(e); 410 } 411 } 412 413 /** 414 * Reads a <code>double</code> from the bytes message stream. 415 * 416 * @return the next eight bytes from the bytes message stream, interpreted 417 * as a <code>double</code> 418 * @throws JMSException if the JMS provider fails to read the message due to 419 * some internal error. 420 * @throws MessageEOFException if unexpected end of bytes stream has been 421 * reached. 422 * @throws MessageNotReadableException if the message is in write-only mode. 423 */ 424 @Override 425 public double readDouble() throws JMSException { 426 initializeReading(); 427 try { 428 return this.dataIn.readDouble(); 429 } catch (EOFException e) { 430 throw JMSExceptionSupport.createMessageEOFException(e); 431 } catch (IOException e) { 432 throw JMSExceptionSupport.createMessageFormatException(e); 433 } 434 } 435 436 /** 437 * Reads a string that has been encoded using a modified UTF-8 format from 438 * the bytes message stream. 439 * <P> 440 * For more information on the UTF-8 format, see "File System Safe UCS 441 * Transformation Format (FSS_UTF)", X/Open Preliminary Specification, 442 * X/Open Company Ltd., Document Number: P316. This information also appears 443 * in ISO/IEC 10646, Annex P. 444 * 445 * @return a Unicode string from the bytes message stream 446 * @throws JMSException if the JMS provider fails to read the message due to 447 * some internal error. 448 * @throws MessageEOFException if unexpected end of bytes stream has been 449 * reached. 450 * @throws MessageNotReadableException if the message is in write-only mode. 451 */ 452 @Override 453 public String readUTF() throws JMSException { 454 initializeReading(); 455 try { 456 return this.dataIn.readUTF(); 457 } catch (EOFException e) { 458 throw JMSExceptionSupport.createMessageEOFException(e); 459 } catch (IOException e) { 460 throw JMSExceptionSupport.createMessageFormatException(e); 461 } 462 } 463 464 /** 465 * Reads a byte array from the bytes message stream. 466 * <P> 467 * If the length of array <code>value</code> is less than the number of 468 * bytes remaining to be read from the stream, the array should be filled. A 469 * subsequent call reads the next increment, and so on. 470 * <P> 471 * If the number of bytes remaining in the stream is less than the length of 472 * array <code>value</code>, the bytes should be read into the array. The 473 * return value of the total number of bytes read will be less than the 474 * length of the array, indicating that there are no more bytes left to be 475 * read from the stream. The next read of the stream returns -1. 476 * 477 * @param value the buffer into which the data is read 478 * @return the total number of bytes read into the buffer, or -1 if there is 479 * no more data because the end of the stream has been reached 480 * @throws JMSException if the JMS provider fails to read the message due to 481 * some internal error. 482 * @throws MessageNotReadableException if the message is in write-only mode. 483 */ 484 @Override 485 public int readBytes(byte[] value) throws JMSException { 486 return readBytes(value, value.length); 487 } 488 489 /** 490 * Reads a portion of the bytes message stream. 491 * <P> 492 * If the length of array <code>value</code> is less than the number of 493 * bytes remaining to be read from the stream, the array should be filled. A 494 * subsequent call reads the next increment, and so on. 495 * <P> 496 * If the number of bytes remaining in the stream is less than the length of 497 * array <code>value</code>, the bytes should be read into the array. The 498 * return value of the total number of bytes read will be less than the 499 * length of the array, indicating that there are no more bytes left to be 500 * read from the stream. The next read of the stream returns -1. <p/> If 501 * <code>length</code> is negative, or <code>length</code> is greater 502 * than the length of the array <code>value</code>, then an 503 * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read 504 * from the stream for this exception case. 505 * 506 * @param value the buffer into which the data is read 507 * @param length the number of bytes to read; must be less than or equal to 508 * <code>value.length</code> 509 * @return the total number of bytes read into the buffer, or -1 if there is 510 * no more data because the end of the stream has been reached 511 * @throws JMSException if the JMS provider fails to read the message due to 512 * some internal error. 513 * @throws MessageNotReadableException if the message is in write-only mode. 514 */ 515 @Override 516 public int readBytes(byte[] value, int length) throws JMSException { 517 initializeReading(); 518 try { 519 int n = 0; 520 while (n < length) { 521 int count = this.dataIn.read(value, n, length - n); 522 if (count < 0) { 523 break; 524 } 525 n += count; 526 } 527 if (n == 0 && length > 0) { 528 n = -1; 529 } 530 return n; 531 } catch (EOFException e) { 532 throw JMSExceptionSupport.createMessageEOFException(e); 533 } catch (IOException e) { 534 throw JMSExceptionSupport.createMessageFormatException(e); 535 } 536 } 537 538 /** 539 * Writes a <code>boolean</code> to the bytes message stream as a 1-byte 540 * value. The value <code>true</code> is written as the value 541 * <code>(byte)1</code>; the value <code>false</code> is written as the 542 * value <code>(byte)0</code>. 543 * 544 * @param value the <code>boolean</code> value to be written 545 * @throws JMSException if the JMS provider fails to write the message due 546 * to some internal error. 547 * @throws MessageNotWriteableException if the message is in read-only mode. 548 */ 549 @Override 550 public void writeBoolean(boolean value) throws JMSException { 551 initializeWriting(); 552 try { 553 this.dataOut.writeBoolean(value); 554 } catch (IOException ioe) { 555 throw JMSExceptionSupport.create(ioe); 556 } 557 } 558 559 /** 560 * Writes a <code>byte</code> to the bytes message stream as a 1-byte 561 * value. 562 * 563 * @param value the <code>byte</code> value to be written 564 * @throws JMSException if the JMS provider fails to write the message due 565 * to some internal error. 566 * @throws MessageNotWriteableException if the message is in read-only mode. 567 */ 568 @Override 569 public void writeByte(byte value) throws JMSException { 570 initializeWriting(); 571 try { 572 this.dataOut.writeByte(value); 573 } catch (IOException ioe) { 574 throw JMSExceptionSupport.create(ioe); 575 } 576 } 577 578 /** 579 * Writes a <code>short</code> to the bytes message stream as two bytes, 580 * high byte first. 581 * 582 * @param value the <code>short</code> to be written 583 * @throws JMSException if the JMS provider fails to write the message due 584 * to some internal error. 585 * @throws MessageNotWriteableException if the message is in read-only mode. 586 */ 587 @Override 588 public void writeShort(short value) throws JMSException { 589 initializeWriting(); 590 try { 591 this.dataOut.writeShort(value); 592 } catch (IOException ioe) { 593 throw JMSExceptionSupport.create(ioe); 594 } 595 } 596 597 /** 598 * Writes a <code>char</code> to the bytes message stream as a 2-byte 599 * value, high byte first. 600 * 601 * @param value the <code>char</code> value to be written 602 * @throws JMSException if the JMS provider fails to write the message due 603 * to some internal error. 604 * @throws MessageNotWriteableException if the message is in read-only mode. 605 */ 606 @Override 607 public void writeChar(char value) throws JMSException { 608 initializeWriting(); 609 try { 610 this.dataOut.writeChar(value); 611 } catch (IOException ioe) { 612 throw JMSExceptionSupport.create(ioe); 613 } 614 } 615 616 /** 617 * Writes an <code>int</code> to the bytes message stream as four bytes, 618 * high byte first. 619 * 620 * @param value the <code>int</code> to be written 621 * @throws JMSException if the JMS provider fails to write the message due 622 * to some internal error. 623 * @throws MessageNotWriteableException if the message is in read-only mode. 624 */ 625 @Override 626 public void writeInt(int value) throws JMSException { 627 initializeWriting(); 628 try { 629 this.dataOut.writeInt(value); 630 } catch (IOException ioe) { 631 throw JMSExceptionSupport.create(ioe); 632 } 633 } 634 635 /** 636 * Writes a <code>long</code> to the bytes message stream as eight bytes, 637 * high byte first. 638 * 639 * @param value the <code>long</code> to be written 640 * @throws JMSException if the JMS provider fails to write the message due 641 * to some internal error. 642 * @throws MessageNotWriteableException if the message is in read-only mode. 643 */ 644 @Override 645 public void writeLong(long value) throws JMSException { 646 initializeWriting(); 647 try { 648 this.dataOut.writeLong(value); 649 } catch (IOException ioe) { 650 throw JMSExceptionSupport.create(ioe); 651 } 652 } 653 654 /** 655 * Converts the <code>float</code> argument to an <code>int</code> using 656 * the <code>floatToIntBits</code> method in class <code>Float</code>, 657 * and then writes that <code>int</code> value to the bytes message stream 658 * as a 4-byte quantity, high byte first. 659 * 660 * @param value the <code>float</code> value to be written 661 * @throws JMSException if the JMS provider fails to write the message due 662 * to some internal error. 663 * @throws MessageNotWriteableException if the message is in read-only mode. 664 */ 665 @Override 666 public void writeFloat(float value) throws JMSException { 667 initializeWriting(); 668 try { 669 this.dataOut.writeFloat(value); 670 } catch (IOException ioe) { 671 throw JMSExceptionSupport.create(ioe); 672 } 673 } 674 675 /** 676 * Converts the <code>double</code> argument to a <code>long</code> 677 * using the <code>doubleToLongBits</code> method in class 678 * <code>Double</code>, and then writes that <code>long</code> value to 679 * the bytes message stream as an 8-byte quantity, high byte first. 680 * 681 * @param value the <code>double</code> value to be written 682 * @throws JMSException if the JMS provider fails to write the message due 683 * to some internal error. 684 * @throws MessageNotWriteableException if the message is in read-only mode. 685 */ 686 @Override 687 public void writeDouble(double value) throws JMSException { 688 initializeWriting(); 689 try { 690 this.dataOut.writeDouble(value); 691 } catch (IOException ioe) { 692 throw JMSExceptionSupport.create(ioe); 693 } 694 } 695 696 /** 697 * Writes a string to the bytes message stream using UTF-8 encoding in a 698 * machine-independent manner. 699 * <P> 700 * For more information on the UTF-8 format, see "File System Safe UCS 701 * Transformation Format (FSS_UTF)", X/Open Preliminary Specification, 702 * X/Open Company Ltd., Document Number: P316. This information also appears 703 * in ISO/IEC 10646, Annex P. 704 * 705 * @param value the <code>String</code> value to be written 706 * @throws JMSException if the JMS provider fails to write the message due 707 * to some internal error. 708 * @throws MessageNotWriteableException if the message is in read-only mode. 709 */ 710 @Override 711 public void writeUTF(String value) throws JMSException { 712 initializeWriting(); 713 try { 714 this.dataOut.writeUTF(value); 715 } catch (IOException ioe) { 716 throw JMSExceptionSupport.create(ioe); 717 } 718 } 719 720 /** 721 * Writes a byte array to the bytes message stream. 722 * 723 * @param value the byte array to be written 724 * @throws JMSException if the JMS provider fails to write the message due 725 * to some internal error. 726 * @throws MessageNotWriteableException if the message is in read-only mode. 727 */ 728 @Override 729 public void writeBytes(byte[] value) throws JMSException { 730 initializeWriting(); 731 try { 732 this.dataOut.write(value); 733 } catch (IOException ioe) { 734 throw JMSExceptionSupport.create(ioe); 735 } 736 } 737 738 /** 739 * Writes a portion of a byte array to the bytes message stream. 740 * 741 * @param value the byte array value to be written 742 * @param offset the initial offset within the byte array 743 * @param length the number of bytes to use 744 * @throws JMSException if the JMS provider fails to write the message due 745 * to some internal error. 746 * @throws MessageNotWriteableException if the message is in read-only mode. 747 */ 748 @Override 749 public void writeBytes(byte[] value, int offset, int length) throws JMSException { 750 initializeWriting(); 751 try { 752 this.dataOut.write(value, offset, length); 753 } catch (IOException ioe) { 754 throw JMSExceptionSupport.create(ioe); 755 } 756 } 757 758 /** 759 * Writes an object to the bytes message stream. 760 * <P> 761 * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>, 762 * <code>Long</code> ...), <code>String</code> objects, and byte 763 * arrays. 764 * 765 * @param value the object in the Java programming language ("Java object") 766 * to be written; it must not be null 767 * @throws JMSException if the JMS provider fails to write the message due 768 * to some internal error. 769 * @throws MessageFormatException if the object is of an invalid type. 770 * @throws MessageNotWriteableException if the message is in read-only mode. 771 * @throws java.lang.NullPointerException if the parameter 772 * <code>value</code> is null. 773 */ 774 @Override 775 public void writeObject(Object value) throws JMSException { 776 if (value == null) { 777 throw new NullPointerException(); 778 } 779 initializeWriting(); 780 if (value instanceof Boolean) { 781 writeBoolean(((Boolean)value).booleanValue()); 782 } else if (value instanceof Character) { 783 writeChar(((Character)value).charValue()); 784 } else if (value instanceof Byte) { 785 writeByte(((Byte)value).byteValue()); 786 } else if (value instanceof Short) { 787 writeShort(((Short)value).shortValue()); 788 } else if (value instanceof Integer) { 789 writeInt(((Integer)value).intValue()); 790 } else if (value instanceof Long) { 791 writeLong(((Long)value).longValue()); 792 } else if (value instanceof Float) { 793 writeFloat(((Float)value).floatValue()); 794 } else if (value instanceof Double) { 795 writeDouble(((Double)value).doubleValue()); 796 } else if (value instanceof String) { 797 writeUTF(value.toString()); 798 } else if (value instanceof byte[]) { 799 writeBytes((byte[])value); 800 } else { 801 throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass()); 802 } 803 } 804 805 /** 806 * Puts the message body in read-only mode and repositions the stream of 807 * bytes to the beginning. 808 * 809 * @throws JMSException if an internal error occurs 810 */ 811 @Override 812 public void reset() throws JMSException { 813 storeContent(); 814 setReadOnlyBody(true); 815 try { 816 if (bytesOut != null) { 817 bytesOut.close(); 818 bytesOut = null; 819 } 820 if (dataIn != null) { 821 dataIn.close(); 822 dataIn = null; 823 } 824 if (dataOut != null) { 825 dataOut.close(); 826 dataOut = null; 827 } 828 } catch (IOException ioe) { 829 throw JMSExceptionSupport.create(ioe); 830 } 831 } 832 833 private void initializeWriting() throws JMSException { 834 checkReadOnlyBody(); 835 if (this.dataOut == null) { 836 this.bytesOut = new ByteArrayOutputStream(); 837 OutputStream os = bytesOut; 838 this.dataOut = new DataOutputStream(os); 839 } 840 841 restoreOldContent(); 842 } 843 844 private void restoreOldContent() throws JMSException { 845 // For a message that already had a body and was sent we need to restore the content 846 // if the message is used again without having its clearBody method called. 847 if (this.content != null && this.content.length > 0) { 848 try { 849 ByteSequence toRestore = this.content; 850 if (isCompressed()) { 851 toRestore = new ByteSequence(decompress(this.content)); 852 compressed = false; 853 } 854 855 this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength()); 856 // Free up the buffer from the old content, will be re-written when 857 // the message is sent again and storeContent() is called. 858 this.content = null; 859 } catch (IOException ioe) { 860 throw JMSExceptionSupport.create(ioe); 861 } 862 } 863 } 864 865 protected void checkWriteOnlyBody() throws MessageNotReadableException { 866 if (!readOnlyBody) { 867 throw new MessageNotReadableException("Message body is write-only"); 868 } 869 } 870 871 private void initializeReading() throws JMSException { 872 checkWriteOnlyBody(); 873 if (dataIn == null) { 874 try { 875 ByteSequence data = getContent(); 876 if (data == null) { 877 data = new ByteSequence(new byte[] {}, 0, 0); 878 } 879 InputStream is = new ByteArrayInputStream(data); 880 if (isCompressed()) { 881 if (data.length != 0) { 882 is = new ByteArrayInputStream(decompress(data)); 883 } 884 } else { 885 length = data.getLength(); 886 } 887 888 dataIn = new DataInputStream(is); 889 } catch (IOException ioe) { 890 throw JMSExceptionSupport.create(ioe); 891 } 892 } 893 } 894 895 protected byte[] decompress(ByteSequence dataSequence) throws IOException { 896 Inflater inflater = new Inflater(); 897 ByteArrayOutputStream decompressed = new ByteArrayOutputStream(); 898 try { 899 length = ByteSequenceData.readIntBig(dataSequence); 900 dataSequence.offset = 0; 901 byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength()); 902 inflater.setInput(data); 903 byte[] buffer = new byte[length]; 904 int count = inflater.inflate(buffer); 905 decompressed.write(buffer, 0, count); 906 return decompressed.toByteArray(); 907 } catch (Exception e) { 908 throw new IOException(e); 909 } finally { 910 inflater.end(); 911 decompressed.close(); 912 } 913 } 914 915 @Override 916 public void setObjectProperty(String name, Object value) throws JMSException { 917 initializeWriting(); 918 super.setObjectProperty(name, value); 919 } 920 921 @Override 922 public String toString() { 923 return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; 924 } 925 926 @Override 927 protected void doCompress() throws IOException { 928 compressed = true; 929 ByteSequence bytes = getContent(); 930 if (bytes != null) { 931 int length = bytes.getLength(); 932 ByteArrayOutputStream compressed = new ByteArrayOutputStream(); 933 compressed.write(new byte[4]); 934 Deflater deflater = new Deflater(); 935 try { 936 deflater.setInput(bytes.data); 937 deflater.finish(); 938 byte[] buffer = new byte[1024]; 939 while (!deflater.finished()) { 940 int count = deflater.deflate(buffer); 941 compressed.write(buffer, 0, count); 942 } 943 944 bytes = compressed.toByteSequence(); 945 ByteSequenceData.writeIntBig(bytes, length); 946 bytes.offset = 0; 947 setContent(bytes); 948 } finally { 949 deflater.end(); 950 compressed.close(); 951 } 952 } 953 } 954}