001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025 026/** 027 * Used to pool DataFileAccessors. 028 * 029 * @author chirino 030 */ 031public class DataFileAccessorPool { 032 033 private final Journal journal; 034 private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>(); 035 private boolean closed; 036 private int maxOpenReadersPerFile = 5; 037 038 class Pool { 039 040 private final DataFile file; 041 private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>(); 042 private int openCounter; 043 private boolean disposed; 044 045 public Pool(DataFile file) { 046 this.file = file; 047 } 048 049 public DataFileAccessor openDataFileReader() throws IOException { 050 DataFileAccessor rc = null; 051 if (pool.isEmpty()) { 052 rc = new DataFileAccessor(journal, file); 053 } else { 054 rc = pool.remove(pool.size() - 1); 055 } 056 openCounter++; 057 return rc; 058 } 059 060 public synchronized void closeDataFileReader(DataFileAccessor reader) { 061 openCounter--; 062 if (pool.size() >= maxOpenReadersPerFile || disposed) { 063 reader.dispose(); 064 } else { 065 pool.add(reader); 066 } 067 } 068 069 public synchronized boolean isUsed() { 070 return openCounter > 0; 071 } 072 073 public synchronized void dispose() { 074 for (DataFileAccessor reader : pool) { 075 reader.dispose(); 076 } 077 pool.clear(); 078 disposed = true; 079 } 080 081 public synchronized int getOpenCounter() { 082 return openCounter; 083 } 084 085 } 086 087 public DataFileAccessorPool(Journal dataManager) { 088 this.journal = dataManager; 089 } 090 091 public synchronized int size() { 092 return pools.size(); 093 } 094 095 public synchronized void disposeUnused() { 096 for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) { 097 Pool pool = iter.next(); 098 if (!pool.isUsed()) { 099 pool.dispose(); 100 iter.remove(); 101 } 102 } 103 } 104 105 synchronized void disposeDataFileAccessors(DataFile dataFile) { 106 if (closed) { 107 throw new IllegalStateException("Closed."); 108 } 109 Pool pool = pools.get(dataFile.getDataFileId()); 110 if (pool != null) { 111 if (pool.getOpenCounter() == 0) { 112 pool.dispose(); 113 pools.remove(dataFile.getDataFileId()); 114 } else { 115 throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter()); 116 } 117 } 118 } 119 120 synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException { 121 if (closed) { 122 throw new IOException("Closed."); 123 } 124 125 Pool pool = pools.get(dataFile.getDataFileId()); 126 if (pool == null) { 127 pool = new Pool(dataFile); 128 pools.put(dataFile.getDataFileId(), pool); 129 } 130 return pool.openDataFileReader(); 131 } 132 133 synchronized void closeDataFileAccessor(DataFileAccessor reader) { 134 Pool pool = pools.get(reader.getDataFile().getDataFileId()); 135 if (pool == null || closed) { 136 reader.dispose(); 137 } else { 138 pool.closeDataFileReader(reader); 139 } 140 } 141 142 public synchronized void close() { 143 if (closed) { 144 return; 145 } 146 closed = true; 147 for (Pool pool : pools.values()) { 148 pool.dispose(); 149 } 150 pools.clear(); 151 } 152 153}