001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.scheduler; 018 019 import org.apache.activemq.util.IOHelper; 020 import org.apache.activemq.util.ServiceStopper; 021 import org.apache.activemq.util.ServiceSupport; 022 import org.apache.kahadb.index.BTreeIndex; 023 import org.apache.kahadb.journal.Journal; 024 import org.apache.kahadb.journal.Location; 025 import org.apache.kahadb.page.Page; 026 import org.apache.kahadb.page.PageFile; 027 import org.apache.kahadb.page.Transaction; 028 import org.apache.kahadb.util.ByteSequence; 029 import org.apache.kahadb.util.IntegerMarshaller; 030 import org.apache.kahadb.util.LockFile; 031 import org.apache.kahadb.util.StringMarshaller; 032 import org.apache.kahadb.util.VariableMarshaller; 033 import org.slf4j.Logger; 034 import org.slf4j.LoggerFactory; 035 036 import java.io.DataInput; 037 import java.io.DataOutput; 038 import java.io.File; 039 import java.io.IOException; 040 import java.util.ArrayList; 041 import java.util.HashMap; 042 import java.util.HashSet; 043 import java.util.Iterator; 044 import java.util.List; 045 import java.util.Map; 046 import java.util.Map.Entry; 047 import java.util.Set; 048 049 public class JobSchedulerStore extends ServiceSupport { 050 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class); 051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 052 053 public static final int CLOSED_STATE = 1; 054 public static final int OPEN_STATE = 2; 055 056 private File directory; 057 PageFile pageFile; 058 private Journal journal; 059 private LockFile lockFile; 060 private boolean failIfDatabaseIsLocked; 061 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 062 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 063 private boolean enableIndexWriteAsync = false; 064 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 065 MetaData metaData = new MetaData(this); 066 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 067 Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>(); 068 069 protected class MetaData { 070 protected MetaData(JobSchedulerStore store) { 071 this.store = store; 072 } 073 private final JobSchedulerStore store; 074 Page<MetaData> page; 075 BTreeIndex<Integer, Integer> journalRC; 076 BTreeIndex<String, JobSchedulerImpl> storedSchedulers; 077 078 void createIndexes(Transaction tx) throws IOException { 079 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId()); 080 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId()); 081 } 082 083 void load(Transaction tx) throws IOException { 084 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); 085 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); 086 this.storedSchedulers.load(tx); 087 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); 088 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); 089 this.journalRC.load(tx); 090 } 091 092 void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException { 093 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { 094 Entry<String, JobSchedulerImpl> entry = i.next(); 095 entry.getValue().load(tx); 096 schedulers.put(entry.getKey(), entry.getValue()); 097 } 098 } 099 100 public void read(DataInput is) throws IOException { 101 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong()); 102 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); 103 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); 104 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong()); 105 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); 106 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); 107 } 108 109 public void write(DataOutput os) throws IOException { 110 os.writeLong(this.storedSchedulers.getPageId()); 111 os.writeLong(this.journalRC.getPageId()); 112 113 } 114 } 115 116 class MetaDataMarshaller extends VariableMarshaller<MetaData> { 117 private final JobSchedulerStore store; 118 119 MetaDataMarshaller(JobSchedulerStore store) { 120 this.store = store; 121 } 122 public MetaData readPayload(DataInput dataIn) throws IOException { 123 MetaData rc = new MetaData(this.store); 124 rc.read(dataIn); 125 return rc; 126 } 127 128 public void writePayload(MetaData object, DataOutput dataOut) throws IOException { 129 object.write(dataOut); 130 } 131 } 132 133 class ValueMarshaller extends VariableMarshaller<List<JobLocation>> { 134 public List<JobLocation> readPayload(DataInput dataIn) throws IOException { 135 List<JobLocation> result = new ArrayList<JobLocation>(); 136 int size = dataIn.readInt(); 137 for (int i = 0; i < size; i++) { 138 JobLocation jobLocation = new JobLocation(); 139 jobLocation.readExternal(dataIn); 140 result.add(jobLocation); 141 } 142 return result; 143 } 144 145 public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { 146 dataOut.writeInt(value.size()); 147 for (JobLocation jobLocation : value) { 148 jobLocation.writeExternal(dataOut); 149 } 150 } 151 } 152 153 class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> { 154 private final JobSchedulerStore store; 155 JobSchedulerMarshaller(JobSchedulerStore store) { 156 this.store = store; 157 } 158 public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { 159 JobSchedulerImpl result = new JobSchedulerImpl(this.store); 160 result.read(dataIn); 161 return result; 162 } 163 164 public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { 165 js.write(dataOut); 166 } 167 } 168 169 public File getDirectory() { 170 return directory; 171 } 172 173 public void setDirectory(File directory) { 174 this.directory = directory; 175 } 176 177 public long size() { 178 if ( !isStarted() ) { 179 return 0; 180 } 181 try { 182 return journal.getDiskSize() + pageFile.getDiskSize(); 183 } catch (IOException e) { 184 throw new RuntimeException(e); 185 } 186 } 187 188 public JobScheduler getJobScheduler(final String name) throws Exception { 189 JobSchedulerImpl result = this.schedulers.get(name); 190 if (result == null) { 191 final JobSchedulerImpl js = new JobSchedulerImpl(this); 192 js.setName(name); 193 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 194 public void execute(Transaction tx) throws IOException { 195 js.createIndexes(tx); 196 js.load(tx); 197 metaData.storedSchedulers.put(tx, name, js); 198 } 199 }); 200 result = js; 201 this.schedulers.put(name, js); 202 if (isStarted()) { 203 result.start(); 204 } 205 this.pageFile.flush(); 206 } 207 return result; 208 } 209 210 synchronized public boolean removeJobScheduler(final String name) throws Exception { 211 boolean result = false; 212 final JobSchedulerImpl js = this.schedulers.remove(name); 213 result = js != null; 214 if (result) { 215 js.stop(); 216 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 217 public void execute(Transaction tx) throws IOException { 218 metaData.storedSchedulers.remove(tx, name); 219 js.destroy(tx); 220 } 221 }); 222 } 223 return result; 224 } 225 226 @Override 227 protected synchronized void doStart() throws Exception { 228 if (this.directory == null) { 229 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); 230 } 231 IOHelper.mkdirs(this.directory); 232 lock(); 233 this.journal = new Journal(); 234 this.journal.setDirectory(directory); 235 this.journal.setMaxFileLength(getJournalMaxFileLength()); 236 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); 237 this.journal.start(); 238 this.pageFile = new PageFile(directory, "scheduleDB"); 239 this.pageFile.setWriteBatchSize(1); 240 this.pageFile.load(); 241 242 this.pageFile.tx().execute(new Transaction.Closure<IOException>() { 243 public void execute(Transaction tx) throws IOException { 244 if (pageFile.getPageCount() == 0) { 245 Page<MetaData> page = tx.allocate(); 246 assert page.getPageId() == 0; 247 page.set(metaData); 248 metaData.page = page; 249 metaData.createIndexes(tx); 250 tx.store(metaData.page, metaDataMarshaller, true); 251 252 } else { 253 Page<MetaData> page = tx.load(0, metaDataMarshaller); 254 metaData = page.get(); 255 metaData.page = page; 256 } 257 metaData.load(tx); 258 metaData.loadScheduler(tx, schedulers); 259 for (JobSchedulerImpl js :schedulers.values()) { 260 try { 261 js.start(); 262 } catch (Exception e) { 263 JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e); 264 } 265 } 266 } 267 }); 268 269 this.pageFile.flush(); 270 LOG.info(this + " started"); 271 } 272 273 @Override 274 protected synchronized void doStop(ServiceStopper stopper) throws Exception { 275 for (JobSchedulerImpl js : this.schedulers.values()) { 276 js.stop(); 277 } 278 if (this.pageFile != null) { 279 this.pageFile.unload(); 280 } 281 if (this.journal != null) { 282 journal.close(); 283 } 284 if (this.lockFile != null) { 285 this.lockFile.unlock(); 286 } 287 this.lockFile = null; 288 LOG.info(this + " stopped"); 289 290 } 291 292 synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException { 293 int logId = location.getDataFileId(); 294 Integer val = this.metaData.journalRC.get(tx, logId); 295 int refCount = val != null ? val.intValue() + 1 : 1; 296 this.metaData.journalRC.put(tx, logId, refCount); 297 298 } 299 300 synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException { 301 int logId = location.getDataFileId(); 302 int refCount = this.metaData.journalRC.get(tx, logId); 303 refCount--; 304 if (refCount <= 0) { 305 this.metaData.journalRC.remove(tx, logId); 306 Set<Integer> set = new HashSet<Integer>(); 307 set.add(logId); 308 this.journal.removeDataFiles(set); 309 } else { 310 this.metaData.journalRC.put(tx, logId, refCount); 311 } 312 313 } 314 315 synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 316 ByteSequence result = null; 317 result = this.journal.read(location); 318 return result; 319 } 320 321 synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { 322 return this.journal.write(payload, sync); 323 } 324 325 private void lock() throws IOException { 326 if (lockFile == null) { 327 File lockFileName = new File(directory, "lock"); 328 lockFile = new LockFile(lockFileName, true); 329 if (failIfDatabaseIsLocked) { 330 lockFile.lock(); 331 } else { 332 while (true) { 333 try { 334 lockFile.lock(); 335 break; 336 } catch (IOException e) { 337 LOG.info("Database " + lockFileName + " is locked... waiting " 338 + (DATABASE_LOCKED_WAIT_DELAY / 1000) 339 + " seconds for the database to be unlocked. Reason: " + e); 340 try { 341 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 342 } catch (InterruptedException e1) { 343 } 344 } 345 } 346 } 347 } 348 } 349 350 PageFile getPageFile() { 351 this.pageFile.isLoaded(); 352 return this.pageFile; 353 } 354 355 public boolean isFailIfDatabaseIsLocked() { 356 return failIfDatabaseIsLocked; 357 } 358 359 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 360 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 361 } 362 363 public int getJournalMaxFileLength() { 364 return journalMaxFileLength; 365 } 366 367 public void setJournalMaxFileLength(int journalMaxFileLength) { 368 this.journalMaxFileLength = journalMaxFileLength; 369 } 370 371 public int getJournalMaxWriteBatchSize() { 372 return journalMaxWriteBatchSize; 373 } 374 375 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 376 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 377 } 378 379 public boolean isEnableIndexWriteAsync() { 380 return enableIndexWriteAsync; 381 } 382 383 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 384 this.enableIndexWriteAsync = enableIndexWriteAsync; 385 } 386 387 @Override 388 public String toString() { 389 return "JobSchedulerStore:" + this.directory; 390 } 391 392 }