001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues; 020 021import java.util.ArrayDeque; 022import java.util.Collection; 023import java.util.Deque; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028 029import org.apache.activemq.broker.region.MessageReference; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.management.SizeStatisticImpl; 032 033public class PrioritizedPendingList implements PendingList { 034 035 private static final Integer MAX_PRIORITY = 10; 036 private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY]; 037 private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>(); 038 private final SizeStatisticImpl messageSize; 039 private final PendingMessageHelper pendingMessageHelper; 040 041 042 public PrioritizedPendingList() { 043 for (int i = 0; i < MAX_PRIORITY; i++) { 044 this.lists[i] = new OrderedPendingList(); 045 } 046 messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages"); 047 messageSize.setEnabled(true); 048 pendingMessageHelper = new PendingMessageHelper(map, messageSize); 049 } 050 051 @Override 052 public PendingNode addMessageFirst(MessageReference message) { 053 PendingNode node = getList(message).addMessageFirst(message); 054 this.pendingMessageHelper.addToMap(message, node); 055 return node; 056 } 057 058 @Override 059 public PendingNode addMessageLast(MessageReference message) { 060 PendingNode node = getList(message).addMessageLast(message); 061 this.pendingMessageHelper.addToMap(message, node); 062 return node; 063 } 064 065 @Override 066 public void clear() { 067 for (int i = 0; i < MAX_PRIORITY; i++) { 068 this.lists[i].clear(); 069 } 070 this.map.clear(); 071 this.messageSize.reset(); 072 } 073 074 @Override 075 public boolean isEmpty() { 076 return this.map.isEmpty(); 077 } 078 079 @Override 080 public Iterator<MessageReference> iterator() { 081 return new PrioritizedPendingListIterator(); 082 } 083 084 @Override 085 public PendingNode remove(MessageReference message) { 086 PendingNode node = null; 087 if (message != null) { 088 node = this.pendingMessageHelper.removeFromMap(message); 089 if (node != null) { 090 node.getList().removeNode(node); 091 } 092 } 093 return node; 094 } 095 096 @Override 097 public int size() { 098 return this.map.size(); 099 } 100 101 @Override 102 public long messageSize() { 103 return this.messageSize.getTotalSize(); 104 } 105 106 @Override 107 public String toString() { 108 return "PrioritizedPendingList(" + System.identityHashCode(this) + ")"; 109 } 110 111 protected int getPriority(MessageReference message) { 112 int priority = javax.jms.Message.DEFAULT_PRIORITY; 113 if (message.getMessageId() != null) { 114 priority = Math.max(message.getMessage().getPriority(), 0); 115 priority = Math.min(priority, 9); 116 } 117 return priority; 118 } 119 120 protected OrderedPendingList getList(MessageReference msg) { 121 return lists[getPriority(msg)]; 122 } 123 124 private final class PrioritizedPendingListIterator implements Iterator<MessageReference> { 125 126 private final Deque<Iterator<MessageReference>> iterators = new ArrayDeque<Iterator<MessageReference>>(); 127 128 private Iterator<MessageReference> current; 129 private MessageReference currentMessage; 130 131 PrioritizedPendingListIterator() { 132 for (OrderedPendingList list : lists) { 133 if (!list.isEmpty()) { 134 iterators.push(list.iterator()); 135 } 136 } 137 138 current = iterators.poll(); 139 } 140 141 @Override 142 public boolean hasNext() { 143 while (current != null) { 144 if (current.hasNext()) { 145 return true; 146 } else { 147 current = iterators.poll(); 148 } 149 } 150 151 return false; 152 } 153 154 @Override 155 public MessageReference next() { 156 MessageReference result = null; 157 158 while (current != null) { 159 if (current.hasNext()) { 160 result = currentMessage = current.next(); 161 break; 162 } else { 163 current = iterators.poll(); 164 } 165 } 166 167 return result; 168 } 169 170 @Override 171 public void remove() { 172 if (currentMessage != null) { 173 pendingMessageHelper.removeFromMap(currentMessage); 174 current.remove(); 175 currentMessage = null; 176 } 177 } 178 } 179 180 @Override 181 public boolean contains(MessageReference message) { 182 if (message != null) { 183 return this.map.containsKey(message.getMessageId()); 184 } 185 return false; 186 } 187 188 @Override 189 public Collection<MessageReference> values() { 190 return getValues(this); 191 } 192 193 @Override 194 public void addAll(PendingList pendingList) { 195 for(MessageReference messageReference : pendingList) { 196 addMessageLast(messageReference); 197 } 198 } 199 200 @Override 201 public MessageReference get(MessageId messageId) { 202 PendingNode node = map.get(messageId); 203 if (node != null) { 204 return node.getMessage(); 205 } 206 return null; 207 } 208 209}