001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.kaha.impl; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.io.RandomAccessFile; 022 import java.nio.channels.FileLock; 023 import java.util.Date; 024 import java.util.HashSet; 025 import java.util.Iterator; 026 import java.util.Map; 027 import java.util.Set; 028 import java.util.concurrent.ConcurrentHashMap; 029 import java.util.concurrent.atomic.AtomicLong; 030 031 import org.apache.activemq.kaha.ContainerId; 032 import org.apache.activemq.kaha.ListContainer; 033 import org.apache.activemq.kaha.MapContainer; 034 import org.apache.activemq.kaha.Store; 035 import org.apache.activemq.kaha.StoreLocation; 036 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 037 import org.apache.activemq.kaha.impl.async.DataManagerFacade; 038 import org.apache.activemq.kaha.impl.container.ListContainerImpl; 039 import org.apache.activemq.kaha.impl.container.MapContainerImpl; 040 import org.apache.activemq.kaha.impl.data.DataManagerImpl; 041 import org.apache.activemq.kaha.impl.data.Item; 042 import org.apache.activemq.kaha.impl.data.RedoListener; 043 import org.apache.activemq.kaha.impl.index.IndexItem; 044 import org.apache.activemq.kaha.impl.index.IndexManager; 045 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem; 046 import org.apache.activemq.util.IOHelper; 047 import org.slf4j.Logger; 048 import org.slf4j.LoggerFactory; 049 050 /** 051 * Store Implementation 052 * 053 * 054 */ 055 public class KahaStore implements Store { 056 057 private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store"; 058 private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX 059 + ".FileLockBroken", 060 "false")); 061 private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX 062 + ".DisableLocking", 063 "false")); 064 //according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm 065 //and we can use it as a monitor for the lockset. 066 private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor"; 067 private static final Logger LOG = LoggerFactory.getLogger(KahaStore.class); 068 069 private final File directory; 070 private final String mode; 071 private IndexRootContainer mapsContainer; 072 private IndexRootContainer listsContainer; 073 private final Map<ContainerId, ListContainerImpl> lists = new ConcurrentHashMap<ContainerId, ListContainerImpl>(); 074 private final Map<ContainerId, MapContainerImpl> maps = new ConcurrentHashMap<ContainerId, MapContainerImpl>(); 075 private final Map<String, DataManager> dataManagers = new ConcurrentHashMap<String, DataManager>(); 076 private final Map<String, IndexManager> indexManagers = new ConcurrentHashMap<String, IndexManager>(); 077 private boolean closed; 078 private boolean initialized; 079 private boolean logIndexChanges; 080 private boolean useAsyncDataManager; 081 private long maxDataFileLength = 1024 * 1024 * 32; 082 private FileLock lock; 083 private boolean persistentIndex = true; 084 private RandomAccessFile lockFile; 085 private final AtomicLong storeSize; 086 private String defaultContainerName = DEFAULT_CONTAINER_NAME; 087 088 089 public KahaStore(String name, String mode) throws IOException { 090 this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong()); 091 } 092 093 public KahaStore(File directory, String mode) throws IOException { 094 this(directory, mode, new AtomicLong()); 095 } 096 097 public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException { 098 this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize); 099 } 100 101 public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException { 102 this.mode = mode; 103 this.storeSize = storeSize; 104 this.directory = directory; 105 IOHelper.mkdirs(this.directory); 106 } 107 108 public synchronized void close() throws IOException { 109 if (!closed) { 110 closed = true; 111 if (initialized) { 112 unlock(); 113 for (ListContainerImpl container : lists.values()) { 114 container.close(); 115 } 116 lists.clear(); 117 for (MapContainerImpl container : maps.values()) { 118 container.close(); 119 } 120 maps.clear(); 121 for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) { 122 IndexManager im = iter.next(); 123 im.close(); 124 iter.remove(); 125 } 126 for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) { 127 DataManager dm = iter.next(); 128 dm.close(); 129 iter.remove(); 130 } 131 } 132 if (lockFile!=null) { 133 lockFile.close(); 134 lockFile=null; 135 } 136 } 137 } 138 139 public synchronized void force() throws IOException { 140 if (initialized) { 141 for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) { 142 IndexManager im = iter.next(); 143 im.force(); 144 } 145 for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) { 146 DataManager dm = iter.next(); 147 dm.force(); 148 } 149 } 150 } 151 152 public synchronized void clear() throws IOException { 153 initialize(); 154 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { 155 ContainerId id = (ContainerId)i.next(); 156 MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName()); 157 container.clear(); 158 } 159 for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { 160 ContainerId id = (ContainerId)i.next(); 161 ListContainer container = getListContainer(id.getKey(), id.getDataContainerName()); 162 container.clear(); 163 } 164 165 } 166 167 public synchronized boolean delete() throws IOException { 168 boolean result = true; 169 if (initialized) { 170 clear(); 171 for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) { 172 IndexManager im = iter.next(); 173 result &= im.delete(); 174 iter.remove(); 175 } 176 for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) { 177 DataManager dm = iter.next(); 178 result &= dm.delete(); 179 iter.remove(); 180 } 181 } 182 if (directory != null && directory.isDirectory()) { 183 result =IOHelper.deleteChildren(directory); 184 String str = result ? "successfully deleted" : "failed to delete"; 185 LOG.info("Kaha Store " + str + " data directory " + directory); 186 } 187 return result; 188 } 189 190 public synchronized boolean isInitialized() { 191 return initialized; 192 } 193 194 public boolean doesMapContainerExist(Object id) throws IOException { 195 return doesMapContainerExist(id, defaultContainerName); 196 } 197 198 public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException { 199 initialize(); 200 ContainerId containerId = new ContainerId(id, containerName); 201 return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId); 202 } 203 204 public MapContainer getMapContainer(Object id) throws IOException { 205 return getMapContainer(id, defaultContainerName); 206 } 207 208 public MapContainer getMapContainer(Object id, String containerName) throws IOException { 209 return getMapContainer(id, containerName, persistentIndex); 210 } 211 212 public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex) 213 throws IOException { 214 initialize(); 215 ContainerId containerId = new ContainerId(id, containerName); 216 MapContainerImpl result = maps.get(containerId); 217 if (result == null) { 218 DataManager dm = getDataManager(containerName); 219 IndexManager im = getIndexManager(dm, containerName); 220 221 IndexItem root = mapsContainer.getRoot(im, containerId); 222 if (root == null) { 223 root = mapsContainer.addRoot(im, containerId); 224 } 225 result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex); 226 maps.put(containerId, result); 227 } 228 return result; 229 } 230 231 public void deleteMapContainer(Object id) throws IOException { 232 deleteMapContainer(id, defaultContainerName); 233 } 234 235 public void deleteMapContainer(Object id, String containerName) throws IOException { 236 ContainerId containerId = new ContainerId(id, containerName); 237 deleteMapContainer(containerId); 238 } 239 240 public synchronized void deleteMapContainer(ContainerId containerId) throws IOException { 241 initialize(); 242 MapContainerImpl container = maps.remove(containerId); 243 if (container != null) { 244 container.clear(); 245 mapsContainer.removeRoot(container.getIndexManager(), containerId); 246 container.close(); 247 } 248 } 249 250 public synchronized Set<ContainerId> getMapContainerIds() throws IOException { 251 initialize(); 252 Set<ContainerId> set = new HashSet<ContainerId>(); 253 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { 254 ContainerId id = (ContainerId)i.next(); 255 set.add(id); 256 } 257 return set; 258 } 259 260 public boolean doesListContainerExist(Object id) throws IOException { 261 return doesListContainerExist(id, defaultContainerName); 262 } 263 264 public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException { 265 initialize(); 266 ContainerId containerId = new ContainerId(id, containerName); 267 return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId); 268 } 269 270 public ListContainer getListContainer(Object id) throws IOException { 271 return getListContainer(id, defaultContainerName); 272 } 273 274 public ListContainer getListContainer(Object id, String containerName) throws IOException { 275 return getListContainer(id, containerName, persistentIndex); 276 } 277 278 public synchronized ListContainer getListContainer(Object id, String containerName, 279 boolean persistentIndex) throws IOException { 280 initialize(); 281 ContainerId containerId = new ContainerId(id, containerName); 282 ListContainerImpl result = lists.get(containerId); 283 if (result == null) { 284 DataManager dm = getDataManager(containerName); 285 IndexManager im = getIndexManager(dm, containerName); 286 287 IndexItem root = listsContainer.getRoot(im, containerId); 288 if (root == null) { 289 root = listsContainer.addRoot(im, containerId); 290 } 291 result = new ListContainerImpl(containerId, root, im, dm, persistentIndex); 292 lists.put(containerId, result); 293 } 294 return result; 295 } 296 297 public void deleteListContainer(Object id) throws IOException { 298 deleteListContainer(id, defaultContainerName); 299 } 300 301 public synchronized void deleteListContainer(Object id, String containerName) throws IOException { 302 ContainerId containerId = new ContainerId(id, containerName); 303 deleteListContainer(containerId); 304 } 305 306 public synchronized void deleteListContainer(ContainerId containerId) throws IOException { 307 initialize(); 308 ListContainerImpl container = lists.remove(containerId); 309 if (container != null) { 310 listsContainer.removeRoot(container.getIndexManager(), containerId); 311 container.clear(); 312 container.close(); 313 } 314 } 315 316 public synchronized Set<ContainerId> getListContainerIds() throws IOException { 317 initialize(); 318 Set<ContainerId> set = new HashSet<ContainerId>(); 319 for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { 320 ContainerId id = (ContainerId)i.next(); 321 set.add(id); 322 } 323 return set; 324 } 325 326 /** 327 * @return the listsContainer 328 */ 329 public IndexRootContainer getListsContainer() { 330 return this.listsContainer; 331 } 332 333 /** 334 * @return the mapsContainer 335 */ 336 public IndexRootContainer getMapsContainer() { 337 return this.mapsContainer; 338 } 339 340 public synchronized DataManager getDataManager(String name) throws IOException { 341 DataManager dm = dataManagers.get(name); 342 if (dm == null) { 343 if (isUseAsyncDataManager()) { 344 AsyncDataManager t = new AsyncDataManager(storeSize); 345 t.setDirectory(directory); 346 t.setFilePrefix("async-data-" + name + "-"); 347 t.setMaxFileLength((int)maxDataFileLength); 348 t.start(); 349 dm = new DataManagerFacade(t, name); 350 } else { 351 DataManagerImpl t = new DataManagerImpl(directory, name,storeSize); 352 t.setMaxFileLength(maxDataFileLength); 353 dm = t; 354 } 355 if (logIndexChanges) { 356 recover(dm); 357 } 358 dataManagers.put(name, dm); 359 } 360 return dm; 361 } 362 363 public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException { 364 IndexManager im = indexManagers.get(name); 365 if (im == null) { 366 im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize); 367 indexManagers.put(name, im); 368 } 369 return im; 370 } 371 372 private void recover(final DataManager dm) throws IOException { 373 dm.recoverRedoItems(new RedoListener() { 374 public void onRedoItem(StoreLocation item, Object o) throws Exception { 375 RedoStoreIndexItem redo = (RedoStoreIndexItem)o; 376 // IndexManager im = getIndexManager(dm, redo.getIndexName()); 377 IndexManager im = getIndexManager(dm, dm.getName()); 378 im.redo(redo); 379 } 380 }); 381 } 382 383 public synchronized boolean isLogIndexChanges() { 384 return logIndexChanges; 385 } 386 387 public synchronized void setLogIndexChanges(boolean logIndexChanges) { 388 this.logIndexChanges = logIndexChanges; 389 } 390 391 /** 392 * @return the maxDataFileLength 393 */ 394 public synchronized long getMaxDataFileLength() { 395 return maxDataFileLength; 396 } 397 398 /** 399 * @param maxDataFileLength the maxDataFileLength to set 400 */ 401 public synchronized void setMaxDataFileLength(long maxDataFileLength) { 402 this.maxDataFileLength = maxDataFileLength; 403 } 404 405 /** 406 * @return the default index type 407 */ 408 public synchronized String getIndexTypeAsString() { 409 return persistentIndex ? "PERSISTENT" : "VM"; 410 } 411 412 /** 413 * Set the default index type 414 * 415 * @param type "PERSISTENT" or "VM" 416 */ 417 public synchronized void setIndexTypeAsString(String type) { 418 if (type.equalsIgnoreCase("VM")) { 419 persistentIndex = false; 420 } else { 421 persistentIndex = true; 422 } 423 } 424 425 public boolean isPersistentIndex() { 426 return persistentIndex; 427 } 428 429 public void setPersistentIndex(boolean persistentIndex) { 430 this.persistentIndex = persistentIndex; 431 } 432 433 434 public synchronized boolean isUseAsyncDataManager() { 435 return useAsyncDataManager; 436 } 437 438 public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) { 439 this.useAsyncDataManager = useAsyncWriter; 440 } 441 442 /** 443 * @return size of store 444 * @see org.apache.activemq.kaha.Store#size() 445 */ 446 public long size(){ 447 return storeSize.get(); 448 } 449 450 public String getDefaultContainerName() { 451 return defaultContainerName; 452 } 453 454 public void setDefaultContainerName(String defaultContainerName) { 455 this.defaultContainerName = defaultContainerName; 456 } 457 458 public synchronized void initialize() throws IOException { 459 if (closed) { 460 throw new IOException("Store has been closed."); 461 } 462 if (!initialized) { 463 LOG.info("Kaha Store using data directory " + directory); 464 lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); 465 lock(); 466 DataManager defaultDM = getDataManager(defaultContainerName); 467 IndexManager rootIndexManager = getIndexManager(defaultDM, defaultContainerName); 468 IndexItem mapRoot = new IndexItem(); 469 IndexItem listRoot = new IndexItem(); 470 if (rootIndexManager.isEmpty()) { 471 mapRoot.setOffset(0); 472 rootIndexManager.storeIndex(mapRoot); 473 listRoot.setOffset(IndexItem.INDEX_SIZE); 474 rootIndexManager.storeIndex(listRoot); 475 rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2); 476 } else { 477 mapRoot = rootIndexManager.getIndex(0); 478 listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE); 479 } 480 initialized = true; 481 mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM); 482 listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM); 483 /** 484 * Add interest in data files - then consolidate them 485 */ 486 generateInterestInMapDataFiles(); 487 generateInterestInListDataFiles(); 488 for (Iterator<DataManager> i = dataManagers.values().iterator(); i.hasNext();) { 489 DataManager dm = i.next(); 490 dm.consolidateDataFiles(); 491 } 492 } 493 } 494 495 private void lock() throws IOException { 496 synchronized (LOCKSET_MONITOR) { 497 if (!DISABLE_LOCKING && directory != null && lock == null) { 498 String key = getPropertyKey(); 499 String property = System.getProperty(key); 500 if (null == property) { 501 if (!BROKEN_FILE_LOCK) { 502 lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false); 503 if (lock == null) { 504 throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by another application"); 505 } else 506 System.setProperty(key, new Date().toString()); 507 } 508 } else { //already locked 509 throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application."); 510 } 511 } 512 } 513 } 514 515 private void unlock() throws IOException { 516 synchronized (LOCKSET_MONITOR) { 517 if (!DISABLE_LOCKING && (null != directory) && (null != lock)) { 518 System.getProperties().remove(getPropertyKey()); 519 if (lock.isValid()) { 520 lock.release(); 521 } 522 lock = null; 523 } 524 } 525 } 526 527 528 private String getPropertyKey() throws IOException { 529 return getClass().getName() + ".lock." + directory.getCanonicalPath(); 530 } 531 532 /** 533 * scans the directory and builds up the IndexManager and DataManager 534 * 535 * @throws IOException if there is a problem accessing an index or data file 536 */ 537 private void generateInterestInListDataFiles() throws IOException { 538 for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { 539 ContainerId id = (ContainerId)i.next(); 540 DataManager dm = getDataManager(id.getDataContainerName()); 541 IndexManager im = getIndexManager(dm, id.getDataContainerName()); 542 IndexItem theRoot = listsContainer.getRoot(im, id); 543 long nextItem = theRoot.getNextItem(); 544 while (nextItem != Item.POSITION_NOT_SET) { 545 IndexItem item = im.getIndex(nextItem); 546 item.setOffset(nextItem); 547 dm.addInterestInFile(item.getKeyFile()); 548 dm.addInterestInFile(item.getValueFile()); 549 nextItem = item.getNextItem(); 550 } 551 } 552 } 553 554 /** 555 * scans the directory and builds up the IndexManager and DataManager 556 * 557 * @throws IOException if there is a problem accessing an index or data file 558 */ 559 private void generateInterestInMapDataFiles() throws IOException { 560 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { 561 ContainerId id = (ContainerId)i.next(); 562 DataManager dm = getDataManager(id.getDataContainerName()); 563 IndexManager im = getIndexManager(dm, id.getDataContainerName()); 564 IndexItem theRoot = mapsContainer.getRoot(im, id); 565 long nextItem = theRoot.getNextItem(); 566 while (nextItem != Item.POSITION_NOT_SET) { 567 IndexItem item = im.getIndex(nextItem); 568 item.setOffset(nextItem); 569 dm.addInterestInFile(item.getKeyFile()); 570 dm.addInterestInFile(item.getValueFile()); 571 nextItem = item.getNextItem(); 572 } 573 574 } 575 } 576 }