001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.net.SocketException;
022import java.net.URI;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Properties;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038
039import javax.transaction.xa.XAResource;
040
041import org.apache.activemq.advisory.AdvisorySupport;
042import org.apache.activemq.broker.region.ConnectionStatistics;
043import org.apache.activemq.broker.region.RegionBroker;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.command.BrokerInfo;
046import org.apache.activemq.command.BrokerSubscriptionInfo;
047import org.apache.activemq.command.Command;
048import org.apache.activemq.command.CommandTypes;
049import org.apache.activemq.command.ConnectionControl;
050import org.apache.activemq.command.ConnectionError;
051import org.apache.activemq.command.ConnectionId;
052import org.apache.activemq.command.ConnectionInfo;
053import org.apache.activemq.command.ConsumerControl;
054import org.apache.activemq.command.ConsumerId;
055import org.apache.activemq.command.ConsumerInfo;
056import org.apache.activemq.command.ControlCommand;
057import org.apache.activemq.command.DataArrayResponse;
058import org.apache.activemq.command.DestinationInfo;
059import org.apache.activemq.command.ExceptionResponse;
060import org.apache.activemq.command.FlushCommand;
061import org.apache.activemq.command.IntegerResponse;
062import org.apache.activemq.command.KeepAliveInfo;
063import org.apache.activemq.command.Message;
064import org.apache.activemq.command.MessageAck;
065import org.apache.activemq.command.MessageDispatch;
066import org.apache.activemq.command.MessageDispatchNotification;
067import org.apache.activemq.command.MessagePull;
068import org.apache.activemq.command.ProducerAck;
069import org.apache.activemq.command.ProducerId;
070import org.apache.activemq.command.ProducerInfo;
071import org.apache.activemq.command.RemoveInfo;
072import org.apache.activemq.command.RemoveSubscriptionInfo;
073import org.apache.activemq.command.Response;
074import org.apache.activemq.command.SessionId;
075import org.apache.activemq.command.SessionInfo;
076import org.apache.activemq.command.ShutdownInfo;
077import org.apache.activemq.command.TransactionId;
078import org.apache.activemq.command.TransactionInfo;
079import org.apache.activemq.command.WireFormatInfo;
080import org.apache.activemq.network.DemandForwardingBridge;
081import org.apache.activemq.network.MBeanNetworkListener;
082import org.apache.activemq.network.NetworkBridgeConfiguration;
083import org.apache.activemq.network.NetworkBridgeFactory;
084import org.apache.activemq.network.NetworkConnector;
085import org.apache.activemq.security.MessageAuthorizationPolicy;
086import org.apache.activemq.state.CommandVisitor;
087import org.apache.activemq.state.ConnectionState;
088import org.apache.activemq.state.ConsumerState;
089import org.apache.activemq.state.ProducerState;
090import org.apache.activemq.state.SessionState;
091import org.apache.activemq.state.TransactionState;
092import org.apache.activemq.thread.Task;
093import org.apache.activemq.thread.TaskRunner;
094import org.apache.activemq.thread.TaskRunnerFactory;
095import org.apache.activemq.transaction.Transaction;
096import org.apache.activemq.transport.DefaultTransportListener;
097import org.apache.activemq.transport.ResponseCorrelator;
098import org.apache.activemq.transport.TransmitCallback;
099import org.apache.activemq.transport.Transport;
100import org.apache.activemq.transport.TransportDisposedIOException;
101import org.apache.activemq.util.IntrospectionSupport;
102import org.apache.activemq.util.MarshallingSupport;
103import org.apache.activemq.util.NetworkBridgeUtils;
104import org.apache.activemq.util.SubscriptionKey;
105import org.slf4j.Logger;
106import org.slf4j.LoggerFactory;
107import org.slf4j.MDC;
108
109public class TransportConnection implements Connection, Task, CommandVisitor {
110    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
111    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
112    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
113    // Keeps track of the broker and connector that created this connection.
114    protected final Broker broker;
115    protected final BrokerService brokerService;
116    protected final TransportConnector connector;
117    // Keeps track of the state of the connections.
118    // protected final ConcurrentHashMap localConnectionStates=new
119    // ConcurrentHashMap();
120    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
121    // The broker and wireformat info that was exchanged.
122    protected BrokerInfo brokerInfo;
123    protected final List<Command> dispatchQueue = new LinkedList<>();
124    protected TaskRunner taskRunner;
125    protected final AtomicReference<Throwable> transportException = new AtomicReference<>();
126    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
127    private final Transport transport;
128    private MessageAuthorizationPolicy messageAuthorizationPolicy;
129    private WireFormatInfo wireFormatInfo;
130    // Used to do async dispatch.. this should perhaps be pushed down into the
131    // transport layer..
132    private boolean inServiceException;
133    private final ConnectionStatistics statistics = new ConnectionStatistics();
134    private boolean manageable;
135    private boolean slow;
136    private boolean markedCandidate;
137    private boolean blockedCandidate;
138    private boolean blocked;
139    private boolean connected;
140    private boolean active;
141    private final AtomicBoolean starting = new AtomicBoolean();
142    private final AtomicBoolean pendingStop = new AtomicBoolean();
143    private long timeStamp;
144    private final AtomicBoolean stopping = new AtomicBoolean(false);
145    private final CountDownLatch stopped = new CountDownLatch(1);
146    private final AtomicBoolean asyncException = new AtomicBoolean(false);
147    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>();
148    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>();
149    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
150    private ConnectionContext context;
151    private boolean networkConnection;
152    private boolean faultTolerantConnection;
153    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
154    private DemandForwardingBridge duplexBridge;
155    private final TaskRunnerFactory taskRunnerFactory;
156    private final TaskRunnerFactory stopTaskRunnerFactory;
157    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
158    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
159    private String duplexNetworkConnectorId;
160
161    /**
162     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
163     *                          else commands are sent async.
164     * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
165     */
166    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
167                               TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
168        this.connector = connector;
169        this.broker = broker;
170        this.brokerService = broker.getBrokerService();
171
172        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
173        brokerConnectionStates = rb.getConnectionStates();
174        if (connector != null) {
175            this.statistics.setParent(connector.getStatistics());
176            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
177        }
178        this.taskRunnerFactory = taskRunnerFactory;
179        this.stopTaskRunnerFactory = stopTaskRunnerFactory;
180        this.transport = transport;
181        if( this.transport instanceof BrokerServiceAware ) {
182            ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
183        }
184        this.transport.setTransportListener(new DefaultTransportListener() {
185            @Override
186            public void onCommand(Object o) {
187                serviceLock.readLock().lock();
188                try {
189                    if (!(o instanceof Command)) {
190                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
191                    }
192                    Command command = (Command) o;
193                    if (!brokerService.isStopping()) {
194                        Response response = service(command);
195                        if (response != null && !brokerService.isStopping()) {
196                            dispatchSync(response);
197                        }
198                    } else {
199                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
200                    }
201                } finally {
202                    serviceLock.readLock().unlock();
203                }
204            }
205
206            @Override
207            public void onException(IOException exception) {
208                serviceLock.readLock().lock();
209                try {
210                    serviceTransportException(exception);
211                } finally {
212                    serviceLock.readLock().unlock();
213                }
214            }
215        });
216        connected = true;
217    }
218
219    /**
220     * Returns the number of messages to be dispatched to this connection
221     *
222     * @return size of dispatch queue
223     */
224    @Override
225    public int getDispatchQueueSize() {
226        synchronized (dispatchQueue) {
227            return dispatchQueue.size();
228        }
229    }
230
231    public void serviceTransportException(IOException e) {
232        if (!stopping.get() && !pendingStop.get()) {
233            transportException.set(e);
234            if (TRANSPORTLOG.isDebugEnabled()) {
235                TRANSPORTLOG.debug(this + " failed: " + e, e);
236            } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
237                TRANSPORTLOG.warn(this + " failed: " + e);
238            }
239            stopAsync(e);
240        }
241    }
242
243    private boolean expected(IOException e) {
244        return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
245    }
246
247    private boolean isStomp() {
248        URI uri = connector.getUri();
249        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
250    }
251
252    /**
253     * Calls the serviceException method in an async thread. Since handling a
254     * service exception closes a socket, we should not tie up broker threads
255     * since client sockets may hang or cause deadlocks.
256     */
257    @Override
258    public void serviceExceptionAsync(final IOException e) {
259        if (asyncException.compareAndSet(false, true)) {
260            new Thread("Async Exception Handler") {
261                @Override
262                public void run() {
263                    serviceException(e);
264                }
265            }.start();
266        }
267    }
268
269    /**
270     * Closes a clients connection due to a detected error. Errors are ignored
271     * if: the client is closing or broker is closing. Otherwise, the connection
272     * error transmitted to the client before stopping it's transport.
273     */
274    @Override
275    public void serviceException(Throwable e) {
276        // are we a transport exception such as not being able to dispatch
277        // synchronously to a transport
278        if (e instanceof IOException) {
279            serviceTransportException((IOException) e);
280        } else if (e.getClass() == BrokerStoppedException.class) {
281            // Handle the case where the broker is stopped
282            // But the client is still connected.
283            if (!stopping.get()) {
284                SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
285                ConnectionError ce = new ConnectionError();
286                ce.setException(e);
287                dispatchSync(ce);
288                // Record the error that caused the transport to stop
289                transportException.set(e);
290                // Wait a little bit to try to get the output buffer to flush
291                // the exception notification to the client.
292                try {
293                    Thread.sleep(500);
294                } catch (InterruptedException ie) {
295                    Thread.currentThread().interrupt();
296                }
297                // Worst case is we just kill the connection before the
298                // notification gets to him.
299                stopAsync();
300            }
301        } else if (!stopping.get() && !inServiceException) {
302            inServiceException = true;
303            try {
304                if (SERVICELOG.isDebugEnabled()) {
305                    SERVICELOG.debug("Async error occurred: " + e, e);
306                } else {
307                    SERVICELOG.warn("Async error occurred: " + e);
308                }
309                ConnectionError ce = new ConnectionError();
310                ce.setException(e);
311                if (pendingStop.get()) {
312                    dispatchSync(ce);
313                } else {
314                    dispatchAsync(ce);
315                }
316            } finally {
317                inServiceException = false;
318            }
319        }
320    }
321
322    @Override
323    public Response service(Command command) {
324        MDC.put("activemq.connector", connector.getUri().toString());
325        Response response = null;
326        boolean responseRequired = command.isResponseRequired();
327        int commandId = command.getCommandId();
328        try {
329            if (!pendingStop.get()) {
330                response = command.visit(this);
331            } else {
332                response = new ExceptionResponse(transportException.get());
333            }
334        } catch (Throwable e) {
335            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
336                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
337                        + " command: " + command + ", exception: " + e, e);
338            }
339
340            if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
341                LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
342                responseRequired = false;
343            }
344
345            if (responseRequired) {
346                if (e instanceof SecurityException || e.getCause() instanceof SecurityException) {
347                    SERVICELOG.warn("Security Error occurred on connection to: {}, {}",
348                            transport.getRemoteAddress(), e.getMessage());
349                }
350                response = new ExceptionResponse(e);
351            } else {
352                forceRollbackOnlyOnFailedAsyncTransactionOp(e, command);
353                serviceException(e);
354            }
355        }
356        if (responseRequired) {
357            if (response == null) {
358                response = new Response();
359            }
360            response.setCorrelationId(commandId);
361        }
362        // The context may have been flagged so that the response is not
363        // sent.
364        if (context != null) {
365            if (context.isDontSendReponse()) {
366                context.setDontSendReponse(false);
367                response = null;
368            }
369            context = null;
370        }
371        MDC.remove("activemq.connector");
372        return response;
373    }
374
375    private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) {
376        if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) {
377            Transaction transaction = getActiveTransaction(command);
378            if (transaction != null && !transaction.isRollbackOnly()) {
379                LOG.debug("on async exception, force rollback of transaction for: " + command, e);
380                transaction.setRollbackOnly(e);
381            }
382        }
383    }
384
385    private Transaction getActiveTransaction(Command command) {
386        Transaction transaction = null;
387        try {
388            if (command instanceof Message) {
389                Message messageSend = (Message) command;
390                ProducerId producerId = messageSend.getProducerId();
391                ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
392                transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId());
393            } else if (command instanceof  MessageAck) {
394                MessageAck messageAck = (MessageAck) command;
395                ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId());
396                if (consumerExchange != null) {
397                    transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId());
398                }
399            }
400        } catch(Exception ignored){
401            LOG.trace("failed to find active transaction for command: " + command, ignored);
402        }
403        return transaction;
404    }
405
406    private boolean isInTransaction(Command command) {
407        return command instanceof Message && ((Message)command).isInTransaction()
408                || command instanceof MessageAck && ((MessageAck)command).isInTransaction();
409    }
410
411    @Override
412    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
413        return null;
414    }
415
416    @Override
417    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
418        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
419        return null;
420    }
421
422    @Override
423    public Response processWireFormat(WireFormatInfo info) throws Exception {
424        wireFormatInfo = info;
425        protocolVersion.set(info.getVersion());
426        return null;
427    }
428
429    @Override
430    public Response processShutdown(ShutdownInfo info) throws Exception {
431        stopAsync();
432        return null;
433    }
434
435    @Override
436    public Response processFlush(FlushCommand command) throws Exception {
437        return null;
438    }
439
440    @Override
441    public Response processBeginTransaction(TransactionInfo info) throws Exception {
442        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
443        context = null;
444        if (cs != null) {
445            context = cs.getContext();
446        }
447        if (cs == null) {
448            throw new NullPointerException("Context is null");
449        }
450        // Avoid replaying dup commands
451        if (cs.getTransactionState(info.getTransactionId()) == null) {
452            cs.addTransactionState(info.getTransactionId());
453            broker.beginTransaction(context, info.getTransactionId());
454        }
455        return null;
456    }
457
458    @Override
459    public int getActiveTransactionCount() {
460        int rc = 0;
461        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
462            rc += cs.getTransactionStates().size();
463        }
464        return rc;
465    }
466
467    @Override
468    public Long getOldestActiveTransactionDuration() {
469        TransactionState oldestTX = null;
470        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
471            Collection<TransactionState> transactions = cs.getTransactionStates();
472            for (TransactionState transaction : transactions) {
473                if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) {
474                    oldestTX = transaction;
475                }
476            }
477        }
478        if( oldestTX == null ) {
479            return null;
480        }
481        return System.currentTimeMillis() - oldestTX.getCreatedAt();
482    }
483
484    @Override
485    public Response processEndTransaction(TransactionInfo info) throws Exception {
486        // No need to do anything. This packet is just sent by the client
487        // make sure he is synced with the server as commit command could
488        // come from a different connection.
489        return null;
490    }
491
492    @Override
493    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
494        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
495        context = null;
496        if (cs != null) {
497            context = cs.getContext();
498        }
499        if (cs == null) {
500            throw new NullPointerException("Context is null");
501        }
502        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
503        if (transactionState == null) {
504            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
505                    + info.getTransactionId());
506        }
507        // Avoid dups.
508        if (!transactionState.isPrepared()) {
509            transactionState.setPrepared(true);
510            int result = broker.prepareTransaction(context, info.getTransactionId());
511            transactionState.setPreparedResult(result);
512            if (result == XAResource.XA_RDONLY) {
513                // we are done, no further rollback or commit from TM
514                cs.removeTransactionState(info.getTransactionId());
515            }
516            IntegerResponse response = new IntegerResponse(result);
517            return response;
518        } else {
519            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
520            return response;
521        }
522    }
523
524    @Override
525    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
526        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
527        context = cs.getContext();
528        cs.removeTransactionState(info.getTransactionId());
529        broker.commitTransaction(context, info.getTransactionId(), true);
530        return null;
531    }
532
533    @Override
534    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
535        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
536        context = cs.getContext();
537        cs.removeTransactionState(info.getTransactionId());
538        broker.commitTransaction(context, info.getTransactionId(), false);
539        return null;
540    }
541
542    @Override
543    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
544        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
545        context = cs.getContext();
546        cs.removeTransactionState(info.getTransactionId());
547        broker.rollbackTransaction(context, info.getTransactionId());
548        return null;
549    }
550
551    @Override
552    public Response processForgetTransaction(TransactionInfo info) throws Exception {
553        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
554        context = cs.getContext();
555        broker.forgetTransaction(context, info.getTransactionId());
556        return null;
557    }
558
559    @Override
560    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
561        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
562        context = cs.getContext();
563        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
564        return new DataArrayResponse(preparedTransactions);
565    }
566
567    @Override
568    public Response processMessage(Message messageSend) throws Exception {
569        ProducerId producerId = messageSend.getProducerId();
570        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
571        if (producerExchange.canDispatch(messageSend)) {
572            broker.send(producerExchange, messageSend);
573        }
574        return null;
575    }
576
577    @Override
578    public Response processMessageAck(MessageAck ack) throws Exception {
579        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
580        if (consumerExchange != null) {
581            broker.acknowledge(consumerExchange, ack);
582        } else if (ack.isInTransaction()) {
583            LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
584        }
585        return null;
586    }
587
588    @Override
589    public Response processMessagePull(MessagePull pull) throws Exception {
590        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
591    }
592
593    @Override
594    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
595        broker.processDispatchNotification(notification);
596        return null;
597    }
598
599    @Override
600    public Response processAddDestination(DestinationInfo info) throws Exception {
601        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
602        broker.addDestinationInfo(cs.getContext(), info);
603        if (info.getDestination().isTemporary()) {
604            cs.addTempDestination(info);
605        }
606        return null;
607    }
608
609    @Override
610    public Response processRemoveDestination(DestinationInfo info) throws Exception {
611        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
612        broker.removeDestinationInfo(cs.getContext(), info);
613        if (info.getDestination().isTemporary()) {
614            cs.removeTempDestination(info.getDestination());
615        }
616        return null;
617    }
618
619    @Override
620    public Response processAddProducer(ProducerInfo info) throws Exception {
621        SessionId sessionId = info.getProducerId().getParentId();
622        ConnectionId connectionId = sessionId.getParentId();
623        TransportConnectionState cs = lookupConnectionState(connectionId);
624        if (cs == null) {
625            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
626                    + connectionId);
627        }
628        SessionState ss = cs.getSessionState(sessionId);
629        if (ss == null) {
630            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
631                    + sessionId);
632        }
633        // Avoid replaying dup commands
634        if (!ss.getProducerIds().contains(info.getProducerId())) {
635            ActiveMQDestination destination = info.getDestination();
636            // Do not check for null here as it would cause the count of max producers to exclude
637            // anonymous producers.  The isAdvisoryTopic method checks for null so it is safe to
638            // call it from here with a null Destination value.
639            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
640                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
641                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
642                }
643            }
644            broker.addProducer(cs.getContext(), info);
645            try {
646                ss.addProducer(info);
647            } catch (IllegalStateException e) {
648                broker.removeProducer(cs.getContext(), info);
649            }
650
651        }
652        return null;
653    }
654
655    @Override
656    public Response processRemoveProducer(ProducerId id) throws Exception {
657        SessionId sessionId = id.getParentId();
658        ConnectionId connectionId = sessionId.getParentId();
659        TransportConnectionState cs = lookupConnectionState(connectionId);
660        SessionState ss = cs.getSessionState(sessionId);
661        if (ss == null) {
662            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
663                    + sessionId);
664        }
665        ProducerState ps = ss.removeProducer(id);
666        if (ps == null) {
667            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
668        }
669        removeProducerBrokerExchange(id);
670        broker.removeProducer(cs.getContext(), ps.getInfo());
671        return null;
672    }
673
674    @Override
675    public Response processAddConsumer(ConsumerInfo info) throws Exception {
676        SessionId sessionId = info.getConsumerId().getParentId();
677        ConnectionId connectionId = sessionId.getParentId();
678        TransportConnectionState cs = lookupConnectionState(connectionId);
679        if (cs == null) {
680            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
681                    + connectionId);
682        }
683        SessionState ss = cs.getSessionState(sessionId);
684        if (ss == null) {
685            throw new IllegalStateException(broker.getBrokerName()
686                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
687        }
688        // Avoid replaying dup commands
689        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
690            ActiveMQDestination destination = info.getDestination();
691            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
692                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
693                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
694                }
695            }
696
697            broker.addConsumer(cs.getContext(), info);
698            try {
699                ss.addConsumer(info);
700                addConsumerBrokerExchange(cs, info.getConsumerId());
701            } catch (IllegalStateException e) {
702                broker.removeConsumer(cs.getContext(), info);
703            }
704
705        }
706        return null;
707    }
708
709    @Override
710    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
711        SessionId sessionId = id.getParentId();
712        ConnectionId connectionId = sessionId.getParentId();
713        TransportConnectionState cs = lookupConnectionState(connectionId);
714        if (cs == null) {
715            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
716                    + connectionId);
717        }
718        SessionState ss = cs.getSessionState(sessionId);
719        if (ss == null) {
720            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
721                    + sessionId);
722        }
723        ConsumerState consumerState = ss.removeConsumer(id);
724        if (consumerState == null) {
725            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
726        }
727        ConsumerInfo info = consumerState.getInfo();
728        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
729        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
730        removeConsumerBrokerExchange(id);
731        return null;
732    }
733
734    @Override
735    public Response processAddSession(SessionInfo info) throws Exception {
736        ConnectionId connectionId = info.getSessionId().getParentId();
737        TransportConnectionState cs = lookupConnectionState(connectionId);
738        // Avoid replaying dup commands
739        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
740            broker.addSession(cs.getContext(), info);
741            try {
742                cs.addSession(info);
743            } catch (IllegalStateException e) {
744                LOG.warn("Failed to add session: {}", info.getSessionId(), e);
745                broker.removeSession(cs.getContext(), info);
746            }
747        }
748        return null;
749    }
750
751    @Override
752    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
753        ConnectionId connectionId = id.getParentId();
754        TransportConnectionState cs = lookupConnectionState(connectionId);
755        if (cs == null) {
756            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
757        }
758        SessionState session = cs.getSessionState(id);
759        if (session == null) {
760            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
761        }
762        // Don't let new consumers or producers get added while we are closing
763        // this down.
764        session.shutdown();
765        // Cascade the connection stop to the consumers and producers.
766        for (ConsumerId consumerId : session.getConsumerIds()) {
767            try {
768                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
769            } catch (Throwable e) {
770                LOG.warn("Failed to remove consumer: {}", consumerId, e);
771            }
772        }
773        for (ProducerId producerId : session.getProducerIds()) {
774            try {
775                processRemoveProducer(producerId);
776            } catch (Throwable e) {
777                LOG.warn("Failed to remove producer: {}", producerId, e);
778            }
779        }
780        cs.removeSession(id);
781        broker.removeSession(cs.getContext(), session.getInfo());
782        return null;
783    }
784
785    @Override
786    public Response processAddConnection(ConnectionInfo info) throws Exception {
787        // Older clients should have been defaulting this field to true.. but
788        // they were not.
789        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
790            info.setClientMaster(true);
791        }
792        TransportConnectionState state;
793        // Make sure 2 concurrent connections by the same ID only generate 1
794        // TransportConnectionState object.
795        synchronized (brokerConnectionStates) {
796            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
797            if (state == null) {
798                state = new TransportConnectionState(info, this);
799                brokerConnectionStates.put(info.getConnectionId(), state);
800            }
801            state.incrementReference();
802        }
803        // If there are 2 concurrent connections for the same connection id,
804        // then last one in wins, we need to sync here
805        // to figure out the winner.
806        synchronized (state.getConnectionMutex()) {
807            if (state.getConnection() != this) {
808                LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress());
809                state.getConnection().stop();
810                LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress());
811                state.setConnection(this);
812                state.reset(info);
813            }
814        }
815        registerConnectionState(info.getConnectionId(), state);
816        LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info });
817        this.faultTolerantConnection = info.isFaultTolerant();
818        // Setup the context.
819        String clientId = info.getClientId();
820        context = new ConnectionContext();
821        context.setBroker(broker);
822        context.setClientId(clientId);
823        context.setClientMaster(info.isClientMaster());
824        context.setConnection(this);
825        context.setConnectionId(info.getConnectionId());
826        context.setConnector(connector);
827        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
828        context.setNetworkConnection(networkConnection);
829        context.setFaultTolerant(faultTolerantConnection);
830        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
831        context.setUserName(info.getUserName());
832        context.setWireFormatInfo(wireFormatInfo);
833        context.setReconnect(info.isFailoverReconnect());
834        this.manageable = info.isManageable();
835        context.setConnectionState(state);
836        state.setContext(context);
837        state.setConnection(this);
838        if (info.getClientIp() == null) {
839            info.setClientIp(getRemoteAddress());
840        }
841
842        try {
843            broker.addConnection(context, info);
844        } catch (Exception e) {
845            synchronized (brokerConnectionStates) {
846                brokerConnectionStates.remove(info.getConnectionId());
847            }
848            unregisterConnectionState(info.getConnectionId());
849            LOG.warn("Failed to add Connection id={}, clientId={} due to {}", info.getConnectionId(), clientId, e);
850            //AMQ-6561 - stop for all exceptions on addConnection
851            // close this down - in case the peer of this transport doesn't play nice
852            delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
853            throw e;
854        }
855        if (info.isManageable()) {
856            // send ConnectionCommand
857            ConnectionControl command = this.connector.getConnectionControl();
858            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
859            if (info.isFailoverReconnect()) {
860                command.setRebalanceConnection(false);
861            }
862            dispatchAsync(command);
863        }
864        return null;
865    }
866
867    @Override
868    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
869            throws InterruptedException {
870        LOG.debug("remove connection id: {}", id);
871        TransportConnectionState cs = lookupConnectionState(id);
872        if (cs != null) {
873            // Don't allow things to be added to the connection state while we
874            // are shutting down.
875            cs.shutdown();
876            // Cascade the connection stop to the sessions.
877            for (SessionId sessionId : cs.getSessionIds()) {
878                try {
879                    processRemoveSession(sessionId, lastDeliveredSequenceId);
880                } catch (Throwable e) {
881                    SERVICELOG.warn("Failed to remove session {}", sessionId, e);
882                }
883            }
884            // Cascade the connection stop to temp destinations.
885            for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
886                DestinationInfo di = iter.next();
887                try {
888                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
889                } catch (Throwable e) {
890                    SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e);
891                }
892                iter.remove();
893            }
894            try {
895                broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
896            } catch (Throwable e) {
897                SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
898            }
899            TransportConnectionState state = unregisterConnectionState(id);
900            if (state != null) {
901                synchronized (brokerConnectionStates) {
902                    // If we are the last reference, we should remove the state
903                    // from the broker.
904                    if (state.decrementReference() == 0) {
905                        brokerConnectionStates.remove(id);
906                    }
907                }
908            }
909        }
910        return null;
911    }
912
913    @Override
914    public Response processProducerAck(ProducerAck ack) throws Exception {
915        // A broker should not get ProducerAck messages.
916        return null;
917    }
918
919    @Override
920    public Connector getConnector() {
921        return connector;
922    }
923
924    @Override
925    public void dispatchSync(Command message) {
926        try {
927            processDispatch(message);
928        } catch (IOException e) {
929            serviceExceptionAsync(e);
930        }
931    }
932
933    @Override
934    public void dispatchAsync(Command message) {
935        if (!stopping.get()) {
936            if (taskRunner == null) {
937                dispatchSync(message);
938            } else {
939                synchronized (dispatchQueue) {
940                    dispatchQueue.add(message);
941                }
942                try {
943                    taskRunner.wakeup();
944                } catch (InterruptedException e) {
945                    Thread.currentThread().interrupt();
946                }
947            }
948        } else {
949            if (message.isMessageDispatch()) {
950                MessageDispatch md = (MessageDispatch) message;
951                TransmitCallback sub = md.getTransmitCallback();
952                broker.postProcessDispatch(md);
953                if (sub != null) {
954                    sub.onFailure();
955                }
956            }
957        }
958    }
959
960    protected void processDispatch(Command command) throws IOException {
961        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
962        try {
963            if (!stopping.get()) {
964                if (messageDispatch != null) {
965                    try {
966                        broker.preProcessDispatch(messageDispatch);
967                    } catch (RuntimeException convertToIO) {
968                        throw new IOException(convertToIO);
969                    }
970                }
971                dispatch(command);
972            }
973        } catch (IOException e) {
974            if (messageDispatch != null) {
975                TransmitCallback sub = messageDispatch.getTransmitCallback();
976                broker.postProcessDispatch(messageDispatch);
977                if (sub != null) {
978                    sub.onFailure();
979                }
980                messageDispatch = null;
981                throw e;
982            } else {
983                if (TRANSPORTLOG.isDebugEnabled()) {
984                    TRANSPORTLOG.debug("Unexpected exception on asyncDispatch, command of type: " + command.getDataStructureType(), e);
985                }
986            }
987        } finally {
988            if (messageDispatch != null) {
989                TransmitCallback sub = messageDispatch.getTransmitCallback();
990                broker.postProcessDispatch(messageDispatch);
991                if (sub != null) {
992                    sub.onSuccess();
993                }
994            }
995        }
996    }
997
998    @Override
999    public boolean iterate() {
1000        try {
1001            if (pendingStop.get() || stopping.get()) {
1002                if (dispatchStopped.compareAndSet(false, true)) {
1003                    if (transportException.get() == null) {
1004                        try {
1005                            dispatch(new ShutdownInfo());
1006                        } catch (Throwable ignore) {
1007                        }
1008                    }
1009                    dispatchStoppedLatch.countDown();
1010                }
1011                return false;
1012            }
1013            if (!dispatchStopped.get()) {
1014                Command command = null;
1015                synchronized (dispatchQueue) {
1016                    if (dispatchQueue.isEmpty()) {
1017                        return false;
1018                    }
1019                    command = dispatchQueue.remove(0);
1020                }
1021                processDispatch(command);
1022                return true;
1023            }
1024            return false;
1025        } catch (IOException e) {
1026            if (dispatchStopped.compareAndSet(false, true)) {
1027                dispatchStoppedLatch.countDown();
1028            }
1029            serviceExceptionAsync(e);
1030            return false;
1031        }
1032    }
1033
1034    /**
1035     * Returns the statistics for this connection
1036     */
1037    @Override
1038    public ConnectionStatistics getStatistics() {
1039        return statistics;
1040    }
1041
1042    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1043        return messageAuthorizationPolicy;
1044    }
1045
1046    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1047        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1048    }
1049
1050    @Override
1051    public boolean isManageable() {
1052        return manageable;
1053    }
1054
1055    @Override
1056    public void start() throws Exception {
1057        try {
1058            synchronized (this) {
1059                starting.set(true);
1060                if (taskRunnerFactory != null) {
1061                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
1062                            + getRemoteAddress());
1063                } else {
1064                    taskRunner = null;
1065                }
1066                transport.start();
1067                active = true;
1068                BrokerInfo info = connector.getBrokerInfo().copy();
1069                if (connector.isUpdateClusterClients()) {
1070                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
1071                } else {
1072                    info.setPeerBrokerInfos(null);
1073                }
1074                dispatchAsync(info);
1075
1076                connector.onStarted(this);
1077            }
1078        } catch (Exception e) {
1079            // Force clean up on an error starting up.
1080            pendingStop.set(true);
1081            throw e;
1082        } finally {
1083            // stop() can be called from within the above block,
1084            // but we want to be sure start() completes before
1085            // stop() runs, so queue the stop until right now:
1086            setStarting(false);
1087            if (isPendingStop()) {
1088                LOG.debug("Calling the delayed stop() after start() {}", this);
1089                stop();
1090            }
1091        }
1092    }
1093
1094    @Override
1095    public void stop() throws Exception {
1096        // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
1097        // as their lifecycle is handled elsewhere
1098
1099        stopAsync();
1100        while (!stopped.await(5, TimeUnit.SECONDS)) {
1101            LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress());
1102        }
1103    }
1104
1105    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
1106        if (waitTime > 0) {
1107            synchronized (this) {
1108                pendingStop.set(true);
1109                transportException.set(cause);
1110            }
1111            try {
1112                stopTaskRunnerFactory.execute(new Runnable() {
1113                    @Override
1114                    public void run() {
1115                        try {
1116                            Thread.sleep(waitTime);
1117                            stopAsync();
1118                            LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason);
1119                        } catch (InterruptedException e) {
1120                        }
1121                    }
1122                });
1123            } catch (Throwable t) {
1124                LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
1125            }
1126        }
1127    }
1128
1129    public void stopAsync(Throwable cause) {
1130        transportException.set(cause);
1131        stopAsync();
1132    }
1133
1134    public void stopAsync() {
1135        // If we're in the middle of starting then go no further... for now.
1136        synchronized (this) {
1137            pendingStop.set(true);
1138            if (starting.get()) {
1139                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
1140                return;
1141            }
1142        }
1143        if (stopping.compareAndSet(false, true)) {
1144            // Let all the connection contexts know we are shutting down
1145            // so that in progress operations can notice and unblock.
1146            List<TransportConnectionState> connectionStates = listConnectionStates();
1147            for (TransportConnectionState cs : connectionStates) {
1148                ConnectionContext connectionContext = cs.getContext();
1149                if (connectionContext != null) {
1150                    connectionContext.getStopping().set(true);
1151                }
1152            }
1153            try {
1154                stopTaskRunnerFactory.execute(new Runnable() {
1155                    @Override
1156                    public void run() {
1157                        serviceLock.writeLock().lock();
1158                        try {
1159                            doStop();
1160                        } catch (Throwable e) {
1161                            LOG.debug("Error occurred while shutting down a connection {}", this, e);
1162                        } finally {
1163                            stopped.countDown();
1164                            serviceLock.writeLock().unlock();
1165                        }
1166                    }
1167                });
1168            } catch (Throwable t) {
1169                LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1170                stopped.countDown();
1171            }
1172        }
1173    }
1174
1175    @Override
1176    public String toString() {
1177        return "Transport Connection to: " + transport.getRemoteAddress();
1178    }
1179
1180    protected void doStop() throws Exception {
1181        LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1182        connector.onStopped(this);
1183        try {
1184            synchronized (this) {
1185                if (duplexBridge != null) {
1186                    duplexBridge.stop();
1187                }
1188            }
1189        } catch (Exception ignore) {
1190            LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1191        }
1192        try {
1193            transport.stop();
1194            LOG.debug("Stopped transport: {}", transport.getRemoteAddress());
1195        } catch (Exception e) {
1196            LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e);
1197        }
1198        if (taskRunner != null) {
1199            taskRunner.shutdown(1);
1200            taskRunner = null;
1201        }
1202        active = false;
1203        // Run the MessageDispatch callbacks so that message references get
1204        // cleaned up.
1205        synchronized (dispatchQueue) {
1206            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1207                Command command = iter.next();
1208                if (command.isMessageDispatch()) {
1209                    MessageDispatch md = (MessageDispatch) command;
1210                    TransmitCallback sub = md.getTransmitCallback();
1211                    broker.postProcessDispatch(md);
1212                    if (sub != null) {
1213                        sub.onFailure();
1214                    }
1215                }
1216            }
1217            dispatchQueue.clear();
1218        }
1219        //
1220        // Remove all logical connection associated with this connection
1221        // from the broker.
1222        if (!broker.isStopped()) {
1223            List<TransportConnectionState> connectionStates = listConnectionStates();
1224            connectionStates = listConnectionStates();
1225            for (TransportConnectionState cs : connectionStates) {
1226                cs.getContext().getStopping().set(true);
1227                try {
1228                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1229                    processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
1230                } catch (Throwable ignore) {
1231                    LOG.debug("Exception caught removing connection {}. This exception is ignored.", cs.getInfo().getConnectionId(), ignore);
1232                }
1233            }
1234        }
1235        LOG.debug("Connection Stopped: {}", getRemoteAddress());
1236    }
1237
1238    /**
1239     * @return Returns the blockedCandidate.
1240     */
1241    public boolean isBlockedCandidate() {
1242        return blockedCandidate;
1243    }
1244
1245    /**
1246     * @param blockedCandidate The blockedCandidate to set.
1247     */
1248    public void setBlockedCandidate(boolean blockedCandidate) {
1249        this.blockedCandidate = blockedCandidate;
1250    }
1251
1252    /**
1253     * @return Returns the markedCandidate.
1254     */
1255    public boolean isMarkedCandidate() {
1256        return markedCandidate;
1257    }
1258
1259    /**
1260     * @param markedCandidate The markedCandidate to set.
1261     */
1262    public void setMarkedCandidate(boolean markedCandidate) {
1263        this.markedCandidate = markedCandidate;
1264        if (!markedCandidate) {
1265            timeStamp = 0;
1266            blockedCandidate = false;
1267        }
1268    }
1269
1270    /**
1271     * @param slow The slow to set.
1272     */
1273    public void setSlow(boolean slow) {
1274        this.slow = slow;
1275    }
1276
1277    /**
1278     * @return true if the Connection is slow
1279     */
1280    @Override
1281    public boolean isSlow() {
1282        return slow;
1283    }
1284
1285    /**
1286     * @return true if the Connection is potentially blocked
1287     */
1288    public boolean isMarkedBlockedCandidate() {
1289        return markedCandidate;
1290    }
1291
1292    /**
1293     * Mark the Connection, so we can deem if it's collectable on the next sweep
1294     */
1295    public void doMark() {
1296        if (timeStamp == 0) {
1297            timeStamp = System.currentTimeMillis();
1298        }
1299    }
1300
1301    /**
1302     * @return if after being marked, the Connection is still writing
1303     */
1304    @Override
1305    public boolean isBlocked() {
1306        return blocked;
1307    }
1308
1309    /**
1310     * @return true if the Connection is connected
1311     */
1312    @Override
1313    public boolean isConnected() {
1314        return connected;
1315    }
1316
1317    /**
1318     * @param blocked The blocked to set.
1319     */
1320    public void setBlocked(boolean blocked) {
1321        this.blocked = blocked;
1322    }
1323
1324    /**
1325     * @param connected The connected to set.
1326     */
1327    public void setConnected(boolean connected) {
1328        this.connected = connected;
1329    }
1330
1331    /**
1332     * @return true if the Connection is active
1333     */
1334    @Override
1335    public boolean isActive() {
1336        return active;
1337    }
1338
1339    /**
1340     * @param active The active to set.
1341     */
1342    public void setActive(boolean active) {
1343        this.active = active;
1344    }
1345
1346    /**
1347     * @return true if the Connection is starting
1348     */
1349    public boolean isStarting() {
1350        return starting.get();
1351    }
1352
1353    @Override
1354    public synchronized boolean isNetworkConnection() {
1355        return networkConnection;
1356    }
1357
1358    @Override
1359    public boolean isFaultTolerantConnection() {
1360        return this.faultTolerantConnection;
1361    }
1362
1363    protected void setStarting(boolean starting) {
1364        this.starting.set(starting);
1365    }
1366
1367    /**
1368     * @return true if the Connection needs to stop
1369     */
1370    public boolean isPendingStop() {
1371        return pendingStop.get();
1372    }
1373
1374    protected void setPendingStop(boolean pendingStop) {
1375        this.pendingStop.set(pendingStop);
1376    }
1377
1378    private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
1379        Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1380        Map<String, String> props = createMap(properties);
1381        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1382        IntrospectionSupport.setProperties(config, props, "");
1383        return config;
1384    }
1385
1386    @Override
1387    public Response processBrokerInfo(BrokerInfo info) {
1388        if (info.isSlaveBroker()) {
1389            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1390        } else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
1391            try {
1392                NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1393                if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1394                    LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1395                    dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
1396                }
1397            } catch (Exception e) {
1398                LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
1399                return null;
1400            }
1401        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1402            // so this TransportConnection is the rear end of a network bridge
1403            // We have been requested to create a two way pipe ...
1404            try {
1405                NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1406                config.setBrokerName(broker.getBrokerName());
1407
1408                if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1409                    LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1410                    dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
1411                }
1412
1413                // check for existing duplex connection hanging about
1414
1415                // We first look if existing network connection already exists for the same broker Id and network connector name
1416                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1417                // and the duplex network connector side wanting to open a new one
1418                // In this case, the old connection must be broken
1419                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1420                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1421                synchronized (connections) {
1422                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1423                        TransportConnection c = iter.next();
1424                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1425                            LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1426                            c.stopAsync();
1427                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1428                            c.getStopped().await(1, TimeUnit.SECONDS);
1429                        }
1430                    }
1431                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1432                }
1433                Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
1434                Transport remoteBridgeTransport = transport;
1435                if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
1436                    // the vm transport case is already wrapped
1437                    remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
1438                }
1439                String duplexName = localTransport.toString();
1440                if (duplexName.contains("#")) {
1441                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1442                }
1443                MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
1444                listener.setCreatedByDuplex(true);
1445                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1446                duplexBridge.setBrokerService(brokerService);
1447                //Need to set durableDestinations to properly restart subs when dynamicOnly=false
1448                duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(
1449                        broker.getDurableDestinations()));
1450
1451                // now turn duplex off this side
1452                info.setDuplexConnection(false);
1453                duplexBridge.setCreatedByDuplex(true);
1454                duplexBridge.duplexStart(this, brokerInfo, info);
1455                LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1456                return null;
1457            } catch (TransportDisposedIOException e) {
1458                LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1459                return null;
1460            } catch (Exception e) {
1461                LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1462                return null;
1463            }
1464        }
1465        // We only expect to get one broker info command per connection
1466        if (this.brokerInfo != null) {
1467            LOG.warn("Unexpected extra broker info command received: {}", info);
1468        }
1469        this.brokerInfo = info;
1470        networkConnection = true;
1471        List<TransportConnectionState> connectionStates = listConnectionStates();
1472        for (TransportConnectionState cs : connectionStates) {
1473            cs.getContext().setNetworkConnection(true);
1474        }
1475        return null;
1476    }
1477
1478    @SuppressWarnings({"unchecked", "rawtypes"})
1479    private HashMap<String, String> createMap(Properties properties) {
1480        return new HashMap(properties);
1481    }
1482
1483    protected void dispatch(Command command) throws IOException {
1484        try {
1485            setMarkedCandidate(true);
1486            transport.oneway(command);
1487        } finally {
1488            setMarkedCandidate(false);
1489        }
1490    }
1491
1492    @Override
1493    public String getRemoteAddress() {
1494        return transport.getRemoteAddress();
1495    }
1496
1497    public Transport getTransport() {
1498        return transport;
1499    }
1500
1501    @Override
1502    public String getConnectionId() {
1503        List<TransportConnectionState> connectionStates = listConnectionStates();
1504        for (TransportConnectionState cs : connectionStates) {
1505            if (cs.getInfo().getClientId() != null) {
1506                return cs.getInfo().getClientId();
1507            }
1508            return cs.getInfo().getConnectionId().toString();
1509        }
1510        return null;
1511    }
1512
1513    @Override
1514    public void updateClient(ConnectionControl control) {
1515        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1516                && this.wireFormatInfo.getVersion() >= 6) {
1517            dispatchAsync(control);
1518        }
1519    }
1520
1521    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
1522        ProducerBrokerExchange result = null;
1523        if (producerInfo != null && producerInfo.getProducerId() != null){
1524            synchronized (producerExchanges){
1525                result = producerExchanges.get(producerInfo.getProducerId());
1526            }
1527        }
1528        return result;
1529    }
1530
1531    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1532        ProducerBrokerExchange result = producerExchanges.get(id);
1533        if (result == null) {
1534            synchronized (producerExchanges) {
1535                result = new ProducerBrokerExchange();
1536                TransportConnectionState state = lookupConnectionState(id);
1537                context = state.getContext();
1538                result.setConnectionContext(context);
1539                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1540                    result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id));
1541                }
1542                SessionState ss = state.getSessionState(id.getParentId());
1543                if (ss != null) {
1544                    result.setProducerState(ss.getProducerState(id));
1545                    ProducerState producerState = ss.getProducerState(id);
1546                    if (producerState != null && producerState.getInfo() != null) {
1547                        ProducerInfo info = producerState.getInfo();
1548                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1549                    }
1550                }
1551                producerExchanges.put(id, result);
1552            }
1553        } else {
1554            context = result.getConnectionContext();
1555        }
1556        return result;
1557    }
1558
1559    private void removeProducerBrokerExchange(ProducerId id) {
1560        synchronized (producerExchanges) {
1561            producerExchanges.remove(id);
1562        }
1563    }
1564
1565    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1566        ConsumerBrokerExchange result = consumerExchanges.get(id);
1567        return result;
1568    }
1569
1570    private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) {
1571        ConsumerBrokerExchange result = consumerExchanges.get(id);
1572        if (result == null) {
1573            synchronized (consumerExchanges) {
1574                result = new ConsumerBrokerExchange();
1575                context = connectionState.getContext();
1576                result.setConnectionContext(context);
1577                SessionState ss = connectionState.getSessionState(id.getParentId());
1578                if (ss != null) {
1579                    ConsumerState cs = ss.getConsumerState(id);
1580                    if (cs != null) {
1581                        ConsumerInfo info = cs.getInfo();
1582                        if (info != null) {
1583                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1584                                result.setWildcard(true);
1585                            }
1586                        }
1587                    }
1588                }
1589                consumerExchanges.put(id, result);
1590            }
1591        }
1592        return result;
1593    }
1594
1595    private void removeConsumerBrokerExchange(ConsumerId id) {
1596        synchronized (consumerExchanges) {
1597            consumerExchanges.remove(id);
1598        }
1599    }
1600
1601    public int getProtocolVersion() {
1602        return protocolVersion.get();
1603    }
1604
1605    @Override
1606    public Response processControlCommand(ControlCommand command) throws Exception {
1607        return null;
1608    }
1609
1610    @Override
1611    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1612        return null;
1613    }
1614
1615    @Override
1616    public Response processConnectionControl(ConnectionControl control) throws Exception {
1617        if (control != null) {
1618            faultTolerantConnection = control.isFaultTolerant();
1619        }
1620        return null;
1621    }
1622
1623    @Override
1624    public Response processConnectionError(ConnectionError error) throws Exception {
1625        return null;
1626    }
1627
1628    @Override
1629    public Response processConsumerControl(ConsumerControl control) throws Exception {
1630        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1631        broker.processConsumerControl(consumerExchange, control);
1632        return null;
1633    }
1634
1635    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1636                                                                            TransportConnectionState state) {
1637        TransportConnectionState cs = null;
1638        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1639            // swap implementations
1640            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1641            newRegister.intialize(connectionStateRegister);
1642            connectionStateRegister = newRegister;
1643        }
1644        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1645        return cs;
1646    }
1647
1648    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1649        return connectionStateRegister.unregisterConnectionState(connectionId);
1650    }
1651
1652    protected synchronized List<TransportConnectionState> listConnectionStates() {
1653        return connectionStateRegister.listConnectionStates();
1654    }
1655
1656    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1657        return connectionStateRegister.lookupConnectionState(connectionId);
1658    }
1659
1660    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1661        return connectionStateRegister.lookupConnectionState(id);
1662    }
1663
1664    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1665        return connectionStateRegister.lookupConnectionState(id);
1666    }
1667
1668    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1669        return connectionStateRegister.lookupConnectionState(id);
1670    }
1671
1672    // public only for testing
1673    public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1674        return connectionStateRegister.lookupConnectionState(connectionId);
1675    }
1676
1677    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1678        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1679    }
1680
1681    protected synchronized String getDuplexNetworkConnectorId() {
1682        return this.duplexNetworkConnectorId;
1683    }
1684
1685    public boolean isStopping() {
1686        return stopping.get();
1687    }
1688
1689    protected CountDownLatch getStopped() {
1690        return stopped;
1691    }
1692
1693    private int getProducerCount(ConnectionId connectionId) {
1694        int result = 0;
1695        TransportConnectionState cs = lookupConnectionState(connectionId);
1696        if (cs != null) {
1697            for (SessionId sessionId : cs.getSessionIds()) {
1698                SessionState sessionState = cs.getSessionState(sessionId);
1699                if (sessionState != null) {
1700                    result += sessionState.getProducerIds().size();
1701                }
1702            }
1703        }
1704        return result;
1705    }
1706
1707    private int getConsumerCount(ConnectionId connectionId) {
1708        int result = 0;
1709        TransportConnectionState cs = lookupConnectionState(connectionId);
1710        if (cs != null) {
1711            for (SessionId sessionId : cs.getSessionIds()) {
1712                SessionState sessionState = cs.getSessionState(sessionId);
1713                if (sessionState != null) {
1714                    result += sessionState.getConsumerIds().size();
1715                }
1716            }
1717        }
1718        return result;
1719    }
1720
1721    public WireFormatInfo getRemoteWireFormatInfo() {
1722        return wireFormatInfo;
1723    }
1724
1725    /* (non-Javadoc)
1726     * @see org.apache.activemq.state.CommandVisitor#processBrokerSubscriptionInfo(org.apache.activemq.command.BrokerSubscriptionInfo)
1727     */
1728    @Override
1729    public Response processBrokerSubscriptionInfo(BrokerSubscriptionInfo info) throws Exception {
1730        return null;
1731    }
1732}