001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import org.apache.activemq.broker.BrokerService;
020import org.apache.activemq.broker.jmx.AnnotatedMBean;
021import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
022import org.apache.activemq.broker.jmx.NetworkBridgeView;
023import org.apache.activemq.broker.jmx.NetworkDestinationView;
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.Message;
026import org.apache.activemq.thread.Scheduler;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import javax.management.ObjectName;
031import java.util.Iterator;
032import java.util.Map;
033import java.util.concurrent.ConcurrentHashMap;
034
035public class MBeanBridgeDestination {
036    private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
037    private final BrokerService brokerService;
038    private final NetworkBridge bridge;
039    private final NetworkBridgeView networkBridgeView;
040    private final NetworkBridgeConfiguration networkBridgeConfiguration;
041    private final Scheduler scheduler;
042    private final Runnable purgeInactiveDestinationViewTask;
043    private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap = new ConcurrentHashMap<>();
044    private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap = new ConcurrentHashMap<>();
045
046    public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
047        this.brokerService = brokerService;
048        this.networkBridgeConfiguration = networkBridgeConfiguration;
049        this.bridge = bridge;
050        this.networkBridgeView = networkBridgeView;
051        this.scheduler = brokerService.getScheduler();
052        purgeInactiveDestinationViewTask = new Runnable() {
053            public void run() {
054                purgeInactiveDestinationViews();
055            }
056        };
057    }
058
059
060    public void onOutboundMessage(Message message) {
061        ActiveMQDestination destination = message.getDestination();
062        NetworkDestinationContainer networkDestinationContainer;
063
064        if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) {
065            ObjectName bridgeObjectName = bridge.getMbeanObjectName();
066            try {
067                ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
068                NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
069                AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
070
071                networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
072                outboundDestinationViewMap.put(destination, networkDestinationContainer);
073                networkDestinationView.messageSent();
074            } catch (Exception e) {
075                LOG.warn("Failed to register " + destination, e);
076            }
077        } else {
078            networkDestinationContainer.view.messageSent();
079        }
080    }
081
082
083    public void onInboundMessage(Message message) {
084        ActiveMQDestination destination = message.getDestination();
085        NetworkDestinationContainer networkDestinationContainer;
086
087        if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) {
088            ObjectName bridgeObjectName = bridge.getMbeanObjectName();
089            try {
090                ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
091                NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
092                AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
093
094                networkBridgeView.addNetworkDestinationView(networkDestinationView);
095                networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
096                inboundDestinationViewMap.put(destination, networkDestinationContainer);
097                networkDestinationView.messageSent();
098            } catch (Exception e) {
099                LOG.warn("Failed to register " + destination, e);
100            }
101        } else {
102            networkDestinationContainer.view.messageSent();
103        }
104    }
105
106    public void start() {
107        if (networkBridgeConfiguration.isGcDestinationViews()) {
108            long period = networkBridgeConfiguration.getGcSweepTime();
109            if (period > 0) {
110                scheduler.executePeriodically(purgeInactiveDestinationViewTask, period);
111            }
112        }
113    }
114
115    public void stop() {
116        if (!brokerService.isUseJmx()) {
117            return;
118        }
119
120        scheduler.cancel(purgeInactiveDestinationViewTask);
121        for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) {
122            try {
123                brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
124            } catch (Exception e) {
125                LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
126            }
127        }
128        for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) {
129            try {
130                brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
131            } catch (Exception e) {
132                LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
133            }
134        }
135        inboundDestinationViewMap.clear();
136        outboundDestinationViewMap.clear();
137    }
138
139    private void purgeInactiveDestinationViews() {
140        if (!brokerService.isUseJmx()) {
141            return;
142        }
143        purgeInactiveDestinationView(inboundDestinationViewMap);
144        purgeInactiveDestinationView(outboundDestinationViewMap);
145    }
146
147    private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer> map) {
148        long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
149        for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>> it = map.entrySet().iterator(); it.hasNext(); ) {
150            Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next();
151            if (entry.getValue().view.getLastAccessTime() <= time) {
152                ObjectName objectName = entry.getValue().objectName;
153                if (objectName != null) {
154                    try {
155                        brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName);
156                    } catch (Throwable e) {
157                        LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
158                    }
159                }
160                entry.getValue().view.close();
161                it.remove();
162            }
163        }
164    }
165
166    private static class NetworkDestinationContainer {
167        private final NetworkDestinationView view;
168        private final ObjectName objectName;
169
170        private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) {
171            this.view = view;
172            this.objectName = objectName;
173        }
174    }
175}