001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.mqtt;
019    
020    import java.io.IOException;
021    import java.util.Timer;
022    import java.util.concurrent.SynchronousQueue;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicInteger;
028    import java.util.concurrent.locks.ReentrantReadWriteLock;
029    
030    import org.apache.activemq.command.KeepAliveInfo;
031    import org.apache.activemq.thread.SchedulerTimerTask;
032    import org.apache.activemq.transport.AbstractInactivityMonitor;
033    import org.apache.activemq.transport.InactivityIOException;
034    import org.apache.activemq.transport.Transport;
035    import org.apache.activemq.transport.TransportFilter;
036    import org.apache.activemq.wireformat.WireFormat;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    public class MQTTInactivityMonitor extends TransportFilter {
041    
042        private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
043    
044        private static ThreadPoolExecutor ASYNC_TASKS;
045        private static int CHECKER_COUNTER;
046        private static long DEFAULT_CHECK_TIME_MILLS = 30000;
047        private static Timer READ_CHECK_TIMER;
048    
049        private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
050    
051        private final AtomicBoolean commandSent = new AtomicBoolean(false);
052        private final AtomicBoolean inSend = new AtomicBoolean(false);
053        private final AtomicBoolean failed = new AtomicBoolean(false);
054    
055        private final AtomicBoolean commandReceived = new AtomicBoolean(true);
056        private final AtomicBoolean inReceive = new AtomicBoolean(false);
057        private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
058    
059        private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
060        private SchedulerTimerTask readCheckerTask;
061    
062        private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
063        private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
064        private boolean keepAliveResponseRequired;
065        private MQTTProtocolConverter protocolConverter;
066    
067    
068        private final Runnable readChecker = new Runnable() {
069            long lastRunTime;
070    
071            public void run() {
072                long now = System.currentTimeMillis();
073                long elapsed = (now - lastRunTime);
074    
075                if (lastRunTime != 0 && LOG.isDebugEnabled()) {
076                    LOG.debug("" + elapsed + " ms elapsed since last read check.");
077                }
078    
079                // Perhaps the timer executed a read check late.. and then executes
080                // the next read check on time which causes the time elapsed between
081                // read checks to be small..
082    
083                // If less than 90% of the read check Time elapsed then abort this readcheck.
084                if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
085                    LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
086                    return;
087                }
088    
089                lastRunTime = now;
090                readCheck();
091            }
092        };
093    
094        private boolean allowReadCheck(long elapsed) {
095            return elapsed > (readCheckTime * 9 / 10);
096        }
097    
098    
099        public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
100            super(next);
101        }
102    
103        public void start() throws Exception {
104            next.start();
105            startMonitorThread();
106        }
107    
108        public void stop() throws Exception {
109            stopMonitorThread();
110            next.stop();
111        }
112    
113    
114        final void readCheck() {
115            int currentCounter = next.getReceiveCounter();
116            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
117            if (inReceive.get() || currentCounter != previousCounter) {
118                if (LOG.isTraceEnabled()) {
119                    LOG.trace("A receive is in progress");
120                }
121                return;
122            }
123            if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
124                if (LOG.isDebugEnabled()) {
125                    LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
126                }
127                ASYNC_TASKS.execute(new Runnable() {
128                    public void run() {
129                        if (protocolConverter != null) {
130                            protocolConverter.onTransportError();
131                        }
132                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
133                    }
134    
135                    ;
136                });
137            } else {
138                if (LOG.isTraceEnabled()) {
139                    LOG.trace("Message received since last read check, resetting flag: ");
140                }
141            }
142            commandReceived.set(false);
143        }
144    
145    
146        public void onCommand(Object command) {
147            commandReceived.set(true);
148            inReceive.set(true);
149            try {
150                if (command.getClass() == KeepAliveInfo.class) {
151                    KeepAliveInfo info = (KeepAliveInfo) command;
152                    if (info.isResponseRequired()) {
153                        sendLock.readLock().lock();
154                        try {
155                            info.setResponseRequired(false);
156                            oneway(info);
157                        } catch (IOException e) {
158                            onException(e);
159                        } finally {
160                            sendLock.readLock().unlock();
161                        }
162                    }
163                } else {
164                    transportListener.onCommand(command);
165                }
166            } finally {
167                inReceive.set(false);
168            }
169        }
170    
171        public void oneway(Object o) throws IOException {
172            // To prevent the inactivity monitor from sending a message while we
173            // are performing a send we take a read lock.  The inactivity monitor
174            // sends its Heart-beat commands under a write lock.  This means that
175            // the MutexTransport is still responsible for synchronizing sends
176            this.sendLock.readLock().lock();
177            inSend.set(true);
178            try {
179                doOnewaySend(o);
180            } finally {
181                commandSent.set(true);
182                inSend.set(false);
183                this.sendLock.readLock().unlock();
184            }
185        }
186    
187        // Must be called under lock, either read or write on sendLock.
188        private void doOnewaySend(Object command) throws IOException {
189            if (failed.get()) {
190                throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
191            }
192            next.oneway(command);
193        }
194    
195        public void onException(IOException error) {
196            if (failed.compareAndSet(false, true)) {
197                stopMonitorThread();
198                transportListener.onException(error);
199            }
200        }
201    
202    
203        public long getReadCheckTime() {
204            return readCheckTime;
205        }
206    
207        public void setReadCheckTime(long readCheckTime) {
208            this.readCheckTime = readCheckTime;
209        }
210    
211    
212        public long getInitialDelayTime() {
213            return initialDelayTime;
214        }
215    
216        public void setInitialDelayTime(long initialDelayTime) {
217            this.initialDelayTime = initialDelayTime;
218        }
219    
220        public boolean isKeepAliveResponseRequired() {
221            return this.keepAliveResponseRequired;
222        }
223    
224        public void setKeepAliveResponseRequired(boolean value) {
225            this.keepAliveResponseRequired = value;
226        }
227    
228        public boolean isMonitorStarted() {
229            return this.monitorStarted.get();
230        }
231    
232        public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
233            this.protocolConverter = protocolConverter;
234        }
235    
236        public MQTTProtocolConverter getProtocolConverter() {
237            return protocolConverter;
238        }
239    
240        synchronized void startMonitorThread() {
241            if (monitorStarted.get()) {
242                return;
243            }
244    
245    
246            if (readCheckTime > 0) {
247                readCheckerTask = new SchedulerTimerTask(readChecker);
248            }
249    
250    
251            if (readCheckTime > 0) {
252                monitorStarted.set(true);
253                synchronized (AbstractInactivityMonitor.class) {
254                    if (CHECKER_COUNTER == 0) {
255                        ASYNC_TASKS = createExecutor();
256                        READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
257                    }
258                    CHECKER_COUNTER++;
259                    if (readCheckTime > 0) {
260                        READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
261                    }
262                }
263            }
264        }
265    
266    
267        synchronized void stopMonitorThread() {
268            if (monitorStarted.compareAndSet(true, false)) {
269                if (readCheckerTask != null) {
270                    readCheckerTask.cancel();
271                }
272    
273                synchronized (AbstractInactivityMonitor.class) {
274                    READ_CHECK_TIMER.purge();
275                    CHECKER_COUNTER--;
276                    if (CHECKER_COUNTER == 0) {
277                        READ_CHECK_TIMER.cancel();
278                        READ_CHECK_TIMER = null;
279                        ASYNC_TASKS.shutdown();
280                        ASYNC_TASKS = null;
281                    }
282                }
283            }
284        }
285    
286        private ThreadFactory factory = new ThreadFactory() {
287            public Thread newThread(Runnable runnable) {
288                Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
289                thread.setDaemon(true);
290                return thread;
291            }
292        };
293    
294        private ThreadPoolExecutor createExecutor() {
295            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
296            exec.allowCoreThreadTimeOut(true);
297            return exec;
298        }
299    }
300