001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.util; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Map; 022import java.util.concurrent.TimeUnit; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.broker.SuppressReplyException; 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.Queue; 029import org.apache.activemq.broker.region.RegionBroker; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * @org.apache.xbean.XBean 036 */ 037 public class DefaultIOExceptionHandler implements IOExceptionHandler { 038 039 private static final Logger LOG = LoggerFactory 040 .getLogger(DefaultIOExceptionHandler.class); 041 protected BrokerService broker; 042 private boolean ignoreAllErrors = false; 043 private boolean ignoreNoSpaceErrors = true; 044 private boolean ignoreSQLExceptions = true; 045 private boolean stopStartConnectors = false; 046 private String noSpaceMessage = "space"; 047 private String sqlExceptionMessage = ""; // match all 048 private long resumeCheckSleepPeriod = 5*1000; 049 private final AtomicBoolean handlingException = new AtomicBoolean(false); 050 private boolean systemExitOnShutdown = false; 051 052 @Override 053 public void handle(IOException exception) { 054 if (!broker.isStarted() || ignoreAllErrors) { 055 allowIOResumption(); 056 LOG.info("Ignoring IO exception, " + exception, exception); 057 return; 058 } 059 060 if (ignoreNoSpaceErrors) { 061 Throwable cause = exception; 062 while (cause != null && cause instanceof IOException) { 063 String message = cause.getMessage(); 064 if (message != null && message.contains(noSpaceMessage)) { 065 LOG.info("Ignoring no space left exception, " + exception, exception); 066 allowIOResumption(); 067 return; 068 } 069 cause = cause.getCause(); 070 } 071 } 072 073 if (ignoreSQLExceptions) { 074 Throwable cause = exception; 075 while (cause != null) { 076 if (cause instanceof SQLException) { 077 String message = cause.getMessage(); 078 079 if (message == null) { 080 message = ""; 081 } 082 083 if (message.contains(sqlExceptionMessage)) { 084 LOG.info("Ignoring SQLException, " + exception, cause); 085 return; 086 } 087 } 088 cause = cause.getCause(); 089 } 090 } 091 092 if (stopStartConnectors) { 093 if (handlingException.compareAndSet(false, true)) { 094 LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception); 095 096 new Thread("IOExceptionHandler: stop transports") { 097 @Override 098 public void run() { 099 try { 100 ServiceStopper stopper = new ServiceStopper(); 101 broker.stopAllConnectors(stopper); 102 LOG.info("Successfully stopped transports on " + broker); 103 } catch (Exception e) { 104 LOG.warn("Failure occurred while stopping broker connectors", e); 105 } finally { 106 // resume again 107 new Thread("IOExceptionHandler: restart transports") { 108 @Override 109 public void run() { 110 try { 111 allowIOResumption(); 112 while (hasLockOwnership() && isPersistenceAdapterDown()) { 113 LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports"); 114 TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod); 115 } 116 if (hasLockOwnership()) { 117 Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap(); 118 for (Destination destination : destinations.values()) { 119 120 if (destination instanceof Queue) { 121 Queue queue = (Queue)destination; 122 if (queue.isResetNeeded()) { 123 queue.clearPendingMessages(); 124 } 125 } 126 } 127 broker.startAllConnectors(); 128 LOG.info("Successfully restarted transports on " + broker); 129 } 130 } catch (Exception e) { 131 LOG.warn("Stopping " + broker + " due to failure restarting transports", e); 132 stopBroker(e); 133 } finally { 134 handlingException.compareAndSet(true, false); 135 } 136 } 137 138 private boolean isPersistenceAdapterDown() { 139 boolean checkpointSuccess = false; 140 try { 141 broker.getPersistenceAdapter().checkpoint(true); 142 checkpointSuccess = true; 143 } catch (Throwable ignored) { 144 } 145 return !checkpointSuccess; 146 } 147 }.start(); 148 149 150 } 151 } 152 }.start(); 153 } 154 155 throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception); 156 } 157 158 if (handlingException.compareAndSet(false, true)) { 159 stopBroker(exception); 160 } 161 162 // we don't want to propagate the exception back to the client 163 // They will see a delay till they see a disconnect via socket.close 164 // at which point failover: can kick in. 165 throw new SuppressReplyException("ShutdownBrokerInitiated", exception); 166 } 167 168 protected void allowIOResumption() { 169 try { 170 if (broker.getPersistenceAdapter() != null) { 171 broker.getPersistenceAdapter().allowIOResumption(); 172 } 173 } catch (IOException e) { 174 LOG.warn("Failed to allow IO resumption", e); 175 } 176 } 177 178 private void stopBroker(Exception exception) { 179 LOG.info("Stopping " + broker + " due to exception, " + exception, exception); 180 new Thread("IOExceptionHandler: stopping " + broker) { 181 @Override 182 public void run() { 183 try { 184 if( broker.isRestartAllowed() ) { 185 broker.requestRestart(); 186 } 187 broker.setSystemExitOnShutdown(isSystemExitOnShutdown()); 188 broker.stop(); 189 } catch (Exception e) { 190 LOG.warn("Failure occurred while stopping broker", e); 191 } 192 } 193 }.start(); 194 } 195 196 protected boolean hasLockOwnership() throws IOException { 197 return true; 198 } 199 200 @Override 201 public void setBrokerService(BrokerService broker) { 202 this.broker = broker; 203 } 204 205 public boolean isIgnoreAllErrors() { 206 return ignoreAllErrors; 207 } 208 209 public void setIgnoreAllErrors(boolean ignoreAllErrors) { 210 this.ignoreAllErrors = ignoreAllErrors; 211 } 212 213 public boolean isIgnoreNoSpaceErrors() { 214 return ignoreNoSpaceErrors; 215 } 216 217 public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) { 218 this.ignoreNoSpaceErrors = ignoreNoSpaceErrors; 219 } 220 221 public String getNoSpaceMessage() { 222 return noSpaceMessage; 223 } 224 225 public void setNoSpaceMessage(String noSpaceMessage) { 226 this.noSpaceMessage = noSpaceMessage; 227 } 228 229 public boolean isIgnoreSQLExceptions() { 230 return ignoreSQLExceptions; 231 } 232 233 public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) { 234 this.ignoreSQLExceptions = ignoreSQLExceptions; 235 } 236 237 public String getSqlExceptionMessage() { 238 return sqlExceptionMessage; 239 } 240 241 public void setSqlExceptionMessage(String sqlExceptionMessage) { 242 this.sqlExceptionMessage = sqlExceptionMessage; 243 } 244 245 public boolean isStopStartConnectors() { 246 return stopStartConnectors; 247 } 248 249 public void setStopStartConnectors(boolean stopStartConnectors) { 250 this.stopStartConnectors = stopStartConnectors; 251 } 252 253 public long getResumeCheckSleepPeriod() { 254 return resumeCheckSleepPeriod; 255 } 256 257 public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) { 258 this.resumeCheckSleepPeriod = resumeCheckSleepPeriod; 259 } 260 261 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 262 this.systemExitOnShutdown = systemExitOnShutdown; 263 } 264 265 public boolean isSystemExitOnShutdown() { 266 return systemExitOnShutdown; 267 } 268}