001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.beans.Transient; 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.zip.DeflaterOutputStream; 029 030import javax.jms.JMSException; 031 032import org.apache.activemq.ActiveMQConnection; 033import org.apache.activemq.advisory.AdvisorySupport; 034import org.apache.activemq.broker.region.MessageReference; 035import org.apache.activemq.usage.MemoryUsage; 036import org.apache.activemq.util.ByteArrayInputStream; 037import org.apache.activemq.util.ByteArrayOutputStream; 038import org.apache.activemq.util.ByteSequence; 039import org.apache.activemq.util.MarshallingSupport; 040import org.apache.activemq.wireformat.WireFormat; 041import org.fusesource.hawtbuf.UTF8Buffer; 042 043/** 044 * Represents an ActiveMQ message 045 * 046 * @openwire:marshaller 047 * 048 */ 049public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { 050 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 051 052 /** 053 * The default minimum amount of memory a message is assumed to use 054 */ 055 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; 056 057 protected MessageId messageId; 058 protected ActiveMQDestination originalDestination; 059 protected TransactionId originalTransactionId; 060 061 protected ProducerId producerId; 062 protected ActiveMQDestination destination; 063 protected TransactionId transactionId; 064 065 protected long expiration; 066 protected long timestamp; 067 protected long arrival; 068 protected long brokerInTime; 069 protected long brokerOutTime; 070 protected String correlationId; 071 protected ActiveMQDestination replyTo; 072 protected boolean persistent; 073 protected String type; 074 protected byte priority; 075 protected String groupID; 076 protected int groupSequence; 077 protected ConsumerId targetConsumerId; 078 protected boolean compressed; 079 protected String userID; 080 081 protected ByteSequence content; 082 protected ByteSequence marshalledProperties; 083 protected DataStructure dataStructure; 084 protected int redeliveryCounter; 085 086 protected int size; 087 protected Map<String, Object> properties; 088 protected boolean readOnlyProperties; 089 protected boolean readOnlyBody; 090 protected transient boolean recievedByDFBridge; 091 protected boolean droppable; 092 protected boolean jmsXGroupFirstForConsumer; 093 094 private transient short referenceCount; 095 private transient ActiveMQConnection connection; 096 transient MessageDestination regionDestination; 097 transient MemoryUsage memoryUsage; 098 transient AtomicBoolean processAsExpired = new AtomicBoolean(false); 099 100 private BrokerId[] brokerPath; 101 private BrokerId[] cluster; 102 103 public static interface MessageDestination { 104 int getMinimumMessageSize(); 105 MemoryUsage getMemoryUsage(); 106 } 107 108 public abstract Message copy(); 109 public abstract void clearBody() throws JMSException; 110 public abstract void storeContent(); 111 public abstract void storeContentAndClear(); 112 113 /** 114 * @deprecated - This method name is misnamed 115 * @throws JMSException 116 */ 117 public void clearMarshalledState() throws JMSException { 118 clearUnMarshalledState(); 119 } 120 121 // useful to reduce the memory footprint of a persisted message 122 public void clearUnMarshalledState() throws JMSException { 123 properties = null; 124 } 125 126 public boolean isMarshalled() { 127 return isContentMarshalled() && isPropertiesMarshalled(); 128 } 129 130 protected boolean isPropertiesMarshalled() { 131 return marshalledProperties != null || properties == null; 132 } 133 134 protected boolean isContentMarshalled() { 135 return content != null; 136 } 137 138 protected void copy(Message copy) { 139 super.copy(copy); 140 copy.producerId = producerId; 141 copy.transactionId = transactionId; 142 copy.destination = destination; 143 copy.messageId = messageId != null ? messageId.copy() : null; 144 copy.originalDestination = originalDestination; 145 copy.originalTransactionId = originalTransactionId; 146 copy.expiration = expiration; 147 copy.timestamp = timestamp; 148 copy.correlationId = correlationId; 149 copy.replyTo = replyTo; 150 copy.persistent = persistent; 151 copy.redeliveryCounter = redeliveryCounter; 152 copy.type = type; 153 copy.priority = priority; 154 copy.size = size; 155 copy.groupID = groupID; 156 copy.userID = userID; 157 copy.groupSequence = groupSequence; 158 159 if (properties != null) { 160 copy.properties = new HashMap<String, Object>(properties); 161 162 // The new message hasn't expired, so remove this feild. 163 copy.properties.remove(ORIGINAL_EXPIRATION); 164 } else { 165 copy.properties = properties; 166 } 167 168 copy.content = copyByteSequence(content); 169 copy.marshalledProperties = copyByteSequence(marshalledProperties); 170 copy.dataStructure = dataStructure; 171 copy.readOnlyProperties = readOnlyProperties; 172 copy.readOnlyBody = readOnlyBody; 173 copy.compressed = compressed; 174 copy.recievedByDFBridge = recievedByDFBridge; 175 176 copy.arrival = arrival; 177 copy.connection = connection; 178 copy.regionDestination = regionDestination; 179 copy.brokerInTime = brokerInTime; 180 copy.brokerOutTime = brokerOutTime; 181 copy.memoryUsage=this.memoryUsage; 182 copy.brokerPath = brokerPath; 183 copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer; 184 185 // lets not copy the following fields 186 // copy.targetConsumerId = targetConsumerId; 187 // copy.referenceCount = referenceCount; 188 } 189 190 private ByteSequence copyByteSequence(ByteSequence content) { 191 if (content != null) { 192 return new ByteSequence(content.getData(), content.getOffset(), content.getLength()); 193 } 194 return null; 195 } 196 197 public Object getProperty(String name) throws IOException { 198 if (properties == null) { 199 if (marshalledProperties == null) { 200 return null; 201 } 202 properties = unmarsallProperties(marshalledProperties); 203 } 204 Object result = properties.get(name); 205 if (result instanceof UTF8Buffer) { 206 result = result.toString(); 207 } 208 209 return result; 210 } 211 212 @SuppressWarnings("unchecked") 213 public Map<String, Object> getProperties() throws IOException { 214 if (properties == null) { 215 if (marshalledProperties == null) { 216 return Collections.EMPTY_MAP; 217 } 218 properties = unmarsallProperties(marshalledProperties); 219 } 220 return Collections.unmodifiableMap(properties); 221 } 222 223 public void clearProperties() { 224 marshalledProperties = null; 225 properties = null; 226 } 227 228 public void setProperty(String name, Object value) throws IOException { 229 lazyCreateProperties(); 230 properties.put(name, value); 231 } 232 233 public void removeProperty(String name) throws IOException { 234 lazyCreateProperties(); 235 properties.remove(name); 236 } 237 238 protected void lazyCreateProperties() throws IOException { 239 if (properties == null) { 240 if (marshalledProperties == null) { 241 properties = new HashMap<String, Object>(); 242 } else { 243 properties = unmarsallProperties(marshalledProperties); 244 marshalledProperties = null; 245 } 246 } else { 247 marshalledProperties = null; 248 } 249 } 250 251 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 252 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties))); 253 } 254 255 @Override 256 public void beforeMarshall(WireFormat wireFormat) throws IOException { 257 // Need to marshal the properties. 258 if (marshalledProperties == null && properties != null) { 259 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 260 DataOutputStream os = new DataOutputStream(baos); 261 MarshallingSupport.marshalPrimitiveMap(properties, os); 262 os.close(); 263 marshalledProperties = baos.toByteSequence(); 264 } 265 } 266 267 @Override 268 public void afterMarshall(WireFormat wireFormat) throws IOException { 269 } 270 271 @Override 272 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 273 } 274 275 @Override 276 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 277 } 278 279 // ///////////////////////////////////////////////////////////////// 280 // 281 // Simple Field accessors 282 // 283 // ///////////////////////////////////////////////////////////////// 284 285 /** 286 * @openwire:property version=1 cache=true 287 */ 288 public ProducerId getProducerId() { 289 return producerId; 290 } 291 292 public void setProducerId(ProducerId producerId) { 293 this.producerId = producerId; 294 } 295 296 /** 297 * @openwire:property version=1 cache=true 298 */ 299 public ActiveMQDestination getDestination() { 300 return destination; 301 } 302 303 public void setDestination(ActiveMQDestination destination) { 304 this.destination = destination; 305 } 306 307 /** 308 * @openwire:property version=1 cache=true 309 */ 310 public TransactionId getTransactionId() { 311 return transactionId; 312 } 313 314 public void setTransactionId(TransactionId transactionId) { 315 this.transactionId = transactionId; 316 } 317 318 public boolean isInTransaction() { 319 return transactionId != null; 320 } 321 322 /** 323 * @openwire:property version=1 cache=true 324 */ 325 public ActiveMQDestination getOriginalDestination() { 326 return originalDestination; 327 } 328 329 public void setOriginalDestination(ActiveMQDestination destination) { 330 this.originalDestination = destination; 331 } 332 333 /** 334 * @openwire:property version=1 335 */ 336 @Override 337 public MessageId getMessageId() { 338 return messageId; 339 } 340 341 public void setMessageId(MessageId messageId) { 342 this.messageId = messageId; 343 } 344 345 /** 346 * @openwire:property version=1 cache=true 347 */ 348 public TransactionId getOriginalTransactionId() { 349 return originalTransactionId; 350 } 351 352 public void setOriginalTransactionId(TransactionId transactionId) { 353 this.originalTransactionId = transactionId; 354 } 355 356 /** 357 * @openwire:property version=1 358 */ 359 @Override 360 public String getGroupID() { 361 return groupID; 362 } 363 364 public void setGroupID(String groupID) { 365 this.groupID = groupID; 366 } 367 368 /** 369 * @openwire:property version=1 370 */ 371 @Override 372 public int getGroupSequence() { 373 return groupSequence; 374 } 375 376 public void setGroupSequence(int groupSequence) { 377 this.groupSequence = groupSequence; 378 } 379 380 /** 381 * @openwire:property version=1 382 */ 383 public String getCorrelationId() { 384 return correlationId; 385 } 386 387 public void setCorrelationId(String correlationId) { 388 this.correlationId = correlationId; 389 } 390 391 /** 392 * @openwire:property version=1 393 */ 394 @Override 395 public boolean isPersistent() { 396 return persistent; 397 } 398 399 public void setPersistent(boolean deliveryMode) { 400 this.persistent = deliveryMode; 401 } 402 403 /** 404 * @openwire:property version=1 405 */ 406 @Override 407 public long getExpiration() { 408 return expiration; 409 } 410 411 public void setExpiration(long expiration) { 412 this.expiration = expiration; 413 } 414 415 /** 416 * @openwire:property version=1 417 */ 418 public byte getPriority() { 419 return priority; 420 } 421 422 public void setPriority(byte priority) { 423 if (priority < 0) { 424 this.priority = 0; 425 } else if (priority > 9) { 426 this.priority = 9; 427 } else { 428 this.priority = priority; 429 } 430 } 431 432 /** 433 * @openwire:property version=1 434 */ 435 public ActiveMQDestination getReplyTo() { 436 return replyTo; 437 } 438 439 public void setReplyTo(ActiveMQDestination replyTo) { 440 this.replyTo = replyTo; 441 } 442 443 /** 444 * @openwire:property version=1 445 */ 446 public long getTimestamp() { 447 return timestamp; 448 } 449 450 public void setTimestamp(long timestamp) { 451 this.timestamp = timestamp; 452 } 453 454 /** 455 * @openwire:property version=1 456 */ 457 public String getType() { 458 return type; 459 } 460 461 public void setType(String type) { 462 this.type = type; 463 } 464 465 /** 466 * @openwire:property version=1 467 */ 468 public ByteSequence getContent() { 469 return content; 470 } 471 472 public void setContent(ByteSequence content) { 473 this.content = content; 474 } 475 476 /** 477 * @openwire:property version=1 478 */ 479 public ByteSequence getMarshalledProperties() { 480 return marshalledProperties; 481 } 482 483 public void setMarshalledProperties(ByteSequence marshalledProperties) { 484 this.marshalledProperties = marshalledProperties; 485 } 486 487 /** 488 * @openwire:property version=1 489 */ 490 public DataStructure getDataStructure() { 491 return dataStructure; 492 } 493 494 public void setDataStructure(DataStructure data) { 495 this.dataStructure = data; 496 } 497 498 /** 499 * Can be used to route the message to a specific consumer. Should be null 500 * to allow the broker use normal JMS routing semantics. If the target 501 * consumer id is an active consumer on the broker, the message is dropped. 502 * Used by the AdvisoryBroker to replay advisory messages to a specific 503 * consumer. 504 * 505 * @openwire:property version=1 cache=true 506 */ 507 @Override 508 public ConsumerId getTargetConsumerId() { 509 return targetConsumerId; 510 } 511 512 public void setTargetConsumerId(ConsumerId targetConsumerId) { 513 this.targetConsumerId = targetConsumerId; 514 } 515 516 @Override 517 public boolean isExpired() { 518 long expireTime = getExpiration(); 519 return expireTime > 0 && System.currentTimeMillis() > expireTime; 520 } 521 522 @Override 523 public boolean isAdvisory() { 524 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 525 } 526 527 /** 528 * @openwire:property version=1 529 */ 530 public boolean isCompressed() { 531 return compressed; 532 } 533 534 public void setCompressed(boolean compressed) { 535 this.compressed = compressed; 536 } 537 538 public boolean isRedelivered() { 539 return redeliveryCounter > 0; 540 } 541 542 public void setRedelivered(boolean redelivered) { 543 if (redelivered) { 544 if (!isRedelivered()) { 545 setRedeliveryCounter(1); 546 } 547 } else { 548 if (isRedelivered()) { 549 setRedeliveryCounter(0); 550 } 551 } 552 } 553 554 @Override 555 public void incrementRedeliveryCounter() { 556 redeliveryCounter++; 557 } 558 559 /** 560 * @openwire:property version=1 561 */ 562 @Override 563 public int getRedeliveryCounter() { 564 return redeliveryCounter; 565 } 566 567 public void setRedeliveryCounter(int deliveryCounter) { 568 this.redeliveryCounter = deliveryCounter; 569 } 570 571 /** 572 * The route of brokers the command has moved through. 573 * 574 * @openwire:property version=1 cache=true 575 */ 576 public BrokerId[] getBrokerPath() { 577 return brokerPath; 578 } 579 580 public void setBrokerPath(BrokerId[] brokerPath) { 581 this.brokerPath = brokerPath; 582 } 583 584 public boolean isReadOnlyProperties() { 585 return readOnlyProperties; 586 } 587 588 public void setReadOnlyProperties(boolean readOnlyProperties) { 589 this.readOnlyProperties = readOnlyProperties; 590 } 591 592 public boolean isReadOnlyBody() { 593 return readOnlyBody; 594 } 595 596 public void setReadOnlyBody(boolean readOnlyBody) { 597 this.readOnlyBody = readOnlyBody; 598 } 599 600 public ActiveMQConnection getConnection() { 601 return this.connection; 602 } 603 604 public void setConnection(ActiveMQConnection connection) { 605 this.connection = connection; 606 } 607 608 /** 609 * Used to schedule the arrival time of a message to a broker. The broker 610 * will not dispatch a message to a consumer until it's arrival time has 611 * elapsed. 612 * 613 * @openwire:property version=1 614 */ 615 public long getArrival() { 616 return arrival; 617 } 618 619 public void setArrival(long arrival) { 620 this.arrival = arrival; 621 } 622 623 /** 624 * Only set by the broker and defines the userID of the producer connection 625 * who sent this message. This is an optional field, it needs to be enabled 626 * on the broker to have this field populated. 627 * 628 * @openwire:property version=1 629 */ 630 public String getUserID() { 631 return userID; 632 } 633 634 public void setUserID(String jmsxUserID) { 635 this.userID = jmsxUserID; 636 } 637 638 @Override 639 public int getReferenceCount() { 640 return referenceCount; 641 } 642 643 @Override 644 public Message getMessageHardRef() { 645 return this; 646 } 647 648 @Override 649 public Message getMessage() { 650 return this; 651 } 652 653 public void setRegionDestination(MessageDestination destination) { 654 this.regionDestination = destination; 655 if(this.memoryUsage==null) { 656 this.memoryUsage=destination.getMemoryUsage(); 657 } 658 } 659 660 @Override 661 @Transient 662 public MessageDestination getRegionDestination() { 663 return regionDestination; 664 } 665 666 public MemoryUsage getMemoryUsage() { 667 return this.memoryUsage; 668 } 669 670 public void setMemoryUsage(MemoryUsage usage) { 671 this.memoryUsage=usage; 672 } 673 674 @Override 675 public boolean isMarshallAware() { 676 return true; 677 } 678 679 @Override 680 public int incrementReferenceCount() { 681 int rc; 682 int size; 683 synchronized (this) { 684 rc = ++referenceCount; 685 size = getSize(); 686 } 687 688 if (rc == 1 && getMemoryUsage() != null) { 689 getMemoryUsage().increaseUsage(size); 690 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 691 692 } 693 694 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 695 return rc; 696 } 697 698 @Override 699 public int decrementReferenceCount() { 700 int rc; 701 int size; 702 synchronized (this) { 703 rc = --referenceCount; 704 size = getSize(); 705 } 706 707 if (rc == 0 && getMemoryUsage() != null) { 708 getMemoryUsage().decreaseUsage(size); 709 //Thread.dumpStack(); 710 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 711 } 712 713 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 714 715 return rc; 716 } 717 718 @Override 719 public int getSize() { 720 int minimumMessageSize = getMinimumMessageSize(); 721 if (size < minimumMessageSize || size == 0) { 722 size = minimumMessageSize; 723 if (marshalledProperties != null) { 724 size += marshalledProperties.getLength(); 725 } 726 if (content != null) { 727 size += content.getLength(); 728 } 729 } 730 return size; 731 } 732 733 protected int getMinimumMessageSize() { 734 int result = DEFAULT_MINIMUM_MESSAGE_SIZE; 735 //let destination override 736 MessageDestination dest = regionDestination; 737 if (dest != null) { 738 result=dest.getMinimumMessageSize(); 739 } 740 return result; 741 } 742 743 /** 744 * @openwire:property version=1 745 * @return Returns the recievedByDFBridge. 746 */ 747 public boolean isRecievedByDFBridge() { 748 return recievedByDFBridge; 749 } 750 751 /** 752 * @param recievedByDFBridge The recievedByDFBridge to set. 753 */ 754 public void setRecievedByDFBridge(boolean recievedByDFBridge) { 755 this.recievedByDFBridge = recievedByDFBridge; 756 } 757 758 public void onMessageRolledBack() { 759 incrementRedeliveryCounter(); 760 } 761 762 /** 763 * @openwire:property version=2 cache=true 764 */ 765 public boolean isDroppable() { 766 return droppable; 767 } 768 769 public void setDroppable(boolean droppable) { 770 this.droppable = droppable; 771 } 772 773 /** 774 * If a message is stored in multiple nodes on a cluster, all the cluster 775 * members will be listed here. Otherwise, it will be null. 776 * 777 * @openwire:property version=3 cache=true 778 */ 779 public BrokerId[] getCluster() { 780 return cluster; 781 } 782 783 public void setCluster(BrokerId[] cluster) { 784 this.cluster = cluster; 785 } 786 787 @Override 788 public boolean isMessage() { 789 return true; 790 } 791 792 /** 793 * @openwire:property version=3 794 */ 795 public long getBrokerInTime() { 796 return this.brokerInTime; 797 } 798 799 public void setBrokerInTime(long brokerInTime) { 800 this.brokerInTime = brokerInTime; 801 } 802 803 /** 804 * @openwire:property version=3 805 */ 806 public long getBrokerOutTime() { 807 return this.brokerOutTime; 808 } 809 810 public void setBrokerOutTime(long brokerOutTime) { 811 this.brokerOutTime = brokerOutTime; 812 } 813 814 @Override 815 public boolean isDropped() { 816 return false; 817 } 818 819 /** 820 * @openwire:property version=10 821 */ 822 public boolean isJMSXGroupFirstForConsumer() { 823 return jmsXGroupFirstForConsumer; 824 } 825 826 public void setJMSXGroupFirstForConsumer(boolean val) { 827 jmsXGroupFirstForConsumer = val; 828 } 829 830 public void compress() throws IOException { 831 if (!isCompressed()) { 832 storeContent(); 833 if (!isCompressed() && getContent() != null) { 834 doCompress(); 835 } 836 } 837 } 838 839 protected void doCompress() throws IOException { 840 compressed = true; 841 ByteSequence bytes = getContent(); 842 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 843 OutputStream os = new DeflaterOutputStream(bytesOut); 844 os.write(bytes.data, bytes.offset, bytes.length); 845 os.close(); 846 setContent(bytesOut.toByteSequence()); 847 } 848 849 @Override 850 public String toString() { 851 return toString(null); 852 } 853 854 @Override 855 public String toString(Map<String, Object>overrideFields) { 856 try { 857 getProperties(); 858 } catch (IOException e) { 859 } 860 return super.toString(overrideFields); 861 } 862 863 @Override 864 public boolean canProcessAsExpired() { 865 return processAsExpired.compareAndSet(false, true); 866 } 867}