001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.beans.Transient;
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.zip.DeflaterOutputStream;
029
030import javax.jms.JMSException;
031
032import org.apache.activemq.ActiveMQConnection;
033import org.apache.activemq.advisory.AdvisorySupport;
034import org.apache.activemq.broker.region.MessageReference;
035import org.apache.activemq.usage.MemoryUsage;
036import org.apache.activemq.util.ByteArrayInputStream;
037import org.apache.activemq.util.ByteArrayOutputStream;
038import org.apache.activemq.util.ByteSequence;
039import org.apache.activemq.util.MarshallingSupport;
040import org.apache.activemq.wireformat.WireFormat;
041import org.fusesource.hawtbuf.UTF8Buffer;
042
043/**
044 * Represents an ActiveMQ message
045 *
046 * @openwire:marshaller
047 *
048 */
049public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
050    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
051
052    /**
053     * The default minimum amount of memory a message is assumed to use
054     */
055    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
056
057    protected MessageId messageId;
058    protected ActiveMQDestination originalDestination;
059    protected TransactionId originalTransactionId;
060
061    protected ProducerId producerId;
062    protected ActiveMQDestination destination;
063    protected TransactionId transactionId;
064
065    protected long expiration;
066    protected long timestamp;
067    protected long arrival;
068    protected long brokerInTime;
069    protected long brokerOutTime;
070    protected String correlationId;
071    protected ActiveMQDestination replyTo;
072    protected boolean persistent;
073    protected String type;
074    protected byte priority;
075    protected String groupID;
076    protected int groupSequence;
077    protected ConsumerId targetConsumerId;
078    protected boolean compressed;
079    protected String userID;
080
081    protected ByteSequence content;
082    protected ByteSequence marshalledProperties;
083    protected DataStructure dataStructure;
084    protected int redeliveryCounter;
085
086    protected int size;
087    protected Map<String, Object> properties;
088    protected boolean readOnlyProperties;
089    protected boolean readOnlyBody;
090    protected transient boolean recievedByDFBridge;
091    protected boolean droppable;
092    protected boolean jmsXGroupFirstForConsumer;
093
094    private transient short referenceCount;
095    private transient ActiveMQConnection connection;
096    transient MessageDestination regionDestination;
097    transient MemoryUsage memoryUsage;
098    transient AtomicBoolean processAsExpired = new AtomicBoolean(false);
099
100    private BrokerId[] brokerPath;
101    private BrokerId[] cluster;
102
103    public static interface MessageDestination {
104        int getMinimumMessageSize();
105        MemoryUsage getMemoryUsage();
106    }
107
108    public abstract Message copy();
109    public abstract void clearBody() throws JMSException;
110    public abstract void storeContent();
111    public abstract void storeContentAndClear();
112
113    /**
114     * @deprecated - This method name is misnamed
115     * @throws JMSException
116     */
117    public void clearMarshalledState() throws JMSException {
118        clearUnMarshalledState();
119    }
120
121    // useful to reduce the memory footprint of a persisted message
122    public void clearUnMarshalledState() throws JMSException {
123        properties = null;
124    }
125
126    public boolean isMarshalled() {
127        return isContentMarshalled() && isPropertiesMarshalled();
128    }
129
130    protected boolean isPropertiesMarshalled() {
131        return marshalledProperties != null || properties == null;
132    }
133
134    protected boolean isContentMarshalled() {
135        return content != null;
136    }
137
138    protected void copy(Message copy) {
139        super.copy(copy);
140        copy.producerId = producerId;
141        copy.transactionId = transactionId;
142        copy.destination = destination;
143        copy.messageId = messageId != null ? messageId.copy() : null;
144        copy.originalDestination = originalDestination;
145        copy.originalTransactionId = originalTransactionId;
146        copy.expiration = expiration;
147        copy.timestamp = timestamp;
148        copy.correlationId = correlationId;
149        copy.replyTo = replyTo;
150        copy.persistent = persistent;
151        copy.redeliveryCounter = redeliveryCounter;
152        copy.type = type;
153        copy.priority = priority;
154        copy.size = size;
155        copy.groupID = groupID;
156        copy.userID = userID;
157        copy.groupSequence = groupSequence;
158
159        if (properties != null) {
160            copy.properties = new HashMap<String, Object>(properties);
161
162            // The new message hasn't expired, so remove this feild.
163            copy.properties.remove(ORIGINAL_EXPIRATION);
164        } else {
165            copy.properties = properties;
166        }
167
168        copy.content = copyByteSequence(content);
169        copy.marshalledProperties = copyByteSequence(marshalledProperties);
170        copy.dataStructure = dataStructure;
171        copy.readOnlyProperties = readOnlyProperties;
172        copy.readOnlyBody = readOnlyBody;
173        copy.compressed = compressed;
174        copy.recievedByDFBridge = recievedByDFBridge;
175
176        copy.arrival = arrival;
177        copy.connection = connection;
178        copy.regionDestination = regionDestination;
179        copy.brokerInTime = brokerInTime;
180        copy.brokerOutTime = brokerOutTime;
181        copy.memoryUsage=this.memoryUsage;
182        copy.brokerPath = brokerPath;
183        copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
184
185        // lets not copy the following fields
186        // copy.targetConsumerId = targetConsumerId;
187        // copy.referenceCount = referenceCount;
188    }
189
190    private ByteSequence copyByteSequence(ByteSequence content) {
191        if (content != null) {
192            return new ByteSequence(content.getData(), content.getOffset(), content.getLength());
193        }
194        return null;
195    }
196
197    public Object getProperty(String name) throws IOException {
198        if (properties == null) {
199            if (marshalledProperties == null) {
200                return null;
201            }
202            properties = unmarsallProperties(marshalledProperties);
203        }
204        Object result = properties.get(name);
205        if (result instanceof UTF8Buffer) {
206            result = result.toString();
207        }
208
209        return result;
210    }
211
212    @SuppressWarnings("unchecked")
213    public Map<String, Object> getProperties() throws IOException {
214        if (properties == null) {
215            if (marshalledProperties == null) {
216                return Collections.EMPTY_MAP;
217            }
218            properties = unmarsallProperties(marshalledProperties);
219        }
220        return Collections.unmodifiableMap(properties);
221    }
222
223    public void clearProperties() {
224        marshalledProperties = null;
225        properties = null;
226    }
227
228    public void setProperty(String name, Object value) throws IOException {
229        lazyCreateProperties();
230        properties.put(name, value);
231    }
232
233    public void removeProperty(String name) throws IOException {
234        lazyCreateProperties();
235        properties.remove(name);
236    }
237
238    protected void lazyCreateProperties() throws IOException {
239        if (properties == null) {
240            if (marshalledProperties == null) {
241                properties = new HashMap<String, Object>();
242            } else {
243                properties = unmarsallProperties(marshalledProperties);
244                marshalledProperties = null;
245            }
246        } else {
247            marshalledProperties = null;
248        }
249    }
250
251    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
252        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
253    }
254
255    @Override
256        public void beforeMarshall(WireFormat wireFormat) throws IOException {
257        // Need to marshal the properties.
258        if (marshalledProperties == null && properties != null) {
259            ByteArrayOutputStream baos = new ByteArrayOutputStream();
260            DataOutputStream os = new DataOutputStream(baos);
261            MarshallingSupport.marshalPrimitiveMap(properties, os);
262            os.close();
263            marshalledProperties = baos.toByteSequence();
264        }
265    }
266
267    @Override
268        public void afterMarshall(WireFormat wireFormat) throws IOException {
269    }
270
271    @Override
272        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
273    }
274
275    @Override
276        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
277    }
278
279    // /////////////////////////////////////////////////////////////////
280    //
281    // Simple Field accessors
282    //
283    // /////////////////////////////////////////////////////////////////
284
285    /**
286     * @openwire:property version=1 cache=true
287     */
288    public ProducerId getProducerId() {
289        return producerId;
290    }
291
292    public void setProducerId(ProducerId producerId) {
293        this.producerId = producerId;
294    }
295
296    /**
297     * @openwire:property version=1 cache=true
298     */
299    public ActiveMQDestination getDestination() {
300        return destination;
301    }
302
303    public void setDestination(ActiveMQDestination destination) {
304        this.destination = destination;
305    }
306
307    /**
308     * @openwire:property version=1 cache=true
309     */
310    public TransactionId getTransactionId() {
311        return transactionId;
312    }
313
314    public void setTransactionId(TransactionId transactionId) {
315        this.transactionId = transactionId;
316    }
317
318    public boolean isInTransaction() {
319        return transactionId != null;
320    }
321
322    /**
323     * @openwire:property version=1 cache=true
324     */
325    public ActiveMQDestination getOriginalDestination() {
326        return originalDestination;
327    }
328
329    public void setOriginalDestination(ActiveMQDestination destination) {
330        this.originalDestination = destination;
331    }
332
333    /**
334     * @openwire:property version=1
335     */
336    @Override
337        public MessageId getMessageId() {
338        return messageId;
339    }
340
341    public void setMessageId(MessageId messageId) {
342        this.messageId = messageId;
343    }
344
345    /**
346     * @openwire:property version=1 cache=true
347     */
348    public TransactionId getOriginalTransactionId() {
349        return originalTransactionId;
350    }
351
352    public void setOriginalTransactionId(TransactionId transactionId) {
353        this.originalTransactionId = transactionId;
354    }
355
356    /**
357     * @openwire:property version=1
358     */
359    @Override
360        public String getGroupID() {
361        return groupID;
362    }
363
364    public void setGroupID(String groupID) {
365        this.groupID = groupID;
366    }
367
368    /**
369     * @openwire:property version=1
370     */
371    @Override
372        public int getGroupSequence() {
373        return groupSequence;
374    }
375
376    public void setGroupSequence(int groupSequence) {
377        this.groupSequence = groupSequence;
378    }
379
380    /**
381     * @openwire:property version=1
382     */
383    public String getCorrelationId() {
384        return correlationId;
385    }
386
387    public void setCorrelationId(String correlationId) {
388        this.correlationId = correlationId;
389    }
390
391    /**
392     * @openwire:property version=1
393     */
394    @Override
395        public boolean isPersistent() {
396        return persistent;
397    }
398
399    public void setPersistent(boolean deliveryMode) {
400        this.persistent = deliveryMode;
401    }
402
403    /**
404     * @openwire:property version=1
405     */
406    @Override
407        public long getExpiration() {
408        return expiration;
409    }
410
411    public void setExpiration(long expiration) {
412        this.expiration = expiration;
413    }
414
415    /**
416     * @openwire:property version=1
417     */
418    public byte getPriority() {
419        return priority;
420    }
421
422    public void setPriority(byte priority) {
423        if (priority < 0) {
424            this.priority = 0;
425        } else if (priority > 9) {
426            this.priority = 9;
427        } else {
428            this.priority = priority;
429        }
430    }
431
432    /**
433     * @openwire:property version=1
434     */
435    public ActiveMQDestination getReplyTo() {
436        return replyTo;
437    }
438
439    public void setReplyTo(ActiveMQDestination replyTo) {
440        this.replyTo = replyTo;
441    }
442
443    /**
444     * @openwire:property version=1
445     */
446    public long getTimestamp() {
447        return timestamp;
448    }
449
450    public void setTimestamp(long timestamp) {
451        this.timestamp = timestamp;
452    }
453
454    /**
455     * @openwire:property version=1
456     */
457    public String getType() {
458        return type;
459    }
460
461    public void setType(String type) {
462        this.type = type;
463    }
464
465    /**
466     * @openwire:property version=1
467     */
468    public ByteSequence getContent() {
469        return content;
470    }
471
472    public void setContent(ByteSequence content) {
473        this.content = content;
474    }
475
476    /**
477     * @openwire:property version=1
478     */
479    public ByteSequence getMarshalledProperties() {
480        return marshalledProperties;
481    }
482
483    public void setMarshalledProperties(ByteSequence marshalledProperties) {
484        this.marshalledProperties = marshalledProperties;
485    }
486
487    /**
488     * @openwire:property version=1
489     */
490    public DataStructure getDataStructure() {
491        return dataStructure;
492    }
493
494    public void setDataStructure(DataStructure data) {
495        this.dataStructure = data;
496    }
497
498    /**
499     * Can be used to route the message to a specific consumer. Should be null
500     * to allow the broker use normal JMS routing semantics. If the target
501     * consumer id is an active consumer on the broker, the message is dropped.
502     * Used by the AdvisoryBroker to replay advisory messages to a specific
503     * consumer.
504     *
505     * @openwire:property version=1 cache=true
506     */
507    @Override
508        public ConsumerId getTargetConsumerId() {
509        return targetConsumerId;
510    }
511
512    public void setTargetConsumerId(ConsumerId targetConsumerId) {
513        this.targetConsumerId = targetConsumerId;
514    }
515
516    @Override
517        public boolean isExpired() {
518        long expireTime = getExpiration();
519        return expireTime > 0 && System.currentTimeMillis() > expireTime;
520    }
521
522    @Override
523        public boolean isAdvisory() {
524        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
525    }
526
527    /**
528     * @openwire:property version=1
529     */
530    public boolean isCompressed() {
531        return compressed;
532    }
533
534    public void setCompressed(boolean compressed) {
535        this.compressed = compressed;
536    }
537
538    public boolean isRedelivered() {
539        return redeliveryCounter > 0;
540    }
541
542    public void setRedelivered(boolean redelivered) {
543        if (redelivered) {
544            if (!isRedelivered()) {
545                setRedeliveryCounter(1);
546            }
547        } else {
548            if (isRedelivered()) {
549                setRedeliveryCounter(0);
550            }
551        }
552    }
553
554    @Override
555        public void incrementRedeliveryCounter() {
556        redeliveryCounter++;
557    }
558
559    /**
560     * @openwire:property version=1
561     */
562    @Override
563        public int getRedeliveryCounter() {
564        return redeliveryCounter;
565    }
566
567    public void setRedeliveryCounter(int deliveryCounter) {
568        this.redeliveryCounter = deliveryCounter;
569    }
570
571    /**
572     * The route of brokers the command has moved through.
573     *
574     * @openwire:property version=1 cache=true
575     */
576    public BrokerId[] getBrokerPath() {
577        return brokerPath;
578    }
579
580    public void setBrokerPath(BrokerId[] brokerPath) {
581        this.brokerPath = brokerPath;
582    }
583
584    public boolean isReadOnlyProperties() {
585        return readOnlyProperties;
586    }
587
588    public void setReadOnlyProperties(boolean readOnlyProperties) {
589        this.readOnlyProperties = readOnlyProperties;
590    }
591
592    public boolean isReadOnlyBody() {
593        return readOnlyBody;
594    }
595
596    public void setReadOnlyBody(boolean readOnlyBody) {
597        this.readOnlyBody = readOnlyBody;
598    }
599
600    public ActiveMQConnection getConnection() {
601        return this.connection;
602    }
603
604    public void setConnection(ActiveMQConnection connection) {
605        this.connection = connection;
606    }
607
608    /**
609     * Used to schedule the arrival time of a message to a broker. The broker
610     * will not dispatch a message to a consumer until it's arrival time has
611     * elapsed.
612     *
613     * @openwire:property version=1
614     */
615    public long getArrival() {
616        return arrival;
617    }
618
619    public void setArrival(long arrival) {
620        this.arrival = arrival;
621    }
622
623    /**
624     * Only set by the broker and defines the userID of the producer connection
625     * who sent this message. This is an optional field, it needs to be enabled
626     * on the broker to have this field populated.
627     *
628     * @openwire:property version=1
629     */
630    public String getUserID() {
631        return userID;
632    }
633
634    public void setUserID(String jmsxUserID) {
635        this.userID = jmsxUserID;
636    }
637
638    @Override
639        public int getReferenceCount() {
640        return referenceCount;
641    }
642
643    @Override
644        public Message getMessageHardRef() {
645        return this;
646    }
647
648    @Override
649        public Message getMessage() {
650        return this;
651    }
652
653    public void setRegionDestination(MessageDestination destination) {
654        this.regionDestination = destination;
655        if(this.memoryUsage==null) {
656            this.memoryUsage=destination.getMemoryUsage();
657        }
658    }
659
660    @Override
661    @Transient
662        public MessageDestination getRegionDestination() {
663        return regionDestination;
664    }
665
666    public MemoryUsage getMemoryUsage() {
667        return this.memoryUsage;
668    }
669
670    public void setMemoryUsage(MemoryUsage usage) {
671        this.memoryUsage=usage;
672    }
673
674    @Override
675    public boolean isMarshallAware() {
676        return true;
677    }
678
679    @Override
680        public int incrementReferenceCount() {
681        int rc;
682        int size;
683        synchronized (this) {
684            rc = ++referenceCount;
685            size = getSize();
686        }
687
688        if (rc == 1 && getMemoryUsage() != null) {
689            getMemoryUsage().increaseUsage(size);
690            //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
691
692        }
693
694        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
695        return rc;
696    }
697
698    @Override
699        public int decrementReferenceCount() {
700        int rc;
701        int size;
702        synchronized (this) {
703            rc = --referenceCount;
704            size = getSize();
705        }
706
707        if (rc == 0 && getMemoryUsage() != null) {
708            getMemoryUsage().decreaseUsage(size);
709            //Thread.dumpStack();
710            //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
711        }
712
713        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
714
715        return rc;
716    }
717
718    @Override
719        public int getSize() {
720        int minimumMessageSize = getMinimumMessageSize();
721        if (size < minimumMessageSize || size == 0) {
722            size = minimumMessageSize;
723            if (marshalledProperties != null) {
724                size += marshalledProperties.getLength();
725            }
726            if (content != null) {
727                size += content.getLength();
728            }
729        }
730        return size;
731    }
732
733    protected int getMinimumMessageSize() {
734        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
735        //let destination override
736        MessageDestination dest = regionDestination;
737        if (dest != null) {
738            result=dest.getMinimumMessageSize();
739        }
740        return result;
741    }
742
743    /**
744     * @openwire:property version=1
745     * @return Returns the recievedByDFBridge.
746     */
747    public boolean isRecievedByDFBridge() {
748        return recievedByDFBridge;
749    }
750
751    /**
752     * @param recievedByDFBridge The recievedByDFBridge to set.
753     */
754    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
755        this.recievedByDFBridge = recievedByDFBridge;
756    }
757
758    public void onMessageRolledBack() {
759        incrementRedeliveryCounter();
760    }
761
762    /**
763     * @openwire:property version=2 cache=true
764     */
765    public boolean isDroppable() {
766        return droppable;
767    }
768
769    public void setDroppable(boolean droppable) {
770        this.droppable = droppable;
771    }
772
773    /**
774     * If a message is stored in multiple nodes on a cluster, all the cluster
775     * members will be listed here. Otherwise, it will be null.
776     *
777     * @openwire:property version=3 cache=true
778     */
779    public BrokerId[] getCluster() {
780        return cluster;
781    }
782
783    public void setCluster(BrokerId[] cluster) {
784        this.cluster = cluster;
785    }
786
787    @Override
788    public boolean isMessage() {
789        return true;
790    }
791
792    /**
793     * @openwire:property version=3
794     */
795    public long getBrokerInTime() {
796        return this.brokerInTime;
797    }
798
799    public void setBrokerInTime(long brokerInTime) {
800        this.brokerInTime = brokerInTime;
801    }
802
803    /**
804     * @openwire:property version=3
805     */
806    public long getBrokerOutTime() {
807        return this.brokerOutTime;
808    }
809
810    public void setBrokerOutTime(long brokerOutTime) {
811        this.brokerOutTime = brokerOutTime;
812    }
813
814    @Override
815        public boolean isDropped() {
816        return false;
817    }
818
819    /**
820     * @openwire:property version=10
821     */
822    public boolean isJMSXGroupFirstForConsumer() {
823        return jmsXGroupFirstForConsumer;
824    }
825
826    public void setJMSXGroupFirstForConsumer(boolean val) {
827        jmsXGroupFirstForConsumer = val;
828    }
829
830    public void compress() throws IOException {
831        if (!isCompressed()) {
832            storeContent();
833            if (!isCompressed() && getContent() != null) {
834                doCompress();
835            }
836        }
837    }
838
839    protected void doCompress() throws IOException {
840        compressed = true;
841        ByteSequence bytes = getContent();
842        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
843        OutputStream os = new DeflaterOutputStream(bytesOut);
844        os.write(bytes.data, bytes.offset, bytes.length);
845        os.close();
846        setContent(bytesOut.toByteSequence());
847    }
848
849    @Override
850    public String toString() {
851        return toString(null);
852    }
853
854    @Override
855    public String toString(Map<String, Object>overrideFields) {
856        try {
857            getProperties();
858        } catch (IOException e) {
859        }
860        return super.toString(overrideFields);
861    }
862
863    @Override
864    public boolean canProcessAsExpired() {
865        return processAsExpired.compareAndSet(false, true);
866    }
867}