001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.store.kahadb.scheduler; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.UUID; 029 030import org.apache.activemq.store.kahadb.AbstractKahaDBMetaData; 031import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 032import org.apache.activemq.store.kahadb.disk.page.Transaction; 033import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; 034import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 035import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 036import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * The KahaDB MetaData used to house the Index data for the KahaDB implementation 042 * of a JobSchedulerStore. 043 */ 044public class JobSchedulerKahaDBMetaData extends AbstractKahaDBMetaData<JobSchedulerKahaDBMetaData> { 045 046 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerKahaDBMetaData.class); 047 048 private final JobSchedulerStoreImpl store; 049 050 private UUID token = JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN; 051 private int version = JobSchedulerStoreImpl.CURRENT_VERSION; 052 053 private BTreeIndex<Integer, List<Integer>> removeLocationTracker; 054 private BTreeIndex<Integer, Integer> journalRC; 055 private BTreeIndex<String, JobSchedulerImpl> storedSchedulers; 056 057 /** 058 * Creates a new instance of this meta data object with the assigned 059 * parent JobSchedulerStore instance. 060 * 061 * @param store 062 * the store instance that owns this meta data. 063 */ 064 public JobSchedulerKahaDBMetaData(JobSchedulerStoreImpl store) { 065 this.store = store; 066 } 067 068 /** 069 * @return the current value of the Scheduler store identification token. 070 */ 071 public UUID getToken() { 072 return this.token; 073 } 074 075 /** 076 * @return the current value of the version tag for this meta data instance. 077 */ 078 public int getVersion() { 079 return this.version; 080 } 081 082 /** 083 * Gets the index that contains the location tracking information for Jobs 084 * that have been removed from the index but whose add operation has yet 085 * to be removed from the Journal. 086 * 087 * The Journal log file where a remove command is written cannot be released 088 * until the log file with the original add command has also been released, 089 * otherwise on a log replay the scheduled job could reappear in the scheduler 090 * since its corresponding remove might no longer be present. 091 * 092 * @return the remove command location tracker index. 093 */ 094 public BTreeIndex<Integer, List<Integer>> getRemoveLocationTracker() { 095 return this.removeLocationTracker; 096 } 097 098 /** 099 * Gets the index used to track the number of reference to a Journal log file. 100 * 101 * A log file in the Journal can only be considered for removal after all the 102 * references to it have been released. 103 * 104 * @return the journal log file reference counter index. 105 */ 106 public BTreeIndex<Integer, Integer> getJournalRC() { 107 return this.journalRC; 108 } 109 110 /** 111 * Gets the index of JobScheduler instances that have been created and stored 112 * in the JobSchedulerStore instance. 113 * 114 * @return the index of stored JobScheduler instances. 115 */ 116 public BTreeIndex<String, JobSchedulerImpl> getJobSchedulers() { 117 return this.storedSchedulers; 118 } 119 120 @Override 121 public void initialize(Transaction tx) throws IOException { 122 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), tx.allocate().getPageId()); 123 this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), tx.allocate().getPageId()); 124 this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), tx.allocate().getPageId()); 125 } 126 127 @Override 128 public void load(Transaction tx) throws IOException { 129 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); 130 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); 131 this.storedSchedulers.load(tx); 132 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); 133 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); 134 this.journalRC.load(tx); 135 this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE); 136 this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller()); 137 this.removeLocationTracker.load(tx); 138 } 139 140 /** 141 * Loads all the stored JobScheduler instances into the provided map. 142 * 143 * @param tx 144 * the Transaction under which the load operation should be executed. 145 * @param schedulers 146 * a Map<String, JobSchedulerImpl> into which the loaded schedulers are stored. 147 * 148 * @throws IOException if an error occurs while performing the load operation. 149 */ 150 public void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException { 151 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { 152 Entry<String, JobSchedulerImpl> entry = i.next(); 153 entry.getValue().load(tx); 154 schedulers.put(entry.getKey(), entry.getValue()); 155 } 156 } 157 158 @Override 159 public void read(DataInput in) throws IOException { 160 try { 161 long msb = in.readLong(); 162 long lsb = in.readLong(); 163 this.token = new UUID(msb, lsb); 164 } catch (Exception e) { 165 throw new UnknownStoreVersionException(e); 166 } 167 168 if (!token.equals(JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN)) { 169 throw new UnknownStoreVersionException(token.toString()); 170 } 171 this.version = in.readInt(); 172 if (in.readBoolean()) { 173 setLastUpdateLocation(LocationMarshaller.INSTANCE.readPayload(in)); 174 } else { 175 setLastUpdateLocation(null); 176 } 177 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), in.readLong()); 178 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); 179 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); 180 this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), in.readLong()); 181 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); 182 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); 183 this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), in.readLong()); 184 this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE); 185 this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller()); 186 187 LOG.info("Scheduler Store version {} loaded", this.version); 188 } 189 190 @Override 191 public void write(DataOutput out) throws IOException { 192 out.writeLong(this.token.getMostSignificantBits()); 193 out.writeLong(this.token.getLeastSignificantBits()); 194 out.writeInt(this.version); 195 if (getLastUpdateLocation() != null) { 196 out.writeBoolean(true); 197 LocationMarshaller.INSTANCE.writePayload(getLastUpdateLocation(), out); 198 } else { 199 out.writeBoolean(false); 200 } 201 out.writeLong(this.storedSchedulers.getPageId()); 202 out.writeLong(this.journalRC.getPageId()); 203 out.writeLong(this.removeLocationTracker.getPageId()); 204 } 205 206 private class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> { 207 private final JobSchedulerStoreImpl store; 208 209 JobSchedulerMarshaller(JobSchedulerStoreImpl store) { 210 this.store = store; 211 } 212 213 @Override 214 public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { 215 JobSchedulerImpl result = new JobSchedulerImpl(this.store); 216 result.read(dataIn); 217 return result; 218 } 219 220 @Override 221 public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { 222 js.write(dataOut); 223 } 224 } 225 226 private class IntegerListMarshaller extends VariableMarshaller<List<Integer>> { 227 228 @Override 229 public List<Integer> readPayload(DataInput dataIn) throws IOException { 230 List<Integer> result = new ArrayList<Integer>(); 231 int size = dataIn.readInt(); 232 for (int i = 0; i < size; i++) { 233 result.add(IntegerMarshaller.INSTANCE.readPayload(dataIn)); 234 } 235 return result; 236 } 237 238 @Override 239 public void writePayload(List<Integer> value, DataOutput dataOut) throws IOException { 240 dataOut.writeInt(value.size()); 241 for (Integer integer : value) { 242 IntegerMarshaller.INSTANCE.writePayload(integer, dataOut); 243 } 244 } 245 } 246}