001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.util.List;
020    import org.apache.activemq.broker.region.Subscription;
021    import org.apache.activemq.command.BrokerId;
022    import org.apache.activemq.command.ConsumerInfo;
023    import org.apache.activemq.command.Message;
024    import org.apache.activemq.command.NetworkBridgeFilter;
025    import org.apache.activemq.filter.MessageEvaluationContext;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * implement conditional behaviour for queue consumers,
031     * allows replaying back to origin if no consumers are present on the local broker
032     * after a configurable delay, irrespective of the networkTTL
033     * Also allows rate limiting of messages through the network, useful for static includes
034     *
035     *  @org.apache.xbean.XBean
036     */
037    
038    public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
039        boolean replayWhenNoConsumers = false;
040        int replayDelay = 0;
041        int rateLimit = 0;
042        int rateDuration = 1000;
043    
044        @Override
045        public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
046            ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
047            filter.setNetworkBrokerId(remoteBrokerPath[0]);
048            filter.setNetworkTTL(networkTimeToLive);
049            filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
050            filter.setRateLimit(getRateLimit());
051            filter.setRateDuration(getRateDuration());
052            filter.setReplayDelay(getReplayDelay());
053            return filter;
054        }
055    
056        public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) {
057            this.replayWhenNoConsumers = replayWhenNoConsumers;
058        }
059    
060        public boolean isReplayWhenNoConsumers() {
061            return replayWhenNoConsumers;
062        }
063    
064        public void setRateLimit(int rateLimit) {
065            this.rateLimit = rateLimit;
066        }
067    
068        public int getRateLimit() {
069            return rateLimit;
070        }
071    
072        public int getRateDuration() {
073            return rateDuration;
074        }
075    
076        public void setRateDuration(int rateDuration) {
077            this.rateDuration = rateDuration;
078        }
079    
080        public int getReplayDelay() {
081            return replayDelay;
082        }
083    
084        public void setReplayDelay(int replayDelay) {
085            this.replayDelay = replayDelay;
086        }
087    
088        private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
089            final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
090            private int rateLimit;
091            private int rateDuration = 1000;
092            private boolean allowReplayWhenNoConsumers = true;
093            private int replayDelay = 1000;
094    
095            private int matchCount;
096            private long rateDurationEnd;
097    
098            @Override
099            protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
100                boolean match = true;
101                if (mec.getDestination().isQueue()) {
102                    if (contains(message.getBrokerPath(), networkBrokerId)) {
103                        // potential replay back to origin
104                        match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
105    
106                        if (match && LOG.isTraceEnabled()) {
107                            LOG.trace("Replaying  [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer");
108                        }
109                    }
110    
111                    if (match && rateLimitExceeded()) {
112                        if (LOG.isTraceEnabled()) {
113                            LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit  + "/" + rateDuration);
114                        }
115                        match = false;
116                    }
117    
118                } else {
119                    // use existing logic for topics
120                    match = super.matchesForwardingFilter(message, mec);
121                }
122    
123                return match;
124            }
125    
126            private boolean hasNotJustArrived(Message message) {
127                return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
128            }
129    
130            private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
131                List<Subscription> consumers = mec.getMessageReference().getRegionDestination().getConsumers();
132                for (Subscription sub : consumers) {
133                    if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
134                        if (LOG.isTraceEnabled()) {
135                            LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo());
136                        }
137                        return false;
138                    }
139                }
140                return true;
141            }
142    
143            private boolean rateLimitExceeded() {
144                if (rateLimit == 0) {
145                    return false;
146                }
147    
148                if (rateDurationEnd < System.currentTimeMillis()) {
149                    rateDurationEnd = System.currentTimeMillis() + rateDuration;
150                    matchCount = 0;
151                }
152                return ++matchCount > rateLimit;
153            }
154    
155            public void setReplayDelay(int replayDelay) {
156                this.replayDelay = replayDelay;
157            }
158    
159            public void setRateLimit(int rateLimit) {
160                this.rateLimit = rateLimit;
161            }
162    
163            public void setRateDuration(int rateDuration) {
164                this.rateDuration = rateDuration;
165            }
166    
167            public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
168                this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
169            }
170        }
171    }