001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.concurrent.ConcurrentHashMap;
023    import java.util.concurrent.Future;
024    
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.command.MessageAck;
028    import org.apache.activemq.command.MessageId;
029    import org.apache.activemq.command.TransactionId;
030    import org.apache.activemq.command.XATransactionId;
031    import org.apache.activemq.store.AbstractMessageStore;
032    import org.apache.activemq.store.MessageStore;
033    import org.apache.activemq.store.PersistenceAdapter;
034    import org.apache.activemq.store.ProxyMessageStore;
035    import org.apache.activemq.store.ProxyTopicMessageStore;
036    import org.apache.activemq.store.TopicMessageStore;
037    import org.apache.activemq.store.TransactionRecoveryListener;
038    import org.apache.activemq.store.TransactionStore;
039    
040    /**
041     * Provides a TransactionStore implementation that can create transaction aware
042     * MessageStore objects from non transaction aware MessageStore objects.
043     *
044     *
045     */
046    public class MemoryTransactionStore implements TransactionStore {
047    
048        ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
049        ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
050        final PersistenceAdapter persistenceAdapter;
051    
052        private boolean doingRecover;
053    
054        public class Tx {
055            private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
056    
057            private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
058    
059            public void add(AddMessageCommand msg) {
060                messages.add(msg);
061            }
062    
063            public void add(RemoveMessageCommand ack) {
064                acks.add(ack);
065            }
066    
067            public Message[] getMessages() {
068                Message rc[] = new Message[messages.size()];
069                int count = 0;
070                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
071                    AddMessageCommand cmd = iter.next();
072                    rc[count++] = cmd.getMessage();
073                }
074                return rc;
075            }
076    
077            public MessageAck[] getAcks() {
078                MessageAck rc[] = new MessageAck[acks.size()];
079                int count = 0;
080                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
081                    RemoveMessageCommand cmd = iter.next();
082                    rc[count++] = cmd.getMessageAck();
083                }
084                return rc;
085            }
086    
087            /**
088             * @throws IOException
089             */
090            public void commit() throws IOException {
091                ConnectionContext ctx = new ConnectionContext();
092                persistenceAdapter.beginTransaction(ctx);
093                try {
094    
095                    // Do all the message adds.
096                    for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
097                        AddMessageCommand cmd = iter.next();
098                        cmd.run(ctx);
099                    }
100                    // And removes..
101                    for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
102                        RemoveMessageCommand cmd = iter.next();
103                        cmd.run(ctx);
104                    }
105    
106                } catch ( IOException e ) {
107                    persistenceAdapter.rollbackTransaction(ctx);
108                    throw e;
109                }
110                persistenceAdapter.commitTransaction(ctx);
111            }
112        }
113    
114        public interface AddMessageCommand {
115            Message getMessage();
116    
117            void run(ConnectionContext context) throws IOException;
118        }
119    
120        public interface RemoveMessageCommand {
121            MessageAck getMessageAck();
122    
123            void run(ConnectionContext context) throws IOException;
124        }
125    
126        public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
127            this.persistenceAdapter=persistenceAdapter;
128        }
129    
130        public MessageStore proxy(MessageStore messageStore) {
131            return new ProxyMessageStore(messageStore) {
132                @Override
133                public void addMessage(ConnectionContext context, final Message send) throws IOException {
134                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
135                }
136    
137                @Override
138                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
139                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
140                }
141    
142                @Override
143                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
144                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
145                    return AbstractMessageStore.FUTURE;
146                 }
147    
148                @Override
149                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
150                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
151                    return AbstractMessageStore.FUTURE;
152                 }
153    
154                @Override
155                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
156                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
157                }
158    
159                @Override
160                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
161                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
162                }
163            };
164        }
165    
166        public TopicMessageStore proxy(TopicMessageStore messageStore) {
167            return new ProxyTopicMessageStore(messageStore) {
168                @Override
169                public void addMessage(ConnectionContext context, final Message send) throws IOException {
170                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
171                }
172    
173                @Override
174                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
175                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
176                }
177    
178                @Override
179                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
180                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
181                    return AbstractMessageStore.FUTURE;
182                 }
183    
184                @Override
185                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
186                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
187                    return AbstractMessageStore.FUTURE;
188                 }
189    
190                @Override
191                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
192                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
193                }
194    
195                @Override
196                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
197                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
198                }
199    
200                @Override
201                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
202                                MessageId messageId, MessageAck ack) throws IOException {
203                    MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
204                            subscriptionName, messageId, ack);
205                }
206            };
207        }
208    
209        /**
210         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
211         */
212        public void prepare(TransactionId txid) {
213            Tx tx = inflightTransactions.remove(txid);
214            if (tx == null) {
215                return;
216            }
217            preparedTransactions.put(txid, tx);
218        }
219    
220        public Tx getTx(Object txid) {
221            Tx tx = inflightTransactions.get(txid);
222            if (tx == null) {
223                tx = new Tx();
224                inflightTransactions.put(txid, tx);
225            }
226            return tx;
227        }
228    
229        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
230            if (preCommit != null) {
231                preCommit.run();
232            }
233            Tx tx;
234            if (wasPrepared) {
235                tx = preparedTransactions.remove(txid);
236            } else {
237                tx = inflightTransactions.remove(txid);
238            }
239    
240            if (tx == null) {
241                if (postCommit != null) {
242                    postCommit.run();
243                }
244                return;
245            }
246            // ensure message order w.r.t to cursor and store for setBatch()
247            synchronized (this) {
248                tx.commit();
249                if (postCommit != null) {
250                    postCommit.run();
251                }
252            }
253        }
254    
255        /**
256         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
257         */
258        public void rollback(TransactionId txid) {
259            preparedTransactions.remove(txid);
260            inflightTransactions.remove(txid);
261        }
262    
263        public void start() throws Exception {
264        }
265    
266        public void stop() throws Exception {
267        }
268    
269        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
270            // All the inflight transactions get rolled back..
271            inflightTransactions.clear();
272            this.doingRecover = true;
273            try {
274                for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
275                    Object txid = iter.next();
276                    Tx tx = preparedTransactions.get(txid);
277                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
278                }
279            } finally {
280                this.doingRecover = false;
281            }
282        }
283    
284        /**
285         * @param message
286         * @throws IOException
287         */
288        void addMessage(final MessageStore destination, final Message message) throws IOException {
289    
290            if (doingRecover) {
291                return;
292            }
293    
294            if (message.getTransactionId() != null) {
295                Tx tx = getTx(message.getTransactionId());
296                tx.add(new AddMessageCommand() {
297                    public Message getMessage() {
298                        return message;
299                    }
300    
301                    public void run(ConnectionContext ctx) throws IOException {
302                        destination.addMessage(ctx, message);
303                    }
304    
305                });
306            } else {
307                destination.addMessage(null, message);
308            }
309        }
310    
311        /**
312         * @param ack
313         * @throws IOException
314         */
315        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
316            if (doingRecover) {
317                return;
318            }
319    
320            if (ack.isInTransaction()) {
321                Tx tx = getTx(ack.getTransactionId());
322                tx.add(new RemoveMessageCommand() {
323                    public MessageAck getMessageAck() {
324                        return ack;
325                    }
326    
327                    public void run(ConnectionContext ctx) throws IOException {
328                        destination.removeMessage(ctx, ack);
329                    }
330                });
331            } else {
332                destination.removeMessage(null, ack);
333            }
334        }
335    
336        final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
337                               final MessageId messageId, final MessageAck ack) throws IOException {
338            if (doingRecover) {
339                return;
340            }
341    
342            if (ack.isInTransaction()) {
343                Tx tx = getTx(ack.getTransactionId());
344                tx.add(new RemoveMessageCommand() {
345                    public MessageAck getMessageAck() {
346                        return ack;
347                    }
348    
349                    public void run(ConnectionContext ctx) throws IOException {
350                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
351                    }
352                });
353            } else {
354                destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
355            }
356        }
357    
358    
359        public void delete() {
360            inflightTransactions.clear();
361            preparedTransactions.clear();
362            doingRecover = false;
363        }
364    
365    }