001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.nio; 018 019 import java.io.IOException; 020 import java.nio.channels.SocketChannel; 021 import java.util.LinkedList; 022 import java.util.concurrent.Executor; 023 import java.util.concurrent.ExecutorService; 024 import java.util.concurrent.SynchronousQueue; 025 import java.util.concurrent.ThreadFactory; 026 import java.util.concurrent.ThreadPoolExecutor; 027 import java.util.concurrent.TimeUnit; 028 029 /** 030 * The SelectorManager will manage one Selector and the thread that checks the 031 * selector. 032 * 033 * We may need to consider running more than one thread to check the selector if 034 * servicing the selector takes too long. 035 */ 036 public final class SelectorManager { 037 038 public static final SelectorManager SINGLETON = new SelectorManager(); 039 040 private Executor selectorExecutor = createDefaultExecutor(); 041 private Executor channelExecutor = selectorExecutor; 042 private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>(); 043 private int maxChannelsPerWorker = 1024; 044 045 protected ExecutorService createDefaultExecutor() { 046 ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 047 048 private long i = 0; 049 050 public Thread newThread(Runnable runnable) { 051 this.i++; 052 final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i); 053 return t; 054 } 055 }); 056 057 return rc; 058 } 059 060 public static SelectorManager getInstance() { 061 return SINGLETON; 062 } 063 064 public interface Listener { 065 void onSelect(SelectorSelection selector); 066 void onError(SelectorSelection selection, Throwable error); 067 } 068 069 public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) 070 throws IOException { 071 072 SelectorSelection selection = null; 073 while( selection == null ) { 074 if (freeWorkers.size() > 0) { 075 SelectorWorker worker = freeWorkers.getFirst(); 076 if( worker.isReleased() ) { 077 freeWorkers.remove(worker); 078 } else { 079 worker.retain(); 080 selection = new SelectorSelection(worker, socketChannel, listener); 081 } 082 } else { 083 // Worker starts /w retain count of 1 084 SelectorWorker worker = new SelectorWorker(this); 085 freeWorkers.addFirst(worker); 086 selection = new SelectorSelection(worker, socketChannel, listener); 087 } 088 } 089 090 return selection; 091 } 092 093 synchronized void onWorkerFullEvent(SelectorWorker worker) { 094 freeWorkers.remove(worker); 095 } 096 097 public synchronized void onWorkerEmptyEvent(SelectorWorker worker) { 098 freeWorkers.remove(worker); 099 } 100 101 public synchronized void onWorkerNotFullEvent(SelectorWorker worker) { 102 freeWorkers.addFirst(worker); 103 } 104 105 public Executor getChannelExecutor() { 106 return channelExecutor; 107 } 108 109 public void setChannelExecutor(Executor channelExecutor) { 110 this.channelExecutor = channelExecutor; 111 } 112 113 public int getMaxChannelsPerWorker() { 114 return maxChannelsPerWorker; 115 } 116 117 public void setMaxChannelsPerWorker(int maxChannelsPerWorker) { 118 this.maxChannelsPerWorker = maxChannelsPerWorker; 119 } 120 121 public Executor getSelectorExecutor() { 122 return selectorExecutor; 123 } 124 125 public void setSelectorExecutor(Executor selectorExecutor) { 126 this.selectorExecutor = selectorExecutor; 127 } 128 }