001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.plist;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.NoSuchElementException;
025    import java.util.Set;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicReference;
028    import org.apache.kahadb.index.ListIndex;
029    import org.apache.kahadb.journal.Location;
030    import org.apache.kahadb.page.Transaction;
031    import org.apache.kahadb.util.ByteSequence;
032    import org.apache.kahadb.util.LocationMarshaller;
033    import org.apache.kahadb.util.StringMarshaller;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    public class PList extends ListIndex<String, Location> {
038        static final Logger LOG = LoggerFactory.getLogger(PList.class);
039        final PListStore store;
040        private String name;
041        Object indexLock;
042    
043        PList(PListStore store) {
044            this.store = store;
045            this.indexLock = store.getIndexLock();
046            setPageFile(store.getPageFile());
047            setKeyMarshaller(StringMarshaller.INSTANCE);
048            setValueMarshaller(LocationMarshaller.INSTANCE);
049        }
050    
051        public void setName(String name) {
052            this.name = name;
053        }
054    
055        public String getName() {
056            return this.name;
057        }
058    
059        void read(DataInput in) throws IOException {
060            setHeadPageId(in.readLong());
061        }
062    
063        public void write(DataOutput out) throws IOException {
064            out.writeLong(getHeadPageId());
065        }
066    
067        public synchronized void destroy() throws IOException {
068            synchronized (indexLock) {
069                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
070                    public void execute(Transaction tx) throws IOException {
071                        clear(tx);
072                        unload(tx);
073                    }
074                });
075            }
076        }
077    
078        public void addLast(final String id, final ByteSequence bs) throws IOException {
079            final Location location = this.store.write(bs, false);
080            synchronized (indexLock) {
081                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
082                    public void execute(Transaction tx) throws IOException {
083                        add(tx, id, location);
084                    }
085                });
086            }
087        }
088    
089        public void addFirst(final String id, final ByteSequence bs) throws IOException {
090            final Location location = this.store.write(bs, false);
091            synchronized (indexLock) {
092                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
093                    public void execute(Transaction tx) throws IOException {
094                        addFirst(tx, id, location);
095                    }
096                });
097            }
098        }
099    
100        public boolean remove(final String id) throws IOException {
101            final AtomicBoolean result = new AtomicBoolean();
102            synchronized (indexLock) {
103                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
104                    public void execute(Transaction tx) throws IOException {
105                        result.set(remove(tx, id) != null);
106                    }
107                });
108            }
109            return result.get();
110        }
111    
112        public boolean remove(final long position) throws IOException {
113            final AtomicBoolean result = new AtomicBoolean();
114            synchronized (indexLock) {
115                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
116                    public void execute(Transaction tx) throws IOException {
117                        Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
118                        if (iterator.hasNext()) {
119                            iterator.next();
120                            iterator.remove();
121                            result.set(true);
122                        } else {
123                            result.set(false);
124                        }
125                    }
126                });
127            }
128            return result.get();
129        }
130    
131        public PListEntry get(final long position) throws IOException {
132            PListEntry result = null;
133            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
134            synchronized (indexLock) {
135                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
136                    public void execute(Transaction tx) throws IOException {
137                        Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
138                        ref.set(iterator.next());
139                    }
140                });
141            }
142            if (ref.get() != null) {
143                ByteSequence bs = this.store.getPayload(ref.get().getValue());
144                result = new PListEntry(ref.get().getKey(), bs);
145            }
146            return result;
147        }
148    
149        public PListEntry getFirst() throws IOException {
150            PListEntry result = null;
151            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
152            synchronized (indexLock) {
153                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
154                    public void execute(Transaction tx) throws IOException {
155                        ref.set(getFirst(tx));
156                    }
157                });
158            }
159            if (ref.get() != null) {
160                ByteSequence bs = this.store.getPayload(ref.get().getValue());
161                result = new PListEntry(ref.get().getKey(), bs);
162            }
163            return result;
164        }
165    
166        public PListEntry getLast() throws IOException {
167            PListEntry result = null;
168            final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
169            synchronized (indexLock) {
170                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
171                    public void execute(Transaction tx) throws IOException {
172                        ref.set(getLast(tx));
173                    }
174                });
175            }
176            if (ref.get() != null) {
177                ByteSequence bs = this.store.getPayload(ref.get().getValue());
178                result = new PListEntry(ref.get().getKey(), bs);
179            }
180            return result;
181        }
182    
183        public boolean isEmpty() {
184            return size() == 0;
185        }
186    
187        public PListIterator iterator() throws IOException {
188            return new PListIterator();
189        }
190    
191        public final class PListIterator implements Iterator<PListEntry> {
192            final Iterator<Map.Entry<String, Location>> iterator;
193            final Transaction tx;
194    
195            PListIterator() throws IOException {
196                tx = store.pageFile.tx();
197                synchronized (indexLock) {
198                    this.iterator = iterator(tx);
199                }
200            }
201    
202            @Override
203            public boolean hasNext() {
204                return iterator.hasNext();
205            }
206    
207            @Override
208            public PListEntry next() {
209                Map.Entry<String, Location> entry = iterator.next();
210                ByteSequence bs = null;
211                try {
212                    bs = store.getPayload(entry.getValue());
213                } catch (IOException unexpected) {
214                    NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
215                    e.initCause(unexpected);
216                    throw e;
217                }
218                return new PListEntry(entry.getKey(), bs);
219            }
220    
221            @Override
222            public void remove() {
223                try {
224                    synchronized (indexLock) {
225                        tx.execute(new Transaction.Closure<IOException>() {
226                            @Override
227                            public void execute(Transaction tx) throws IOException {
228                                iterator.remove();
229                            }
230                        });
231                    }
232                } catch (IOException unexpected) {
233                    IllegalStateException e = new IllegalStateException(unexpected);
234                    e.initCause(unexpected);
235                    throw e;
236                }
237            }
238    
239            public void release() {
240                try {
241                    tx.rollback();
242                } catch (IOException unexpected) {
243                    IllegalStateException e = new IllegalStateException(unexpected);
244                    e.initCause(unexpected);
245                    throw e;
246                }
247            }
248        }
249    
250        public void claimFileLocations(final Set<Integer> candidates) throws IOException {
251            synchronized (indexLock) {
252                if (loaded.get()) {
253                    this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
254                        public void execute(Transaction tx) throws IOException {
255                            Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
256                            while (iterator.hasNext()) {
257                                Location location = iterator.next().getValue();
258                                candidates.remove(location.getDataFileId());
259                            }
260                        }
261                    });
262                }
263            }
264        }
265    
266        @Override
267        public String toString() {
268            return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
269        }
270    }