001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.scheduler;
018
019import java.io.IOException;
020import java.util.concurrent.atomic.AtomicBoolean;
021
022import org.apache.activemq.ScheduledMessage;
023import org.apache.activemq.advisory.AdvisorySupport;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.BrokerFilter;
026import org.apache.activemq.broker.BrokerService;
027import org.apache.activemq.broker.Connection;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.broker.Connector;
030import org.apache.activemq.broker.ProducerBrokerExchange;
031import org.apache.activemq.broker.region.ConnectionStatistics;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.Command;
034import org.apache.activemq.command.ConnectionControl;
035import org.apache.activemq.command.ExceptionResponse;
036import org.apache.activemq.command.Message;
037import org.apache.activemq.command.MessageId;
038import org.apache.activemq.command.ProducerId;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.Response;
041import org.apache.activemq.openwire.OpenWireFormat;
042import org.apache.activemq.security.SecurityContext;
043import org.apache.activemq.state.ProducerState;
044import org.apache.activemq.transaction.Synchronization;
045import org.apache.activemq.usage.JobSchedulerUsage;
046import org.apache.activemq.usage.SystemUsage;
047import org.apache.activemq.util.ByteSequence;
048import org.apache.activemq.util.IdGenerator;
049import org.apache.activemq.util.LongSequenceGenerator;
050import org.apache.activemq.util.TypeConversionSupport;
051import org.apache.activemq.wireformat.WireFormat;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055public class SchedulerBroker extends BrokerFilter implements JobListener {
056    private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
057    private static final IdGenerator ID_GENERATOR = new IdGenerator();
058    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
059    private final AtomicBoolean started = new AtomicBoolean();
060    private final WireFormat wireFormat = new OpenWireFormat();
061    private final ConnectionContext context = new ConnectionContext();
062    private final ProducerId producerId = new ProducerId();
063    private final SystemUsage systemUsage;
064
065    private final JobSchedulerStore store;
066    private JobScheduler scheduler;
067
068    public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
069        super(next);
070
071        this.store = store;
072        this.producerId.setConnectionId(ID_GENERATOR.generateId());
073        this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
074        // we only get response on unexpected error
075        this.context.setConnection(new Connection() {
076            @Override
077            public Connector getConnector() {
078                return null;
079            }
080
081            @Override
082            public void dispatchSync(Command message) {
083                if (message instanceof ExceptionResponse) {
084                    LOG.warn("Unexpected response: " + message);
085                }
086            }
087
088            @Override
089            public void dispatchAsync(Command command) {
090                if (command instanceof ExceptionResponse) {
091                    LOG.warn("Unexpected response: " + command);
092                }
093            }
094
095            @Override
096            public Response service(Command command) {
097                return null;
098            }
099
100            @Override
101            public void serviceException(Throwable error) {
102                LOG.warn("Unexpected exception: " + error, error);
103            }
104
105            @Override
106            public boolean isSlow() {
107                return false;
108            }
109
110            @Override
111            public boolean isBlocked() {
112                return false;
113            }
114
115            @Override
116            public boolean isConnected() {
117                return false;
118            }
119
120            @Override
121            public boolean isActive() {
122                return false;
123            }
124
125            @Override
126            public int getDispatchQueueSize() {
127                return 0;
128            }
129
130            @Override
131            public ConnectionStatistics getStatistics() {
132                return null;
133            }
134
135            @Override
136            public boolean isManageable() {
137                return false;
138            }
139
140            @Override
141            public String getRemoteAddress() {
142                return null;
143            }
144
145            @Override
146            public void serviceExceptionAsync(IOException e) {
147                LOG.warn("Unexpected async ioexception: " + e, e);
148            }
149
150            @Override
151            public String getConnectionId() {
152                return null;
153            }
154
155            @Override
156            public boolean isNetworkConnection() {
157                return false;
158            }
159
160            @Override
161            public boolean isFaultTolerantConnection() {
162                return false;
163            }
164
165            @Override
166            public void updateClient(ConnectionControl control) {}
167
168            @Override
169            public int getActiveTransactionCount() {
170                return 0;
171            }
172
173            @Override
174            public Long getOldestActiveTransactionDuration() {
175                return null;
176            }
177
178            @Override
179            public void start() throws Exception {}
180
181            @Override
182            public void stop() throws Exception {}
183        });
184        this.context.setBroker(next);
185        this.systemUsage = brokerService.getSystemUsage();
186
187        wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
188    }
189
190    public synchronized JobScheduler getJobScheduler() throws Exception {
191        return new JobSchedulerFacade(this);
192    }
193
194    @Override
195    public void start() throws Exception {
196        this.started.set(true);
197        getInternalScheduler();
198        super.start();
199    }
200
201    @Override
202    public void stop() throws Exception {
203        if (this.started.compareAndSet(true, false)) {
204
205            if (this.store != null) {
206                this.store.stop();
207            }
208            if (this.scheduler != null) {
209                this.scheduler.removeListener(this);
210                this.scheduler = null;
211            }
212        }
213        super.stop();
214    }
215
216    @Override
217    public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
218        ConnectionContext context = producerExchange.getConnectionContext();
219
220        final String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
221        final Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
222        final Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
223        final Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
224
225        String physicalName = messageSend.getDestination().getPhysicalName();
226        boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
227            ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
228
229        if (schedularManage == true) {
230
231            JobScheduler scheduler = getInternalScheduler();
232            ActiveMQDestination replyTo = messageSend.getReplyTo();
233
234            String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
235
236            if (action != null) {
237
238                Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
239                Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
240
241                if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
242
243                    if (startTime != null && endTime != null) {
244
245                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
246                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
247
248                        for (Job job : scheduler.getAllJobs(start, finish)) {
249                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
250                        }
251                    } else {
252                        for (Job job : scheduler.getAllJobs()) {
253                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
254                        }
255                    }
256                }
257                if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
258                    scheduler.remove(jobId);
259                } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
260
261                    if (startTime != null && endTime != null) {
262
263                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
264                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
265
266                        scheduler.removeAllJobs(start, finish);
267                    } else {
268                        scheduler.removeAllJobs();
269                    }
270                }
271            }
272
273        } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
274
275            // Check for room in the job scheduler store
276            if (systemUsage.getJobSchedulerUsage() != null) {
277                JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
278                if (usage.isFull()) {
279                    final String logMessage = "Job Scheduler Store is Full (" +
280                        usage.getPercentUsage() + "% of " + usage.getLimit() +
281                        "). Stopping producer (" + messageSend.getProducerId() +
282                        ") to prevent flooding of the job scheduler store." +
283                        " See http://activemq.apache.org/producer-flow-control.html for more info";
284
285                    long start = System.currentTimeMillis();
286                    long nextWarn = start;
287                    while (!usage.waitForSpace(1000)) {
288                        if (context.getStopping().get()) {
289                            throw new IOException("Connection closed, send aborted.");
290                        }
291
292                        long now = System.currentTimeMillis();
293                        if (now >= nextWarn) {
294                            LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
295                            nextWarn = now + 30000l;
296                        }
297                    }
298                }
299            }
300
301            if (context.isInTransaction()) {
302                context.getTransaction().addSynchronization(new Synchronization() {
303                    @Override
304                    public void afterCommit() throws Exception {
305                        doSchedule(messageSend, cronValue, periodValue, delayValue);
306                    }
307                });
308            } else {
309                doSchedule(messageSend, cronValue, periodValue, delayValue);
310            }
311        } else {
312            super.send(producerExchange, messageSend);
313        }
314    }
315
316    private void doSchedule(Message messageSend, Object cronValue, Object periodValue, Object delayValue) throws Exception {
317        long delay = 0;
318        long period = 0;
319        int repeat = 0;
320        String cronEntry = "";
321
322        // clear transaction context
323        Message msg = messageSend.copy();
324        msg.setTransactionId(null);
325        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
326        if (cronValue != null) {
327            cronEntry = cronValue.toString();
328        }
329        if (periodValue != null) {
330            period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
331        }
332        if (delayValue != null) {
333            delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
334        }
335        Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
336        if (repeatValue != null) {
337            repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
338        }
339
340        getInternalScheduler().schedule(msg.getMessageId().toString(),
341            new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
342    }
343
344    @Override
345    public void scheduledJob(String id, ByteSequence job) {
346        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
347        try {
348            Message messageSend = (Message) wireFormat.unmarshal(packet);
349            messageSend.setOriginalTransactionId(null);
350            Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
351            Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
352            String cronStr = cronValue != null ? cronValue.toString() : null;
353            int repeat = 0;
354            if (repeatValue != null) {
355                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
356            }
357
358            if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
359                // create a unique id - the original message could be sent
360                // lots of times
361                messageSend.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
362            }
363
364            // Add the jobId as a property
365            messageSend.setProperty("scheduledJobId", id);
366
367            // if this goes across a network - we don't want it rescheduled
368            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
369            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
370            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
371            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
372
373            if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
374
375                long oldExpiration = messageSend.getExpiration();
376                long newTimeStamp = System.currentTimeMillis();
377                long timeToLive = 0;
378                long oldTimestamp = messageSend.getTimestamp();
379
380                if (oldExpiration > 0) {
381                    timeToLive = oldExpiration - oldTimestamp;
382                }
383
384                long expiration = timeToLive + newTimeStamp;
385
386                if (expiration > oldExpiration) {
387                    if (timeToLive > 0 && expiration > 0) {
388                        messageSend.setExpiration(expiration);
389                    }
390                    messageSend.setTimestamp(newTimeStamp);
391                    LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ messageSend.getMessageId(), oldTimestamp, newTimeStamp });
392                }
393            }
394
395            // Repackage the message contents prior to send now that all updates are complete.
396            messageSend.beforeMarshall(wireFormat);
397
398            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
399            producerExchange.setConnectionContext(context);
400            producerExchange.setMutable(true);
401            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
402            super.send(producerExchange, messageSend);
403        } catch (Exception e) {
404            LOG.error("Failed to send scheduled message {}", id, e);
405        }
406    }
407
408    protected synchronized JobScheduler getInternalScheduler() throws Exception {
409        if (this.started.get()) {
410            if (this.scheduler == null && store != null) {
411                this.scheduler = store.getJobScheduler("JMS");
412                this.scheduler.addListener(this);
413                this.scheduler.startDispatching();
414            }
415            return this.scheduler;
416        }
417        return null;
418    }
419
420    protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception {
421
422        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
423        try {
424            Message msg = (Message) this.wireFormat.unmarshal(packet);
425            msg.setOriginalTransactionId(null);
426            msg.setPersistent(false);
427            msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
428            msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
429
430            // Preserve original destination
431            msg.setOriginalDestination(msg.getDestination());
432
433            msg.setDestination(replyTo);
434            msg.setResponseRequired(false);
435            msg.setProducerId(this.producerId);
436
437            // Add the jobId as a property
438            msg.setProperty("scheduledJobId", job.getJobId());
439
440            final boolean originalFlowControl = context.isProducerFlowControl();
441            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
442            producerExchange.setConnectionContext(context);
443            producerExchange.setMutable(true);
444            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
445            try {
446                context.setProducerFlowControl(false);
447                this.next.send(producerExchange, msg);
448            } finally {
449                context.setProducerFlowControl(originalFlowControl);
450            }
451        } catch (Exception e) {
452            LOG.error("Failed to send scheduled message {}", job.getJobId(), e);
453        }
454    }
455}