001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.store.amq; 019 020 import java.io.DataInput; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 import org.apache.activemq.broker.ConnectionContext; 024 import org.apache.activemq.command.ActiveMQDestination; 025 import org.apache.activemq.command.JournalTopicAck; 026 import org.apache.activemq.command.Message; 027 import org.apache.activemq.command.MessageAck; 028 import org.apache.activemq.kaha.impl.async.Location; 029 import org.apache.activemq.util.ByteSequence; 030 import org.apache.activemq.wireformat.WireFormat; 031 032 /** 033 */ 034 public class AMQTxOperation { 035 036 public static final byte ADD_OPERATION_TYPE = 0; 037 public static final byte REMOVE_OPERATION_TYPE = 1; 038 public static final byte ACK_OPERATION_TYPE = 3; 039 private byte operationType; 040 private ActiveMQDestination destination; 041 private Object data; 042 private Location location; 043 044 public AMQTxOperation() { 045 } 046 047 public AMQTxOperation(byte operationType, ActiveMQDestination destination, Object data, Location location) { 048 this.operationType = operationType; 049 this.destination = destination; 050 this.data = data; 051 this.location = location; 052 053 } 054 055 /** 056 * @return the data 057 */ 058 public Object getData() { 059 return this.data; 060 } 061 062 /** 063 * @param data the data to set 064 */ 065 public void setData(Object data) { 066 this.data = data; 067 } 068 069 /** 070 * @return the location 071 */ 072 public Location getLocation() { 073 return this.location; 074 } 075 076 /** 077 * @param location the location to set 078 */ 079 public void setLocation(Location location) { 080 this.location = location; 081 } 082 083 /** 084 * @return the operationType 085 */ 086 public byte getOperationType() { 087 return this.operationType; 088 } 089 090 /** 091 * @param operationType the operationType to set 092 */ 093 public void setOperationType(byte operationType) { 094 this.operationType = operationType; 095 } 096 097 public boolean replay(AMQPersistenceAdapter adapter, ConnectionContext context) throws IOException { 098 boolean result = false; 099 AMQMessageStore store = (AMQMessageStore)adapter.createMessageStore(destination); 100 if (operationType == ADD_OPERATION_TYPE) { 101 result = store.replayAddMessage(context, (Message)data, location); 102 } else if (operationType == REMOVE_OPERATION_TYPE) { 103 result = store.replayRemoveMessage(context, (MessageAck)data); 104 } else { 105 JournalTopicAck ack = (JournalTopicAck)data; 106 result = ((AMQTopicMessageStore)store).replayAcknowledge(context, ack.getClientId(), ack 107 .getSubscritionName(), ack.getMessageId()); 108 } 109 return result; 110 } 111 112 public void writeExternal(WireFormat wireFormat, DataOutput dos) throws IOException { 113 location.writeExternal(dos); 114 ByteSequence packet = wireFormat.marshal(getData()); 115 dos.writeInt(packet.length); 116 dos.write(packet.data, packet.offset, packet.length); 117 packet = wireFormat.marshal(destination); 118 dos.writeInt(packet.length); 119 dos.write(packet.data, packet.offset, packet.length); 120 } 121 122 public void readExternal(WireFormat wireFormat, DataInput dis) throws IOException { 123 this.location = new Location(); 124 this.location.readExternal(dis); 125 int size = dis.readInt(); 126 byte[] data = new byte[size]; 127 dis.readFully(data); 128 setData(wireFormat.unmarshal(new ByteSequence(data))); 129 size = dis.readInt(); 130 data = new byte[size]; 131 dis.readFully(data); 132 this.destination = (ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data)); 133 } 134 }