001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command.store; 018 019import java.io.BufferedOutputStream; 020import java.io.File; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.HashMap; 026 027import org.apache.activemq.broker.BrokerFactory; 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.ActiveMQQueue; 031import org.apache.activemq.command.ActiveMQTopic; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.command.MessageAck; 034import org.apache.activemq.command.MessageId; 035import org.apache.activemq.command.SubscriptionInfo; 036import org.apache.activemq.command.XATransactionId; 037import org.apache.activemq.console.command.store.proto.MessagePB; 038import org.apache.activemq.console.command.store.proto.QueueEntryPB; 039import org.apache.activemq.console.command.store.proto.QueuePB; 040import org.apache.activemq.openwire.OpenWireFormat; 041import org.apache.activemq.store.MessageRecoveryListener; 042import org.apache.activemq.store.MessageStore; 043import org.apache.activemq.store.PersistenceAdapter; 044import org.apache.activemq.store.TopicMessageStore; 045import org.apache.activemq.store.TransactionRecoveryListener; 046import org.fusesource.hawtbuf.AsciiBuffer; 047import org.fusesource.hawtbuf.DataByteArrayOutputStream; 048import org.fusesource.hawtbuf.UTF8Buffer; 049 050import com.fasterxml.jackson.databind.ObjectMapper; 051 052/** 053 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 054 */ 055public class StoreExporter { 056 057 static final int OPENWIRE_VERSION = 8; 058 static final boolean TIGHT_ENCODING = false; 059 060 URI config; 061 File file; 062 063 private final ObjectMapper mapper = new ObjectMapper(); 064 private final AsciiBuffer ds_kind = new AsciiBuffer("ds"); 065 private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp"); 066 private final AsciiBuffer codec_id = new AsciiBuffer("openwire"); 067 private final OpenWireFormat wireformat = new OpenWireFormat(); 068 069 public StoreExporter() throws URISyntaxException { 070 config = new URI("xbean:activemq.xml"); 071 wireformat.setCacheEnabled(false); 072 wireformat.setTightEncodingEnabled(TIGHT_ENCODING); 073 wireformat.setVersion(OPENWIRE_VERSION); 074 } 075 076 public void execute() throws Exception { 077 if (config == null) { 078 throw new Exception("required --config option missing"); 079 } 080 if (file == null) { 081 throw new Exception("required --file option missing"); 082 } 083 System.out.println("Loading: " + config); 084 BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting.. 085 BrokerService broker = BrokerFactory.createBroker(config); 086 BrokerFactory.resetStartDefault(); 087 PersistenceAdapter store = broker.getPersistenceAdapter(); 088 System.out.println("Starting: " + store); 089 store.start(); 090 try { 091 BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file)); 092 try { 093 export(store, fos); 094 } finally { 095 fos.close(); 096 } 097 } finally { 098 store.stop(); 099 } 100 } 101 102 void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception { 103 104 105 final long[] messageKeyCounter = new long[]{0}; 106 final long[] containerKeyCounter = new long[]{0}; 107 final ExportStreamManager manager = new ExportStreamManager(fos, 1); 108 109 110 final int[] preparedTxs = new int[]{0}; 111 store.createTransactionStore().recover(new TransactionRecoveryListener() { 112 @Override 113 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { 114 preparedTxs[0] += 1; 115 } 116 }); 117 118 if (preparedTxs[0] > 0) { 119 throw new Exception("Cannot export a store with prepared XA transactions. Please commit or rollback those transactions before attempting to export."); 120 } 121 122 for (ActiveMQDestination odest : store.getDestinations()) { 123 containerKeyCounter[0]++; 124 if (odest instanceof ActiveMQQueue) { 125 ActiveMQQueue dest = (ActiveMQQueue) odest; 126 MessageStore queue = store.createQueueMessageStore(dest); 127 128 QueuePB.Bean destRecord = new QueuePB.Bean(); 129 destRecord.setKey(containerKeyCounter[0]); 130 destRecord.setBindingKind(ptp_kind); 131 132 final long[] seqKeyCounter = new long[]{0}; 133 134 HashMap<String, Object> jsonMap = new HashMap<String, Object>(); 135 jsonMap.put("@class", "queue_destination"); 136 jsonMap.put("name", dest.getQueueName()); 137 String json = mapper.writeValueAsString(jsonMap); 138 System.out.println(json); 139 destRecord.setBindingData(new UTF8Buffer(json)); 140 manager.store_queue(destRecord); 141 142 queue.recover(new MessageRecoveryListener() { 143 @Override 144 public boolean hasSpace() { 145 return true; 146 } 147 148 @Override 149 public boolean recoverMessageReference(MessageId ref) throws Exception { 150 return true; 151 } 152 153 @Override 154 public boolean isDuplicate(MessageId ref) { 155 return false; 156 } 157 158 @Override 159 public boolean recoverMessage(Message message) throws IOException { 160 messageKeyCounter[0]++; 161 seqKeyCounter[0]++; 162 163 MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]); 164 manager.store_message(messageRecord); 165 166 QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]); 167 manager.store_queue_entry(entryRecord); 168 169 return true; 170 } 171 }); 172 173 } else if (odest instanceof ActiveMQTopic) { 174 ActiveMQTopic dest = (ActiveMQTopic) odest; 175 176 TopicMessageStore topic = store.createTopicMessageStore(dest); 177 for (SubscriptionInfo sub : topic.getAllSubscriptions()) { 178 179 QueuePB.Bean destRecord = new QueuePB.Bean(); 180 destRecord.setKey(containerKeyCounter[0]); 181 destRecord.setBindingKind(ds_kind); 182 183 // TODO: use a real JSON encoder like jackson. 184 HashMap<String, Object> jsonMap = new HashMap<String, Object>(); 185 jsonMap.put("@class", "dsub_destination"); 186 jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName()); 187 HashMap<String, Object> jsonTopic = new HashMap<String, Object>(); 188 jsonTopic.put("name", dest.getTopicName()); 189 jsonMap.put("topics", new Object[]{jsonTopic}); 190 if (sub.getSelector() != null) { 191 jsonMap.put("selector", sub.getSelector()); 192 } 193 jsonMap.put("noLocal", sub.isNoLocal()); 194 String json = mapper.writeValueAsString(jsonMap); 195 System.out.println(json); 196 197 destRecord.setBindingData(new UTF8Buffer(json)); 198 manager.store_queue(destRecord); 199 200 final long seqKeyCounter[] = new long[]{0}; 201 topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() { 202 @Override 203 public boolean hasSpace() { 204 return true; 205 } 206 207 @Override 208 public boolean recoverMessageReference(MessageId ref) throws Exception { 209 return true; 210 } 211 212 @Override 213 public boolean isDuplicate(MessageId ref) { 214 return false; 215 } 216 217 @Override 218 public boolean recoverMessage(Message message) throws IOException { 219 messageKeyCounter[0]++; 220 seqKeyCounter[0]++; 221 222 MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]); 223 manager.store_message(messageRecord); 224 225 QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]); 226 manager.store_queue_entry(entryRecord); 227 return true; 228 } 229 }); 230 231 } 232 } 233 } 234 manager.finish(); 235 } 236 237 private QueueEntryPB.Bean createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) { 238 QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean(); 239 entryRecord.setQueueKey(queueKey); 240 entryRecord.setQueueSeq(queueSeq); 241 entryRecord.setMessageKey(messageKey); 242 entryRecord.setSize(message.getSize()); 243 if (message.getExpiration() != 0) { 244 entryRecord.setExpiration(message.getExpiration()); 245 } 246 if (message.getRedeliveryCounter() != 0) { 247 entryRecord.setRedeliveries(message.getRedeliveryCounter()); 248 } 249 return entryRecord; 250 } 251 252 private MessagePB.Bean createMessagePB(Message message, long messageKey) throws IOException { 253 DataByteArrayOutputStream mos = new DataByteArrayOutputStream(); 254 mos.writeBoolean(TIGHT_ENCODING); 255 mos.writeVarInt(OPENWIRE_VERSION); 256 wireformat.marshal(message, mos); 257 258 MessagePB.Bean messageRecord = new MessagePB.Bean(); 259 messageRecord.setCodec(codec_id); 260 messageRecord.setMessageKey(messageKey); 261 messageRecord.setSize(message.getSize()); 262 messageRecord.setValue(mos.toBuffer()); 263 return messageRecord; 264 } 265 266 public File getFile() { 267 return file; 268 } 269 270 public void setFile(String file) { 271 setFile(new File(file)); 272 } 273 274 public void setFile(File file) { 275 this.file = file; 276 } 277 278 public URI getConfig() { 279 return config; 280 } 281 282 public void setConfig(URI config) { 283 this.config = config; 284 } 285}