001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.util; 018 019 import java.io.DataInput; 020 import java.io.DataOutput; 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.Iterator; 024 import java.util.List; 025 import java.util.NoSuchElementException; 026 027 /** 028 * Keeps track of a added long values. Collapses ranges of numbers using a 029 * Sequence representation. Use to keep track of received message ids to find 030 * out if a message is duplicate or if there are any missing messages. 031 * 032 * @author chirino 033 */ 034 public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> { 035 036 public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> { 037 038 public static final Marshaller INSTANCE = new Marshaller(); 039 040 public SequenceSet readPayload(DataInput in) throws IOException { 041 SequenceSet value = new SequenceSet(); 042 int count = in.readInt(); 043 for (int i = 0; i < count; i++) { 044 if( in.readBoolean() ) { 045 Sequence sequence = new Sequence(in.readLong(), in.readLong()); 046 value.addLast(sequence); 047 } else { 048 Sequence sequence = new Sequence(in.readLong()); 049 value.addLast(sequence); 050 } 051 } 052 return value; 053 } 054 055 public void writePayload(SequenceSet value, DataOutput out) throws IOException { 056 out.writeInt(value.size()); 057 Sequence sequence = value.getHead(); 058 while (sequence != null ) { 059 if( sequence.range() > 1 ) { 060 out.writeBoolean(true); 061 out.writeLong(sequence.first); 062 out.writeLong(sequence.last); 063 } else { 064 out.writeBoolean(false); 065 out.writeLong(sequence.first); 066 } 067 sequence = sequence.getNext(); 068 } 069 } 070 071 public int getFixedSize() { 072 return -1; 073 } 074 075 public SequenceSet deepCopy(SequenceSet value) { 076 SequenceSet rc = new SequenceSet(); 077 Sequence sequence = value.getHead(); 078 while (sequence != null ) { 079 rc.add(new Sequence(sequence.first, sequence.last)); 080 sequence = sequence.getNext(); 081 } 082 return rc; 083 } 084 085 public boolean isDeepCopySupported() { 086 return true; 087 } 088 } 089 090 public void add(Sequence value) { 091 // TODO we can probably optimize this a bit 092 for(long i=value.first; i<value.last+1; i++) { 093 add(i); 094 } 095 } 096 097 /** 098 * 099 * @param value 100 * the value to add to the list 101 * @return false if the value was a duplicate. 102 */ 103 public boolean add(long value) { 104 105 if (isEmpty()) { 106 addFirst(new Sequence(value)); 107 return true; 108 } 109 110 Sequence sequence = getHead(); 111 while (sequence != null) { 112 113 if (sequence.isAdjacentToLast(value)) { 114 // grow the sequence... 115 sequence.last = value; 116 // it might connect us to the next sequence.. 117 if (sequence.getNext() != null) { 118 Sequence next = sequence.getNext(); 119 if (next.isAdjacentToFirst(value)) { 120 // Yep the sequence connected.. so join them. 121 sequence.last = next.last; 122 next.unlink(); 123 } 124 } 125 return true; 126 } 127 128 if (sequence.isAdjacentToFirst(value)) { 129 // grow the sequence... 130 sequence.first = value; 131 132 // it might connect us to the previous 133 if (sequence.getPrevious() != null) { 134 Sequence prev = sequence.getPrevious(); 135 if (prev.isAdjacentToLast(value)) { 136 // Yep the sequence connected.. so join them. 137 sequence.first = prev.first; 138 prev.unlink(); 139 } 140 } 141 return true; 142 } 143 144 // Did that value land before this sequence? 145 if (value < sequence.first) { 146 // Then insert a new entry before this sequence item. 147 sequence.linkBefore(new Sequence(value)); 148 return true; 149 } 150 151 // Did that value land within the sequence? The it's a duplicate. 152 if (sequence.contains(value)) { 153 return false; 154 } 155 156 sequence = sequence.getNext(); 157 } 158 159 // Then the value is getting appended to the tail of the sequence. 160 addLast(new Sequence(value)); 161 return true; 162 } 163 164 /** 165 * Removes the given value from the Sequence set, splitting a 166 * contained sequence if necessary. 167 * 168 * @param value 169 * The value that should be removed from the SequenceSet. 170 * 171 * @return true if the value was removed from the set, false if there 172 * was no sequence in the set that contained the given value. 173 */ 174 public boolean remove(long value) { 175 Sequence sequence = getHead(); 176 while (sequence != null ) { 177 if(sequence.contains(value)) { 178 if (sequence.range() == 1) { 179 sequence.unlink(); 180 return true; 181 } else if (sequence.getFirst() == value) { 182 sequence.setFirst(value+1); 183 return true; 184 } else if (sequence.getLast() == value) { 185 sequence.setLast(value-1); 186 return true; 187 } else { 188 sequence.linkBefore(new Sequence(sequence.first, value-1)); 189 sequence.linkAfter(new Sequence(value+1, sequence.last)); 190 sequence.unlink(); 191 return true; 192 } 193 } 194 195 sequence = sequence.getNext(); 196 } 197 198 return false; 199 } 200 201 /** 202 * Removes and returns the first element from this list. 203 * 204 * @return the first element from this list. 205 * @throws NoSuchElementException if this list is empty. 206 */ 207 public long removeFirst() { 208 if (isEmpty()) { 209 throw new NoSuchElementException(); 210 } 211 212 Sequence rc = removeFirstSequence(1); 213 return rc.first; 214 } 215 216 /** 217 * Removes and returns the last sequence from this list. 218 * 219 * @return the last sequence from this list or null if the list is empty. 220 */ 221 public Sequence removeLastSequence() { 222 if (isEmpty()) { 223 return null; 224 } 225 226 Sequence rc = getTail(); 227 rc.unlink(); 228 return rc; 229 } 230 231 /** 232 * Removes and returns the first sequence that is count range large. 233 * 234 * @return a sequence that is count range large, or null if no sequence is that large in the list. 235 */ 236 public Sequence removeFirstSequence(long count) { 237 if (isEmpty()) { 238 return null; 239 } 240 241 Sequence sequence = getHead(); 242 while (sequence != null ) { 243 if (sequence.range() == count ) { 244 sequence.unlink(); 245 return sequence; 246 } 247 if (sequence.range() > count ) { 248 Sequence rc = new Sequence(sequence.first, sequence.first+count-1); 249 sequence.first+=count; 250 return rc; 251 } 252 sequence = sequence.getNext(); 253 } 254 return null; 255 } 256 257 /** 258 * @return all the id Sequences that are missing from this set that are not 259 * in between the range provided. 260 */ 261 public List<Sequence> getMissing(long first, long last) { 262 ArrayList<Sequence> rc = new ArrayList<Sequence>(); 263 if (first > last) { 264 throw new IllegalArgumentException("First cannot be more than last"); 265 } 266 if (isEmpty()) { 267 // We are missing all the messages. 268 rc.add(new Sequence(first, last)); 269 return rc; 270 } 271 272 Sequence sequence = getHead(); 273 while (sequence != null && first <= last) { 274 if (sequence.contains(first)) { 275 first = sequence.last + 1; 276 } else { 277 if (first < sequence.first) { 278 if (last < sequence.first) { 279 rc.add(new Sequence(first, last)); 280 return rc; 281 } else { 282 rc.add(new Sequence(first, sequence.first - 1)); 283 first = sequence.last + 1; 284 } 285 } 286 } 287 sequence = sequence.getNext(); 288 } 289 290 if (first <= last) { 291 rc.add(new Sequence(first, last)); 292 } 293 return rc; 294 } 295 296 /** 297 * @return all the Sequence that are in this list 298 */ 299 public List<Sequence> getReceived() { 300 ArrayList<Sequence> rc = new ArrayList<Sequence>(size()); 301 Sequence sequence = getHead(); 302 while (sequence != null) { 303 rc.add(new Sequence(sequence.first, sequence.last)); 304 sequence = sequence.getNext(); 305 } 306 return rc; 307 } 308 309 /** 310 * Returns true if the value given is contained within one of the 311 * sequences held in this set. 312 * 313 * @param value 314 * The value to search for in the set. 315 * 316 * @return true if the value is contained in the set. 317 */ 318 public boolean contains(long value) { 319 if (isEmpty()) { 320 return false; 321 } 322 323 Sequence sequence = getHead(); 324 while (sequence != null) { 325 if (sequence.contains(value)) { 326 return true; 327 } 328 sequence = sequence.getNext(); 329 } 330 331 return false; 332 } 333 334 public boolean contains(int first, int last) { 335 if (isEmpty()) { 336 return false; 337 } 338 Sequence sequence = getHead(); 339 while (sequence != null) { 340 if (sequence.first <= first && first <= sequence.last ) { 341 return last <= sequence.last; 342 } 343 sequence = sequence.getNext(); 344 } 345 return false; 346 } 347 348 /** 349 * Computes the size of this Sequence by summing the values of all 350 * the contained sequences. 351 * 352 * @return the total number of values contained in this set if it 353 * were to be iterated over like an array. 354 */ 355 public long rangeSize() { 356 long result = 0; 357 Sequence sequence = getHead(); 358 while (sequence != null) { 359 result += sequence.range(); 360 sequence = sequence.getNext(); 361 } 362 return result; 363 } 364 365 public Iterator<Long> iterator() { 366 return new SequenceIterator(); 367 } 368 369 private class SequenceIterator implements Iterator<Long> { 370 371 private Sequence currentEntry; 372 private long lastReturned = -1; 373 374 public SequenceIterator() { 375 currentEntry = getHead(); 376 if (currentEntry != null) { 377 lastReturned = currentEntry.first - 1; 378 } 379 } 380 381 public boolean hasNext() { 382 return currentEntry != null; 383 } 384 385 public Long next() { 386 if (currentEntry == null) { 387 throw new NoSuchElementException(); 388 } 389 390 if(lastReturned < currentEntry.first) { 391 lastReturned = currentEntry.first; 392 if (currentEntry.range() == 1) { 393 currentEntry = currentEntry.getNext(); 394 } 395 } else { 396 lastReturned++; 397 if (lastReturned == currentEntry.last) { 398 currentEntry = currentEntry.getNext(); 399 } 400 } 401 402 return lastReturned; 403 } 404 405 public void remove() { 406 throw new UnsupportedOperationException(); 407 } 408 409 } 410 411 }