001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.util.Map; 020import java.util.Set; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.CopyOnWriteArraySet; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.apache.activemq.command.ConsumerId; 028import org.apache.activemq.command.ConsumerInfo; 029import org.apache.activemq.command.NetworkBridgeFilter; 030import org.apache.activemq.command.SubscriptionInfo; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Represents a network bridge interface 036 */ 037public class DemandSubscription { 038 private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class); 039 040 private final ConsumerInfo remoteInfo; 041 private final ConsumerInfo localInfo; 042 private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>(); 043 private final AtomicInteger dispatched = new AtomicInteger(0); 044 private final AtomicBoolean activeWaiter = new AtomicBoolean(); 045 private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>(); 046 private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>(); 047 private SubscriptionInfo localDurableSubscriber; 048 049 private NetworkBridgeFilter networkBridgeFilter; 050 private boolean staticallyIncluded; 051 052 DemandSubscription(ConsumerInfo info) { 053 remoteInfo = info; 054 localInfo = info.copy(); 055 localInfo.setNetworkSubscription(true); 056 remoteSubsIds.add(info.getConsumerId()); 057 } 058 059 @Override 060 public String toString() { 061 return "DemandSub{" + localInfo.getConsumerId() + ",remotes:" + remoteSubsIds + "}"; 062 } 063 064 /** 065 * Increment the consumers associated with this subscription 066 * 067 * @param id 068 * @return true if added 069 */ 070 public boolean add(ConsumerId id) { 071 return remoteSubsIds.add(id); 072 } 073 074 /** 075 * Increment the consumers associated with this subscription 076 * 077 * @param id 078 * @return true if removed 079 */ 080 public boolean remove(ConsumerId id) { 081 return remoteSubsIds.remove(id); 082 } 083 084 public Set<SubscriptionInfo> getDurableRemoteSubs() { 085 return durableRemoteSubs; 086 } 087 088 /** 089 * @return true if there are no interested consumers 090 */ 091 public boolean isEmpty() { 092 return remoteSubsIds.isEmpty(); 093 } 094 095 public int size() { 096 return remoteSubsIds.size(); 097 } 098 /** 099 * @return Returns the localInfo. 100 */ 101 public ConsumerInfo getLocalInfo() { 102 return localInfo; 103 } 104 105 /** 106 * @return Returns the remoteInfo. 107 */ 108 public ConsumerInfo getRemoteInfo() { 109 return remoteInfo; 110 } 111 112 public boolean addForcedDurableConsumer(ConsumerId id) { 113 return forcedDurableConsumers.add(id); 114 } 115 116 public boolean removeForcedDurableConsumer(ConsumerId id) { 117 return forcedDurableConsumers.remove(id); 118 } 119 120 public int getForcedDurableConsumersSize() { 121 return forcedDurableConsumers.size(); 122 } 123 124 public void waitForCompletion() { 125 if (dispatched.get() > 0) { 126 LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get()); 127 activeWaiter.set(true); 128 if (dispatched.get() > 0) { 129 synchronized (activeWaiter) { 130 try { 131 activeWaiter.wait(TimeUnit.SECONDS.toMillis(30)); 132 } catch (InterruptedException ignored) { 133 } 134 } 135 if (this.dispatched.get() > 0) { 136 LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially {} duplicate forwards", this.dispatched.get()); 137 } 138 } 139 } 140 } 141 142 public void decrementOutstandingResponses() { 143 if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) { 144 synchronized (activeWaiter) { 145 activeWaiter.notifyAll(); 146 } 147 } 148 } 149 150 public boolean incrementOutstandingResponses() { 151 dispatched.incrementAndGet(); 152 if (activeWaiter.get()) { 153 decrementOutstandingResponses(); 154 return false; 155 } 156 return true; 157 } 158 159 public NetworkBridgeFilter getNetworkBridgeFilter() { 160 return networkBridgeFilter; 161 } 162 163 public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) { 164 this.networkBridgeFilter = networkBridgeFilter; 165 } 166 167 public SubscriptionInfo getLocalDurableSubscriber() { 168 return localDurableSubscriber; 169 } 170 171 public void setLocalDurableSubscriber(SubscriptionInfo localDurableSubscriber) { 172 this.localDurableSubscriber = localDurableSubscriber; 173 } 174 175 public boolean isStaticallyIncluded() { 176 return staticallyIncluded; 177 } 178 179 public void setStaticallyIncluded(boolean staticallyIncluded) { 180 this.staticallyIncluded = staticallyIncluded; 181 } 182}