001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.usage; 018 019import java.util.LinkedList; 020import java.util.List; 021import java.util.concurrent.CopyOnWriteArrayList; 022import java.util.concurrent.ThreadPoolExecutor; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.locks.Condition; 026import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028import org.apache.activemq.Service; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Used to keep track of how much of something is being used so that a productive working set usage can be controlled. 034 * Main use case is manage memory usage. 035 * 036 * @org.apache.xbean.XBean 037 * 038 */ 039public abstract class Usage<T extends Usage> implements Service { 040 041 private static final Logger LOG = LoggerFactory.getLogger(Usage.class); 042 043 protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock(); 044 protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition(); 045 protected int percentUsage; 046 protected T parent; 047 protected String name; 048 049 private UsageCapacity limiter = new DefaultUsageCapacity(); 050 private int percentUsageMinDelta = 1; 051 private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>(); 052 private final boolean debug = LOG.isDebugEnabled(); 053 private float usagePortion = 1.0f; 054 private final List<T> children = new CopyOnWriteArrayList<T>(); 055 private final List<Runnable> callbacks = new LinkedList<Runnable>(); 056 private int pollingTime = 100; 057 private final AtomicBoolean started = new AtomicBoolean(); 058 private ThreadPoolExecutor executor; 059 060 public Usage(T parent, String name, float portion) { 061 this.parent = parent; 062 this.usagePortion = portion; 063 if (parent != null) { 064 this.limiter.setLimit((long) (parent.getLimit() * (double)portion)); 065 name = parent.name + ":" + name; 066 } 067 this.name = name; 068 } 069 070 protected abstract long retrieveUsage(); 071 072 /** 073 * @throws InterruptedException 074 */ 075 public void waitForSpace() throws InterruptedException { 076 waitForSpace(0); 077 } 078 079 public boolean waitForSpace(long timeout) throws InterruptedException { 080 return waitForSpace(timeout, 100); 081 } 082 083 /** 084 * @param timeout 085 * @throws InterruptedException 086 * @return true if space 087 */ 088 public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException { 089 if (parent != null) { 090 if (!parent.waitForSpace(timeout, highWaterMark)) { 091 return false; 092 } 093 } 094 usageLock.writeLock().lock(); 095 try { 096 percentUsage = caclPercentUsage(); 097 if (percentUsage >= highWaterMark) { 098 long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; 099 long timeleft = deadline; 100 while (timeleft > 0) { 101 percentUsage = caclPercentUsage(); 102 if (percentUsage >= highWaterMark) { 103 waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS); 104 timeleft = deadline - System.currentTimeMillis(); 105 } else { 106 break; 107 } 108 } 109 } 110 return percentUsage < highWaterMark; 111 } finally { 112 usageLock.writeLock().unlock(); 113 } 114 } 115 116 public boolean isFull() { 117 return isFull(100); 118 } 119 120 public boolean isFull(int highWaterMark) { 121 if (parent != null && parent.isFull(highWaterMark)) { 122 return true; 123 } 124 usageLock.writeLock().lock(); 125 try { 126 percentUsage = caclPercentUsage(); 127 return percentUsage >= highWaterMark; 128 } finally { 129 usageLock.writeLock().unlock(); 130 } 131 } 132 133 public void addUsageListener(UsageListener listener) { 134 listeners.add(listener); 135 } 136 137 public void removeUsageListener(UsageListener listener) { 138 listeners.remove(listener); 139 } 140 141 public int getNumUsageListeners() { 142 return listeners.size(); 143 } 144 145 public long getLimit() { 146 usageLock.readLock().lock(); 147 try { 148 return limiter.getLimit(); 149 } finally { 150 usageLock.readLock().unlock(); 151 } 152 } 153 154 /** 155 * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since the UsageManager 156 * is not going to be portion based off the parent. When set using Xbean, values of the form "20 Mb", "1024kb", and 157 * "1g" can be used 158 * 159 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 160 */ 161 public void setLimit(long limit) { 162 if (percentUsageMinDelta < 0) { 163 throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0"); 164 } 165 usageLock.writeLock().lock(); 166 try { 167 this.limiter.setLimit(limit); 168 this.usagePortion = 0; 169 } finally { 170 usageLock.writeLock().unlock(); 171 } 172 onLimitChange(); 173 } 174 175 protected void onLimitChange() { 176 // We may need to calculate the limit 177 if (usagePortion > 0 && parent != null) { 178 usageLock.writeLock().lock(); 179 try { 180 this.limiter.setLimit((long) (parent.getLimit() * (double) usagePortion)); 181 } finally { 182 usageLock.writeLock().unlock(); 183 } 184 } 185 // Reset the percent currently being used. 186 usageLock.writeLock().lock(); 187 try { 188 setPercentUsage(caclPercentUsage()); 189 } finally { 190 usageLock.writeLock().unlock(); 191 } 192 // Let the children know that the limit has changed. They may need to 193 // set their limits based on ours. 194 for (T child : children) { 195 child.onLimitChange(); 196 } 197 } 198 199 public float getUsagePortion() { 200 usageLock.readLock().lock(); 201 try { 202 return usagePortion; 203 } finally { 204 usageLock.readLock().unlock(); 205 } 206 } 207 208 public void setUsagePortion(float usagePortion) { 209 usageLock.writeLock().lock(); 210 try { 211 this.usagePortion = usagePortion; 212 } finally { 213 usageLock.writeLock().unlock(); 214 } 215 onLimitChange(); 216 } 217 218 public int getPercentUsage() { 219 usageLock.readLock().lock(); 220 try { 221 return percentUsage; 222 } finally { 223 usageLock.readLock().unlock(); 224 } 225 } 226 227 public int getPercentUsageMinDelta() { 228 usageLock.readLock().lock(); 229 try { 230 return percentUsageMinDelta; 231 } finally { 232 usageLock.readLock().unlock(); 233 } 234 } 235 236 /** 237 * Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the 238 * manager. 239 * 240 * @param percentUsageMinDelta 241 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 242 */ 243 public void setPercentUsageMinDelta(int percentUsageMinDelta) { 244 if (percentUsageMinDelta < 1) { 245 throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0"); 246 } 247 248 usageLock.writeLock().lock(); 249 try { 250 this.percentUsageMinDelta = percentUsageMinDelta; 251 setPercentUsage(caclPercentUsage()); 252 } finally { 253 usageLock.writeLock().unlock(); 254 } 255 } 256 257 public long getUsage() { 258 usageLock.readLock().lock(); 259 try { 260 return retrieveUsage(); 261 } finally { 262 usageLock.readLock().unlock(); 263 } 264 } 265 266 protected void setPercentUsage(int value) { 267 usageLock.writeLock().lock(); 268 try { 269 int oldValue = percentUsage; 270 percentUsage = value; 271 if (oldValue != value) { 272 fireEvent(oldValue, value); 273 } 274 } finally { 275 usageLock.writeLock().unlock(); 276 } 277 } 278 279 protected int caclPercentUsage() { 280 if (limiter.getLimit() == 0) { 281 return 0; 282 } 283 return (int) ((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta); 284 } 285 286 // Must be called with the usage lock's writeLock held. 287 private void fireEvent(final int oldPercentUsage, final int newPercentUsage) { 288 if (debug) { 289 LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory"); 290 } 291 if (started.get()) { 292 // Switching from being full to not being full.. 293 if (oldPercentUsage >= 100 && newPercentUsage < 100) { 294 waitForSpaceCondition.signalAll(); 295 if (!callbacks.isEmpty()) { 296 for (Runnable callback : callbacks) { 297 getExecutor().execute(callback); 298 } 299 callbacks.clear(); 300 } 301 } 302 if (!listeners.isEmpty()) { 303 // Let the listeners know on a separate thread 304 Runnable listenerNotifier = new Runnable() { 305 @Override 306 public void run() { 307 for (UsageListener listener : listeners) { 308 listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); 309 } 310 } 311 }; 312 if (started.get()) { 313 getExecutor().execute(listenerNotifier); 314 } else { 315 LOG.warn("Not notifying memory usage change to listeners on shutdown"); 316 } 317 } 318 } 319 } 320 321 public String getName() { 322 return name; 323 } 324 325 @Override 326 public String toString() { 327 return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit() 328 + ", percentUsageMinDelta=" + percentUsageMinDelta + "%" + (parent != null ? ";Parent:" + parent.toString() : ""); 329 } 330 331 @Override 332 @SuppressWarnings("unchecked") 333 public void start() { 334 if (started.compareAndSet(false, true)) { 335 if (parent != null) { 336 parent.addChild(this); 337 if (getLimit() > parent.getLimit()) { 338 LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() }); 339 } 340 } 341 for (T t : children) { 342 t.start(); 343 } 344 } 345 } 346 347 @Override 348 @SuppressWarnings("unchecked") 349 public void stop() { 350 if (started.compareAndSet(true, false)) { 351 if (parent != null) { 352 parent.removeChild(this); 353 } 354 355 // clear down any callbacks 356 usageLock.writeLock().lock(); 357 try { 358 waitForSpaceCondition.signalAll(); 359 for (Runnable callback : this.callbacks) { 360 callback.run(); 361 } 362 this.callbacks.clear(); 363 } finally { 364 usageLock.writeLock().unlock(); 365 } 366 367 for (T t : children) { 368 t.stop(); 369 } 370 } 371 } 372 373 protected void addChild(T child) { 374 children.add(child); 375 if (started.get()) { 376 child.start(); 377 } 378 } 379 380 protected void removeChild(T child) { 381 children.remove(child); 382 } 383 384 /** 385 * @param callback 386 * @return true if the UsageManager was full. The callback will only be called if this method returns true. 387 */ 388 public boolean notifyCallbackWhenNotFull(final Runnable callback) { 389 if (parent != null) { 390 Runnable r = new Runnable() { 391 392 @Override 393 public void run() { 394 usageLock.writeLock().lock(); 395 try { 396 if (percentUsage >= 100) { 397 callbacks.add(callback); 398 } else { 399 callback.run(); 400 } 401 } finally { 402 usageLock.writeLock().unlock(); 403 } 404 } 405 }; 406 if (parent.notifyCallbackWhenNotFull(r)) { 407 return true; 408 } 409 } 410 usageLock.writeLock().lock(); 411 try { 412 if (percentUsage >= 100) { 413 callbacks.add(callback); 414 return true; 415 } else { 416 return false; 417 } 418 } finally { 419 usageLock.writeLock().unlock(); 420 } 421 } 422 423 /** 424 * @return the limiter 425 */ 426 public UsageCapacity getLimiter() { 427 return this.limiter; 428 } 429 430 /** 431 * @param limiter 432 * the limiter to set 433 */ 434 public void setLimiter(UsageCapacity limiter) { 435 this.limiter = limiter; 436 } 437 438 /** 439 * @return the pollingTime 440 */ 441 public int getPollingTime() { 442 return this.pollingTime; 443 } 444 445 /** 446 * @param pollingTime 447 * the pollingTime to set 448 */ 449 public void setPollingTime(int pollingTime) { 450 this.pollingTime = pollingTime; 451 } 452 453 public void setName(String name) { 454 this.name = name; 455 } 456 457 public T getParent() { 458 return parent; 459 } 460 461 public void setParent(T parent) { 462 this.parent = parent; 463 } 464 465 public void setExecutor(ThreadPoolExecutor executor) { 466 this.executor = executor; 467 } 468 469 public ThreadPoolExecutor getExecutor() { 470 return executor; 471 } 472 473 public boolean isStarted() { 474 return started.get(); 475 } 476}