001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb.plist; 018 019 import java.io.DataInput; 020 import java.io.DataOutput; 021 import java.io.File; 022 import java.io.IOException; 023 import java.util.ArrayList; 024 import java.util.HashMap; 025 import java.util.Iterator; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Map.Entry; 029 import java.util.Set; 030 031 import org.apache.activemq.broker.BrokerService; 032 import org.apache.activemq.broker.BrokerServiceAware; 033 import org.apache.activemq.thread.Scheduler; 034 import org.apache.activemq.util.IOHelper; 035 import org.apache.activemq.util.ServiceStopper; 036 import org.apache.activemq.util.ServiceSupport; 037 import org.apache.kahadb.index.BTreeIndex; 038 import org.apache.kahadb.journal.Journal; 039 import org.apache.kahadb.journal.Location; 040 import org.apache.kahadb.page.Page; 041 import org.apache.kahadb.page.PageFile; 042 import org.apache.kahadb.page.Transaction; 043 import org.apache.kahadb.util.ByteSequence; 044 import org.apache.kahadb.util.LockFile; 045 import org.apache.kahadb.util.StringMarshaller; 046 import org.apache.kahadb.util.VariableMarshaller; 047 import org.slf4j.Logger; 048 import org.slf4j.LoggerFactory; 049 050 /** 051 * @org.apache.xbean.XBean 052 */ 053 public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable { 054 static final Logger LOG = LoggerFactory.getLogger(PListStore.class); 055 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 056 057 static final int CLOSED_STATE = 1; 058 static final int OPEN_STATE = 2; 059 060 private File directory; 061 PageFile pageFile; 062 private Journal journal; 063 private LockFile lockFile; 064 private boolean failIfDatabaseIsLocked; 065 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 066 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 067 private boolean enableIndexWriteAsync = false; 068 private boolean initialized = false; 069 private boolean lazyInit = true; 070 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 071 MetaData metaData = new MetaData(this); 072 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 073 Map<String, PList> persistentLists = new HashMap<String, PList>(); 074 final Object indexLock = new Object(); 075 private Scheduler scheduler; 076 private long cleanupInterval = 30000; 077 078 private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE; 079 private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE; 080 private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 081 private boolean indexEnablePageCaching = true; 082 083 public Object getIndexLock() { 084 return indexLock; 085 } 086 087 @Override 088 public void setBrokerService(BrokerService brokerService) { 089 this.scheduler = brokerService.getScheduler(); 090 } 091 092 public int getIndexPageSize() { 093 return indexPageSize; 094 } 095 096 public int getIndexCacheSize() { 097 return indexCacheSize; 098 } 099 100 public int getIndexWriteBatchSize() { 101 return indexWriteBatchSize; 102 } 103 104 public void setIndexPageSize(int indexPageSize) { 105 this.indexPageSize = indexPageSize; 106 } 107 108 public void setIndexCacheSize(int indexCacheSize) { 109 this.indexCacheSize = indexCacheSize; 110 } 111 112 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 113 this.indexWriteBatchSize = indexWriteBatchSize; 114 } 115 116 public boolean getIndexEnablePageCaching() { 117 return indexEnablePageCaching; 118 } 119 120 public void setIndexEnablePageCaching(boolean indexEnablePageCaching) { 121 this.indexEnablePageCaching = indexEnablePageCaching; 122 } 123 124 protected class MetaData { 125 protected MetaData(PListStore store) { 126 this.store = store; 127 } 128 129 private final PListStore store; 130 Page<MetaData> page; 131 BTreeIndex<String, PList> lists; 132 133 void createIndexes(Transaction tx) throws IOException { 134 this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId()); 135 } 136 137 void load(Transaction tx) throws IOException { 138 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 139 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 140 this.lists.load(tx); 141 } 142 143 void loadLists(Transaction tx, Map<String, PList> lists) throws IOException { 144 for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) { 145 Entry<String, PList> entry = i.next(); 146 entry.getValue().load(tx); 147 lists.put(entry.getKey(), entry.getValue()); 148 } 149 } 150 151 public void read(DataInput is) throws IOException { 152 this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong()); 153 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 154 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 155 } 156 157 public void write(DataOutput os) throws IOException { 158 os.writeLong(this.lists.getPageId()); 159 } 160 } 161 162 class MetaDataMarshaller extends VariableMarshaller<MetaData> { 163 private final PListStore store; 164 165 MetaDataMarshaller(PListStore store) { 166 this.store = store; 167 } 168 public MetaData readPayload(DataInput dataIn) throws IOException { 169 MetaData rc = new MetaData(this.store); 170 rc.read(dataIn); 171 return rc; 172 } 173 174 public void writePayload(MetaData object, DataOutput dataOut) throws IOException { 175 object.write(dataOut); 176 } 177 } 178 179 class PListMarshaller extends VariableMarshaller<PList> { 180 private final PListStore store; 181 PListMarshaller(PListStore store) { 182 this.store = store; 183 } 184 public PList readPayload(DataInput dataIn) throws IOException { 185 PList result = new PList(this.store); 186 result.read(dataIn); 187 return result; 188 } 189 190 public void writePayload(PList list, DataOutput dataOut) throws IOException { 191 list.write(dataOut); 192 } 193 } 194 195 public Journal getJournal() { 196 return this.journal; 197 } 198 199 public File getDirectory() { 200 return directory; 201 } 202 203 public void setDirectory(File directory) { 204 this.directory = directory; 205 } 206 207 public long size() { 208 synchronized (this) { 209 if (!initialized) { 210 return 0; 211 } 212 } 213 try { 214 return journal.getDiskSize() + pageFile.getDiskSize(); 215 } catch (IOException e) { 216 throw new RuntimeException(e); 217 } 218 } 219 220 public PList getPList(final String name) throws Exception { 221 if (!isStarted()) { 222 throw new IllegalStateException("Not started"); 223 } 224 intialize(); 225 synchronized (indexLock) { 226 synchronized (this) { 227 PList result = this.persistentLists.get(name); 228 if (result == null) { 229 final PList pl = new PList(this); 230 pl.setName(name); 231 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 232 public void execute(Transaction tx) throws IOException { 233 pl.setHeadPageId(tx.allocate().getPageId()); 234 pl.load(tx); 235 metaData.lists.put(tx, name, pl); 236 } 237 }); 238 result = pl; 239 this.persistentLists.put(name, pl); 240 } 241 final PList toLoad = result; 242 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 243 public void execute(Transaction tx) throws IOException { 244 toLoad.load(tx); 245 } 246 }); 247 248 return result; 249 } 250 } 251 } 252 253 public boolean removePList(final String name) throws Exception { 254 boolean result = false; 255 synchronized (indexLock) { 256 synchronized (this) { 257 final PList pl = this.persistentLists.remove(name); 258 result = pl != null; 259 if (result) { 260 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 261 public void execute(Transaction tx) throws IOException { 262 metaData.lists.remove(tx, name); 263 pl.destroy(); 264 } 265 }); 266 } 267 } 268 } 269 return result; 270 } 271 272 protected synchronized void intialize() throws Exception { 273 if (isStarted()) { 274 if (this.initialized == false) { 275 if (this.directory == null) { 276 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); 277 } 278 IOHelper.mkdirs(this.directory); 279 lock(); 280 this.journal = new Journal(); 281 this.journal.setDirectory(directory); 282 this.journal.setMaxFileLength(getJournalMaxFileLength()); 283 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); 284 this.journal.start(); 285 this.pageFile = new PageFile(directory, "tmpDB"); 286 this.pageFile.setEnablePageCaching(getIndexEnablePageCaching()); 287 this.pageFile.setPageSize(getIndexPageSize()); 288 this.pageFile.setWriteBatchSize(getIndexWriteBatchSize()); 289 this.pageFile.setPageCacheSize(getIndexCacheSize()); 290 this.pageFile.load(); 291 292 this.pageFile.tx().execute(new Transaction.Closure<IOException>() { 293 public void execute(Transaction tx) throws IOException { 294 if (pageFile.getPageCount() == 0) { 295 Page<MetaData> page = tx.allocate(); 296 assert page.getPageId() == 0; 297 page.set(metaData); 298 metaData.page = page; 299 metaData.createIndexes(tx); 300 tx.store(metaData.page, metaDataMarshaller, true); 301 302 } else { 303 Page<MetaData> page = tx.load(0, metaDataMarshaller); 304 metaData = page.get(); 305 metaData.page = page; 306 } 307 metaData.load(tx); 308 metaData.loadLists(tx, persistentLists); 309 } 310 }); 311 this.pageFile.flush(); 312 313 if (cleanupInterval > 0) { 314 if (scheduler == null) { 315 scheduler = new Scheduler(PListStore.class.getSimpleName()); 316 scheduler.start(); 317 } 318 scheduler.executePeriodically(this, cleanupInterval); 319 } 320 this.initialized = true; 321 LOG.info(this + " initialized"); 322 } 323 } 324 } 325 326 @Override 327 protected synchronized void doStart() throws Exception { 328 if (!lazyInit) { 329 intialize(); 330 } 331 LOG.info(this + " started"); 332 } 333 334 @Override 335 protected synchronized void doStop(ServiceStopper stopper) throws Exception { 336 if (scheduler != null) { 337 if (PListStore.class.getSimpleName().equals(scheduler.getName())) { 338 scheduler.stop(); 339 scheduler = null; 340 } 341 } 342 for (PList pl : this.persistentLists.values()) { 343 pl.unload(null); 344 } 345 if (this.pageFile != null) { 346 this.pageFile.unload(); 347 } 348 if (this.journal != null) { 349 journal.close(); 350 } 351 if (this.lockFile != null) { 352 this.lockFile.unlock(); 353 } 354 this.lockFile = null; 355 this.initialized = false; 356 LOG.info(this + " stopped"); 357 358 } 359 360 public void run() { 361 try { 362 if (isStopping()) { 363 return; 364 } 365 final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId(); 366 final Set<Integer> candidates = journal.getFileMap().keySet(); 367 LOG.trace("Full gc candidate set:" + candidates); 368 if (candidates.size() > 1) { 369 // prune current write 370 for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) { 371 if (iterator.next() >= lastJournalFileId) { 372 iterator.remove(); 373 } 374 } 375 List<PList> plists = null; 376 synchronized (indexLock) { 377 synchronized (this) { 378 plists = new ArrayList<PList>(persistentLists.values()); 379 } 380 } 381 for (PList list : plists) { 382 list.claimFileLocations(candidates); 383 if (isStopping()) { 384 return; 385 } 386 LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates); 387 } 388 LOG.trace("GC Candidate set:" + candidates); 389 this.journal.removeDataFiles(candidates); 390 } 391 } catch (IOException e) { 392 LOG.error("Exception on periodic cleanup: " + e, e); 393 } 394 } 395 396 ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 397 ByteSequence result = null; 398 result = this.journal.read(location); 399 return result; 400 } 401 402 Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { 403 return this.journal.write(payload, sync); 404 } 405 406 private void lock() throws IOException { 407 if (lockFile == null) { 408 File lockFileName = new File(directory, "lock"); 409 lockFile = new LockFile(lockFileName, true); 410 if (failIfDatabaseIsLocked) { 411 lockFile.lock(); 412 } else { 413 while (true) { 414 try { 415 lockFile.lock(); 416 break; 417 } catch (IOException e) { 418 LOG.info("Database " + lockFileName + " is locked... waiting " 419 + (DATABASE_LOCKED_WAIT_DELAY / 1000) 420 + " seconds for the database to be unlocked. Reason: " + e); 421 try { 422 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 423 } catch (InterruptedException e1) { 424 } 425 } 426 } 427 } 428 } 429 } 430 431 PageFile getPageFile() { 432 this.pageFile.isLoaded(); 433 return this.pageFile; 434 } 435 436 public boolean isFailIfDatabaseIsLocked() { 437 return failIfDatabaseIsLocked; 438 } 439 440 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 441 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 442 } 443 444 public int getJournalMaxFileLength() { 445 return journalMaxFileLength; 446 } 447 448 public void setJournalMaxFileLength(int journalMaxFileLength) { 449 this.journalMaxFileLength = journalMaxFileLength; 450 } 451 452 public int getJournalMaxWriteBatchSize() { 453 return journalMaxWriteBatchSize; 454 } 455 456 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 457 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 458 } 459 460 public boolean isEnableIndexWriteAsync() { 461 return enableIndexWriteAsync; 462 } 463 464 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 465 this.enableIndexWriteAsync = enableIndexWriteAsync; 466 } 467 468 public long getCleanupInterval() { 469 return cleanupInterval; 470 } 471 472 public void setCleanupInterval(long cleanupInterval) { 473 this.cleanupInterval = cleanupInterval; 474 } 475 476 public boolean isLazyInit() { 477 return lazyInit; 478 } 479 480 public void setLazyInit(boolean lazyInit) { 481 this.lazyInit = lazyInit; 482 } 483 484 @Override 485 public String toString() { 486 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 487 return "PListStore:[" + path + " ]"; 488 } 489 }