001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Properties;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.regex.Pattern;
040
041import javax.management.ObjectName;
042
043import org.apache.activemq.DestinationDoesNotExistException;
044import org.apache.activemq.Service;
045import org.apache.activemq.advisory.AdvisoryBroker;
046import org.apache.activemq.advisory.AdvisorySupport;
047import org.apache.activemq.broker.BrokerService;
048import org.apache.activemq.broker.BrokerServiceAware;
049import org.apache.activemq.broker.ConnectionContext;
050import org.apache.activemq.broker.TransportConnection;
051import org.apache.activemq.broker.region.AbstractRegion;
052import org.apache.activemq.broker.region.DurableTopicSubscription;
053import org.apache.activemq.broker.region.Region;
054import org.apache.activemq.broker.region.RegionBroker;
055import org.apache.activemq.broker.region.Subscription;
056import org.apache.activemq.broker.region.policy.PolicyEntry;
057import org.apache.activemq.command.ActiveMQDestination;
058import org.apache.activemq.command.ActiveMQMessage;
059import org.apache.activemq.command.ActiveMQTempDestination;
060import org.apache.activemq.command.ActiveMQTopic;
061import org.apache.activemq.command.BrokerId;
062import org.apache.activemq.command.BrokerInfo;
063import org.apache.activemq.command.BrokerSubscriptionInfo;
064import org.apache.activemq.command.Command;
065import org.apache.activemq.command.CommandTypes;
066import org.apache.activemq.command.ConnectionError;
067import org.apache.activemq.command.ConnectionId;
068import org.apache.activemq.command.ConnectionInfo;
069import org.apache.activemq.command.ConsumerId;
070import org.apache.activemq.command.ConsumerInfo;
071import org.apache.activemq.command.DataStructure;
072import org.apache.activemq.command.DestinationInfo;
073import org.apache.activemq.command.ExceptionResponse;
074import org.apache.activemq.command.KeepAliveInfo;
075import org.apache.activemq.command.Message;
076import org.apache.activemq.command.MessageAck;
077import org.apache.activemq.command.MessageDispatch;
078import org.apache.activemq.command.MessageId;
079import org.apache.activemq.command.NetworkBridgeFilter;
080import org.apache.activemq.command.ProducerInfo;
081import org.apache.activemq.command.RemoveInfo;
082import org.apache.activemq.command.RemoveSubscriptionInfo;
083import org.apache.activemq.command.Response;
084import org.apache.activemq.command.SessionInfo;
085import org.apache.activemq.command.ShutdownInfo;
086import org.apache.activemq.command.SubscriptionInfo;
087import org.apache.activemq.command.WireFormatInfo;
088import org.apache.activemq.filter.DestinationFilter;
089import org.apache.activemq.filter.MessageEvaluationContext;
090import org.apache.activemq.security.SecurityContext;
091import org.apache.activemq.transport.DefaultTransportListener;
092import org.apache.activemq.transport.FutureResponse;
093import org.apache.activemq.transport.ResponseCallback;
094import org.apache.activemq.transport.Transport;
095import org.apache.activemq.transport.TransportDisposedIOException;
096import org.apache.activemq.transport.TransportFilter;
097import org.apache.activemq.transport.failover.FailoverTransport;
098import org.apache.activemq.transport.tcp.TcpTransport;
099import org.apache.activemq.util.IdGenerator;
100import org.apache.activemq.util.IntrospectionSupport;
101import org.apache.activemq.util.LongSequenceGenerator;
102import org.apache.activemq.util.MarshallingSupport;
103import org.apache.activemq.util.NetworkBridgeUtils;
104import org.apache.activemq.util.ServiceStopper;
105import org.apache.activemq.util.ServiceSupport;
106import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
107import org.slf4j.Logger;
108import org.slf4j.LoggerFactory;
109
110/**
111 * A useful base class for implementing demand forwarding bridges.
112 */
113public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
114    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
115    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
116    protected final Transport localBroker;
117    protected final Transport remoteBroker;
118    protected IdGenerator idGenerator = new IdGenerator();
119    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
120    protected ConnectionInfo localConnectionInfo;
121    protected ConnectionInfo remoteConnectionInfo;
122    protected SessionInfo localSessionInfo;
123    protected ProducerInfo producerInfo;
124    protected String remoteBrokerName = "Unknown";
125    protected String localClientId;
126    protected ConsumerInfo demandConsumerInfo;
127    protected int demandConsumerDispatched;
128    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
129    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
130    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
131    protected final AtomicBoolean disposed = new AtomicBoolean();
132    protected BrokerId localBrokerId;
133    protected ActiveMQDestination[] excludedDestinations;
134    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
135    protected ActiveMQDestination[] staticallyIncludedDestinations;
136    protected ActiveMQDestination[] durableDestinations;
137    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
138    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
139    protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
140    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
141    protected final CountDownLatch startedLatch = new CountDownLatch(2);
142    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
143    protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1);
144    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
145    protected NetworkBridgeConfiguration configuration;
146    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
147
148    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
149    protected BrokerId remoteBrokerId;
150
151    protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
152
153    private NetworkBridgeListener networkBridgeListener;
154    private boolean createdByDuplex;
155    private BrokerInfo localBrokerInfo;
156    private BrokerInfo remoteBrokerInfo;
157
158    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
159    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
160
161    private final AtomicBoolean started = new AtomicBoolean();
162    private TransportConnection duplexInitiatingConnection;
163    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
164    protected BrokerService brokerService = null;
165    private ObjectName mbeanObjectName;
166    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
167    //Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads
168    private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
169    private Transport duplexInboundLocalBroker = null;
170    private ProducerInfo duplexInboundLocalProducerInfo;
171
172    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
173        this.configuration = configuration;
174        this.localBroker = localBroker;
175        this.remoteBroker = remoteBroker;
176    }
177
178    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
179        this.localBrokerInfo = localBrokerInfo;
180        this.remoteBrokerInfo = remoteBrokerInfo;
181        this.duplexInitiatingConnection = connection;
182        start();
183        serviceRemoteCommand(remoteBrokerInfo);
184    }
185
186    @Override
187    public void start() throws Exception {
188        if (started.compareAndSet(false, true)) {
189
190            if (brokerService == null) {
191                throw new IllegalArgumentException("BrokerService is null on " + this);
192            }
193
194            networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
195
196            if (isDuplex()) {
197                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
198                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
199
200                    @Override
201                    public void onCommand(Object o) {
202                        Command command = (Command) o;
203                        serviceLocalCommand(command);
204                    }
205
206                    @Override
207                    public void onException(IOException error) {
208                        serviceLocalException(error);
209                    }
210                });
211                duplexInboundLocalBroker.start();
212            }
213
214            localBroker.setTransportListener(new DefaultTransportListener() {
215
216                @Override
217                public void onCommand(Object o) {
218                    Command command = (Command) o;
219                    serviceLocalCommand(command);
220                }
221
222                @Override
223                public void onException(IOException error) {
224                    if (!futureLocalBrokerInfo.isDone()) {
225                        LOG.info("error with pending local brokerInfo on: " + localBroker, error);
226                        futureLocalBrokerInfo.cancel(true);
227                        return;
228                    }
229                    serviceLocalException(error);
230                }
231            });
232
233            remoteBroker.setTransportListener(new DefaultTransportListener() {
234
235                @Override
236                public void onCommand(Object o) {
237                    Command command = (Command) o;
238                    serviceRemoteCommand(command);
239                }
240
241                @Override
242                public void onException(IOException error) {
243                    if (!futureRemoteBrokerInfo.isDone()) {
244                        LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error);
245                        futureRemoteBrokerInfo.cancel(true);
246                        return;
247                    }
248                    serviceRemoteException(error);
249                }
250            });
251
252            remoteBroker.start();
253            localBroker.start();
254
255            if (!disposed.get()) {
256                try {
257                    triggerStartAsyncNetworkBridgeCreation();
258                } catch (IOException e) {
259                    LOG.warn("Caught exception from remote start", e);
260                }
261            } else {
262                LOG.warn("Bridge was disposed before the start() method was fully executed.");
263                throw new TransportDisposedIOException();
264            }
265        }
266    }
267
268    @Override
269    public void stop() throws Exception {
270        if (started.compareAndSet(true, false)) {
271            if (disposed.compareAndSet(false, true)) {
272                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
273
274                futureRemoteBrokerInfo.cancel(true);
275                futureLocalBrokerInfo.cancel(true);
276
277                NetworkBridgeListener l = this.networkBridgeListener;
278                if (l != null) {
279                    l.onStop(this);
280                }
281                try {
282                    // local start complete
283                    if (startedLatch.getCount() < 2) {
284                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
285                                configuration.getBrokerName(), this, remoteBrokerName
286                        });
287                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
288                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
289                    }
290
291                    remoteBridgeStarted.set(false);
292                    final CountDownLatch sendShutdown = new CountDownLatch(1);
293
294                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
295                        @Override
296                        public void run() {
297                            try {
298                                serialExecutor.shutdown();
299                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
300                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
301                                    LOG.info("pending tasks on stop {}", pendingTasks);
302                                }
303                                //Shutdown the syncExecutor, call countDown to make sure a thread can
304                                //terminate if it is waiting
305                                staticDestinationsLatch.countDown();
306                                syncExecutor.shutdown();
307                                if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
308                                    List<Runnable> pendingTasks = syncExecutor.shutdownNow();
309                                    LOG.info("pending tasks on stop {}", pendingTasks);
310                                }
311                                localBroker.oneway(new ShutdownInfo());
312                                remoteBroker.oneway(new ShutdownInfo());
313                            } catch (Throwable e) {
314                                LOG.debug("Caught exception sending shutdown", e);
315                            } finally {
316                                sendShutdown.countDown();
317                            }
318
319                        }
320                    }, "ActiveMQ ForwardingBridge StopTask");
321
322                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
323                        LOG.info("Network Could not shutdown in a timely manner");
324                    }
325                } finally {
326                    ServiceStopper ss = new ServiceStopper();
327                    stopFailoverTransport(remoteBroker);
328                    ss.stop(remoteBroker);
329                    ss.stop(localBroker);
330                    ss.stop(duplexInboundLocalBroker);
331                    // Release the started Latch since another thread could be
332                    // stuck waiting for it to start up.
333                    startedLatch.countDown();
334                    startedLatch.countDown();
335                    localStartedLatch.countDown();
336                    staticDestinationsLatch.countDown();
337
338                    ss.throwFirstException();
339                }
340            }
341
342            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
343        }
344    }
345
346    private void stopFailoverTransport(Transport transport) {
347        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
348        if (failoverTransport != null) {
349            // may be blocked on write, in which case stop will block
350            try {
351                failoverTransport.handleTransportFailure(new IOException("Bridge stopped"));
352            } catch (InterruptedException ignored) {}
353        }
354    }
355
356    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
357        brokerService.getTaskRunnerFactory().execute(new Runnable() {
358            @Override
359            public void run() {
360                final String originalName = Thread.currentThread().getName();
361                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
362                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
363
364                try {
365                    // First we collect the info data from both the local and remote ends
366                    collectBrokerInfos();
367
368                    // Once we have all required broker info we can attempt to start
369                    // the local and then remote sides of the bridge.
370                    doStartLocalAndRemoteBridges();
371                } finally {
372                    Thread.currentThread().setName(originalName);
373                }
374            }
375        });
376    }
377
378    private void collectBrokerInfos() {
379        int timeout = 30000;
380        TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class);
381        if (tcpTransport != null) {
382           timeout = tcpTransport.getConnectionTimeout();
383        }
384
385        // First wait for the remote to feed us its BrokerInfo, then we can check on
386        // the LocalBrokerInfo and decide is this is a loop.
387        try {
388            remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
389            if (remoteBrokerInfo == null) {
390                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
391                return;
392            }
393        } catch (Exception e) {
394            serviceRemoteException(e);
395            return;
396        }
397
398        try {
399            localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
400            if (localBrokerInfo == null) {
401                serviceLocalException(new Throwable("localBrokerInfo is null"));
402                return;
403            }
404
405            // Before we try and build the bridge lets check if we are in a loop
406            // and if so just stop now before registering anything.
407            remoteBrokerId = remoteBrokerInfo.getBrokerId();
408            if (localBrokerId.equals(remoteBrokerId)) {
409                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
410                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
411                });
412                ServiceSupport.dispose(localBroker);
413                ServiceSupport.dispose(remoteBroker);
414                // the bridge is left in a bit of limbo, but it won't get retried
415                // in this state.
416                return;
417            }
418
419            // Fill in the remote broker's information now.
420            remoteBrokerPath[0] = remoteBrokerId;
421            remoteBrokerName = remoteBrokerInfo.getBrokerName();
422            if (configuration.isUseBrokerNamesAsIdSeed()) {
423                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
424            }
425        } catch (Throwable e) {
426            serviceLocalException(e);
427        }
428    }
429
430    private void doStartLocalAndRemoteBridges() {
431
432        if (disposed.get()) {
433            return;
434        }
435
436        if (isCreatedByDuplex()) {
437            // apply remote (propagated) configuration to local duplex bridge before start
438            Properties props = null;
439            try {
440                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
441                IntrospectionSupport.getProperties(configuration, props, null);
442                if (configuration.getExcludedDestinations() != null) {
443                    excludedDestinations = configuration.getExcludedDestinations().toArray(
444                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
445                }
446                if (configuration.getStaticallyIncludedDestinations() != null) {
447                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
448                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
449                }
450                if (configuration.getDynamicallyIncludedDestinations() != null) {
451                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
452                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
453                }
454            } catch (Throwable t) {
455                LOG.error("Error mapping remote configuration: {}", props, t);
456            }
457        }
458
459        try {
460            startLocalBridge();
461        } catch (Throwable e) {
462            serviceLocalException(e);
463            return;
464        }
465
466        try {
467            startRemoteBridge();
468        } catch (Throwable e) {
469            serviceRemoteException(e);
470            return;
471        }
472
473        try {
474            if (safeWaitUntilStarted()) {
475                setupStaticDestinations();
476                staticDestinationsLatch.countDown();
477            }
478        } catch (Throwable e) {
479            serviceLocalException(e);
480        }
481    }
482
483    private void startLocalBridge() throws Throwable {
484        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
485            synchronized (this) {
486                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
487                if (!disposed.get()) {
488
489                    if (idGenerator == null) {
490                        throw new IllegalStateException("Id Generator cannot be null");
491                    }
492
493                    localConnectionInfo = new ConnectionInfo();
494                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
495                    localClientId = configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + configuration.getBrokerName();
496                    localConnectionInfo.setClientId(localClientId);
497                    localConnectionInfo.setUserName(configuration.getUserName());
498                    localConnectionInfo.setPassword(configuration.getPassword());
499                    Transport originalTransport = remoteBroker;
500                    while (originalTransport instanceof TransportFilter) {
501                        originalTransport = ((TransportFilter) originalTransport).getNext();
502                    }
503                    if (originalTransport instanceof TcpTransport) {
504                        X509Certificate[] peerCerts = originalTransport.getPeerCertificates();
505                        localConnectionInfo.setTransportContext(peerCerts);
506                    }
507                    // sync requests that may fail
508                    Object resp = localBroker.request(localConnectionInfo);
509                    if (resp instanceof ExceptionResponse) {
510                        throw ((ExceptionResponse) resp).getException();
511                    }
512                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
513                    localBroker.oneway(localSessionInfo);
514
515                    if (configuration.isDuplex()) {
516                        // separate in-bound channel for forwards so we don't
517                        // contend with out-bound dispatch on same connection
518                        remoteBrokerInfo.setNetworkConnection(true);
519                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
520
521                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
522                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
523                        duplexLocalConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + "duplex"
524                                + configuration.getClientIdToken() + configuration.getBrokerName());
525                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
526                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
527
528                        if (originalTransport instanceof TcpTransport) {
529                            X509Certificate[] peerCerts = originalTransport.getPeerCertificates();
530                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
531                        }
532                        // sync requests that may fail
533                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
534                        if (resp instanceof ExceptionResponse) {
535                            throw ((ExceptionResponse) resp).getException();
536                        }
537                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
538                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
539                        duplexInboundLocalBroker.oneway(duplexInboundSession);
540                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
541                    }
542                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
543                    NetworkBridgeListener l = this.networkBridgeListener;
544                    if (l != null) {
545                        l.onStart(this);
546                    }
547
548                    // Let the local broker know the remote broker's ID.
549                    localBroker.oneway(remoteBrokerInfo);
550                    // new peer broker (a consumer can work with remote broker also)
551                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
552
553                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
554                            localBroker, remoteBroker, remoteBrokerName
555                    });
556                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
557                            configuration.getBrokerName(), this, remoteBrokerName
558                    });
559                } else {
560                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
561                }
562                startedLatch.countDown();
563                localStartedLatch.countDown();
564            }
565        }
566    }
567
568    protected void startRemoteBridge() throws Exception {
569        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
570            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
571            synchronized (this) {
572                if (!isCreatedByDuplex()) {
573                    BrokerInfo brokerInfo = new BrokerInfo();
574                    brokerInfo.setBrokerName(configuration.getBrokerName());
575                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
576                    brokerInfo.setNetworkConnection(true);
577                    brokerInfo.setDuplexConnection(configuration.isDuplex());
578                    // set our properties
579                    Properties props = new Properties();
580                    IntrospectionSupport.getProperties(configuration, props, null);
581
582                    String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
583                    String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
584
585                    if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
586                        props.put(dynamicallyIncludedDestinationsKey,
587                                StringToListOfActiveMQDestinationConverter.
588                                convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
589                    }
590                    if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
591                        props.put(staticallyIncludedDestinationsKey,
592                                StringToListOfActiveMQDestinationConverter.
593                                convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
594                    }
595
596                    props.remove("networkTTL");
597                    String str = MarshallingSupport.propertiesToString(props);
598                    brokerInfo.setNetworkProperties(str);
599                    brokerInfo.setBrokerId(this.localBrokerId);
600                    remoteBroker.oneway(brokerInfo);
601                    if (configuration.isSyncDurableSubs() &&
602                            remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
603                        remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
604                                configuration));
605                    }
606                }
607                if (remoteConnectionInfo != null) {
608                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
609                }
610                remoteConnectionInfo = new ConnectionInfo();
611                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
612                remoteConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + configuration.getBrokerName() + configuration.getClientIdToken() + "outbound");
613                remoteConnectionInfo.setUserName(configuration.getUserName());
614                remoteConnectionInfo.setPassword(configuration.getPassword());
615                remoteBroker.oneway(remoteConnectionInfo);
616
617                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
618                remoteBroker.oneway(remoteSessionInfo);
619                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
620                producerInfo.setResponseRequired(false);
621                remoteBroker.oneway(producerInfo);
622                // Listen to consumer advisory messages on the remote broker to determine demand.
623                if (!configuration.isStaticBridge()) {
624                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
625                    // always dispatch advisory message asynchronously so that
626                    // we never block the producer broker if we are slow
627                    demandConsumerInfo.setDispatchAsync(true);
628                    String advisoryTopic = configuration.getDestinationFilter();
629                    if (configuration.isBridgeTempDestinations()) {
630                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
631                    }
632                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
633                    configureConsumerPrefetch(demandConsumerInfo);
634                    remoteBroker.oneway(demandConsumerInfo);
635                }
636                startedLatch.countDown();
637            }
638        }
639    }
640
641    @Override
642    public void serviceRemoteException(Throwable error) {
643        if (!disposed.get()) {
644            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
645                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
646                        localBroker, remoteBroker, error
647                });
648            } else {
649                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
650                        localBroker, remoteBroker, error
651                });
652            }
653            LOG.debug("The remote Exception was: {}", error, error);
654            brokerService.getTaskRunnerFactory().execute(new Runnable() {
655                @Override
656                public void run() {
657                    ServiceSupport.dispose(getControllingService());
658                }
659            });
660            fireBridgeFailed(error);
661        }
662    }
663
664    /**
665     * Checks whether or not this consumer is a direct bridge network subscription
666     * @param info
667     * @return
668     */
669    protected boolean isDirectBridgeConsumer(ConsumerInfo info) {
670        return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
671                (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
672    }
673
674    protected boolean isProxyBridgeSubscription(String clientId, String subName) {
675        if (subName != null && clientId != null) {
676            if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(configuration.getName())) {
677                return true;
678            }
679        }
680        return false;
681    }
682
683    /**
684     * This scenaior is primarily used for durable sync on broker restarts
685     *
686     * @param sub
687     * @param clientId
688     * @param subName
689     */
690    protected void addProxyNetworkSubscriptionClientId(final DemandSubscription sub, final String clientId, String subName) {
691        if (clientId != null && sub != null && subName != null) {
692                String newClientId = getProxyBridgeClientId(clientId);
693                final SubscriptionInfo newSubInfo = new SubscriptionInfo(newClientId, subName);
694                sub.getDurableRemoteSubs().add(newSubInfo);
695                LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
696
697        } else {
698            LOG.debug("Skipping addProxyNetworkSubscription");
699        }
700    }
701
702    /**
703     * Add a durable remote proxy subscription when we can generate via the BrokerId path
704     * This is the most common scenario
705     *
706     * @param sub
707     * @param path
708     * @param subName
709     */
710    protected void addProxyNetworkSubscriptionBrokerPath(final DemandSubscription sub, final BrokerId[] path, String subName) {
711        if (sub != null && path.length > 1 && subName != null) {
712            String b1 = path[path.length-1].toString();
713            String b2 = path[path.length-2].toString();
714            final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
715            sub.getDurableRemoteSubs().add(newSubInfo);
716        }
717    }
718
719    private String getProxyBridgeClientId(String clientId) {
720        String newClientId = clientId;
721        String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
722        if (clientIdTokens != null && clientIdTokens.length > 2) {
723            newClientId = clientIdTokens[clientIdTokens.length - 3] +  configuration.getClientIdToken() + "inbound"
724                    + configuration.getClientIdToken() +  clientIdTokens[clientIdTokens.length -1];
725        }
726        return newClientId;
727    }
728
729    protected boolean isProxyNSConsumerBrokerPath(ConsumerInfo info) {
730        return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
731    }
732
733    protected boolean isProxyNSConsumerClientId(String clientId) {
734        return clientId != null && clientId.split(Pattern.quote(configuration.getClientIdToken())).length > 3;
735    }
736
737    protected void serviceRemoteCommand(Command command) {
738        if (!disposed.get()) {
739            try {
740                if (command.isMessageDispatch()) {
741                    safeWaitUntilStarted();
742                    MessageDispatch md = (MessageDispatch) command;
743                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
744                    ackAdvisory(md.getMessage());
745                } else if (command.isBrokerInfo()) {
746                    futureRemoteBrokerInfo.set((BrokerInfo) command);
747                } else if (command instanceof BrokerSubscriptionInfo) {
748                    final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
749
750                    //Start in a new thread so we don't block the transport waiting for staticDestinations
751                    syncExecutor.execute(new Runnable() {
752
753                        @Override
754                        public void run() {
755                            try {
756                                staticDestinationsLatch.await();
757                                //Make sure after the countDown of staticDestinationsLatch we aren't stopping
758                                if (!disposed.get()) {
759                                    BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
760                                    LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
761                                            brokerService.getBrokerName(), subInfo.getBrokerName());
762
763                                    if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
764                                            && !configuration.isDynamicOnly()) {
765                                        if (started.get()) {
766                                            if (subInfo.getSubscriptionInfos() != null) {
767                                                for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
768                                                    //re-add any process any non-NC consumers that match the
769                                                    //dynamicallyIncludedDestinations list
770                                                    //Also re-add network consumers that are not part of this direct
771                                                    //bridge (proxy of proxy bridges)
772                                                    if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) &&
773                                                            NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
774                                                        serviceRemoteConsumerAdvisory(info);
775                                                    }
776                                                }
777                                            }
778
779                                            //After re-added, clean up any empty durables
780                                            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
781                                                DemandSubscription ds = i.next();
782                                                if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
783                                                    cleanupDurableSub(ds, i);
784                                                }
785                                            }
786                                        }
787                                    }
788                                }
789                            } catch (Exception e) {
790                                LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
791                                LOG.debug(e.getMessage(), e);
792                            }
793                        }
794                    });
795
796                } else if (command.getClass() == ConnectionError.class) {
797                    ConnectionError ce = (ConnectionError) command;
798                    serviceRemoteException(ce.getException());
799                } else {
800                    if (isDuplex()) {
801                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
802                        if (command.isMessage()) {
803                            final ActiveMQMessage message = (ActiveMQMessage) command;
804                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
805                                serviceRemoteConsumerAdvisory(message.getDataStructure());
806                                ackAdvisory(message);
807                            } else {
808                                if (!isPermissableDestination(message.getDestination(), true)) {
809                                    return;
810                                }
811                                // message being forwarded - we need to
812                                // propagate the response to our local send
813                                if (canDuplexDispatch(message)) {
814                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
815                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
816                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
817                                            final int correlationId = message.getCommandId();
818
819                                            @Override
820                                            public void onCompletion(FutureResponse resp) {
821                                                try {
822                                                    Response reply = resp.getResult();
823                                                    reply.setCorrelationId(correlationId);
824                                                    remoteBroker.oneway(reply);
825                                                    //increment counter when messages are received in duplex mode
826                                                    networkBridgeStatistics.getReceivedCount().increment();
827                                                } catch (IOException error) {
828                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
829                                                    serviceRemoteException(error);
830                                                }
831                                            }
832                                        });
833                                    } else {
834                                        duplexInboundLocalBroker.oneway(message);
835                                        networkBridgeStatistics.getReceivedCount().increment();
836                                    }
837                                    serviceInboundMessage(message);
838                                } else {
839                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
840                                        Response reply = new Response();
841                                        reply.setCorrelationId(message.getCommandId());
842                                        remoteBroker.oneway(reply);
843                                    }
844                                }
845                            }
846                        } else {
847                            switch (command.getDataStructureType()) {
848                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
849                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
850                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
851                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
852                                    } else {
853                                        localBroker.oneway(command);
854                                    }
855                                    break;
856                                case SessionInfo.DATA_STRUCTURE_TYPE:
857                                    localBroker.oneway(command);
858                                    break;
859                                case ProducerInfo.DATA_STRUCTURE_TYPE:
860                                    // using duplexInboundLocalProducerInfo
861                                    break;
862                                case MessageAck.DATA_STRUCTURE_TYPE:
863                                    MessageAck ack = (MessageAck) command;
864                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
865                                    if (localSub != null) {
866                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
867                                        localBroker.oneway(ack);
868                                    } else {
869                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
870                                    }
871                                    break;
872                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
873                                    localStartedLatch.await();
874                                    if (started.get()) {
875                                        final ConsumerInfo consumerInfo = (ConsumerInfo) command;
876                                        if (isDuplicateSuppressionOff(consumerInfo)) {
877                                            addConsumerInfo(consumerInfo);
878                                        } else {
879                                            synchronized (brokerService.getVmConnectorURI()) {
880                                                addConsumerInfo(consumerInfo);
881                                            }
882                                        }
883                                    } else {
884                                        // received a subscription whilst stopping
885                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
886                                    }
887                                    break;
888                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
889                                    // initiator is shutting down, controlled case
890                                    // abortive close dealt with by inactivity monitor
891                                    LOG.info("Stopping network bridge on shutdown of remote broker");
892                                    serviceRemoteException(new IOException(command.toString()));
893                                    break;
894                                default:
895                                    LOG.debug("Ignoring remote command: {}", command);
896                            }
897                        }
898                    } else {
899                        switch (command.getDataStructureType()) {
900                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
901                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
902                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
903                                break;
904                            default:
905                                LOG.warn("Unexpected remote command: {}", command);
906                        }
907                    }
908                }
909            } catch (Throwable e) {
910                LOG.debug("Exception processing remote command: {}", command, e);
911                serviceRemoteException(e);
912            }
913        }
914    }
915
916    private void ackAdvisory(Message message) throws IOException {
917        demandConsumerDispatched++;
918        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
919                (configuration.getAdvisoryAckPercentage() / 100f))) {
920            final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
921            ack.setConsumerId(demandConsumerInfo.getConsumerId());
922            brokerService.getTaskRunnerFactory().execute(new Runnable() {
923                @Override
924                public void run() {
925                    try {
926                        remoteBroker.oneway(ack);
927                    } catch (IOException e) {
928                        LOG.warn("Failed to send advisory ack " + ack, e);
929                    }
930                }
931            });
932            demandConsumerDispatched = 0;
933        }
934    }
935
936    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
937        final int networkTTL = configuration.getConsumerTTL();
938        if (data.getClass() == ConsumerInfo.class) {
939            // Create a new local subscription
940            ConsumerInfo info = (ConsumerInfo) data;
941            BrokerId[] path = info.getBrokerPath();
942
943            if (info.isBrowser()) {
944                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
945                return;
946            }
947
948            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
949                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
950                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
951                });
952                return;
953            }
954
955            if (contains(path, localBrokerPath[0])) {
956                // Ignore this consumer as it's a consumer we locally sent to the broker.
957                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
958                        configuration.getBrokerName(), remoteBrokerName, info
959                });
960                return;
961            }
962
963            if (!isPermissableDestination(info.getDestination())) {
964                // ignore if not in the permitted or in the excluded list
965                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
966                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
967                });
968                return;
969            }
970
971            // in a cyclic network there can be multiple bridges per broker that can propagate
972            // a network subscription so there is a need to synchronize on a shared entity
973            // if duplicate suppression is required
974            if (isDuplicateSuppressionOff(info)) {
975                addConsumerInfo(info);
976            } else {
977                synchronized (brokerService.getVmConnectorURI()) {
978                    addConsumerInfo(info);
979                }
980            }
981        } else if (data.getClass() == DestinationInfo.class) {
982            // It's a destination info - we want to pass up information about temporary destinations
983            final DestinationInfo destInfo = (DestinationInfo) data;
984            BrokerId[] path = destInfo.getBrokerPath();
985            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
986                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
987                        configuration.getBrokerName(), destInfo, networkTTL
988                });
989                return;
990            }
991            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
992                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
993                return;
994            }
995            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
996            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
997                // re-set connection id so comes from here
998                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
999                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
1000            }
1001            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
1002            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
1003                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
1004            });
1005            if (destInfo.isRemoveOperation()) {
1006                // Serialize with removeSub operations such that all removeSub advisories
1007                // are generated
1008                serialExecutor.execute(new Runnable() {
1009                    @Override
1010                    public void run() {
1011                        try {
1012                            localBroker.oneway(destInfo);
1013                        } catch (IOException e) {
1014                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
1015                        }
1016                    }
1017                });
1018            } else {
1019                localBroker.oneway(destInfo);
1020            }
1021        } else if (data.getClass() == RemoveInfo.class) {
1022            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
1023            removeDemandSubscription(id);
1024
1025            if (forcedDurableRemoteId.remove(id)) {
1026                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
1027                    DemandSubscription ds = i.next();
1028                    boolean removed = ds.removeForcedDurableConsumer(id);
1029                    if (removed) {
1030                        cleanupDurableSub(ds, i);
1031                    }
1032                }
1033           }
1034
1035        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
1036            final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
1037            final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
1038            final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
1039                    subscriptionInfo.getSubscriptionName());
1040            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
1041                DemandSubscription ds = i.next();
1042                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
1043
1044                //If this is a proxy bridge subscription we need to try changing the clientId
1045                if (!removed && proxyBridgeSub){
1046                    subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId()));
1047                    if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
1048                        ds.getDurableRemoteSubs().remove(subscriptionInfo);
1049                        removed = true;
1050                    }
1051                }
1052
1053                if (removed) {
1054                    cleanupDurableSub(ds, i);
1055                }
1056            }
1057        }
1058    }
1059
1060    private void cleanupDurableSub(final DemandSubscription ds,
1061            Iterator<DemandSubscription> i) throws IOException {
1062
1063        if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
1064                && ds.getForcedDurableConsumersSize() == 0) {
1065            // deactivate subscriber
1066            RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
1067            localBroker.oneway(removeInfo);
1068
1069            // remove subscriber
1070            RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
1071            sending.setClientId(localClientId);
1072            sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
1073            sending.setConnectionId(this.localConnectionInfo.getConnectionId());
1074            localBroker.oneway(sending);
1075
1076            //remove subscriber from map
1077            i.remove();
1078        }
1079    }
1080
1081    @Override
1082    public void serviceLocalException(Throwable error) {
1083        serviceLocalException(null, error);
1084    }
1085
1086    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
1087        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
1088        if (!disposed.get()) {
1089            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
1090                // not a reason to terminate the bridge - temps can disappear with
1091                // pending sends as the demand sub may outlive the remote dest
1092                if (messageDispatch != null) {
1093                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
1094                    try {
1095                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
1096                        poisonAck.setPoisonCause(error);
1097                        localBroker.oneway(poisonAck);
1098                    } catch (IOException ioe) {
1099                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
1100                    }
1101                    fireFailedForwardAdvisory(messageDispatch, error);
1102                } else {
1103                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
1104                }
1105                return;
1106            }
1107
1108            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
1109            LOG.debug("The local Exception was: {}", error, error);
1110
1111            brokerService.getTaskRunnerFactory().execute(new Runnable() {
1112                @Override
1113                public void run() {
1114                    ServiceSupport.dispose(getControllingService());
1115                }
1116            });
1117            fireBridgeFailed(error);
1118        }
1119    }
1120
1121    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
1122        if (configuration.isAdvisoryForFailedForward()) {
1123            AdvisoryBroker advisoryBroker = null;
1124            try {
1125                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
1126
1127                if (advisoryBroker != null) {
1128                    ConnectionContext context = new ConnectionContext();
1129                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
1130                    context.setBroker(brokerService.getBroker());
1131
1132                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
1133                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
1134                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
1135                            advisoryMessage);
1136
1137                }
1138            } catch (Exception e) {
1139                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
1140                LOG.debug("detail", e);
1141            }
1142        }
1143    }
1144
1145    protected Service getControllingService() {
1146        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
1147    }
1148
1149    protected void addSubscription(DemandSubscription sub) throws IOException {
1150        if (sub != null) {
1151            localBroker.oneway(sub.getLocalInfo());
1152        }
1153    }
1154
1155    protected void removeSubscription(final DemandSubscription sub) throws IOException {
1156        if (sub != null) {
1157            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
1158
1159            // ensure not available for conduit subs pending removal
1160            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1161            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1162
1163            // continue removal in separate thread to free up tshis thread for outstanding responses
1164            // Serialize with removeDestination operations so that removeSubs are serialized with
1165            // removeDestinations such that all removeSub advisories are generated
1166            serialExecutor.execute(new Runnable() {
1167                @Override
1168                public void run() {
1169                    sub.waitForCompletion();
1170                    try {
1171                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
1172                    } catch (IOException e) {
1173                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
1174                    }
1175                }
1176            });
1177        }
1178    }
1179
1180    protected Message configureMessage(MessageDispatch md) throws IOException {
1181        Message message = md.getMessage().copy();
1182        // Update the packet to show where it came from.
1183        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
1184        message.setProducerId(producerInfo.getProducerId());
1185        message.setDestination(md.getDestination());
1186        message.setMemoryUsage(null);
1187        if (message.getOriginalTransactionId() == null) {
1188            message.setOriginalTransactionId(message.getTransactionId());
1189        }
1190        message.setTransactionId(null);
1191        if (configuration.isUseCompression()) {
1192            message.compress();
1193        }
1194        return message;
1195    }
1196
1197    protected void serviceLocalCommand(Command command) {
1198        if (!disposed.get()) {
1199            try {
1200                if (command.isMessageDispatch()) {
1201                    safeWaitUntilStarted();
1202                    networkBridgeStatistics.getEnqueues().increment();
1203                    final MessageDispatch md = (MessageDispatch) command;
1204                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
1205                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
1206
1207                        if (suppressMessageDispatch(md, sub)) {
1208                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
1209                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
1210                            });
1211                            // still ack as it may be durable
1212                            try {
1213                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1214                            } finally {
1215                                sub.decrementOutstandingResponses();
1216                            }
1217                            return;
1218                        }
1219
1220                        Message message = configureMessage(md);
1221                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
1222                                configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
1223                        });
1224                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
1225                            try {
1226                                // never request b/c they are eventually                     acked async
1227                                remoteBroker.oneway(message);
1228                            } finally {
1229                                sub.decrementOutstandingResponses();
1230                            }
1231                            return;
1232                        }
1233                        if (isPermissableDestination(md.getDestination())) {
1234                           if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1235
1236                              // The message was not sent using async send, so we should only
1237                              // ack the local broker when we get confirmation that the remote
1238                              // broker has received the message.
1239                              remoteBroker.asyncRequest(message, new ResponseCallback() {
1240                                 @Override
1241                                 public void onCompletion(FutureResponse future) {
1242                                    try {
1243                                       Response response = future.getResult();
1244                                       if (response.isException()) {
1245                                          ExceptionResponse er = (ExceptionResponse) response;
1246                                          serviceLocalException(md, er.getException());
1247                                       } else {
1248                                          localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1249                                          networkBridgeStatistics.getDequeues().increment();
1250                                       }
1251                                    } catch (IOException e) {
1252                                       serviceLocalException(md, e);
1253                                    } finally {
1254                                       sub.decrementOutstandingResponses();
1255                                    }
1256                                 }
1257                              });
1258
1259                           } else {
1260                              // If the message was originally sent using async send, we will
1261                              // preserve that QOS by bridging it using an async send (small chance
1262                              // of message loss).
1263                              try {
1264                                 remoteBroker.oneway(message);
1265                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1266                                 networkBridgeStatistics.getDequeues().increment();
1267                              } finally {
1268                                 sub.decrementOutstandingResponses();
1269                              }
1270                           }
1271                           serviceOutbound(message);
1272                        }
1273                    } else {
1274                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1275                    }
1276                } else if (command.isBrokerInfo()) {
1277                    futureLocalBrokerInfo.set((BrokerInfo) command);
1278                } else if (command.isShutdownInfo()) {
1279                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1280                    stop();
1281                } else if (command.getClass() == ConnectionError.class) {
1282                    ConnectionError ce = (ConnectionError) command;
1283                    serviceLocalException(ce.getException());
1284                } else {
1285                    switch (command.getDataStructureType()) {
1286                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1287                            break;
1288                        case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
1289                            break;
1290                        default:
1291                            LOG.warn("Unexpected local command: {}", command);
1292                    }
1293                }
1294            } catch (Throwable e) {
1295                LOG.warn("Caught an exception processing local command", e);
1296                serviceLocalException(e);
1297            }
1298        }
1299    }
1300
1301    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1302        boolean suppress = false;
1303        // for durable subs, suppression via filter leaves dangling acks so we
1304        // need to check here and allow the ack irrespective
1305        if (sub.getLocalInfo().isDurable()) {
1306            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1307            messageEvalContext.setMessageReference(md.getMessage());
1308            messageEvalContext.setDestination(md.getDestination());
1309            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1310            //AMQ-6465 - Need to decrement the reference count after checking matches() as
1311            //the call above will increment the reference count by 1
1312            messageEvalContext.getMessageReference().decrementReferenceCount();
1313        }
1314        return suppress;
1315    }
1316
1317    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1318        if (brokerPath != null) {
1319            for (BrokerId id : brokerPath) {
1320                if (brokerId.equals(id)) {
1321                    return true;
1322                }
1323            }
1324        }
1325        return false;
1326    }
1327
1328    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1329        if (brokerPath == null || brokerPath.length == 0) {
1330            return pathsToAppend;
1331        }
1332        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1333        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1334        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1335        return rc;
1336    }
1337
1338    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1339        if (brokerPath == null || brokerPath.length == 0) {
1340            return new BrokerId[]{idToAppend};
1341        }
1342        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1343        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1344        rc[brokerPath.length] = idToAppend;
1345        return rc;
1346    }
1347
1348    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1349        return isPermissableDestination(destination, false);
1350    }
1351
1352    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1353        // Are we not bridging temporary destinations?
1354        if (destination.isTemporary()) {
1355            if (allowTemporary) {
1356                return true;
1357            } else {
1358                return configuration.isBridgeTempDestinations();
1359            }
1360        }
1361
1362        ActiveMQDestination[] dests = excludedDestinations;
1363        if (dests != null && dests.length > 0) {
1364            for (ActiveMQDestination dest : dests) {
1365                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1366                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1367                    return false;
1368                }
1369            }
1370        }
1371
1372        dests = staticallyIncludedDestinations;
1373        if (dests != null && dests.length > 0) {
1374            for (ActiveMQDestination dest : dests) {
1375                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1376                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1377                    return true;
1378                }
1379            }
1380        }
1381
1382        dests = dynamicallyIncludedDestinations;
1383        if (dests != null && dests.length > 0) {
1384            for (ActiveMQDestination dest : dests) {
1385                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1386                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1387                    return true;
1388                }
1389            }
1390
1391            return false;
1392        }
1393
1394        return true;
1395    }
1396
1397    /**
1398     * Subscriptions for these destinations are always created
1399     */
1400    protected void setupStaticDestinations() {
1401        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1402        if (dests != null) {
1403            for (ActiveMQDestination dest : dests) {
1404                if (isPermissableDestination(dest)) {
1405                    DemandSubscription sub = createDemandSubscription(dest, null);
1406                    if (sub != null) {
1407                        sub.setStaticallyIncluded(true);
1408                        try {
1409                            addSubscription(sub);
1410                        } catch (IOException e) {
1411                            LOG.error("Failed to add static destination {}", dest, e);
1412                        }
1413                        LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1414                    } else {
1415                        LOG.info("{}, static destination excluded: {}, demand already exists", configuration.getBrokerName(), dest);
1416                    }
1417                } else {
1418                    LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
1419                }
1420            }
1421        }
1422    }
1423
1424    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1425        ConsumerInfo info = consumerInfo.copy();
1426        addRemoteBrokerToBrokerPath(info);
1427        DemandSubscription sub = createDemandSubscription(info);
1428        if (sub != null) {
1429            if (duplicateSuppressionIsRequired(sub)) {
1430                undoMapRegistration(sub);
1431            } else {
1432                if (consumerInfo.isDurable()) {
1433                    //Handle the demand generated by proxy network subscriptions
1434                    //The broker path is case is normal
1435                    if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo()) &&
1436                            info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) {
1437                        final BrokerId[] path = info.getBrokerPath();
1438                        addProxyNetworkSubscriptionBrokerPath(sub, path, consumerInfo.getSubscriptionName());
1439                    //This is the durable sync case on broker restart
1440                    } else if (isProxyNSConsumerClientId(sub.getRemoteInfo().getClientId()) &&
1441                            isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
1442                                addProxyNetworkSubscriptionClientId(sub, sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName());
1443                        } else {
1444                                sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1445                        }
1446                }
1447                addSubscription(sub);
1448                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1449            }
1450        }
1451    }
1452
1453    private void undoMapRegistration(DemandSubscription sub) {
1454        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1455        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1456    }
1457
1458    /*
1459     * check our existing subs networkConsumerIds against the list of network
1460     * ids in this subscription A match means a duplicate which we suppress for
1461     * topics and maybe for queues
1462     */
1463    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1464        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1465        boolean suppress = false;
1466
1467        if (isDuplicateSuppressionOff(consumerInfo)) {
1468            return suppress;
1469        }
1470
1471        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1472        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1473        for (Subscription sub : currentSubs) {
1474            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1475            if (!networkConsumers.isEmpty()) {
1476                if (matchFound(candidateConsumers, networkConsumers)) {
1477                    if (isInActiveDurableSub(sub)) {
1478                        suppress = false;
1479                    } else {
1480                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1481                    }
1482                    break;
1483                }
1484            }
1485        }
1486        return suppress;
1487    }
1488
1489    private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) {
1490        return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions()
1491                || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
1492                || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions();
1493    }
1494
1495    private boolean isInActiveDurableSub(Subscription sub) {
1496        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1497    }
1498
1499    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1500        boolean suppress = false;
1501
1502        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1503            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1504                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1505            });
1506            suppress = true;
1507        } else {
1508            // remove the existing lower priority duplicate and allow this candidate
1509            try {
1510                removeDuplicateSubscription(existingSub);
1511
1512                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1513                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1514                });
1515            } catch (IOException e) {
1516                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1517            }
1518        }
1519        return suppress;
1520    }
1521
1522    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1523        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1524            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1525                break;
1526            }
1527        }
1528    }
1529
1530    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1531        boolean found = false;
1532        for (ConsumerId aliasConsumer : networkConsumers) {
1533            if (candidateConsumers.contains(aliasConsumer)) {
1534                found = true;
1535                break;
1536            }
1537        }
1538        return found;
1539    }
1540
1541    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1542        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1543        Region region;
1544        Collection<Subscription> subs;
1545
1546        region = null;
1547        switch (dest.getDestinationType()) {
1548            case ActiveMQDestination.QUEUE_TYPE:
1549                region = region_broker.getQueueRegion();
1550                break;
1551            case ActiveMQDestination.TOPIC_TYPE:
1552                region = region_broker.getTopicRegion();
1553                break;
1554            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1555                region = region_broker.getTempQueueRegion();
1556                break;
1557            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1558                region = region_broker.getTempTopicRegion();
1559                break;
1560        }
1561
1562        if (region instanceof AbstractRegion) {
1563            subs = ((AbstractRegion) region).getSubscriptions().values();
1564        } else {
1565            subs = null;
1566        }
1567
1568        return subs;
1569    }
1570
1571    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1572        // add our original id to ourselves
1573        info.addNetworkConsumerId(info.getConsumerId());
1574        return doCreateDemandSubscription(info);
1575    }
1576
1577    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1578        DemandSubscription result = new DemandSubscription(info);
1579        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1580        if (info.getDestination().isTemporary()) {
1581            // reset the local connection Id
1582            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1583            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1584        }
1585
1586        if (configuration.isDecreaseNetworkConsumerPriority()) {
1587            byte priority = (byte) configuration.getConsumerPriorityBase();
1588            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1589                // The longer the path to the consumer, the less it's consumer priority.
1590                priority -= info.getBrokerPath().length + 1;
1591            }
1592            result.getLocalInfo().setPriority(priority);
1593            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1594        }
1595        configureDemandSubscription(info, result);
1596        return result;
1597    }
1598
1599    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) {
1600        ConsumerInfo info = new ConsumerInfo();
1601        info.setNetworkSubscription(true);
1602        info.setDestination(destination);
1603
1604        if (subscriptionName != null) {
1605            info.setSubscriptionName(subscriptionName);
1606        }
1607
1608        // Indicate that this subscription is being made on behalf of the remote broker.
1609        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1610
1611        // the remote info held by the DemandSubscription holds the original
1612        // consumerId, the local info get's overwritten
1613        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1614        DemandSubscription result = null;
1615        try {
1616            result = createDemandSubscription(info);
1617        } catch (IOException e) {
1618            LOG.error("Failed to create DemandSubscription ", e);
1619        }
1620        return result;
1621    }
1622
1623    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1624        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1625                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1626            sub.getLocalInfo().setDispatchAsync(true);
1627        } else {
1628            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1629        }
1630        configureConsumerPrefetch(sub.getLocalInfo());
1631        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1632        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1633
1634        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1635        if (!info.isDurable()) {
1636            // This works for now since we use a VM connection to the local broker.
1637            // may need to change if we ever subscribe to a remote broker.
1638            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1639        } else {
1640            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1641        }
1642    }
1643
1644    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1645        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1646        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1647                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1648        });
1649        if (sub != null) {
1650            removeSubscription(sub);
1651            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1652                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1653            });
1654        }
1655    }
1656
1657    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1658        boolean removeDone = false;
1659        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1660        if (sub != null) {
1661            try {
1662                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1663                removeDone = true;
1664            } catch (IOException e) {
1665                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1666            }
1667        }
1668        return removeDone;
1669    }
1670
1671    /**
1672     * Performs a timed wait on the started latch and then checks for disposed
1673     * before performing another wait each time the the started wait times out.
1674     */
1675    protected boolean safeWaitUntilStarted() throws InterruptedException {
1676        while (!disposed.get()) {
1677            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1678                break;
1679            }
1680        }
1681        return !disposed.get();
1682    }
1683
1684    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1685        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1686        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1687            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1688            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1689                filterFactory = entry.getNetworkBridgeFilterFactory();
1690            }
1691        }
1692        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1693    }
1694
1695    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1696        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1697    }
1698
1699    protected BrokerId[] getRemoteBrokerPath() {
1700        return remoteBrokerPath;
1701    }
1702
1703    @Override
1704    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1705        this.networkBridgeListener = listener;
1706    }
1707
1708    private void fireBridgeFailed(Throwable reason) {
1709        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1710        NetworkBridgeListener l = this.networkBridgeListener;
1711        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1712            l.bridgeFailed();
1713        }
1714    }
1715
1716    /**
1717     * @return Returns the dynamicallyIncludedDestinations.
1718     */
1719    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1720        return dynamicallyIncludedDestinations;
1721    }
1722
1723    /**
1724     * @param dynamicallyIncludedDestinations
1725     *         The dynamicallyIncludedDestinations to set.
1726     */
1727    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1728        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1729    }
1730
1731    /**
1732     * @return Returns the excludedDestinations.
1733     */
1734    public ActiveMQDestination[] getExcludedDestinations() {
1735        return excludedDestinations;
1736    }
1737
1738    /**
1739     * @param excludedDestinations The excludedDestinations to set.
1740     */
1741    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1742        this.excludedDestinations = excludedDestinations;
1743    }
1744
1745    /**
1746     * @return Returns the staticallyIncludedDestinations.
1747     */
1748    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1749        return staticallyIncludedDestinations;
1750    }
1751
1752    /**
1753     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1754     */
1755    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1756        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1757    }
1758
1759    /**
1760     * @return Returns the durableDestinations.
1761     */
1762    public ActiveMQDestination[] getDurableDestinations() {
1763        return durableDestinations;
1764    }
1765
1766    /**
1767     * @param durableDestinations The durableDestinations to set.
1768     */
1769    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1770        this.durableDestinations = durableDestinations;
1771    }
1772
1773    /**
1774     * @return Returns the localBroker.
1775     */
1776    public Transport getLocalBroker() {
1777        return localBroker;
1778    }
1779
1780    /**
1781     * @return Returns the remoteBroker.
1782     */
1783    public Transport getRemoteBroker() {
1784        return remoteBroker;
1785    }
1786
1787    /**
1788     * @return the createdByDuplex
1789     */
1790    public boolean isCreatedByDuplex() {
1791        return this.createdByDuplex;
1792    }
1793
1794    /**
1795     * @param createdByDuplex the createdByDuplex to set
1796     */
1797    public void setCreatedByDuplex(boolean createdByDuplex) {
1798        this.createdByDuplex = createdByDuplex;
1799    }
1800
1801    @Override
1802    public String getRemoteAddress() {
1803        return remoteBroker.getRemoteAddress();
1804    }
1805
1806    @Override
1807    public String getLocalAddress() {
1808        return localBroker.getRemoteAddress();
1809    }
1810
1811    @Override
1812    public String getRemoteBrokerName() {
1813        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1814    }
1815
1816    @Override
1817    public String getRemoteBrokerId() {
1818        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1819    }
1820
1821    @Override
1822    public String getLocalBrokerName() {
1823        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1824    }
1825
1826    @Override
1827    public long getDequeueCounter() {
1828        return networkBridgeStatistics.getDequeues().getCount();
1829    }
1830
1831    @Override
1832    public long getEnqueueCounter() {
1833        return networkBridgeStatistics.getEnqueues().getCount();
1834    }
1835
1836    @Override
1837    public NetworkBridgeStatistics getNetworkBridgeStatistics() {
1838        return networkBridgeStatistics;
1839    }
1840
1841    protected boolean isDuplex() {
1842        return configuration.isDuplex() || createdByDuplex;
1843    }
1844
1845    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1846        return subscriptionMapByRemoteId;
1847    }
1848
1849    @Override
1850    public void setBrokerService(BrokerService brokerService) {
1851        this.brokerService = brokerService;
1852        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1853        localBrokerPath[0] = localBrokerId;
1854    }
1855
1856    @Override
1857    public void setMbeanObjectName(ObjectName objectName) {
1858        this.mbeanObjectName = objectName;
1859    }
1860
1861    @Override
1862    public ObjectName getMbeanObjectName() {
1863        return mbeanObjectName;
1864    }
1865
1866    @Override
1867    public void resetStats() {
1868        networkBridgeStatistics.reset();
1869    }
1870
1871    /*
1872     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1873     * remote sides of the network bridge.
1874     */
1875    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1876
1877        private final CountDownLatch slot = new CountDownLatch(1);
1878        private final AtomicBoolean disposed;
1879        private volatile BrokerInfo info = null;
1880
1881        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1882            this.info = info;
1883            this.disposed = disposed;
1884        }
1885
1886        @Override
1887        public boolean cancel(boolean mayInterruptIfRunning) {
1888            slot.countDown();
1889            return true;
1890        }
1891
1892        @Override
1893        public boolean isCancelled() {
1894            return slot.getCount() == 0 && info == null;
1895        }
1896
1897        @Override
1898        public boolean isDone() {
1899            return info != null;
1900        }
1901
1902        @Override
1903        public BrokerInfo get() throws InterruptedException, ExecutionException {
1904            try {
1905                if (info == null) {
1906                    while (!disposed.get()) {
1907                        if (slot.await(1, TimeUnit.SECONDS)) {
1908                            break;
1909                        }
1910                    }
1911                }
1912                return info;
1913            } catch (InterruptedException e) {
1914                Thread.currentThread().interrupt();
1915                LOG.debug("Operation interrupted: {}", e, e);
1916                throw new InterruptedException("Interrupted.");
1917            }
1918        }
1919
1920        @Override
1921        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1922            try {
1923                if (info == null) {
1924                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1925
1926                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1927                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1928                            break;
1929                        }
1930                    }
1931                    if (info == null) {
1932                        throw new TimeoutException();
1933                    }
1934                }
1935                return info;
1936            } catch (InterruptedException e) {
1937                throw new InterruptedException("Interrupted.");
1938            }
1939        }
1940
1941        public void set(BrokerInfo info) {
1942            this.info = info;
1943            this.slot.countDown();
1944        }
1945    }
1946
1947    protected void serviceOutbound(Message message) {
1948        NetworkBridgeListener l = this.networkBridgeListener;
1949        if (l != null) {
1950            l.onOutboundMessage(this, message);
1951        }
1952    }
1953
1954    protected void serviceInboundMessage(Message message) {
1955        NetworkBridgeListener l = this.networkBridgeListener;
1956        if (l != null) {
1957            l.onInboundMessage(this, message);
1958        }
1959    }
1960
1961    protected boolean canDuplexDispatch(Message message) {
1962        boolean result = true;
1963        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1964            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1965            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1966            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1967            if (producerSequenceId <= lastStoredForMessageProducer) {
1968                result = false;
1969                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1970                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1971                });
1972            }
1973        }
1974        return result;
1975    }
1976
1977    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1978        try {
1979            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1980        } catch (IOException ignored) {
1981            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1982        }
1983        return -1;
1984    }
1985
1986    protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
1987        //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
1988        //set then use it, else default to the prefetchSize setting
1989        if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
1990                configuration.getAdvisoryPrefetchSize() > 0) {
1991            consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
1992        } else {
1993            consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
1994        }
1995    }
1996
1997}