001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.IOException;
020import java.nio.channels.spi.AbstractSelectableChannel;
021import java.util.LinkedList;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.Executor;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.RejectedExecutionHandler;
027import java.util.concurrent.SynchronousQueue;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032/**
033 * The SelectorManager will manage one Selector and the thread that checks the
034 * selector.
035 *
036 * We may need to consider running more than one thread to check the selector if
037 * servicing the selector takes too long.
038 */
039public final class SelectorManager {
040
041    public static final SelectorManager SINGLETON = new SelectorManager();
042
043    private Executor selectorExecutor = createDefaultExecutor();
044    private Executor channelExecutor = selectorExecutor;
045    private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
046    private int maxChannelsPerWorker = -1;
047
048    protected ExecutorService createDefaultExecutor() {
049        ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, newWorkQueue(),
050            new ThreadFactory() {
051
052                private long i = 0;
053
054                @Override
055                public Thread newThread(Runnable runnable) {
056                    Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + (i++));
057                    t.setDaemon(true);
058                    return t;
059                }
060            }, newRejectionHandler());
061
062        return rc;
063    }
064
065    private RejectedExecutionHandler newRejectionHandler() {
066        return canRejectWork() ? new ThreadPoolExecutor.AbortPolicy() : new ThreadPoolExecutor.CallerRunsPolicy();
067    }
068
069    private BlockingQueue<Runnable> newWorkQueue() {
070        final int workQueueCapicity = getDefaultWorkQueueCapacity();
071        return workQueueCapicity > 0 ? new LinkedBlockingQueue<Runnable>(workQueueCapicity) : new SynchronousQueue<Runnable>();
072    }
073
074    private static boolean canRejectWork() {
075        return Boolean.getBoolean("org.apache.activemq.transport.nio.SelectorManager.rejectWork");
076    }
077
078    private static int getDefaultWorkQueueCapacity() {
079        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity", 0);
080    }
081
082    private static int getDefaultCorePoolSize() {
083        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10);
084    }
085
086    private static int getDefaultMaximumPoolSize() {
087        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", 1024);
088    }
089
090    private static int getDefaultKeepAliveTime() {
091        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30);
092    }
093
094    private static int getDefaultMaxChannelsPerWorker() {
095        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maxChannelsPerWorker", 1024);
096    }
097
098    public static SelectorManager getInstance() {
099        return SINGLETON;
100    }
101
102    public interface Listener {
103        void onSelect(SelectorSelection selector);
104
105        void onError(SelectorSelection selection, Throwable error);
106    }
107
108    public synchronized SelectorSelection register(AbstractSelectableChannel selectableChannel, Listener listener) throws IOException {
109        SelectorSelection selection = null;
110        while (selection == null) {
111            if (freeWorkers.size() > 0) {
112                SelectorWorker worker = freeWorkers.getFirst();
113                if (worker.isReleased()) {
114                    freeWorkers.remove(worker);
115                } else {
116                    worker.retain();
117                    selection = new SelectorSelection(worker, selectableChannel, listener);
118                }
119            } else {
120                // Worker starts /w retain count of 1
121                SelectorWorker worker = new SelectorWorker(this);
122                freeWorkers.addFirst(worker);
123                selection = new SelectorSelection(worker, selectableChannel, listener);
124            }
125        }
126
127        return selection;
128    }
129
130    synchronized void onWorkerFullEvent(SelectorWorker worker) {
131        freeWorkers.remove(worker);
132    }
133
134    public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
135        freeWorkers.remove(worker);
136    }
137
138    public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
139        freeWorkers.addFirst(worker);
140    }
141
142    public Executor getChannelExecutor() {
143        return channelExecutor;
144    }
145
146    public void setChannelExecutor(Executor channelExecutor) {
147        this.channelExecutor = channelExecutor;
148    }
149
150    public int getMaxChannelsPerWorker() {
151        return maxChannelsPerWorker >= 0 ? maxChannelsPerWorker : getDefaultMaxChannelsPerWorker();
152    }
153
154    public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
155        this.maxChannelsPerWorker = maxChannelsPerWorker;
156    }
157
158    public Executor getSelectorExecutor() {
159        return selectorExecutor;
160    }
161
162    public void setSelectorExecutor(Executor selectorExecutor) {
163        this.selectorExecutor = selectorExecutor;
164    }
165}