001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.HashMap;
020import java.util.HashSet;
021import java.util.Map;
022import java.util.Set;
023import java.util.Timer;
024import java.util.TimerTask;
025
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.thread.TaskRunnerFactory;
029import org.apache.activemq.usage.SystemUsage;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * 
035 */
036public abstract class AbstractTempRegion extends AbstractRegion {
037    private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class);
038
039    private Map<CachedDestination, Destination> cachedDestinations = new HashMap<CachedDestination, Destination>();
040    private final boolean doCacheTempDestinations;
041    private final int purgeTime;
042    private Timer purgeTimer;
043    private TimerTask purgeTask;
044   
045
046    /**
047     * @param broker
048     * @param destinationStatistics
049     * @param memoryManager
050     * @param taskRunnerFactory
051     * @param destinationFactory
052     */
053    public AbstractTempRegion(RegionBroker broker,
054            DestinationStatistics destinationStatistics,
055            SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
056            DestinationFactory destinationFactory) {
057        super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
058                destinationFactory);
059        this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations();
060        this.purgeTime = broker.getBrokerService().getTimeBeforePurgeTempDestinations();
061        if (this.doCacheTempDestinations) {
062            this.purgeTimer = new Timer("ActiveMQ Temp destination purge timer", true);
063            this.purgeTask = new TimerTask() {
064                public void run() {
065                    doPurge();
066                }
067    
068            };
069            this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime);
070        }
071       
072    }
073
074    public void stop() throws Exception {
075        super.stop();
076        if (purgeTimer != null) {
077            purgeTimer.cancel();
078        }
079    }
080
081    protected synchronized Destination createDestination(
082            ConnectionContext context, ActiveMQDestination destination)
083            throws Exception {
084        Destination result = cachedDestinations.remove(new CachedDestination(
085                destination));
086        if (result == null) {
087            result =  destinationFactory.createDestination(context, destination, destinationStatistics);
088        }
089        return result;
090    }
091
092    protected final synchronized void dispose(ConnectionContext context,
093            Destination dest) throws Exception {
094        // add to cache
095        if (this.doCacheTempDestinations) {
096            cachedDestinations.put(new CachedDestination(dest
097                    .getActiveMQDestination()), dest);
098        }else {
099            try {
100                dest.dispose(context);
101                dest.stop();
102            } catch (Exception e) {
103                LOG.warn("Failed to dispose of {}", dest, e);
104            }
105        }
106    }
107
108    private void doDispose(Destination dest) {
109        ConnectionContext context = new ConnectionContext();
110        try {
111            dest.dispose(context);
112            dest.stop();
113        } catch (Exception e) {
114            LOG.warn("Failed to dispose of {}", dest, e);
115        }
116
117    }
118
119    private synchronized void doPurge() {
120        long currentTime = System.currentTimeMillis();
121        if (cachedDestinations.size() > 0) {
122            Set<CachedDestination> tmp = new HashSet<CachedDestination>(
123                    cachedDestinations.keySet());
124            for (CachedDestination key : tmp) {
125                if ((key.timeStamp + purgeTime) < currentTime) {
126                    Destination dest = cachedDestinations.remove(key);
127                    if (dest != null) {
128                        doDispose(dest);
129                    }
130                }
131            }
132        }
133    }
134
135    static class CachedDestination {
136        long timeStamp;
137
138        ActiveMQDestination destination;
139
140        CachedDestination(ActiveMQDestination destination) {
141            this.destination = destination;
142            this.timeStamp = System.currentTimeMillis();
143        }
144
145        public int hashCode() {
146            return destination.hashCode();
147        }
148
149        public boolean equals(Object o) {
150            if (o instanceof CachedDestination) {
151                CachedDestination other = (CachedDestination) o;
152                return other.destination.equals(this.destination);
153            }
154            return false;
155        }
156
157    }
158
159}