001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.LinkedHashMap;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.ConcurrentHashMap;
026    
027    import javax.jms.JMSException;
028    import javax.transaction.xa.XAException;
029    
030    import org.apache.activemq.ActiveMQMessageAudit;
031    import org.apache.activemq.broker.jmx.ManagedRegionBroker;
032    import org.apache.activemq.broker.region.Destination;
033    import org.apache.activemq.broker.region.Queue;
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.BaseCommand;
036    import org.apache.activemq.command.ConnectionInfo;
037    import org.apache.activemq.command.LocalTransactionId;
038    import org.apache.activemq.command.Message;
039    import org.apache.activemq.command.MessageAck;
040    import org.apache.activemq.command.ProducerInfo;
041    import org.apache.activemq.command.TransactionId;
042    import org.apache.activemq.command.XATransactionId;
043    import org.apache.activemq.state.ProducerState;
044    import org.apache.activemq.store.TransactionRecoveryListener;
045    import org.apache.activemq.store.TransactionStore;
046    import org.apache.activemq.transaction.LocalTransaction;
047    import org.apache.activemq.transaction.Synchronization;
048    import org.apache.activemq.transaction.Transaction;
049    import org.apache.activemq.transaction.XATransaction;
050    import org.apache.activemq.util.IOExceptionSupport;
051    import org.apache.activemq.util.WrappedException;
052    import org.slf4j.Logger;
053    import org.slf4j.LoggerFactory;
054    
055    /**
056     * This broker filter handles the transaction related operations in the Broker
057     * interface.
058     * 
059     * 
060     */
061    public class TransactionBroker extends BrokerFilter {
062    
063        private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
064    
065        // The prepared XA transactions.
066        private TransactionStore transactionStore;
067        private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
068        private ActiveMQMessageAudit audit;
069    
070        public TransactionBroker(Broker next, TransactionStore transactionStore) {
071            super(next);
072            this.transactionStore = transactionStore;
073        }
074    
075        // ////////////////////////////////////////////////////////////////////////////
076        //
077        // Life cycle Methods
078        //
079        // ////////////////////////////////////////////////////////////////////////////
080    
081        /**
082         * Recovers any prepared transactions.
083         */
084        public void start() throws Exception {
085            transactionStore.start();
086            try {
087                final ConnectionContext context = new ConnectionContext();
088                context.setBroker(this);
089                context.setInRecoveryMode(true);
090                context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
091                context.setProducerFlowControl(false);
092                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
093                producerExchange.setMutable(true);
094                producerExchange.setConnectionContext(context);
095                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
096                final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
097                consumerExchange.setConnectionContext(context);
098                transactionStore.recover(new TransactionRecoveryListener() {
099                    public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
100                        try {
101                            beginTransaction(context, xid);
102                            XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
103                            for (int i = 0; i < addedMessages.length; i++) {
104                                forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
105                            }
106                            for (int i = 0; i < aks.length; i++) {
107                                forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
108                            }
109                            transaction.setState(Transaction.PREPARED_STATE);
110                            registerMBean(transaction);
111                            if (LOG.isDebugEnabled()) {
112                                LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
113                            }
114                        } catch (Throwable e) {
115                            throw new WrappedException(e);
116                        }
117                    }
118                });
119            } catch (WrappedException e) {
120                Throwable cause = e.getCause();
121                throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
122            }
123            next.start();
124        }
125    
126        private void registerMBean(XATransaction transaction) {
127            if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
128                ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
129                managedRegionBroker.registerRecoveredTransactionMBean(transaction);
130            }
131        }
132    
133        private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
134                                                        ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
135            Destination destination =  addDestination(context, amqDestination, false);
136            registerSync(destination, transaction, ack);
137        }
138    
139        private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
140            if (destination instanceof Queue) {
141                Synchronization sync = new PreparedDestinationCompletion((Queue) destination, command.isMessage());
142                // ensure one per destination in the list
143                transaction.removeSynchronization(sync);
144                transaction.addSynchronization(sync);
145            }
146        }
147    
148        static class PreparedDestinationCompletion extends Synchronization {
149            final Queue queue;
150            final boolean messageSend;
151            public PreparedDestinationCompletion(final Queue queue, boolean messageSend) {
152                this.queue = queue;
153                // rollback relevant to acks, commit to sends
154                this.messageSend = messageSend;
155            }
156    
157            @Override
158            public int hashCode() {
159                return System.identityHashCode(queue) +
160                        System.identityHashCode(Boolean.valueOf(messageSend));
161            }
162    
163            @Override
164            public boolean equals(Object other) {
165                return other instanceof PreparedDestinationCompletion &&
166                        queue.equals(((PreparedDestinationCompletion) other).queue) &&
167                        messageSend == ((PreparedDestinationCompletion) other).messageSend;
168            }
169    
170            @Override
171            public void afterRollback() throws Exception {
172                if (!messageSend) {
173                    queue.clearPendingMessages();
174                    if (LOG.isDebugEnabled()) {
175                        LOG.debug("cleared pending from afterRollback : " + queue);
176                    }
177                }
178            }
179    
180            @Override
181            public void afterCommit() throws Exception {
182                if (messageSend) {
183                    queue.clearPendingMessages();
184                    if (LOG.isDebugEnabled()) {
185                        LOG.debug("cleared pending from afterCommit : " + queue);
186                    }
187                }
188            }
189        }
190    
191        public void stop() throws Exception {
192            transactionStore.stop();
193            next.stop();
194        }
195    
196        // ////////////////////////////////////////////////////////////////////////////
197        //
198        // BrokerFilter overrides
199        //
200        // ////////////////////////////////////////////////////////////////////////////
201        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
202            List<TransactionId> txs = new ArrayList<TransactionId>();
203            synchronized (xaTransactions) {
204                for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
205                    Transaction tx = iter.next();
206                    if (tx.isPrepared()) {
207                        if (LOG.isDebugEnabled()) {
208                            LOG.debug("prepared transaction: " + tx.getTransactionId());
209                        }
210                        txs.add(tx.getTransactionId());
211                    }
212                }
213            }
214            XATransactionId rc[] = new XATransactionId[txs.size()];
215            txs.toArray(rc);
216            if (LOG.isDebugEnabled()) {
217                LOG.debug("prepared transaction list size: " + rc.length);
218            }
219            return rc;
220        }
221    
222        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
223            // the transaction may have already been started.
224            if (xid.isXATransaction()) {
225                XATransaction transaction = null;
226                synchronized (xaTransactions) {
227                    transaction = xaTransactions.get(xid);
228                    if (transaction != null) {
229                        return;
230                    }
231                    transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
232                    xaTransactions.put(xid, transaction);
233                }
234            } else {
235                Map<TransactionId, Transaction> transactionMap = context.getTransactions();
236                Transaction transaction = transactionMap.get(xid);
237                if (transaction != null) {
238                    throw new JMSException("Transaction '" + xid + "' has already been started.");
239                }
240                transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
241                transactionMap.put(xid, transaction);
242            }
243        }
244    
245        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
246            Transaction transaction = getTransaction(context, xid, false);
247            return transaction.prepare();
248        }
249    
250        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
251            Transaction transaction = getTransaction(context, xid, true);
252            transaction.commit(onePhase);
253        }
254    
255        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
256            Transaction transaction = getTransaction(context, xid, true);
257            transaction.rollback();
258        }
259    
260        public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
261            Transaction transaction = getTransaction(context, xid, true);
262            transaction.rollback();
263        }
264    
265        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
266            // This method may be invoked recursively.
267            // Track original tx so that it can be restored.
268            final ConnectionContext context = consumerExchange.getConnectionContext();
269            Transaction originalTx = context.getTransaction();
270            Transaction transaction = null;
271            if (ack.isInTransaction()) {
272                transaction = getTransaction(context, ack.getTransactionId(), false);
273            }
274            context.setTransaction(transaction);
275            try {
276                next.acknowledge(consumerExchange, ack);
277            } finally {
278                context.setTransaction(originalTx);
279            }
280        }
281    
282        public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
283            // This method may be invoked recursively.
284            // Track original tx so that it can be restored.
285            final ConnectionContext context = producerExchange.getConnectionContext();
286            Transaction originalTx = context.getTransaction();
287            Transaction transaction = null;
288            Synchronization sync = null;
289            if (message.getTransactionId() != null) {
290                transaction = getTransaction(context, message.getTransactionId(), false);
291                if (transaction != null) {
292                    sync = new Synchronization() {
293    
294                        public void afterRollback() {
295                            if (audit != null) {
296                                audit.rollback(message);
297                            }
298                        }
299                    };
300                    transaction.addSynchronization(sync);
301                }
302            }
303            if (audit == null || !audit.isDuplicate(message)) {
304                context.setTransaction(transaction);
305                try {
306                    next.send(producerExchange, message);
307                } finally {
308                    context.setTransaction(originalTx);
309                }
310            } else {
311                if (sync != null && transaction != null) {
312                    transaction.removeSynchronization(sync);
313                }
314                if (LOG.isDebugEnabled()) {
315                    LOG.debug("IGNORING duplicate message " + message);
316                }
317            }
318        }
319    
320        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
321            for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
322                try {
323                    Transaction transaction = iter.next();
324                    transaction.rollback();
325                } catch (Exception e) {
326                    LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
327                }
328                iter.remove();
329            }
330    
331            synchronized (xaTransactions) {
332                // first find all txs that belongs to the connection
333                ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
334                for (XATransaction tx : xaTransactions.values()) {
335                    if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
336                        txs.add(tx);
337                    }
338                }
339    
340                // then remove them
341                // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
342                for (XATransaction tx : txs) {
343                    try {
344                        tx.rollback();
345                    } catch (Exception e) {
346                        LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
347                    }
348                }
349    
350            }
351            next.removeConnection(context, info, error);
352        }
353    
354        // ////////////////////////////////////////////////////////////////////////////
355        //
356        // Implementation help methods.
357        //
358        // ////////////////////////////////////////////////////////////////////////////
359        public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
360            Map transactionMap = null;
361            synchronized (xaTransactions) {
362                transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
363            }
364            Transaction transaction = (Transaction)transactionMap.get(xid);
365            if (transaction != null) {
366                return transaction;
367            }
368            if (xid.isXATransaction()) {
369                XAException e = new XAException("Transaction '" + xid + "' has not been started.");
370                e.errorCode = XAException.XAER_NOTA;
371                throw e;
372            } else {
373                throw new JMSException("Transaction '" + xid + "' has not been started.");
374            }
375        }
376    
377        public void removeTransaction(XATransactionId xid) {
378            synchronized (xaTransactions) {
379                xaTransactions.remove(xid);
380            }
381        }
382    
383        public synchronized void brokerServiceStarted() {
384            super.brokerServiceStarted();
385            if (getBrokerService().isSupportFailOver() && audit == null) {
386                audit = new ActiveMQMessageAudit();
387            }
388        }
389    
390    }