001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Properties;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicBoolean;
039
040import javax.management.ObjectName;
041
042import org.apache.activemq.DestinationDoesNotExistException;
043import org.apache.activemq.Service;
044import org.apache.activemq.advisory.AdvisoryBroker;
045import org.apache.activemq.advisory.AdvisorySupport;
046import org.apache.activemq.broker.BrokerService;
047import org.apache.activemq.broker.BrokerServiceAware;
048import org.apache.activemq.broker.ConnectionContext;
049import org.apache.activemq.broker.TransportConnection;
050import org.apache.activemq.broker.region.AbstractRegion;
051import org.apache.activemq.broker.region.DurableTopicSubscription;
052import org.apache.activemq.broker.region.Region;
053import org.apache.activemq.broker.region.RegionBroker;
054import org.apache.activemq.broker.region.Subscription;
055import org.apache.activemq.broker.region.policy.PolicyEntry;
056import org.apache.activemq.command.ActiveMQDestination;
057import org.apache.activemq.command.ActiveMQMessage;
058import org.apache.activemq.command.ActiveMQTempDestination;
059import org.apache.activemq.command.ActiveMQTopic;
060import org.apache.activemq.command.BrokerId;
061import org.apache.activemq.command.BrokerInfo;
062import org.apache.activemq.command.BrokerSubscriptionInfo;
063import org.apache.activemq.command.Command;
064import org.apache.activemq.command.CommandTypes;
065import org.apache.activemq.command.ConnectionError;
066import org.apache.activemq.command.ConnectionId;
067import org.apache.activemq.command.ConnectionInfo;
068import org.apache.activemq.command.ConsumerId;
069import org.apache.activemq.command.ConsumerInfo;
070import org.apache.activemq.command.DataStructure;
071import org.apache.activemq.command.DestinationInfo;
072import org.apache.activemq.command.ExceptionResponse;
073import org.apache.activemq.command.KeepAliveInfo;
074import org.apache.activemq.command.Message;
075import org.apache.activemq.command.MessageAck;
076import org.apache.activemq.command.MessageDispatch;
077import org.apache.activemq.command.MessageId;
078import org.apache.activemq.command.NetworkBridgeFilter;
079import org.apache.activemq.command.ProducerInfo;
080import org.apache.activemq.command.RemoveInfo;
081import org.apache.activemq.command.RemoveSubscriptionInfo;
082import org.apache.activemq.command.Response;
083import org.apache.activemq.command.SessionInfo;
084import org.apache.activemq.command.ShutdownInfo;
085import org.apache.activemq.command.SubscriptionInfo;
086import org.apache.activemq.command.WireFormatInfo;
087import org.apache.activemq.filter.DestinationFilter;
088import org.apache.activemq.filter.MessageEvaluationContext;
089import org.apache.activemq.security.SecurityContext;
090import org.apache.activemq.transport.DefaultTransportListener;
091import org.apache.activemq.transport.FutureResponse;
092import org.apache.activemq.transport.ResponseCallback;
093import org.apache.activemq.transport.Transport;
094import org.apache.activemq.transport.TransportDisposedIOException;
095import org.apache.activemq.transport.TransportFilter;
096import org.apache.activemq.transport.failover.FailoverTransport;
097import org.apache.activemq.transport.tcp.SslTransport;
098import org.apache.activemq.transport.tcp.TcpTransport;
099import org.apache.activemq.util.IdGenerator;
100import org.apache.activemq.util.IntrospectionSupport;
101import org.apache.activemq.util.LongSequenceGenerator;
102import org.apache.activemq.util.MarshallingSupport;
103import org.apache.activemq.util.NetworkBridgeUtils;
104import org.apache.activemq.util.ServiceStopper;
105import org.apache.activemq.util.ServiceSupport;
106import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
107import org.slf4j.Logger;
108import org.slf4j.LoggerFactory;
109
110/**
111 * A useful base class for implementing demand forwarding bridges.
112 */
113public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
114    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
115    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
116    protected final Transport localBroker;
117    protected final Transport remoteBroker;
118    protected IdGenerator idGenerator = new IdGenerator();
119    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
120    protected ConnectionInfo localConnectionInfo;
121    protected ConnectionInfo remoteConnectionInfo;
122    protected SessionInfo localSessionInfo;
123    protected ProducerInfo producerInfo;
124    protected String remoteBrokerName = "Unknown";
125    protected String localClientId;
126    protected ConsumerInfo demandConsumerInfo;
127    protected int demandConsumerDispatched;
128    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
129    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
130    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
131    protected final AtomicBoolean disposed = new AtomicBoolean();
132    protected BrokerId localBrokerId;
133    protected ActiveMQDestination[] excludedDestinations;
134    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
135    protected ActiveMQDestination[] staticallyIncludedDestinations;
136    protected ActiveMQDestination[] durableDestinations;
137    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
138    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
139    protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
140    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
141    protected final CountDownLatch startedLatch = new CountDownLatch(2);
142    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
143    protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1);
144    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
145    protected NetworkBridgeConfiguration configuration;
146    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
147
148    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
149    protected BrokerId remoteBrokerId;
150
151    protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
152
153    private NetworkBridgeListener networkBridgeListener;
154    private boolean createdByDuplex;
155    private BrokerInfo localBrokerInfo;
156    private BrokerInfo remoteBrokerInfo;
157
158    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
159    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
160
161    private final AtomicBoolean started = new AtomicBoolean();
162    private TransportConnection duplexInitiatingConnection;
163    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
164    protected BrokerService brokerService = null;
165    private ObjectName mbeanObjectName;
166    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
167    //Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads
168    private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
169    private Transport duplexInboundLocalBroker = null;
170    private ProducerInfo duplexInboundLocalProducerInfo;
171
172    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
173        this.configuration = configuration;
174        this.localBroker = localBroker;
175        this.remoteBroker = remoteBroker;
176    }
177
178    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
179        this.localBrokerInfo = localBrokerInfo;
180        this.remoteBrokerInfo = remoteBrokerInfo;
181        this.duplexInitiatingConnection = connection;
182        start();
183        serviceRemoteCommand(remoteBrokerInfo);
184    }
185
186    @Override
187    public void start() throws Exception {
188        if (started.compareAndSet(false, true)) {
189
190            if (brokerService == null) {
191                throw new IllegalArgumentException("BrokerService is null on " + this);
192            }
193
194            networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
195
196            if (isDuplex()) {
197                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
198                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
199
200                    @Override
201                    public void onCommand(Object o) {
202                        Command command = (Command) o;
203                        serviceLocalCommand(command);
204                    }
205
206                    @Override
207                    public void onException(IOException error) {
208                        serviceLocalException(error);
209                    }
210                });
211                duplexInboundLocalBroker.start();
212            }
213
214            localBroker.setTransportListener(new DefaultTransportListener() {
215
216                @Override
217                public void onCommand(Object o) {
218                    Command command = (Command) o;
219                    serviceLocalCommand(command);
220                }
221
222                @Override
223                public void onException(IOException error) {
224                    if (!futureLocalBrokerInfo.isDone()) {
225                        LOG.info("error with pending local brokerInfo on: " + localBroker, error);
226                        futureLocalBrokerInfo.cancel(true);
227                        return;
228                    }
229                    serviceLocalException(error);
230                }
231            });
232
233            remoteBroker.setTransportListener(new DefaultTransportListener() {
234
235                @Override
236                public void onCommand(Object o) {
237                    Command command = (Command) o;
238                    serviceRemoteCommand(command);
239                }
240
241                @Override
242                public void onException(IOException error) {
243                    if (!futureRemoteBrokerInfo.isDone()) {
244                        LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error);
245                        futureRemoteBrokerInfo.cancel(true);
246                        return;
247                    }
248                    serviceRemoteException(error);
249                }
250            });
251
252            remoteBroker.start();
253            localBroker.start();
254
255            if (!disposed.get()) {
256                try {
257                    triggerStartAsyncNetworkBridgeCreation();
258                } catch (IOException e) {
259                    LOG.warn("Caught exception from remote start", e);
260                }
261            } else {
262                LOG.warn("Bridge was disposed before the start() method was fully executed.");
263                throw new TransportDisposedIOException();
264            }
265        }
266    }
267
268    @Override
269    public void stop() throws Exception {
270        if (started.compareAndSet(true, false)) {
271            if (disposed.compareAndSet(false, true)) {
272                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
273
274                futureRemoteBrokerInfo.cancel(true);
275                futureLocalBrokerInfo.cancel(true);
276
277                NetworkBridgeListener l = this.networkBridgeListener;
278                if (l != null) {
279                    l.onStop(this);
280                }
281                try {
282                    // local start complete
283                    if (startedLatch.getCount() < 2) {
284                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
285                                configuration.getBrokerName(), this, remoteBrokerName
286                        });
287                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
288                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
289                    }
290
291                    remoteBridgeStarted.set(false);
292                    final CountDownLatch sendShutdown = new CountDownLatch(1);
293
294                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
295                        @Override
296                        public void run() {
297                            try {
298                                serialExecutor.shutdown();
299                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
300                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
301                                    LOG.info("pending tasks on stop {}", pendingTasks);
302                                }
303                                //Shutdown the syncExecutor, call countDown to make sure a thread can
304                                //terminate if it is waiting
305                                staticDestinationsLatch.countDown();
306                                syncExecutor.shutdown();
307                                if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
308                                    List<Runnable> pendingTasks = syncExecutor.shutdownNow();
309                                    LOG.info("pending tasks on stop {}", pendingTasks);
310                                }
311                                localBroker.oneway(new ShutdownInfo());
312                                remoteBroker.oneway(new ShutdownInfo());
313                            } catch (Throwable e) {
314                                LOG.debug("Caught exception sending shutdown", e);
315                            } finally {
316                                sendShutdown.countDown();
317                            }
318
319                        }
320                    }, "ActiveMQ ForwardingBridge StopTask");
321
322                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
323                        LOG.info("Network Could not shutdown in a timely manner");
324                    }
325                } finally {
326                    ServiceStopper ss = new ServiceStopper();
327                    stopFailoverTransport(remoteBroker);
328                    ss.stop(remoteBroker);
329                    ss.stop(localBroker);
330                    ss.stop(duplexInboundLocalBroker);
331                    // Release the started Latch since another thread could be
332                    // stuck waiting for it to start up.
333                    startedLatch.countDown();
334                    startedLatch.countDown();
335                    localStartedLatch.countDown();
336                    staticDestinationsLatch.countDown();
337
338                    ss.throwFirstException();
339                }
340            }
341
342            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
343        }
344    }
345
346    private void stopFailoverTransport(Transport transport) {
347        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
348        if (failoverTransport != null) {
349            // may be blocked on write, in which case stop will block
350            try {
351                failoverTransport.handleTransportFailure(new IOException("Bridge stopped"));
352            } catch (InterruptedException ignored) {}
353        }
354    }
355
356    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
357        brokerService.getTaskRunnerFactory().execute(new Runnable() {
358            @Override
359            public void run() {
360                final String originalName = Thread.currentThread().getName();
361                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
362                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
363
364                try {
365                    // First we collect the info data from both the local and remote ends
366                    collectBrokerInfos();
367
368                    // Once we have all required broker info we can attempt to start
369                    // the local and then remote sides of the bridge.
370                    doStartLocalAndRemoteBridges();
371                } finally {
372                    Thread.currentThread().setName(originalName);
373                }
374            }
375        });
376    }
377
378    private void collectBrokerInfos() {
379        int timeout = 30000;
380        TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class);
381        if (tcpTransport != null) {
382           timeout = tcpTransport.getConnectionTimeout();
383        }
384
385        // First wait for the remote to feed us its BrokerInfo, then we can check on
386        // the LocalBrokerInfo and decide is this is a loop.
387        try {
388            remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
389            if (remoteBrokerInfo == null) {
390                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
391                return;
392            }
393        } catch (Exception e) {
394            serviceRemoteException(e);
395            return;
396        }
397
398        try {
399            localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
400            if (localBrokerInfo == null) {
401                serviceLocalException(new Throwable("localBrokerInfo is null"));
402                return;
403            }
404
405            // Before we try and build the bridge lets check if we are in a loop
406            // and if so just stop now before registering anything.
407            remoteBrokerId = remoteBrokerInfo.getBrokerId();
408            if (localBrokerId.equals(remoteBrokerId)) {
409                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
410                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
411                });
412                ServiceSupport.dispose(localBroker);
413                ServiceSupport.dispose(remoteBroker);
414                // the bridge is left in a bit of limbo, but it won't get retried
415                // in this state.
416                return;
417            }
418
419            // Fill in the remote broker's information now.
420            remoteBrokerPath[0] = remoteBrokerId;
421            remoteBrokerName = remoteBrokerInfo.getBrokerName();
422            if (configuration.isUseBrokerNamesAsIdSeed()) {
423                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
424            }
425        } catch (Throwable e) {
426            serviceLocalException(e);
427        }
428    }
429
430    private void doStartLocalAndRemoteBridges() {
431
432        if (disposed.get()) {
433            return;
434        }
435
436        if (isCreatedByDuplex()) {
437            // apply remote (propagated) configuration to local duplex bridge before start
438            Properties props = null;
439            try {
440                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
441                IntrospectionSupport.getProperties(configuration, props, null);
442                if (configuration.getExcludedDestinations() != null) {
443                    excludedDestinations = configuration.getExcludedDestinations().toArray(
444                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
445                }
446                if (configuration.getStaticallyIncludedDestinations() != null) {
447                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
448                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
449                }
450                if (configuration.getDynamicallyIncludedDestinations() != null) {
451                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
452                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
453                }
454            } catch (Throwable t) {
455                LOG.error("Error mapping remote configuration: {}", props, t);
456            }
457        }
458
459        try {
460            startLocalBridge();
461        } catch (Throwable e) {
462            serviceLocalException(e);
463            return;
464        }
465
466        try {
467            startRemoteBridge();
468        } catch (Throwable e) {
469            serviceRemoteException(e);
470            return;
471        }
472
473        try {
474            if (safeWaitUntilStarted()) {
475                setupStaticDestinations();
476                staticDestinationsLatch.countDown();
477            }
478        } catch (Throwable e) {
479            serviceLocalException(e);
480        }
481    }
482
483    private void startLocalBridge() throws Throwable {
484        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
485            synchronized (this) {
486                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
487                if (!disposed.get()) {
488
489                    if (idGenerator == null) {
490                        throw new IllegalStateException("Id Generator cannot be null");
491                    }
492
493                    localConnectionInfo = new ConnectionInfo();
494                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
495                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
496                    localConnectionInfo.setClientId(localClientId);
497                    localConnectionInfo.setUserName(configuration.getUserName());
498                    localConnectionInfo.setPassword(configuration.getPassword());
499                    Transport originalTransport = remoteBroker;
500                    while (originalTransport instanceof TransportFilter) {
501                        originalTransport = ((TransportFilter) originalTransport).getNext();
502                    }
503                    if (originalTransport instanceof TcpTransport) {
504                        X509Certificate[] peerCerts = originalTransport.getPeerCertificates();
505                        localConnectionInfo.setTransportContext(peerCerts);
506                    }
507                    // sync requests that may fail
508                    Object resp = localBroker.request(localConnectionInfo);
509                    if (resp instanceof ExceptionResponse) {
510                        throw ((ExceptionResponse) resp).getException();
511                    }
512                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
513                    localBroker.oneway(localSessionInfo);
514
515                    if (configuration.isDuplex()) {
516                        // separate in-bound channel for forwards so we don't
517                        // contend with out-bound dispatch on same connection
518                        remoteBrokerInfo.setNetworkConnection(true);
519                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
520
521                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
522                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
523                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
524                                + configuration.getBrokerName());
525                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
526                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
527
528                        if (originalTransport instanceof TcpTransport) {
529                            X509Certificate[] peerCerts = originalTransport.getPeerCertificates();
530                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
531                        }
532                        // sync requests that may fail
533                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
534                        if (resp instanceof ExceptionResponse) {
535                            throw ((ExceptionResponse) resp).getException();
536                        }
537                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
538                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
539                        duplexInboundLocalBroker.oneway(duplexInboundSession);
540                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
541                    }
542                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
543                    NetworkBridgeListener l = this.networkBridgeListener;
544                    if (l != null) {
545                        l.onStart(this);
546                    }
547
548                    // Let the local broker know the remote broker's ID.
549                    localBroker.oneway(remoteBrokerInfo);
550                    // new peer broker (a consumer can work with remote broker also)
551                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
552
553                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
554                            localBroker, remoteBroker, remoteBrokerName
555                    });
556                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
557                            configuration.getBrokerName(), this, remoteBrokerName
558                    });
559                } else {
560                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
561                }
562                startedLatch.countDown();
563                localStartedLatch.countDown();
564            }
565        }
566    }
567
568    protected void startRemoteBridge() throws Exception {
569        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
570            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
571            synchronized (this) {
572                if (!isCreatedByDuplex()) {
573                    BrokerInfo brokerInfo = new BrokerInfo();
574                    brokerInfo.setBrokerName(configuration.getBrokerName());
575                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
576                    brokerInfo.setNetworkConnection(true);
577                    brokerInfo.setDuplexConnection(configuration.isDuplex());
578                    // set our properties
579                    Properties props = new Properties();
580                    IntrospectionSupport.getProperties(configuration, props, null);
581
582                    String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
583                    String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
584
585                    if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
586                        props.put(dynamicallyIncludedDestinationsKey,
587                                StringToListOfActiveMQDestinationConverter.
588                                convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
589                    }
590                    if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
591                        props.put(staticallyIncludedDestinationsKey,
592                                StringToListOfActiveMQDestinationConverter.
593                                convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
594                    }
595
596                    props.remove("networkTTL");
597                    String str = MarshallingSupport.propertiesToString(props);
598                    brokerInfo.setNetworkProperties(str);
599                    brokerInfo.setBrokerId(this.localBrokerId);
600                    remoteBroker.oneway(brokerInfo);
601                    if (configuration.isSyncDurableSubs() &&
602                            remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
603                        remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
604                                configuration));
605                    }
606                }
607                if (remoteConnectionInfo != null) {
608                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
609                }
610                remoteConnectionInfo = new ConnectionInfo();
611                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
612                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
613                remoteConnectionInfo.setUserName(configuration.getUserName());
614                remoteConnectionInfo.setPassword(configuration.getPassword());
615                remoteBroker.oneway(remoteConnectionInfo);
616
617                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
618                remoteBroker.oneway(remoteSessionInfo);
619                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
620                producerInfo.setResponseRequired(false);
621                remoteBroker.oneway(producerInfo);
622                // Listen to consumer advisory messages on the remote broker to determine demand.
623                if (!configuration.isStaticBridge()) {
624                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
625                    // always dispatch advisory message asynchronously so that
626                    // we never block the producer broker if we are slow
627                    demandConsumerInfo.setDispatchAsync(true);
628                    String advisoryTopic = configuration.getDestinationFilter();
629                    if (configuration.isBridgeTempDestinations()) {
630                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
631                    }
632                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
633                    configureConsumerPrefetch(demandConsumerInfo);
634                    remoteBroker.oneway(demandConsumerInfo);
635                }
636                startedLatch.countDown();
637            }
638        }
639    }
640
641    @Override
642    public void serviceRemoteException(Throwable error) {
643        if (!disposed.get()) {
644            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
645                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
646                        localBroker, remoteBroker, error
647                });
648            } else {
649                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
650                        localBroker, remoteBroker, error
651                });
652            }
653            LOG.debug("The remote Exception was: {}", error, error);
654            brokerService.getTaskRunnerFactory().execute(new Runnable() {
655                @Override
656                public void run() {
657                    ServiceSupport.dispose(getControllingService());
658                }
659            });
660            fireBridgeFailed(error);
661        }
662    }
663
664    protected void serviceRemoteCommand(Command command) {
665        if (!disposed.get()) {
666            try {
667                if (command.isMessageDispatch()) {
668                    safeWaitUntilStarted();
669                    MessageDispatch md = (MessageDispatch) command;
670                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
671                    ackAdvisory(md.getMessage());
672                } else if (command.isBrokerInfo()) {
673                    futureRemoteBrokerInfo.set((BrokerInfo) command);
674                } else if (command instanceof BrokerSubscriptionInfo) {
675                    final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
676
677                    //Start in a new thread so we don't block the transport waiting for staticDestinations
678                    syncExecutor.execute(new Runnable() {
679
680                        @Override
681                        public void run() {
682                            try {
683                                staticDestinationsLatch.await();
684                                //Make sure after the countDown of staticDestinationsLatch we aren't stopping
685                                if (!disposed.get()) {
686                                    BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
687                                    LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
688                                            brokerService.getBrokerName(), subInfo.getBrokerName());
689
690                                    if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
691                                            && !configuration.isDynamicOnly()) {
692                                        if (started.get()) {
693                                            if (subInfo.getSubscriptionInfos() != null) {
694                                                for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
695                                                    //re-add any process any non-NC consumers that match the
696                                                    //dynamicallyIncludedDestinations list
697                                                    if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
698                                                            NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
699                                                        serviceRemoteConsumerAdvisory(info);
700                                                    }
701                                                }
702                                            }
703
704                                            //After re-added, clean up any empty durables
705                                            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
706                                                DemandSubscription ds = i.next();
707                                                if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
708                                                    cleanupDurableSub(ds, i);
709                                                }
710                                            }
711                                        }
712                                    }
713                                }
714                            } catch (Exception e) {
715                                LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
716                                LOG.debug(e.getMessage(), e);
717                            }
718                        }
719                    });
720
721                } else if (command.getClass() == ConnectionError.class) {
722                    ConnectionError ce = (ConnectionError) command;
723                    serviceRemoteException(ce.getException());
724                } else {
725                    if (isDuplex()) {
726                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
727                        if (command.isMessage()) {
728                            final ActiveMQMessage message = (ActiveMQMessage) command;
729                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
730                                serviceRemoteConsumerAdvisory(message.getDataStructure());
731                                ackAdvisory(message);
732                            } else {
733                                if (!isPermissableDestination(message.getDestination(), true)) {
734                                    return;
735                                }
736                                // message being forwarded - we need to
737                                // propagate the response to our local send
738                                if (canDuplexDispatch(message)) {
739                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
740                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
741                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
742                                            final int correlationId = message.getCommandId();
743
744                                            @Override
745                                            public void onCompletion(FutureResponse resp) {
746                                                try {
747                                                    Response reply = resp.getResult();
748                                                    reply.setCorrelationId(correlationId);
749                                                    remoteBroker.oneway(reply);
750                                                    //increment counter when messages are received in duplex mode
751                                                    networkBridgeStatistics.getReceivedCount().increment();
752                                                } catch (IOException error) {
753                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
754                                                    serviceRemoteException(error);
755                                                }
756                                            }
757                                        });
758                                    } else {
759                                        duplexInboundLocalBroker.oneway(message);
760                                        networkBridgeStatistics.getReceivedCount().increment();
761                                    }
762                                    serviceInboundMessage(message);
763                                } else {
764                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
765                                        Response reply = new Response();
766                                        reply.setCorrelationId(message.getCommandId());
767                                        remoteBroker.oneway(reply);
768                                    }
769                                }
770                            }
771                        } else {
772                            switch (command.getDataStructureType()) {
773                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
774                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
775                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
776                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
777                                    } else {
778                                        localBroker.oneway(command);
779                                    }
780                                    break;
781                                case SessionInfo.DATA_STRUCTURE_TYPE:
782                                    localBroker.oneway(command);
783                                    break;
784                                case ProducerInfo.DATA_STRUCTURE_TYPE:
785                                    // using duplexInboundLocalProducerInfo
786                                    break;
787                                case MessageAck.DATA_STRUCTURE_TYPE:
788                                    MessageAck ack = (MessageAck) command;
789                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
790                                    if (localSub != null) {
791                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
792                                        localBroker.oneway(ack);
793                                    } else {
794                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
795                                    }
796                                    break;
797                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
798                                    localStartedLatch.await();
799                                    if (started.get()) {
800                                        final ConsumerInfo consumerInfo = (ConsumerInfo) command;
801                                        if (isDuplicateSuppressionOff(consumerInfo)) {
802                                            addConsumerInfo(consumerInfo);
803                                        } else {
804                                            synchronized (brokerService.getVmConnectorURI()) {
805                                                addConsumerInfo(consumerInfo);
806                                            }
807                                        }
808                                    } else {
809                                        // received a subscription whilst stopping
810                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
811                                    }
812                                    break;
813                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
814                                    // initiator is shutting down, controlled case
815                                    // abortive close dealt with by inactivity monitor
816                                    LOG.info("Stopping network bridge on shutdown of remote broker");
817                                    serviceRemoteException(new IOException(command.toString()));
818                                    break;
819                                default:
820                                    LOG.debug("Ignoring remote command: {}", command);
821                            }
822                        }
823                    } else {
824                        switch (command.getDataStructureType()) {
825                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
826                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
827                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
828                                break;
829                            default:
830                                LOG.warn("Unexpected remote command: {}", command);
831                        }
832                    }
833                }
834            } catch (Throwable e) {
835                LOG.debug("Exception processing remote command: {}", command, e);
836                serviceRemoteException(e);
837            }
838        }
839    }
840
841    private void ackAdvisory(Message message) throws IOException {
842        demandConsumerDispatched++;
843        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
844                (configuration.getAdvisoryAckPercentage() / 100f))) {
845            final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
846            ack.setConsumerId(demandConsumerInfo.getConsumerId());
847            brokerService.getTaskRunnerFactory().execute(new Runnable() {
848                @Override
849                public void run() {
850                    try {
851                        remoteBroker.oneway(ack);
852                    } catch (IOException e) {
853                        LOG.warn("Failed to send advisory ack " + ack, e);
854                    }
855                }
856            });
857            demandConsumerDispatched = 0;
858        }
859    }
860
861    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
862        final int networkTTL = configuration.getConsumerTTL();
863        if (data.getClass() == ConsumerInfo.class) {
864            // Create a new local subscription
865            ConsumerInfo info = (ConsumerInfo) data;
866            BrokerId[] path = info.getBrokerPath();
867
868            if (info.isBrowser()) {
869                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
870                return;
871            }
872
873            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
874                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
875                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
876                });
877                return;
878            }
879
880            if (contains(path, localBrokerPath[0])) {
881                // Ignore this consumer as it's a consumer we locally sent to the broker.
882                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
883                        configuration.getBrokerName(), remoteBrokerName, info
884                });
885                return;
886            }
887
888            if (!isPermissableDestination(info.getDestination())) {
889                // ignore if not in the permitted or in the excluded list
890                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
891                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
892                });
893                return;
894            }
895
896            // in a cyclic network there can be multiple bridges per broker that can propagate
897            // a network subscription so there is a need to synchronize on a shared entity
898            // if duplicate suppression is required
899            if (isDuplicateSuppressionOff(info)) {
900                addConsumerInfo(info);
901            } else {
902                synchronized (brokerService.getVmConnectorURI()) {
903                    addConsumerInfo(info);
904                }
905            }
906        } else if (data.getClass() == DestinationInfo.class) {
907            // It's a destination info - we want to pass up information about temporary destinations
908            final DestinationInfo destInfo = (DestinationInfo) data;
909            BrokerId[] path = destInfo.getBrokerPath();
910            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
911                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
912                        configuration.getBrokerName(), destInfo, networkTTL
913                });
914                return;
915            }
916            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
917                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
918                return;
919            }
920            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
921            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
922                // re-set connection id so comes from here
923                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
924                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
925            }
926            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
927            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
928                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
929            });
930            if (destInfo.isRemoveOperation()) {
931                // Serialize with removeSub operations such that all removeSub advisories
932                // are generated
933                serialExecutor.execute(new Runnable() {
934                    @Override
935                    public void run() {
936                        try {
937                            localBroker.oneway(destInfo);
938                        } catch (IOException e) {
939                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
940                        }
941                    }
942                });
943            } else {
944                localBroker.oneway(destInfo);
945            }
946        } else if (data.getClass() == RemoveInfo.class) {
947            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
948            removeDemandSubscription(id);
949
950            if (forcedDurableRemoteId.remove(id)) {
951                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
952                    DemandSubscription ds = i.next();
953                    boolean removed = ds.removeForcedDurableConsumer(id);
954                    if (removed) {
955                        cleanupDurableSub(ds, i);
956                    }
957                }
958           }
959
960        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
961            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
962            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
963            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
964                DemandSubscription ds = i.next();
965                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
966                if (removed) {
967                    cleanupDurableSub(ds, i);
968                }
969            }
970        }
971    }
972
973    private void cleanupDurableSub(final DemandSubscription ds,
974            Iterator<DemandSubscription> i) throws IOException {
975        if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
976                && ds.getForcedDurableConsumersSize() == 0) {
977            // deactivate subscriber
978            RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
979            localBroker.oneway(removeInfo);
980
981            // remove subscriber
982            RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
983            sending.setClientId(localClientId);
984            sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
985            sending.setConnectionId(this.localConnectionInfo.getConnectionId());
986            localBroker.oneway(sending);
987
988            //remove subscriber from map
989            i.remove();
990        }
991    }
992
993    @Override
994    public void serviceLocalException(Throwable error) {
995        serviceLocalException(null, error);
996    }
997
998    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
999        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
1000        if (!disposed.get()) {
1001            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
1002                // not a reason to terminate the bridge - temps can disappear with
1003                // pending sends as the demand sub may outlive the remote dest
1004                if (messageDispatch != null) {
1005                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
1006                    try {
1007                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
1008                        poisonAck.setPoisonCause(error);
1009                        localBroker.oneway(poisonAck);
1010                    } catch (IOException ioe) {
1011                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
1012                    }
1013                    fireFailedForwardAdvisory(messageDispatch, error);
1014                } else {
1015                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
1016                }
1017                return;
1018            }
1019
1020            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
1021            LOG.debug("The local Exception was: {}", error, error);
1022
1023            brokerService.getTaskRunnerFactory().execute(new Runnable() {
1024                @Override
1025                public void run() {
1026                    ServiceSupport.dispose(getControllingService());
1027                }
1028            });
1029            fireBridgeFailed(error);
1030        }
1031    }
1032
1033    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
1034        if (configuration.isAdvisoryForFailedForward()) {
1035            AdvisoryBroker advisoryBroker = null;
1036            try {
1037                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
1038
1039                if (advisoryBroker != null) {
1040                    ConnectionContext context = new ConnectionContext();
1041                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
1042                    context.setBroker(brokerService.getBroker());
1043
1044                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
1045                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
1046                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
1047                            advisoryMessage);
1048
1049                }
1050            } catch (Exception e) {
1051                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
1052                LOG.debug("detail", e);
1053            }
1054        }
1055    }
1056
1057    protected Service getControllingService() {
1058        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
1059    }
1060
1061    protected void addSubscription(DemandSubscription sub) throws IOException {
1062        if (sub != null) {
1063            localBroker.oneway(sub.getLocalInfo());
1064        }
1065    }
1066
1067    protected void removeSubscription(final DemandSubscription sub) throws IOException {
1068        if (sub != null) {
1069            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
1070
1071            // ensure not available for conduit subs pending removal
1072            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1073            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1074
1075            // continue removal in separate thread to free up this thread for outstanding responses
1076            // Serialize with removeDestination operations so that removeSubs are serialized with
1077            // removeDestinations such that all removeSub advisories are generated
1078            serialExecutor.execute(new Runnable() {
1079                @Override
1080                public void run() {
1081                    sub.waitForCompletion();
1082                    try {
1083                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
1084                    } catch (IOException e) {
1085                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
1086                    }
1087                }
1088            });
1089        }
1090    }
1091
1092    protected Message configureMessage(MessageDispatch md) throws IOException {
1093        Message message = md.getMessage().copy();
1094        // Update the packet to show where it came from.
1095        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
1096        message.setProducerId(producerInfo.getProducerId());
1097        message.setDestination(md.getDestination());
1098        message.setMemoryUsage(null);
1099        if (message.getOriginalTransactionId() == null) {
1100            message.setOriginalTransactionId(message.getTransactionId());
1101        }
1102        message.setTransactionId(null);
1103        if (configuration.isUseCompression()) {
1104            message.compress();
1105        }
1106        return message;
1107    }
1108
1109    protected void serviceLocalCommand(Command command) {
1110        if (!disposed.get()) {
1111            try {
1112                if (command.isMessageDispatch()) {
1113                    safeWaitUntilStarted();
1114                    networkBridgeStatistics.getEnqueues().increment();
1115                    final MessageDispatch md = (MessageDispatch) command;
1116                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
1117                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
1118
1119                        if (suppressMessageDispatch(md, sub)) {
1120                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
1121                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
1122                            });
1123                            // still ack as it may be durable
1124                            try {
1125                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1126                            } finally {
1127                                sub.decrementOutstandingResponses();
1128                            }
1129                            return;
1130                        }
1131
1132                        Message message = configureMessage(md);
1133                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
1134                                configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
1135                        });
1136                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
1137                            try {
1138                                // never request b/c they are eventually                     acked async
1139                                remoteBroker.oneway(message);
1140                            } finally {
1141                                sub.decrementOutstandingResponses();
1142                            }
1143                            return;
1144                        }
1145                        if (isPermissableDestination(md.getDestination())) {
1146                           if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1147
1148                              // The message was not sent using async send, so we should only
1149                              // ack the local broker when we get confirmation that the remote
1150                              // broker has received the message.
1151                              remoteBroker.asyncRequest(message, new ResponseCallback() {
1152                                 @Override
1153                                 public void onCompletion(FutureResponse future) {
1154                                    try {
1155                                       Response response = future.getResult();
1156                                       if (response.isException()) {
1157                                          ExceptionResponse er = (ExceptionResponse) response;
1158                                          serviceLocalException(md, er.getException());
1159                                       } else {
1160                                          localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1161                                          networkBridgeStatistics.getDequeues().increment();
1162                                       }
1163                                    } catch (IOException e) {
1164                                       serviceLocalException(md, e);
1165                                    } finally {
1166                                       sub.decrementOutstandingResponses();
1167                                    }
1168                                 }
1169                              });
1170
1171                           } else {
1172                              // If the message was originally sent using async send, we will
1173                              // preserve that QOS by bridging it using an async send (small chance
1174                              // of message loss).
1175                              try {
1176                                 remoteBroker.oneway(message);
1177                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1178                                 networkBridgeStatistics.getDequeues().increment();
1179                              } finally {
1180                                 sub.decrementOutstandingResponses();
1181                              }
1182                           }
1183                           serviceOutbound(message);
1184                        }
1185                    } else {
1186                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1187                    }
1188                } else if (command.isBrokerInfo()) {
1189                    futureLocalBrokerInfo.set((BrokerInfo) command);
1190                } else if (command.isShutdownInfo()) {
1191                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1192                    stop();
1193                } else if (command.getClass() == ConnectionError.class) {
1194                    ConnectionError ce = (ConnectionError) command;
1195                    serviceLocalException(ce.getException());
1196                } else {
1197                    switch (command.getDataStructureType()) {
1198                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1199                            break;
1200                        case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
1201                            break;
1202                        default:
1203                            LOG.warn("Unexpected local command: {}", command);
1204                    }
1205                }
1206            } catch (Throwable e) {
1207                LOG.warn("Caught an exception processing local command", e);
1208                serviceLocalException(e);
1209            }
1210        }
1211    }
1212
1213    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1214        boolean suppress = false;
1215        // for durable subs, suppression via filter leaves dangling acks so we
1216        // need to check here and allow the ack irrespective
1217        if (sub.getLocalInfo().isDurable()) {
1218            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1219            messageEvalContext.setMessageReference(md.getMessage());
1220            messageEvalContext.setDestination(md.getDestination());
1221            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1222            //AMQ-6465 - Need to decrement the reference count after checking matches() as
1223            //the call above will increment the reference count by 1
1224            messageEvalContext.getMessageReference().decrementReferenceCount();
1225        }
1226        return suppress;
1227    }
1228
1229    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1230        if (brokerPath != null) {
1231            for (BrokerId id : brokerPath) {
1232                if (brokerId.equals(id)) {
1233                    return true;
1234                }
1235            }
1236        }
1237        return false;
1238    }
1239
1240    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1241        if (brokerPath == null || brokerPath.length == 0) {
1242            return pathsToAppend;
1243        }
1244        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1245        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1246        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1247        return rc;
1248    }
1249
1250    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1251        if (brokerPath == null || brokerPath.length == 0) {
1252            return new BrokerId[]{idToAppend};
1253        }
1254        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1255        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1256        rc[brokerPath.length] = idToAppend;
1257        return rc;
1258    }
1259
1260    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1261        return isPermissableDestination(destination, false);
1262    }
1263
1264    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1265        // Are we not bridging temporary destinations?
1266        if (destination.isTemporary()) {
1267            if (allowTemporary) {
1268                return true;
1269            } else {
1270                return configuration.isBridgeTempDestinations();
1271            }
1272        }
1273
1274        ActiveMQDestination[] dests = excludedDestinations;
1275        if (dests != null && dests.length > 0) {
1276            for (ActiveMQDestination dest : dests) {
1277                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1278                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1279                    return false;
1280                }
1281            }
1282        }
1283
1284        dests = staticallyIncludedDestinations;
1285        if (dests != null && dests.length > 0) {
1286            for (ActiveMQDestination dest : dests) {
1287                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1288                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1289                    return true;
1290                }
1291            }
1292        }
1293
1294        dests = dynamicallyIncludedDestinations;
1295        if (dests != null && dests.length > 0) {
1296            for (ActiveMQDestination dest : dests) {
1297                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1298                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1299                    return true;
1300                }
1301            }
1302
1303            return false;
1304        }
1305
1306        return true;
1307    }
1308
1309    /**
1310     * Subscriptions for these destinations are always created
1311     */
1312    protected void setupStaticDestinations() {
1313        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1314        if (dests != null) {
1315            for (ActiveMQDestination dest : dests) {
1316                if (isPermissableDestination(dest)) {
1317                    DemandSubscription sub = createDemandSubscription(dest, null);
1318                    sub.setStaticallyIncluded(true);
1319                    try {
1320                        addSubscription(sub);
1321                    } catch (IOException e) {
1322                        LOG.error("Failed to add static destination {}", dest, e);
1323                    }
1324                    LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1325                } else {
1326                    LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
1327                }
1328            }
1329        }
1330    }
1331
1332    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1333        ConsumerInfo info = consumerInfo.copy();
1334        addRemoteBrokerToBrokerPath(info);
1335        DemandSubscription sub = createDemandSubscription(info);
1336        if (sub != null) {
1337            if (duplicateSuppressionIsRequired(sub)) {
1338                undoMapRegistration(sub);
1339            } else {
1340                if (consumerInfo.isDurable()) {
1341                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1342                }
1343                addSubscription(sub);
1344                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1345            }
1346        }
1347    }
1348
1349    private void undoMapRegistration(DemandSubscription sub) {
1350        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1351        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1352    }
1353
1354    /*
1355     * check our existing subs networkConsumerIds against the list of network
1356     * ids in this subscription A match means a duplicate which we suppress for
1357     * topics and maybe for queues
1358     */
1359    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1360        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1361        boolean suppress = false;
1362
1363        if (isDuplicateSuppressionOff(consumerInfo)) {
1364            return suppress;
1365        }
1366
1367        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1368        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1369        for (Subscription sub : currentSubs) {
1370            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1371            if (!networkConsumers.isEmpty()) {
1372                if (matchFound(candidateConsumers, networkConsumers)) {
1373                    if (isInActiveDurableSub(sub)) {
1374                        suppress = false;
1375                    } else {
1376                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1377                    }
1378                    break;
1379                }
1380            }
1381        }
1382        return suppress;
1383    }
1384
1385    private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) {
1386        return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions()
1387                || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
1388                || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions();
1389    }
1390
1391    private boolean isInActiveDurableSub(Subscription sub) {
1392        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1393    }
1394
1395    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1396        boolean suppress = false;
1397
1398        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1399            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1400                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1401            });
1402            suppress = true;
1403        } else {
1404            // remove the existing lower priority duplicate and allow this candidate
1405            try {
1406                removeDuplicateSubscription(existingSub);
1407
1408                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1409                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1410                });
1411            } catch (IOException e) {
1412                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1413            }
1414        }
1415        return suppress;
1416    }
1417
1418    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1419        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1420            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1421                break;
1422            }
1423        }
1424    }
1425
1426    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1427        boolean found = false;
1428        for (ConsumerId aliasConsumer : networkConsumers) {
1429            if (candidateConsumers.contains(aliasConsumer)) {
1430                found = true;
1431                break;
1432            }
1433        }
1434        return found;
1435    }
1436
1437    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1438        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1439        Region region;
1440        Collection<Subscription> subs;
1441
1442        region = null;
1443        switch (dest.getDestinationType()) {
1444            case ActiveMQDestination.QUEUE_TYPE:
1445                region = region_broker.getQueueRegion();
1446                break;
1447            case ActiveMQDestination.TOPIC_TYPE:
1448                region = region_broker.getTopicRegion();
1449                break;
1450            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1451                region = region_broker.getTempQueueRegion();
1452                break;
1453            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1454                region = region_broker.getTempTopicRegion();
1455                break;
1456        }
1457
1458        if (region instanceof AbstractRegion) {
1459            subs = ((AbstractRegion) region).getSubscriptions().values();
1460        } else {
1461            subs = null;
1462        }
1463
1464        return subs;
1465    }
1466
1467    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1468        // add our original id to ourselves
1469        info.addNetworkConsumerId(info.getConsumerId());
1470        return doCreateDemandSubscription(info);
1471    }
1472
1473    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1474        DemandSubscription result = new DemandSubscription(info);
1475        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1476        if (info.getDestination().isTemporary()) {
1477            // reset the local connection Id
1478            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1479            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1480        }
1481
1482        if (configuration.isDecreaseNetworkConsumerPriority()) {
1483            byte priority = (byte) configuration.getConsumerPriorityBase();
1484            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1485                // The longer the path to the consumer, the less it's consumer priority.
1486                priority -= info.getBrokerPath().length + 1;
1487            }
1488            result.getLocalInfo().setPriority(priority);
1489            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1490        }
1491        configureDemandSubscription(info, result);
1492        return result;
1493    }
1494
1495    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) {
1496        ConsumerInfo info = new ConsumerInfo();
1497        info.setNetworkSubscription(true);
1498        info.setDestination(destination);
1499
1500        if (subscriptionName != null) {
1501            info.setSubscriptionName(subscriptionName);
1502        }
1503
1504        // Indicate that this subscription is being made on behalf of the remote broker.
1505        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1506
1507        // the remote info held by the DemandSubscription holds the original
1508        // consumerId, the local info get's overwritten
1509        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1510        DemandSubscription result = null;
1511        try {
1512            result = createDemandSubscription(info);
1513        } catch (IOException e) {
1514            LOG.error("Failed to create DemandSubscription ", e);
1515        }
1516        return result;
1517    }
1518
1519    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1520        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1521                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1522            sub.getLocalInfo().setDispatchAsync(true);
1523        } else {
1524            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1525        }
1526        configureConsumerPrefetch(sub.getLocalInfo());
1527        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1528        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1529
1530        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1531        if (!info.isDurable()) {
1532            // This works for now since we use a VM connection to the local broker.
1533            // may need to change if we ever subscribe to a remote broker.
1534            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1535        } else {
1536            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1537        }
1538    }
1539
1540    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1541        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1542        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1543                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1544        });
1545        if (sub != null) {
1546            removeSubscription(sub);
1547            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1548                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1549            });
1550        }
1551    }
1552
1553    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1554        boolean removeDone = false;
1555        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1556        if (sub != null) {
1557            try {
1558                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1559                removeDone = true;
1560            } catch (IOException e) {
1561                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1562            }
1563        }
1564        return removeDone;
1565    }
1566
1567    /**
1568     * Performs a timed wait on the started latch and then checks for disposed
1569     * before performing another wait each time the the started wait times out.
1570     */
1571    protected boolean safeWaitUntilStarted() throws InterruptedException {
1572        while (!disposed.get()) {
1573            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1574                break;
1575            }
1576        }
1577        return !disposed.get();
1578    }
1579
1580    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1581        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1582        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1583            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1584            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1585                filterFactory = entry.getNetworkBridgeFilterFactory();
1586            }
1587        }
1588        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1589    }
1590
1591    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1592        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1593    }
1594
1595    protected BrokerId[] getRemoteBrokerPath() {
1596        return remoteBrokerPath;
1597    }
1598
1599    @Override
1600    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1601        this.networkBridgeListener = listener;
1602    }
1603
1604    private void fireBridgeFailed(Throwable reason) {
1605        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1606        NetworkBridgeListener l = this.networkBridgeListener;
1607        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1608            l.bridgeFailed();
1609        }
1610    }
1611
1612    /**
1613     * @return Returns the dynamicallyIncludedDestinations.
1614     */
1615    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1616        return dynamicallyIncludedDestinations;
1617    }
1618
1619    /**
1620     * @param dynamicallyIncludedDestinations
1621     *         The dynamicallyIncludedDestinations to set.
1622     */
1623    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1624        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1625    }
1626
1627    /**
1628     * @return Returns the excludedDestinations.
1629     */
1630    public ActiveMQDestination[] getExcludedDestinations() {
1631        return excludedDestinations;
1632    }
1633
1634    /**
1635     * @param excludedDestinations The excludedDestinations to set.
1636     */
1637    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1638        this.excludedDestinations = excludedDestinations;
1639    }
1640
1641    /**
1642     * @return Returns the staticallyIncludedDestinations.
1643     */
1644    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1645        return staticallyIncludedDestinations;
1646    }
1647
1648    /**
1649     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1650     */
1651    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1652        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1653    }
1654
1655    /**
1656     * @return Returns the durableDestinations.
1657     */
1658    public ActiveMQDestination[] getDurableDestinations() {
1659        return durableDestinations;
1660    }
1661
1662    /**
1663     * @param durableDestinations The durableDestinations to set.
1664     */
1665    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1666        this.durableDestinations = durableDestinations;
1667    }
1668
1669    /**
1670     * @return Returns the localBroker.
1671     */
1672    public Transport getLocalBroker() {
1673        return localBroker;
1674    }
1675
1676    /**
1677     * @return Returns the remoteBroker.
1678     */
1679    public Transport getRemoteBroker() {
1680        return remoteBroker;
1681    }
1682
1683    /**
1684     * @return the createdByDuplex
1685     */
1686    public boolean isCreatedByDuplex() {
1687        return this.createdByDuplex;
1688    }
1689
1690    /**
1691     * @param createdByDuplex the createdByDuplex to set
1692     */
1693    public void setCreatedByDuplex(boolean createdByDuplex) {
1694        this.createdByDuplex = createdByDuplex;
1695    }
1696
1697    @Override
1698    public String getRemoteAddress() {
1699        return remoteBroker.getRemoteAddress();
1700    }
1701
1702    @Override
1703    public String getLocalAddress() {
1704        return localBroker.getRemoteAddress();
1705    }
1706
1707    @Override
1708    public String getRemoteBrokerName() {
1709        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1710    }
1711
1712    @Override
1713    public String getRemoteBrokerId() {
1714        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1715    }
1716
1717    @Override
1718    public String getLocalBrokerName() {
1719        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1720    }
1721
1722    @Override
1723    public long getDequeueCounter() {
1724        return networkBridgeStatistics.getDequeues().getCount();
1725    }
1726
1727    @Override
1728    public long getEnqueueCounter() {
1729        return networkBridgeStatistics.getEnqueues().getCount();
1730    }
1731
1732    @Override
1733    public NetworkBridgeStatistics getNetworkBridgeStatistics() {
1734        return networkBridgeStatistics;
1735    }
1736
1737    protected boolean isDuplex() {
1738        return configuration.isDuplex() || createdByDuplex;
1739    }
1740
1741    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1742        return subscriptionMapByRemoteId;
1743    }
1744
1745    @Override
1746    public void setBrokerService(BrokerService brokerService) {
1747        this.brokerService = brokerService;
1748        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1749        localBrokerPath[0] = localBrokerId;
1750    }
1751
1752    @Override
1753    public void setMbeanObjectName(ObjectName objectName) {
1754        this.mbeanObjectName = objectName;
1755    }
1756
1757    @Override
1758    public ObjectName getMbeanObjectName() {
1759        return mbeanObjectName;
1760    }
1761
1762    @Override
1763    public void resetStats() {
1764        networkBridgeStatistics.reset();
1765    }
1766
1767    /*
1768     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1769     * remote sides of the network bridge.
1770     */
1771    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1772
1773        private final CountDownLatch slot = new CountDownLatch(1);
1774        private final AtomicBoolean disposed;
1775        private volatile BrokerInfo info = null;
1776
1777        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1778            this.info = info;
1779            this.disposed = disposed;
1780        }
1781
1782        @Override
1783        public boolean cancel(boolean mayInterruptIfRunning) {
1784            slot.countDown();
1785            return true;
1786        }
1787
1788        @Override
1789        public boolean isCancelled() {
1790            return slot.getCount() == 0 && info == null;
1791        }
1792
1793        @Override
1794        public boolean isDone() {
1795            return info != null;
1796        }
1797
1798        @Override
1799        public BrokerInfo get() throws InterruptedException, ExecutionException {
1800            try {
1801                if (info == null) {
1802                    while (!disposed.get()) {
1803                        if (slot.await(1, TimeUnit.SECONDS)) {
1804                            break;
1805                        }
1806                    }
1807                }
1808                return info;
1809            } catch (InterruptedException e) {
1810                Thread.currentThread().interrupt();
1811                LOG.debug("Operation interrupted: {}", e, e);
1812                throw new InterruptedException("Interrupted.");
1813            }
1814        }
1815
1816        @Override
1817        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1818            try {
1819                if (info == null) {
1820                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1821
1822                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1823                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1824                            break;
1825                        }
1826                    }
1827                    if (info == null) {
1828                        throw new TimeoutException();
1829                    }
1830                }
1831                return info;
1832            } catch (InterruptedException e) {
1833                throw new InterruptedException("Interrupted.");
1834            }
1835        }
1836
1837        public void set(BrokerInfo info) {
1838            this.info = info;
1839            this.slot.countDown();
1840        }
1841    }
1842
1843    protected void serviceOutbound(Message message) {
1844        NetworkBridgeListener l = this.networkBridgeListener;
1845        if (l != null) {
1846            l.onOutboundMessage(this, message);
1847        }
1848    }
1849
1850    protected void serviceInboundMessage(Message message) {
1851        NetworkBridgeListener l = this.networkBridgeListener;
1852        if (l != null) {
1853            l.onInboundMessage(this, message);
1854        }
1855    }
1856
1857    protected boolean canDuplexDispatch(Message message) {
1858        boolean result = true;
1859        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1860            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1861            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1862            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1863            if (producerSequenceId <= lastStoredForMessageProducer) {
1864                result = false;
1865                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1866                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1867                });
1868            }
1869        }
1870        return result;
1871    }
1872
1873    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1874        try {
1875            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1876        } catch (IOException ignored) {
1877            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1878        }
1879        return -1;
1880    }
1881
1882    protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
1883        //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
1884        //set then use it, else default to the prefetchSize setting
1885        if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
1886                configuration.getAdvisoryPrefetchSize() > 0) {
1887            consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
1888        } else {
1889            consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
1890        }
1891    }
1892
1893}