001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicReference; 043 044import org.apache.activemq.broker.ConnectionContext; 045import org.apache.activemq.broker.region.BaseDestination; 046import org.apache.activemq.broker.scheduler.JobSchedulerStore; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQQueue; 049import org.apache.activemq.command.ActiveMQTempQueue; 050import org.apache.activemq.command.ActiveMQTempTopic; 051import org.apache.activemq.command.ActiveMQTopic; 052import org.apache.activemq.command.Message; 053import org.apache.activemq.command.MessageAck; 054import org.apache.activemq.command.MessageId; 055import org.apache.activemq.command.ProducerId; 056import org.apache.activemq.command.SubscriptionInfo; 057import org.apache.activemq.command.TransactionId; 058import org.apache.activemq.openwire.OpenWireFormat; 059import org.apache.activemq.protobuf.Buffer; 060import org.apache.activemq.store.AbstractMessageStore; 061import org.apache.activemq.store.IndexListener; 062import org.apache.activemq.store.ListenableFuture; 063import org.apache.activemq.store.MessageRecoveryListener; 064import org.apache.activemq.store.MessageStore; 065import org.apache.activemq.store.MessageStoreStatistics; 066import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 067import org.apache.activemq.store.NoLocalSubscriptionAware; 068import org.apache.activemq.store.PersistenceAdapter; 069import org.apache.activemq.store.TopicMessageStore; 070import org.apache.activemq.store.TransactionIdTransformer; 071import org.apache.activemq.store.TransactionStore; 072import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 073import org.apache.activemq.store.kahadb.data.KahaDestination; 074import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 075import org.apache.activemq.store.kahadb.data.KahaLocation; 076import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 077import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 078import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 079import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 080import org.apache.activemq.store.kahadb.disk.journal.Location; 081import org.apache.activemq.store.kahadb.disk.page.Transaction; 082import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 083import org.apache.activemq.usage.MemoryUsage; 084import org.apache.activemq.usage.SystemUsage; 085import org.apache.activemq.util.IOExceptionSupport; 086import org.apache.activemq.util.ServiceStopper; 087import org.apache.activemq.util.ThreadPoolUtils; 088import org.apache.activemq.wireformat.WireFormat; 089import org.slf4j.Logger; 090import org.slf4j.LoggerFactory; 091 092public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware { 093 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 094 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 095 096 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 097 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 098 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 099 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 100 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 101 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 102 103 protected ExecutorService queueExecutor; 104 protected ExecutorService topicExecutor; 105 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 106 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 107 final WireFormat wireFormat = new OpenWireFormat(); 108 private SystemUsage usageManager; 109 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 110 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 111 Semaphore globalQueueSemaphore; 112 Semaphore globalTopicSemaphore; 113 private boolean concurrentStoreAndDispatchQueues = true; 114 // when true, message order may be compromised when cache is exhausted if store is out 115 // or order w.r.t cache 116 private boolean concurrentStoreAndDispatchTopics = false; 117 private final boolean concurrentStoreAndDispatchTransactions = false; 118 private int maxAsyncJobs = MAX_ASYNC_JOBS; 119 private final KahaDBTransactionStore transactionStore; 120 private TransactionIdTransformer transactionIdTransformer; 121 122 public KahaDBStore() { 123 this.transactionStore = new KahaDBTransactionStore(this); 124 this.transactionIdTransformer = new TransactionIdTransformer() { 125 @Override 126 public TransactionId transform(TransactionId txid) { 127 return txid; 128 } 129 }; 130 } 131 132 @Override 133 public String toString() { 134 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 135 } 136 137 @Override 138 public void setBrokerName(String brokerName) { 139 } 140 141 @Override 142 public void setUsageManager(SystemUsage usageManager) { 143 this.usageManager = usageManager; 144 } 145 146 public SystemUsage getUsageManager() { 147 return this.usageManager; 148 } 149 150 /** 151 * @return the concurrentStoreAndDispatch 152 */ 153 public boolean isConcurrentStoreAndDispatchQueues() { 154 return this.concurrentStoreAndDispatchQueues; 155 } 156 157 /** 158 * @param concurrentStoreAndDispatch 159 * the concurrentStoreAndDispatch to set 160 */ 161 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 162 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 163 } 164 165 /** 166 * @return the concurrentStoreAndDispatch 167 */ 168 public boolean isConcurrentStoreAndDispatchTopics() { 169 return this.concurrentStoreAndDispatchTopics; 170 } 171 172 /** 173 * @param concurrentStoreAndDispatch 174 * the concurrentStoreAndDispatch to set 175 */ 176 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 177 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 178 } 179 180 public boolean isConcurrentStoreAndDispatchTransactions() { 181 return this.concurrentStoreAndDispatchTransactions; 182 } 183 184 /** 185 * @return the maxAsyncJobs 186 */ 187 public int getMaxAsyncJobs() { 188 return this.maxAsyncJobs; 189 } 190 191 /** 192 * @param maxAsyncJobs 193 * the maxAsyncJobs to set 194 */ 195 public void setMaxAsyncJobs(int maxAsyncJobs) { 196 this.maxAsyncJobs = maxAsyncJobs; 197 } 198 199 200 @Override 201 protected void configureMetadata() { 202 if (brokerService != null) { 203 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 204 wireFormat.setVersion(metadata.openwireVersion); 205 206 if (LOG.isDebugEnabled()) { 207 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 208 } 209 210 } 211 } 212 213 @Override 214 public void doStart() throws Exception { 215 //configure the metadata before start, right now 216 //this is just the open wire version 217 configureMetadata(); 218 219 super.doStart(); 220 221 if (brokerService != null) { 222 // In case the recovered store used a different OpenWire version log a warning 223 // to assist in determining why journal reads fail. 224 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 225 LOG.warn("Existing Store uses a different OpenWire version[{}] " + 226 "than the version configured[{}] reverting to the version " + 227 "used by this store, some newer broker features may not work" + 228 "as expected.", 229 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 230 231 // Update the broker service instance to the actual version in use. 232 wireFormat.setVersion(metadata.openwireVersion); 233 brokerService.setStoreOpenWireVersion(metadata.openwireVersion); 234 } 235 } 236 237 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 238 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 239 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 240 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 241 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 242 asyncQueueJobQueue, new ThreadFactory() { 243 @Override 244 public Thread newThread(Runnable runnable) { 245 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 246 thread.setDaemon(true); 247 return thread; 248 } 249 }); 250 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 251 asyncTopicJobQueue, new ThreadFactory() { 252 @Override 253 public Thread newThread(Runnable runnable) { 254 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 255 thread.setDaemon(true); 256 return thread; 257 } 258 }); 259 } 260 261 @Override 262 public void doStop(ServiceStopper stopper) throws Exception { 263 // drain down async jobs 264 LOG.info("Stopping async queue tasks"); 265 if (this.globalQueueSemaphore != null) { 266 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 267 } 268 synchronized (this.asyncQueueMaps) { 269 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 270 synchronized (m) { 271 for (StoreTask task : m.values()) { 272 task.cancel(); 273 } 274 } 275 } 276 this.asyncQueueMaps.clear(); 277 } 278 LOG.info("Stopping async topic tasks"); 279 if (this.globalTopicSemaphore != null) { 280 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 281 } 282 synchronized (this.asyncTopicMaps) { 283 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 284 synchronized (m) { 285 for (StoreTask task : m.values()) { 286 task.cancel(); 287 } 288 } 289 } 290 this.asyncTopicMaps.clear(); 291 } 292 if (this.globalQueueSemaphore != null) { 293 this.globalQueueSemaphore.drainPermits(); 294 } 295 if (this.globalTopicSemaphore != null) { 296 this.globalTopicSemaphore.drainPermits(); 297 } 298 if (this.queueExecutor != null) { 299 ThreadPoolUtils.shutdownNow(queueExecutor); 300 queueExecutor = null; 301 } 302 if (this.topicExecutor != null) { 303 ThreadPoolUtils.shutdownNow(topicExecutor); 304 topicExecutor = null; 305 } 306 LOG.info("Stopped KahaDB"); 307 super.doStop(stopper); 308 } 309 310 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 311 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 312 @Override 313 public Location execute(Transaction tx) throws IOException { 314 StoredDestination sd = getStoredDestination(destination, tx); 315 Long sequence = sd.messageIdIndex.get(tx, key); 316 if (sequence == null) { 317 return null; 318 } 319 return sd.orderIndex.get(tx, sequence).location; 320 } 321 }); 322 } 323 324 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 325 StoreQueueTask task = null; 326 synchronized (store.asyncTaskMap) { 327 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 328 } 329 return task; 330 } 331 332 // with asyncTaskMap locked 333 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 334 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 335 this.queueExecutor.execute(task); 336 } 337 338 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 339 StoreTopicTask task = null; 340 synchronized (store.asyncTaskMap) { 341 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 342 } 343 return task; 344 } 345 346 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 347 synchronized (store.asyncTaskMap) { 348 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 349 } 350 this.topicExecutor.execute(task); 351 } 352 353 @Override 354 public TransactionStore createTransactionStore() throws IOException { 355 return this.transactionStore; 356 } 357 358 public boolean getForceRecoverIndex() { 359 return this.forceRecoverIndex; 360 } 361 362 public void setForceRecoverIndex(boolean forceRecoverIndex) { 363 this.forceRecoverIndex = forceRecoverIndex; 364 } 365 366 public class KahaDBMessageStore extends AbstractMessageStore { 367 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 368 protected KahaDestination dest; 369 private final int maxAsyncJobs; 370 private final Semaphore localDestinationSemaphore; 371 372 double doneTasks, canceledTasks = 0; 373 374 public KahaDBMessageStore(ActiveMQDestination destination) { 375 super(destination); 376 this.dest = convert(destination); 377 this.maxAsyncJobs = getMaxAsyncJobs(); 378 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 379 } 380 381 @Override 382 public ActiveMQDestination getDestination() { 383 return destination; 384 } 385 386 @Override 387 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 388 throws IOException { 389 if (isConcurrentStoreAndDispatchQueues()) { 390 message.beforeMarshall(wireFormat); 391 StoreQueueTask result = new StoreQueueTask(this, context, message); 392 ListenableFuture<Object> future = result.getFuture(); 393 message.getMessageId().setFutureOrSequenceLong(future); 394 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 395 result.aquireLocks(); 396 synchronized (asyncTaskMap) { 397 addQueueTask(this, result); 398 if (indexListener != null) { 399 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 400 } 401 } 402 return future; 403 } else { 404 return super.asyncAddQueueMessage(context, message); 405 } 406 } 407 408 @Override 409 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 410 if (isConcurrentStoreAndDispatchQueues()) { 411 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 412 StoreQueueTask task = null; 413 synchronized (asyncTaskMap) { 414 task = (StoreQueueTask) asyncTaskMap.get(key); 415 } 416 if (task != null) { 417 if (ack.isInTransaction() || !task.cancel()) { 418 try { 419 task.future.get(); 420 } catch (InterruptedException e) { 421 throw new InterruptedIOException(e.toString()); 422 } catch (Exception ignored) { 423 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 424 } 425 removeMessage(context, ack); 426 } else { 427 indexLock.writeLock().lock(); 428 try { 429 metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); 430 } finally { 431 indexLock.writeLock().unlock(); 432 } 433 synchronized (asyncTaskMap) { 434 asyncTaskMap.remove(key); 435 } 436 } 437 } else { 438 removeMessage(context, ack); 439 } 440 } else { 441 removeMessage(context, ack); 442 } 443 } 444 445 @Override 446 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 447 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 448 command.setDestination(dest); 449 command.setMessageId(message.getMessageId().toProducerKey()); 450 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 451 command.setPriority(message.getPriority()); 452 command.setPrioritySupported(isPrioritizedMessages()); 453 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 454 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 455 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 456 // sync add? (for async, future present from getFutureOrSequenceLong) 457 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 458 459 @Override 460 public void sequenceAssignedWithIndexLocked(final long sequence) { 461 message.getMessageId().setFutureOrSequenceLong(sequence); 462 if (indexListener != null) { 463 if (possibleFuture == null) { 464 trackPendingAdd(dest, sequence); 465 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 466 @Override 467 public void run() { 468 trackPendingAddComplete(dest, sequence); 469 } 470 })); 471 } 472 } 473 } 474 }, null); 475 } 476 477 @Override 478 public void updateMessage(Message message) throws IOException { 479 if (LOG.isTraceEnabled()) { 480 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 481 } 482 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 483 KahaAddMessageCommand command = new KahaAddMessageCommand(); 484 command.setDestination(dest); 485 command.setMessageId(message.getMessageId().toProducerKey()); 486 command.setPriority(message.getPriority()); 487 command.setPrioritySupported(prioritizedMessages); 488 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 489 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 490 updateMessageCommand.setMessage(command); 491 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 492 } 493 494 @Override 495 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 496 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 497 command.setDestination(dest); 498 command.setMessageId(ack.getLastMessageId().toProducerKey()); 499 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 500 501 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 502 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 503 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 504 } 505 506 @Override 507 public void removeAllMessages(ConnectionContext context) throws IOException { 508 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 509 command.setDestination(dest); 510 store(command, true, null, null); 511 } 512 513 @Override 514 public Message getMessage(MessageId identity) throws IOException { 515 final String key = identity.toProducerKey(); 516 517 // Hopefully one day the page file supports concurrent read 518 // operations... but for now we must 519 // externally synchronize... 520 Location location; 521 indexLock.writeLock().lock(); 522 try { 523 location = findMessageLocation(key, dest); 524 } finally { 525 indexLock.writeLock().unlock(); 526 } 527 if (location == null) { 528 return null; 529 } 530 531 return loadMessage(location); 532 } 533 534 @Override 535 public boolean isEmpty() throws IOException { 536 indexLock.writeLock().lock(); 537 try { 538 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 539 @Override 540 public Boolean execute(Transaction tx) throws IOException { 541 // Iterate through all index entries to get a count of 542 // messages in the destination. 543 StoredDestination sd = getStoredDestination(dest, tx); 544 return sd.locationIndex.isEmpty(tx); 545 } 546 }); 547 } finally { 548 indexLock.writeLock().unlock(); 549 } 550 } 551 552 @Override 553 public void recover(final MessageRecoveryListener listener) throws Exception { 554 // recovery may involve expiry which will modify 555 indexLock.writeLock().lock(); 556 try { 557 pageFile.tx().execute(new Transaction.Closure<Exception>() { 558 @Override 559 public void execute(Transaction tx) throws Exception { 560 StoredDestination sd = getStoredDestination(dest, tx); 561 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 562 sd.orderIndex.resetCursorPosition(); 563 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 564 .hasNext(); ) { 565 Entry<Long, MessageKeys> entry = iterator.next(); 566 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 567 continue; 568 } 569 Message msg = loadMessage(entry.getValue().location); 570 listener.recoverMessage(msg); 571 } 572 } 573 }); 574 } finally { 575 indexLock.writeLock().unlock(); 576 } 577 } 578 579 @Override 580 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 581 indexLock.writeLock().lock(); 582 try { 583 pageFile.tx().execute(new Transaction.Closure<Exception>() { 584 @Override 585 public void execute(Transaction tx) throws Exception { 586 StoredDestination sd = getStoredDestination(dest, tx); 587 Entry<Long, MessageKeys> entry = null; 588 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 589 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 590 entry = iterator.next(); 591 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 592 continue; 593 } 594 Message msg = loadMessage(entry.getValue().location); 595 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 596 listener.recoverMessage(msg); 597 counter++; 598 if (counter >= maxReturned) { 599 break; 600 } 601 } 602 sd.orderIndex.stoppedIterating(); 603 } 604 }); 605 } finally { 606 indexLock.writeLock().unlock(); 607 } 608 } 609 610 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 611 int counter = 0; 612 String id; 613 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 614 id = iterator.next(); 615 iterator.remove(); 616 Long sequence = sd.messageIdIndex.get(tx, id); 617 if (sequence != null) { 618 if (sd.orderIndex.alreadyDispatched(sequence)) { 619 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 620 counter++; 621 if (counter >= maxReturned) { 622 break; 623 } 624 } else { 625 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 626 } 627 } else { 628 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 629 } 630 } 631 return counter; 632 } 633 634 635 @Override 636 public void resetBatching() { 637 if (pageFile.isLoaded()) { 638 indexLock.writeLock().lock(); 639 try { 640 pageFile.tx().execute(new Transaction.Closure<Exception>() { 641 @Override 642 public void execute(Transaction tx) throws Exception { 643 StoredDestination sd = getExistingStoredDestination(dest, tx); 644 if (sd != null) { 645 sd.orderIndex.resetCursorPosition();} 646 } 647 }); 648 } catch (Exception e) { 649 LOG.error("Failed to reset batching",e); 650 } finally { 651 indexLock.writeLock().unlock(); 652 } 653 } 654 } 655 656 @Override 657 public void setBatch(final MessageId identity) throws IOException { 658 indexLock.writeLock().lock(); 659 try { 660 pageFile.tx().execute(new Transaction.Closure<IOException>() { 661 @Override 662 public void execute(Transaction tx) throws IOException { 663 StoredDestination sd = getStoredDestination(dest, tx); 664 Long location = (Long) identity.getFutureOrSequenceLong(); 665 Long pending = sd.orderIndex.minPendingAdd(); 666 if (pending != null) { 667 location = Math.min(location, pending-1); 668 } 669 sd.orderIndex.setBatch(tx, location); 670 } 671 }); 672 } finally { 673 indexLock.writeLock().unlock(); 674 } 675 } 676 677 @Override 678 public void setMemoryUsage(MemoryUsage memoryUsage) { 679 } 680 @Override 681 public void start() throws Exception { 682 super.start(); 683 } 684 @Override 685 public void stop() throws Exception { 686 super.stop(); 687 } 688 689 protected void lockAsyncJobQueue() { 690 try { 691 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 692 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 693 } 694 } catch (Exception e) { 695 LOG.error("Failed to lock async jobs for " + this.destination, e); 696 } 697 } 698 699 protected void unlockAsyncJobQueue() { 700 this.localDestinationSemaphore.release(this.maxAsyncJobs); 701 } 702 703 protected void acquireLocalAsyncLock() { 704 try { 705 this.localDestinationSemaphore.acquire(); 706 } catch (InterruptedException e) { 707 LOG.error("Failed to aquire async lock for " + this.destination, e); 708 } 709 } 710 711 protected void releaseLocalAsyncLock() { 712 this.localDestinationSemaphore.release(); 713 } 714 715 @Override 716 public String toString(){ 717 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 718 } 719 720 @Override 721 protected void recoverMessageStoreStatistics() throws IOException { 722 try { 723 MessageStoreStatistics recoveredStatistics; 724 lockAsyncJobQueue(); 725 indexLock.writeLock().lock(); 726 try { 727 recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() { 728 @Override 729 public MessageStoreStatistics execute(Transaction tx) throws IOException { 730 MessageStoreStatistics statistics = new MessageStoreStatistics(); 731 732 // Iterate through all index entries to get the size of each message 733 StoredDestination sd = getStoredDestination(dest, tx); 734 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 735 int locationSize = iterator.next().getKey().getSize(); 736 statistics.getMessageCount().increment(); 737 statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); 738 } 739 return statistics; 740 } 741 }); 742 getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); 743 getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); 744 } finally { 745 indexLock.writeLock().unlock(); 746 } 747 } finally { 748 unlockAsyncJobQueue(); 749 } 750 } 751 } 752 753 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 754 private final AtomicInteger subscriptionCount = new AtomicInteger(); 755 protected final MessageStoreSubscriptionStatistics messageStoreSubStats = 756 new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics()); 757 758 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 759 super(destination); 760 this.subscriptionCount.set(getAllSubscriptions().length); 761 if (isConcurrentStoreAndDispatchTopics()) { 762 asyncTopicMaps.add(asyncTaskMap); 763 } 764 } 765 766 @Override 767 protected void recoverMessageStoreStatistics() throws IOException { 768 super.recoverMessageStoreStatistics(); 769 this.recoverMessageStoreSubMetrics(); 770 } 771 772 @Override 773 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 774 throws IOException { 775 if (isConcurrentStoreAndDispatchTopics()) { 776 message.beforeMarshall(wireFormat); 777 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 778 result.aquireLocks(); 779 addTopicTask(this, result); 780 return result.getFuture(); 781 } else { 782 return super.asyncAddTopicMessage(context, message); 783 } 784 } 785 786 @Override 787 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 788 MessageId messageId, MessageAck ack) throws IOException { 789 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 790 if (isConcurrentStoreAndDispatchTopics()) { 791 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 792 StoreTopicTask task = null; 793 synchronized (asyncTaskMap) { 794 task = (StoreTopicTask) asyncTaskMap.get(key); 795 } 796 if (task != null) { 797 if (task.addSubscriptionKey(subscriptionKey)) { 798 removeTopicTask(this, messageId); 799 if (task.cancel()) { 800 synchronized (asyncTaskMap) { 801 asyncTaskMap.remove(key); 802 } 803 } 804 } 805 } else { 806 doAcknowledge(context, subscriptionKey, messageId, ack); 807 } 808 } else { 809 doAcknowledge(context, subscriptionKey, messageId, ack); 810 } 811 } 812 813 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 814 throws IOException { 815 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 816 command.setDestination(dest); 817 command.setSubscriptionKey(subscriptionKey); 818 command.setMessageId(messageId.toProducerKey()); 819 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 820 if (ack != null && ack.isUnmatchedAck()) { 821 command.setAck(UNMATCHED); 822 } else { 823 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 824 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 825 } 826 store(command, false, null, null); 827 } 828 829 @Override 830 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 831 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 832 .getSubscriptionName()); 833 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 834 command.setDestination(dest); 835 command.setSubscriptionKey(subscriptionKey.toString()); 836 command.setRetroactive(retroactive); 837 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 838 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 839 store(command, isEnableJournalDiskSyncs() && true, null, null); 840 this.subscriptionCount.incrementAndGet(); 841 } 842 843 @Override 844 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 845 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 846 command.setDestination(dest); 847 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 848 store(command, isEnableJournalDiskSyncs() && true, null, null); 849 this.subscriptionCount.decrementAndGet(); 850 } 851 852 @Override 853 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 854 855 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 856 indexLock.writeLock().lock(); 857 try { 858 pageFile.tx().execute(new Transaction.Closure<IOException>() { 859 @Override 860 public void execute(Transaction tx) throws IOException { 861 StoredDestination sd = getStoredDestination(dest, tx); 862 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 863 .hasNext();) { 864 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 865 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 866 .getValue().getSubscriptionInfo().newInput())); 867 subscriptions.add(info); 868 869 } 870 } 871 }); 872 } finally { 873 indexLock.writeLock().unlock(); 874 } 875 876 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 877 subscriptions.toArray(rc); 878 return rc; 879 } 880 881 @Override 882 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 883 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 884 indexLock.writeLock().lock(); 885 try { 886 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 887 @Override 888 public SubscriptionInfo execute(Transaction tx) throws IOException { 889 StoredDestination sd = getStoredDestination(dest, tx); 890 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 891 if (command == null) { 892 return null; 893 } 894 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 895 .getSubscriptionInfo().newInput())); 896 } 897 }); 898 } finally { 899 indexLock.writeLock().unlock(); 900 } 901 } 902 903 @Override 904 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 905 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 906 907 if (isEnableSubscriptionStatistics()) { 908 return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount(); 909 } else { 910 911 indexLock.writeLock().lock(); 912 try { 913 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 914 @Override 915 public Integer execute(Transaction tx) throws IOException { 916 StoredDestination sd = getStoredDestination(dest, tx); 917 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 918 if (cursorPos == null) { 919 // The subscription might not exist. 920 return 0; 921 } 922 923 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 924 } 925 }); 926 } finally { 927 indexLock.writeLock().unlock(); 928 } 929 } 930 } 931 932 933 @Override 934 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 935 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 936 if (isEnableSubscriptionStatistics()) { 937 return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize(); 938 } else { 939 indexLock.writeLock().lock(); 940 try { 941 return pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() { 942 @Override 943 public Long execute(Transaction tx) throws IOException { 944 StoredDestination sd = getStoredDestination(dest, tx); 945 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 946 if (cursorPos == null) { 947 // The subscription might not exist. 948 return 0l; 949 } 950 951 return getStoredMessageSize(tx, sd, subscriptionKey); 952 } 953 }); 954 } finally { 955 indexLock.writeLock().unlock(); 956 } 957 } 958 } 959 960 protected void recoverMessageStoreSubMetrics() throws IOException { 961 if (isEnableSubscriptionStatistics()) { 962 963 final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); 964 indexLock.writeLock().lock(); 965 try { 966 pageFile.tx().execute(new Transaction.Closure<IOException>() { 967 @Override 968 public void execute(Transaction tx) throws IOException { 969 StoredDestination sd = getStoredDestination(dest, tx); 970 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions 971 .iterator(tx); iterator.hasNext();) { 972 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 973 974 String subscriptionKey = entry.getKey(); 975 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 976 if (cursorPos != null) { 977 long size = getStoredMessageSize(tx, sd, subscriptionKey); 978 statistics.getMessageCount(subscriptionKey) 979 .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); 980 statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0); 981 } 982 } 983 } 984 }); 985 } finally { 986 indexLock.writeLock().unlock(); 987 } 988 } 989 } 990 991 @Override 992 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 993 throws Exception { 994 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 995 @SuppressWarnings("unused") 996 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 997 indexLock.writeLock().lock(); 998 try { 999 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1000 @Override 1001 public void execute(Transaction tx) throws Exception { 1002 StoredDestination sd = getStoredDestination(dest, tx); 1003 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 1004 sd.orderIndex.setBatch(tx, cursorPos); 1005 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 1006 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 1007 .hasNext();) { 1008 Entry<Long, MessageKeys> entry = iterator.next(); 1009 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1010 continue; 1011 } 1012 listener.recoverMessage(loadMessage(entry.getValue().location)); 1013 } 1014 sd.orderIndex.resetCursorPosition(); 1015 } 1016 }); 1017 } finally { 1018 indexLock.writeLock().unlock(); 1019 } 1020 } 1021 1022 @Override 1023 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 1024 final MessageRecoveryListener listener) throws Exception { 1025 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1026 @SuppressWarnings("unused") 1027 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1028 indexLock.writeLock().lock(); 1029 try { 1030 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1031 @Override 1032 public void execute(Transaction tx) throws Exception { 1033 StoredDestination sd = getStoredDestination(dest, tx); 1034 sd.orderIndex.resetCursorPosition(); 1035 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 1036 if (moc == null) { 1037 LastAck pos = getLastAck(tx, sd, subscriptionKey); 1038 if (pos == null) { 1039 // sub deleted 1040 return; 1041 } 1042 sd.orderIndex.setBatch(tx, pos); 1043 moc = sd.orderIndex.cursor; 1044 } else { 1045 sd.orderIndex.cursor.sync(moc); 1046 } 1047 1048 Entry<Long, MessageKeys> entry = null; 1049 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 1050 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 1051 .hasNext();) { 1052 entry = iterator.next(); 1053 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1054 continue; 1055 } 1056 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 1057 counter++; 1058 } 1059 if (counter >= maxReturned || listener.hasSpace() == false) { 1060 break; 1061 } 1062 } 1063 sd.orderIndex.stoppedIterating(); 1064 if (entry != null) { 1065 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1066 sd.subscriptionCursors.put(subscriptionKey, copy); 1067 } 1068 } 1069 }); 1070 } finally { 1071 indexLock.writeLock().unlock(); 1072 } 1073 } 1074 1075 @Override 1076 public void resetBatching(String clientId, String subscriptionName) { 1077 try { 1078 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1079 indexLock.writeLock().lock(); 1080 try { 1081 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1082 @Override 1083 public void execute(Transaction tx) throws IOException { 1084 StoredDestination sd = getStoredDestination(dest, tx); 1085 sd.subscriptionCursors.remove(subscriptionKey); 1086 } 1087 }); 1088 }finally { 1089 indexLock.writeLock().unlock(); 1090 } 1091 } catch (IOException e) { 1092 throw new RuntimeException(e); 1093 } 1094 } 1095 1096 @Override 1097 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 1098 return messageStoreSubStats; 1099 } 1100 } 1101 1102 String subscriptionKey(String clientId, String subscriptionName) { 1103 return clientId + ":" + subscriptionName; 1104 } 1105 1106 @Override 1107 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1108 String key = key(convert(destination)); 1109 MessageStore store = storeCache.get(key(convert(destination))); 1110 if (store == null) { 1111 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1112 store = storeCache.putIfAbsent(key, queueStore); 1113 if (store == null) { 1114 store = queueStore; 1115 } 1116 } 1117 1118 return store; 1119 } 1120 1121 @Override 1122 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1123 String key = key(convert(destination)); 1124 MessageStore store = storeCache.get(key(convert(destination))); 1125 if (store == null) { 1126 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1127 store = storeCache.putIfAbsent(key, topicStore); 1128 if (store == null) { 1129 store = topicStore; 1130 } 1131 } 1132 1133 return (TopicMessageStore) store; 1134 } 1135 1136 /** 1137 * Cleanup method to remove any state associated with the given destination. 1138 * This method does not stop the message store (it might not be cached). 1139 * 1140 * @param destination 1141 * Destination to forget 1142 */ 1143 @Override 1144 public void removeQueueMessageStore(ActiveMQQueue destination) { 1145 } 1146 1147 /** 1148 * Cleanup method to remove any state associated with the given destination 1149 * This method does not stop the message store (it might not be cached). 1150 * 1151 * @param destination 1152 * Destination to forget 1153 */ 1154 @Override 1155 public void removeTopicMessageStore(ActiveMQTopic destination) { 1156 } 1157 1158 @Override 1159 public void deleteAllMessages() throws IOException { 1160 deleteAllMessages = true; 1161 } 1162 1163 @Override 1164 public Set<ActiveMQDestination> getDestinations() { 1165 try { 1166 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1167 indexLock.writeLock().lock(); 1168 try { 1169 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1170 @Override 1171 public void execute(Transaction tx) throws IOException { 1172 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1173 .hasNext();) { 1174 Entry<String, StoredDestination> entry = iterator.next(); 1175 //Removing isEmpty topic check - see AMQ-5875 1176 rc.add(convert(entry.getKey())); 1177 } 1178 } 1179 }); 1180 }finally { 1181 indexLock.writeLock().unlock(); 1182 } 1183 return rc; 1184 } catch (IOException e) { 1185 throw new RuntimeException(e); 1186 } 1187 } 1188 1189 @Override 1190 public long getLastMessageBrokerSequenceId() throws IOException { 1191 return 0; 1192 } 1193 1194 @Override 1195 public long getLastProducerSequenceId(ProducerId id) { 1196 indexLock.writeLock().lock(); 1197 try { 1198 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1199 } finally { 1200 indexLock.writeLock().unlock(); 1201 } 1202 } 1203 1204 @Override 1205 public long size() { 1206 try { 1207 return journalSize.get() + getPageFile().getDiskSize(); 1208 } catch (IOException e) { 1209 throw new RuntimeException(e); 1210 } 1211 } 1212 1213 @Override 1214 public void beginTransaction(ConnectionContext context) throws IOException { 1215 throw new IOException("Not yet implemented."); 1216 } 1217 @Override 1218 public void commitTransaction(ConnectionContext context) throws IOException { 1219 throw new IOException("Not yet implemented."); 1220 } 1221 @Override 1222 public void rollbackTransaction(ConnectionContext context) throws IOException { 1223 throw new IOException("Not yet implemented."); 1224 } 1225 1226 @Override 1227 public void checkpoint(boolean sync) throws IOException { 1228 super.checkpointCleanup(sync); 1229 } 1230 1231 // ///////////////////////////////////////////////////////////////// 1232 // Internal helper methods. 1233 // ///////////////////////////////////////////////////////////////// 1234 1235 /** 1236 * @param location 1237 * @return 1238 * @throws IOException 1239 */ 1240 Message loadMessage(Location location) throws IOException { 1241 try { 1242 JournalCommand<?> command = load(location); 1243 KahaAddMessageCommand addMessage = null; 1244 switch (command.type()) { 1245 case KAHA_UPDATE_MESSAGE_COMMAND: 1246 addMessage = ((KahaUpdateMessageCommand) command).getMessage(); 1247 break; 1248 case KAHA_ADD_MESSAGE_COMMAND: 1249 addMessage = (KahaAddMessageCommand) command; 1250 break; 1251 default: 1252 throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location); 1253 } 1254 if (!addMessage.hasMessage()) { 1255 throw new IOException("Could not load journal record, null message content at location: " + location); 1256 } 1257 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1258 return msg; 1259 } catch (Throwable t) { 1260 IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t); 1261 LOG.error("Failed to load message at: {}", location , ioe); 1262 brokerService.handleIOException(ioe); 1263 throw ioe; 1264 } 1265 } 1266 1267 // ///////////////////////////////////////////////////////////////// 1268 // Internal conversion methods. 1269 // ///////////////////////////////////////////////////////////////// 1270 1271 KahaLocation convert(Location location) { 1272 KahaLocation rc = new KahaLocation(); 1273 rc.setLogId(location.getDataFileId()); 1274 rc.setOffset(location.getOffset()); 1275 return rc; 1276 } 1277 1278 KahaDestination convert(ActiveMQDestination dest) { 1279 KahaDestination rc = new KahaDestination(); 1280 rc.setName(dest.getPhysicalName()); 1281 switch (dest.getDestinationType()) { 1282 case ActiveMQDestination.QUEUE_TYPE: 1283 rc.setType(DestinationType.QUEUE); 1284 return rc; 1285 case ActiveMQDestination.TOPIC_TYPE: 1286 rc.setType(DestinationType.TOPIC); 1287 return rc; 1288 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1289 rc.setType(DestinationType.TEMP_QUEUE); 1290 return rc; 1291 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1292 rc.setType(DestinationType.TEMP_TOPIC); 1293 return rc; 1294 default: 1295 return null; 1296 } 1297 } 1298 1299 ActiveMQDestination convert(String dest) { 1300 int p = dest.indexOf(":"); 1301 if (p < 0) { 1302 throw new IllegalArgumentException("Not in the valid destination format"); 1303 } 1304 int type = Integer.parseInt(dest.substring(0, p)); 1305 String name = dest.substring(p + 1); 1306 return convert(type, name); 1307 } 1308 1309 private ActiveMQDestination convert(KahaDestination commandDestination) { 1310 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1311 } 1312 1313 private ActiveMQDestination convert(int type, String name) { 1314 switch (KahaDestination.DestinationType.valueOf(type)) { 1315 case QUEUE: 1316 return new ActiveMQQueue(name); 1317 case TOPIC: 1318 return new ActiveMQTopic(name); 1319 case TEMP_QUEUE: 1320 return new ActiveMQTempQueue(name); 1321 case TEMP_TOPIC: 1322 return new ActiveMQTempTopic(name); 1323 default: 1324 throw new IllegalArgumentException("Not in the valid destination format"); 1325 } 1326 } 1327 1328 public TransactionIdTransformer getTransactionIdTransformer() { 1329 return transactionIdTransformer; 1330 } 1331 1332 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1333 this.transactionIdTransformer = transactionIdTransformer; 1334 } 1335 1336 static class AsyncJobKey { 1337 MessageId id; 1338 ActiveMQDestination destination; 1339 1340 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1341 this.id = id; 1342 this.destination = destination; 1343 } 1344 1345 @Override 1346 public boolean equals(Object obj) { 1347 if (obj == this) { 1348 return true; 1349 } 1350 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1351 && destination.equals(((AsyncJobKey) obj).destination); 1352 } 1353 1354 @Override 1355 public int hashCode() { 1356 return id.hashCode() + destination.hashCode(); 1357 } 1358 1359 @Override 1360 public String toString() { 1361 return destination.getPhysicalName() + "-" + id; 1362 } 1363 } 1364 1365 public interface StoreTask { 1366 public boolean cancel(); 1367 1368 public void aquireLocks(); 1369 1370 public void releaseLocks(); 1371 } 1372 1373 class StoreQueueTask implements Runnable, StoreTask { 1374 protected final Message message; 1375 protected final ConnectionContext context; 1376 protected final KahaDBMessageStore store; 1377 protected final InnerFutureTask future; 1378 protected final AtomicBoolean done = new AtomicBoolean(); 1379 protected final AtomicBoolean locked = new AtomicBoolean(); 1380 1381 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1382 this.store = store; 1383 this.context = context; 1384 this.message = message; 1385 this.future = new InnerFutureTask(this); 1386 } 1387 1388 public ListenableFuture<Object> getFuture() { 1389 return this.future; 1390 } 1391 1392 @Override 1393 public boolean cancel() { 1394 if (this.done.compareAndSet(false, true)) { 1395 return this.future.cancel(false); 1396 } 1397 return false; 1398 } 1399 1400 @Override 1401 public void aquireLocks() { 1402 if (this.locked.compareAndSet(false, true)) { 1403 try { 1404 globalQueueSemaphore.acquire(); 1405 store.acquireLocalAsyncLock(); 1406 message.incrementReferenceCount(); 1407 } catch (InterruptedException e) { 1408 LOG.warn("Failed to aquire lock", e); 1409 } 1410 } 1411 1412 } 1413 1414 @Override 1415 public void releaseLocks() { 1416 if (this.locked.compareAndSet(true, false)) { 1417 store.releaseLocalAsyncLock(); 1418 globalQueueSemaphore.release(); 1419 message.decrementReferenceCount(); 1420 } 1421 } 1422 1423 @Override 1424 public void run() { 1425 this.store.doneTasks++; 1426 try { 1427 if (this.done.compareAndSet(false, true)) { 1428 this.store.addMessage(context, message); 1429 removeQueueTask(this.store, this.message.getMessageId()); 1430 this.future.complete(); 1431 } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) { 1432 System.err.println(this.store.dest.getName() + " cancelled: " 1433 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1434 this.store.canceledTasks = this.store.doneTasks = 0; 1435 } 1436 } catch (Throwable t) { 1437 this.future.setException(t); 1438 removeQueueTask(this.store, this.message.getMessageId()); 1439 } 1440 } 1441 1442 protected Message getMessage() { 1443 return this.message; 1444 } 1445 1446 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1447 1448 private final AtomicReference<Runnable> listenerRef = new AtomicReference<>(); 1449 1450 public InnerFutureTask(Runnable runnable) { 1451 super(runnable, null); 1452 } 1453 1454 public void setException(final Throwable e) { 1455 super.setException(e); 1456 } 1457 1458 public void complete() { 1459 super.set(null); 1460 } 1461 1462 @Override 1463 public void done() { 1464 fireListener(); 1465 } 1466 1467 @Override 1468 public void addListener(Runnable listener) { 1469 this.listenerRef.set(listener); 1470 if (isDone()) { 1471 fireListener(); 1472 } 1473 } 1474 1475 private void fireListener() { 1476 Runnable listener = listenerRef.getAndSet(null); 1477 if (listener != null) { 1478 try { 1479 listener.run(); 1480 } catch (Exception ignored) { 1481 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1482 } 1483 } 1484 } 1485 } 1486 } 1487 1488 class StoreTopicTask extends StoreQueueTask { 1489 private final int subscriptionCount; 1490 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1491 private final KahaDBTopicMessageStore topicStore; 1492 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1493 int subscriptionCount) { 1494 super(store, context, message); 1495 this.topicStore = store; 1496 this.subscriptionCount = subscriptionCount; 1497 1498 } 1499 1500 @Override 1501 public void aquireLocks() { 1502 if (this.locked.compareAndSet(false, true)) { 1503 try { 1504 globalTopicSemaphore.acquire(); 1505 store.acquireLocalAsyncLock(); 1506 message.incrementReferenceCount(); 1507 } catch (InterruptedException e) { 1508 LOG.warn("Failed to aquire lock", e); 1509 } 1510 } 1511 } 1512 1513 @Override 1514 public void releaseLocks() { 1515 if (this.locked.compareAndSet(true, false)) { 1516 message.decrementReferenceCount(); 1517 store.releaseLocalAsyncLock(); 1518 globalTopicSemaphore.release(); 1519 } 1520 } 1521 1522 /** 1523 * add a key 1524 * 1525 * @param key 1526 * @return true if all acknowledgements received 1527 */ 1528 public boolean addSubscriptionKey(String key) { 1529 synchronized (this.subscriptionKeys) { 1530 this.subscriptionKeys.add(key); 1531 } 1532 return this.subscriptionKeys.size() >= this.subscriptionCount; 1533 } 1534 1535 @Override 1536 public void run() { 1537 this.store.doneTasks++; 1538 try { 1539 if (this.done.compareAndSet(false, true)) { 1540 this.topicStore.addMessage(context, message); 1541 // apply any acks we have 1542 synchronized (this.subscriptionKeys) { 1543 for (String key : this.subscriptionKeys) { 1544 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1545 1546 } 1547 } 1548 removeTopicTask(this.topicStore, this.message.getMessageId()); 1549 this.future.complete(); 1550 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1551 System.err.println(this.store.dest.getName() + " cancelled: " 1552 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1553 this.store.canceledTasks = this.store.doneTasks = 0; 1554 } 1555 } catch (Throwable t) { 1556 this.future.setException(t); 1557 removeTopicTask(this.topicStore, this.message.getMessageId()); 1558 } 1559 } 1560 } 1561 1562 public class StoreTaskExecutor extends ThreadPoolExecutor { 1563 1564 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1565 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1566 } 1567 1568 @Override 1569 protected void afterExecute(Runnable runnable, Throwable throwable) { 1570 super.afterExecute(runnable, throwable); 1571 1572 if (runnable instanceof StoreTask) { 1573 ((StoreTask)runnable).releaseLocks(); 1574 } 1575 } 1576 } 1577 1578 @Override 1579 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1580 return new JobSchedulerStoreImpl(); 1581 } 1582 1583 /* (non-Javadoc) 1584 * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() 1585 */ 1586 @Override 1587 public boolean isPersistNoLocal() { 1588 // Prior to v11 the broker did not store the noLocal value for durable subs. 1589 return brokerService.getStoreOpenWireVersion() >= 11; 1590 } 1591}