001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.vm; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.security.cert.X509Certificate; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.apache.activemq.command.ShutdownInfo; 030import org.apache.activemq.thread.Task; 031import org.apache.activemq.thread.TaskRunner; 032import org.apache.activemq.thread.TaskRunnerFactory; 033import org.apache.activemq.transport.FutureResponse; 034import org.apache.activemq.transport.ResponseCallback; 035import org.apache.activemq.transport.Transport; 036import org.apache.activemq.transport.TransportDisposedIOException; 037import org.apache.activemq.transport.TransportListener; 038import org.apache.activemq.util.IOExceptionSupport; 039import org.apache.activemq.wireformat.WireFormat; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * A Transport implementation that uses direct method invocations. 045 */ 046public class VMTransport implements Transport, Task { 047 protected static final Logger LOG = LoggerFactory.getLogger(VMTransport.class); 048 049 private static final AtomicLong NEXT_ID = new AtomicLong(0); 050 051 // Transport Configuration 052 protected VMTransport peer; 053 protected TransportListener transportListener; 054 protected boolean marshal; 055 protected boolean async = true; 056 protected int asyncQueueDepth = 2000; 057 protected final URI location; 058 protected final long id; 059 060 // Implementation 061 private volatile LinkedBlockingQueue<Object> messageQueue; 062 private volatile TaskRunnerFactory taskRunnerFactory; 063 private volatile TaskRunner taskRunner; 064 065 // Transport State 066 protected final AtomicBoolean started = new AtomicBoolean(); 067 protected final AtomicBoolean disposed = new AtomicBoolean(); 068 069 private volatile int receiveCounter; 070 071 public VMTransport(URI location) { 072 this.location = location; 073 this.id = NEXT_ID.getAndIncrement(); 074 } 075 076 public void setPeer(VMTransport peer) { 077 this.peer = peer; 078 } 079 080 @Override 081 public void oneway(Object command) throws IOException { 082 083 if (disposed.get()) { 084 throw new TransportDisposedIOException("Transport disposed."); 085 } 086 087 if (peer == null) { 088 throw new IOException("Peer not connected."); 089 } 090 091 try { 092 093 if (peer.disposed.get()) { 094 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); 095 } 096 097 if (peer.async) { 098 peer.getMessageQueue().put(command); 099 peer.wakeup(); 100 return; 101 } 102 103 if (!peer.started.get()) { 104 LinkedBlockingQueue<Object> pending = peer.getMessageQueue(); 105 int sleepTimeMillis; 106 boolean accepted = false; 107 do { 108 sleepTimeMillis = 0; 109 // the pending queue is drained on start so we need to ensure we add before 110 // the drain commences, otherwise we never get the command dispatched! 111 synchronized (peer.started) { 112 if (!peer.started.get()) { 113 accepted = pending.offer(command); 114 if (!accepted) { 115 sleepTimeMillis = 500; 116 } 117 } 118 } 119 // give start thread a chance if we will loop 120 TimeUnit.MILLISECONDS.sleep(sleepTimeMillis); 121 122 } while (!accepted && !peer.started.get()); 123 if (accepted) { 124 return; 125 } 126 } 127 } catch (InterruptedException e) { 128 Thread.currentThread().interrupt(); 129 InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); 130 iioe.initCause(e); 131 throw iioe; 132 } 133 134 dispatch(peer, peer.messageQueue, command); 135 } 136 137 public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) { 138 TransportListener transportListener = transport.getTransportListener(); 139 if (transportListener != null) { 140 // Lock here on the target transport's started since we want to wait for its start() 141 // method to finish dispatching out of the queue before we do our own. 142 synchronized (transport.started) { 143 144 // Ensure that no additional commands entered the queue in the small time window 145 // before the start method locks the dispatch lock and the oneway method was in 146 // an put operation. 147 while(pending != null && !pending.isEmpty() && !transport.isDisposed()) { 148 doDispatch(transport, transportListener, pending.poll()); 149 } 150 151 // We are now in sync mode and won't enqueue any more commands to the target 152 // transport so lets clean up its resources. 153 transport.messageQueue = null; 154 155 // Don't dispatch if either end was disposed already. 156 if (command != null && !this.disposed.get() && !transport.isDisposed()) { 157 doDispatch(transport, transportListener, command); 158 } 159 } 160 } 161 } 162 163 public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) { 164 transport.receiveCounter++; 165 transportListener.onCommand(command); 166 } 167 168 @Override 169 public void start() throws Exception { 170 171 if (transportListener == null) { 172 throw new IOException("TransportListener not set."); 173 } 174 175 // If we are not in async mode we lock the dispatch lock here and then start to 176 // prevent any sync dispatches from occurring until we dispatch the pending messages 177 // to maintain delivery order. When async this happens automatically so just set 178 // started and wakeup the task runner. 179 if (!async) { 180 synchronized (started) { 181 if (started.compareAndSet(false, true)) { 182 LinkedBlockingQueue<Object> mq = getMessageQueue(); 183 Object command; 184 while ((command = mq.poll()) != null && !disposed.get() ) { 185 receiveCounter++; 186 doDispatch(this, transportListener, command); 187 } 188 } 189 } 190 } else { 191 if (started.compareAndSet(false, true)) { 192 wakeup(); 193 } 194 } 195 } 196 197 @Override 198 public void stop() throws Exception { 199 // Only need to do this once, all future oneway calls will now 200 // fail as will any asnyc jobs in the task runner. 201 if (disposed.compareAndSet(false, true)) { 202 203 TaskRunner tr = taskRunner; 204 LinkedBlockingQueue<Object> mq = this.messageQueue; 205 206 taskRunner = null; 207 messageQueue = null; 208 209 if (mq != null) { 210 mq.clear(); 211 } 212 213 // don't wait for completion 214 if (tr != null) { 215 try { 216 tr.shutdown(1); 217 } catch(Exception e) { 218 } 219 tr = null; 220 } 221 222 if (peer.transportListener != null) { 223 // let the peer know that we are disconnecting after attempting 224 // to cleanly shutdown the async tasks so that this is the last 225 // command it see's. 226 try { 227 peer.transportListener.onCommand(new ShutdownInfo()); 228 } catch (Exception ignore) { 229 } 230 231 // let any requests pending a response see an exception 232 try { 233 peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped.")); 234 } catch (Exception ignore) { 235 } 236 } 237 238 // shutdown task runner factory 239 if (taskRunnerFactory != null) { 240 taskRunnerFactory.shutdownNow(); 241 taskRunnerFactory = null; 242 } 243 } 244 } 245 246 protected void wakeup() { 247 if (async && started.get()) { 248 try { 249 getTaskRunner().wakeup(); 250 } catch (InterruptedException e) { 251 Thread.currentThread().interrupt(); 252 } catch (TransportDisposedIOException e) { 253 } 254 } 255 } 256 257 /** 258 * @see org.apache.activemq.thread.Task#iterate() 259 */ 260 @Override 261 public boolean iterate() { 262 263 final TransportListener tl = transportListener; 264 265 LinkedBlockingQueue<Object> mq; 266 try { 267 mq = getMessageQueue(); 268 } catch (TransportDisposedIOException e) { 269 return false; 270 } 271 272 Object command = mq.poll(); 273 if (command != null && !disposed.get()) { 274 try { 275 tl.onCommand(command); 276 } catch (Exception e) { 277 try { 278 peer.transportListener.onException(IOExceptionSupport.create(e)); 279 } catch (Exception ignore) { 280 } 281 } 282 return !mq.isEmpty() && !disposed.get(); 283 } else { 284 if(disposed.get()) { 285 mq.clear(); 286 } 287 return false; 288 } 289 } 290 291 @Override 292 public void setTransportListener(TransportListener commandListener) { 293 this.transportListener = commandListener; 294 } 295 296 public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException { 297 LinkedBlockingQueue<Object> result = messageQueue; 298 if (result == null) { 299 synchronized (this) { 300 result = messageQueue; 301 if (result == null) { 302 if (disposed.get()) { 303 throw new TransportDisposedIOException("The Transport has been disposed"); 304 } 305 306 messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth); 307 } 308 } 309 } 310 return result; 311 } 312 313 protected TaskRunner getTaskRunner() throws TransportDisposedIOException { 314 TaskRunner result = taskRunner; 315 if (result == null) { 316 synchronized (this) { 317 result = taskRunner; 318 if (result == null) { 319 if (disposed.get()) { 320 throw new TransportDisposedIOException("The Transport has been disposed"); 321 } 322 323 String name = "ActiveMQ VMTransport: " + toString(); 324 if (taskRunnerFactory == null) { 325 taskRunnerFactory = new TaskRunnerFactory(name); 326 taskRunnerFactory.init(); 327 } 328 taskRunner = result = taskRunnerFactory.createTaskRunner(this, name); 329 } 330 } 331 } 332 return result; 333 } 334 335 @Override 336 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 337 throw new AssertionError("Unsupported Method"); 338 } 339 340 @Override 341 public Object request(Object command) throws IOException { 342 throw new AssertionError("Unsupported Method"); 343 } 344 345 @Override 346 public Object request(Object command, int timeout) throws IOException { 347 throw new AssertionError("Unsupported Method"); 348 } 349 350 @Override 351 public TransportListener getTransportListener() { 352 return transportListener; 353 } 354 355 @Override 356 public <T> T narrow(Class<T> target) { 357 if (target.isAssignableFrom(getClass())) { 358 return target.cast(this); 359 } 360 return null; 361 } 362 363 public boolean isMarshal() { 364 return marshal; 365 } 366 367 public void setMarshal(boolean marshal) { 368 this.marshal = marshal; 369 } 370 371 @Override 372 public String toString() { 373 return location + "#" + id; 374 } 375 376 @Override 377 public String getRemoteAddress() { 378 if (peer != null) { 379 return peer.toString(); 380 } 381 return null; 382 } 383 384 /** 385 * @return the async 386 */ 387 public boolean isAsync() { 388 return async; 389 } 390 391 /** 392 * @param async the async to set 393 */ 394 public void setAsync(boolean async) { 395 this.async = async; 396 } 397 398 /** 399 * @return the asyncQueueDepth 400 */ 401 public int getAsyncQueueDepth() { 402 return asyncQueueDepth; 403 } 404 405 /** 406 * @param asyncQueueDepth the asyncQueueDepth to set 407 */ 408 public void setAsyncQueueDepth(int asyncQueueDepth) { 409 this.asyncQueueDepth = asyncQueueDepth; 410 } 411 412 @Override 413 public boolean isFaultTolerant() { 414 return false; 415 } 416 417 @Override 418 public boolean isDisposed() { 419 return disposed.get(); 420 } 421 422 @Override 423 public boolean isConnected() { 424 return !disposed.get(); 425 } 426 427 @Override 428 public void reconnect(URI uri) throws IOException { 429 throw new IOException("Transport reconnect is not supported"); 430 } 431 432 @Override 433 public boolean isReconnectSupported() { 434 return false; 435 } 436 437 @Override 438 public boolean isUpdateURIsSupported() { 439 return false; 440 } 441 442 @Override 443 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 444 throw new IOException("URI update feature not supported"); 445 } 446 447 @Override 448 public int getReceiveCounter() { 449 return receiveCounter; 450 } 451 452 @Override 453 public X509Certificate[] getPeerCertificates() { 454 return null; 455 } 456 457 @Override 458 public void setPeerCertificates(X509Certificate[] certificates) { 459 460 } 461 462 @Override 463 public WireFormat getWireFormat() { 464 return null; 465 } 466}