001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Map;
022import java.util.concurrent.TimeUnit;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.SuppressReplyException;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.Queue;
029import org.apache.activemq.broker.region.RegionBroker;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * @org.apache.xbean.XBean
036 */
037 public class DefaultIOExceptionHandler implements IOExceptionHandler {
038
039    private static final Logger LOG = LoggerFactory
040            .getLogger(DefaultIOExceptionHandler.class);
041    protected BrokerService broker;
042    private boolean ignoreAllErrors = false;
043    private boolean ignoreNoSpaceErrors = true;
044    private boolean ignoreSQLExceptions = true;
045    private boolean stopStartConnectors = false;
046    private String noSpaceMessage = "space";
047    private String sqlExceptionMessage = ""; // match all
048    private long resumeCheckSleepPeriod = 5*1000;
049    private final AtomicBoolean handlingException = new AtomicBoolean(false);
050    private boolean systemExitOnShutdown = false;
051
052    @Override
053    public void handle(IOException exception) {
054        if (!broker.isStarted() || ignoreAllErrors) {
055            allowIOResumption();
056            LOG.info("Ignoring IO exception, " + exception, exception);
057            return;
058        }
059
060        if (ignoreNoSpaceErrors) {
061            Throwable cause = exception;
062            while (cause != null && cause instanceof IOException) {
063                String message = cause.getMessage();
064                if (message != null && message.contains(noSpaceMessage)) {
065                    LOG.info("Ignoring no space left exception, " + exception, exception);
066                    allowIOResumption();
067                    return;
068                }
069                cause = cause.getCause();
070            }
071        }
072
073        if (ignoreSQLExceptions) {
074            Throwable cause = exception;
075            while (cause != null) {
076                if (cause instanceof SQLException) {
077                    String message = cause.getMessage();
078
079                    if (message == null) {
080                        message = "";
081                    }
082
083                    if (message.contains(sqlExceptionMessage)) {
084                        LOG.info("Ignoring SQLException, " + exception, cause);
085                        return;
086                    }
087                }
088                cause = cause.getCause();
089            }
090        }
091
092        if (stopStartConnectors) {
093            if (handlingException.compareAndSet(false, true)) {
094                LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception);
095
096                new Thread("IOExceptionHandler: stop transports") {
097                    @Override
098                    public void run() {
099                        try {
100                            ServiceStopper stopper = new ServiceStopper();
101                            broker.stopAllConnectors(stopper);
102                            LOG.info("Successfully stopped transports on " + broker);
103                        } catch (Exception e) {
104                            LOG.warn("Failure occurred while stopping broker connectors", e);
105                        } finally {
106                            // resume again
107                            new Thread("IOExceptionHandler: restart transports") {
108                                @Override
109                                public void run() {
110                                    try {
111                                        allowIOResumption();
112                                        while (hasLockOwnership() && isPersistenceAdapterDown()) {
113                                            LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
114                                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
115                                        }
116                                        if (hasLockOwnership()) {
117                                            Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap();
118                                            for (Destination destination : destinations.values()) {
119
120                                                if (destination instanceof Queue) {
121                                                    Queue queue = (Queue)destination;
122                                                    if (queue.isResetNeeded()) {
123                                                        queue.clearPendingMessages();
124                                                    }
125                                                }
126                                            }
127                                            broker.startAllConnectors();
128                                            LOG.info("Successfully restarted transports on " + broker);
129                                        }
130                                    } catch (Exception e) {
131                                        LOG.warn("Stopping " + broker + " due to failure restarting transports", e);
132                                        stopBroker(e);
133                                    } finally {
134                                        handlingException.compareAndSet(true, false);
135                                    }
136                                }
137
138                                private boolean isPersistenceAdapterDown() {
139                                    boolean checkpointSuccess = false;
140                                    try {
141                                        broker.getPersistenceAdapter().checkpoint(true);
142                                        checkpointSuccess = true;
143                                    } catch (Throwable ignored) {
144                                    }
145                                    return !checkpointSuccess;
146                                }
147                            }.start();
148
149
150                        }
151                    }
152                }.start();
153            }
154
155            throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception);
156        }
157
158        if (handlingException.compareAndSet(false, true)) {
159            stopBroker(exception);
160        }
161
162        // we don't want to propagate the exception back to the client
163        // They will see a delay till they see a disconnect via socket.close
164        // at which point failover: can kick in.
165        throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
166    }
167
168    protected void allowIOResumption() {
169        try {
170            if (broker.getPersistenceAdapter() != null) {
171                broker.getPersistenceAdapter().allowIOResumption();
172            }
173        } catch (IOException e) {
174            LOG.warn("Failed to allow IO resumption", e);
175        }
176    }
177
178    private void stopBroker(Exception exception) {
179        LOG.info("Stopping " + broker + " due to exception, " + exception, exception);
180        new Thread("IOExceptionHandler: stopping " + broker) {
181            @Override
182            public void run() {
183                try {
184                    if( broker.isRestartAllowed() ) {
185                        broker.requestRestart();
186                    }
187                    broker.setSystemExitOnShutdown(isSystemExitOnShutdown());
188                    broker.stop();
189                } catch (Exception e) {
190                    LOG.warn("Failure occurred while stopping broker", e);
191                }
192            }
193        }.start();
194    }
195
196    protected boolean hasLockOwnership() throws IOException {
197        return true;
198    }
199
200    @Override
201    public void setBrokerService(BrokerService broker) {
202        this.broker = broker;
203    }
204
205    public boolean isIgnoreAllErrors() {
206        return ignoreAllErrors;
207    }
208
209    public void setIgnoreAllErrors(boolean ignoreAllErrors) {
210        this.ignoreAllErrors = ignoreAllErrors;
211    }
212
213    public boolean isIgnoreNoSpaceErrors() {
214        return ignoreNoSpaceErrors;
215    }
216
217    public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
218        this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
219    }
220
221    public String getNoSpaceMessage() {
222        return noSpaceMessage;
223    }
224
225    public void setNoSpaceMessage(String noSpaceMessage) {
226        this.noSpaceMessage = noSpaceMessage;
227    }
228
229    public boolean isIgnoreSQLExceptions() {
230        return ignoreSQLExceptions;
231    }
232
233    public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
234        this.ignoreSQLExceptions = ignoreSQLExceptions;
235    }
236
237    public String getSqlExceptionMessage() {
238        return sqlExceptionMessage;
239    }
240
241    public void setSqlExceptionMessage(String sqlExceptionMessage) {
242        this.sqlExceptionMessage = sqlExceptionMessage;
243    }
244
245    public boolean isStopStartConnectors() {
246        return stopStartConnectors;
247    }
248
249    public void setStopStartConnectors(boolean stopStartConnectors) {
250        this.stopStartConnectors = stopStartConnectors;
251    }
252
253    public long getResumeCheckSleepPeriod() {
254        return resumeCheckSleepPeriod;
255    }
256
257    public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
258        this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
259    }
260
261    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
262        this.systemExitOnShutdown = systemExitOnShutdown;
263    }
264
265    public boolean isSystemExitOnShutdown() {
266        return systemExitOnShutdown;
267    }
268}