001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network.jms; 018 019 import java.util.Iterator; 020 import java.util.List; 021 import java.util.Map; 022 import java.util.concurrent.CopyOnWriteArrayList; 023 import java.util.concurrent.LinkedBlockingQueue; 024 import java.util.concurrent.ThreadFactory; 025 import java.util.concurrent.ThreadPoolExecutor; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.atomic.AtomicBoolean; 028 import java.util.concurrent.atomic.AtomicReference; 029 030 import javax.jms.Connection; 031 import javax.jms.Destination; 032 import javax.jms.QueueConnection; 033 034 import org.apache.activemq.ActiveMQConnectionFactory; 035 import org.apache.activemq.Service; 036 import org.apache.activemq.broker.BrokerService; 037 import org.apache.activemq.util.LRUCache; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 import org.springframework.jndi.JndiTemplate; 041 042 /** 043 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some 044 * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself 045 * aimed to be in compliance with the JMS 1.0.2 specification. 046 */ 047 public abstract class JmsConnector implements Service { 048 049 private static int nextId; 050 private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class); 051 052 protected JndiTemplate jndiLocalTemplate; 053 protected JndiTemplate jndiOutboundTemplate; 054 protected JmsMesageConvertor inboundMessageConvertor; 055 protected JmsMesageConvertor outboundMessageConvertor; 056 protected AtomicBoolean initialized = new AtomicBoolean(false); 057 protected AtomicBoolean localSideInitialized = new AtomicBoolean(false); 058 protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false); 059 protected AtomicBoolean started = new AtomicBoolean(false); 060 protected AtomicBoolean failed = new AtomicBoolean(); 061 protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>(); 062 protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>(); 063 protected ActiveMQConnectionFactory embeddedConnectionFactory; 064 protected int replyToDestinationCacheSize = 10000; 065 protected String outboundUsername; 066 protected String outboundPassword; 067 protected String localUsername; 068 protected String localPassword; 069 protected String outboundClientId; 070 protected String localClientId; 071 protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache(); 072 073 private ReconnectionPolicy policy = new ReconnectionPolicy(); 074 protected ThreadPoolExecutor connectionSerivce; 075 private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); 076 private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); 077 private String name; 078 079 private static LRUCache<Destination, DestinationBridge> createLRUCache() { 080 return new LRUCache<Destination, DestinationBridge>() { 081 private static final long serialVersionUID = -7446792754185879286L; 082 083 protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) { 084 if (size() > maxCacheSize) { 085 Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator(); 086 Map.Entry<Destination, DestinationBridge> lru = iter.next(); 087 remove(lru.getKey()); 088 DestinationBridge bridge = (DestinationBridge)lru.getValue(); 089 try { 090 bridge.stop(); 091 LOG.info("Expired bridge: " + bridge); 092 } catch (Exception e) { 093 LOG.warn("stopping expired bridge" + bridge + " caused an exception", e); 094 } 095 } 096 return false; 097 } 098 }; 099 } 100 101 public boolean init() { 102 boolean result = initialized.compareAndSet(false, true); 103 if (result) { 104 if (jndiLocalTemplate == null) { 105 jndiLocalTemplate = new JndiTemplate(); 106 } 107 if (jndiOutboundTemplate == null) { 108 jndiOutboundTemplate = new JndiTemplate(); 109 } 110 if (inboundMessageConvertor == null) { 111 inboundMessageConvertor = new SimpleJmsMessageConvertor(); 112 } 113 if (outboundMessageConvertor == null) { 114 outboundMessageConvertor = new SimpleJmsMessageConvertor(); 115 } 116 replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize()); 117 118 connectionSerivce = createExecutor(); 119 120 // Subclasses can override this to customize their own it. 121 result = doConnectorInit(); 122 } 123 return result; 124 } 125 126 protected boolean doConnectorInit() { 127 128 // We try to make a connection via a sync call first so that the 129 // JmsConnector is fully initialized before the start call returns 130 // in order to avoid missing any messages that are dispatched 131 // immediately after startup. If either side fails we queue an 132 // asynchronous task to manage the reconnect attempts. 133 134 try { 135 initializeLocalConnection(); 136 localSideInitialized.set(true); 137 } catch(Exception e) { 138 // Queue up the task to attempt the local connection. 139 scheduleAsyncLocalConnectionReconnect(); 140 } 141 142 try { 143 initializeForeignConnection(); 144 foreignSideInitialized.set(true); 145 } catch(Exception e) { 146 // Queue up the task for the foreign connection now. 147 scheduleAsyncForeignConnectionReconnect(); 148 } 149 150 return true; 151 } 152 153 public void start() throws Exception { 154 if (started.compareAndSet(false, true)) { 155 init(); 156 for (DestinationBridge bridge : inboundBridges) { 157 bridge.start(); 158 } 159 for (DestinationBridge bridge : outboundBridges) { 160 bridge.start(); 161 } 162 LOG.info("JMS Connector " + getName() + " Started"); 163 } 164 } 165 166 public void stop() throws Exception { 167 if (started.compareAndSet(true, false)) { 168 169 this.connectionSerivce.shutdown(); 170 171 for (DestinationBridge bridge : inboundBridges) { 172 bridge.stop(); 173 } 174 for (DestinationBridge bridge : outboundBridges) { 175 bridge.stop(); 176 } 177 LOG.info("JMS Connector " + getName() + " Stopped"); 178 } 179 } 180 181 public void clearBridges() { 182 inboundBridges.clear(); 183 outboundBridges.clear(); 184 replyToBridges.clear(); 185 } 186 187 protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); 188 189 /** 190 * One way to configure the local connection - this is called by The 191 * BrokerService when the Connector is embedded 192 * 193 * @param service 194 */ 195 public void setBrokerService(BrokerService service) { 196 embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI()); 197 } 198 199 public Connection getLocalConnection() { 200 return this.localConnection.get(); 201 } 202 203 public Connection getForeignConnection() { 204 return this.foreignConnection.get(); 205 } 206 207 /** 208 * @return Returns the jndiTemplate. 209 */ 210 public JndiTemplate getJndiLocalTemplate() { 211 return jndiLocalTemplate; 212 } 213 214 /** 215 * @param jndiTemplate The jndiTemplate to set. 216 */ 217 public void setJndiLocalTemplate(JndiTemplate jndiTemplate) { 218 this.jndiLocalTemplate = jndiTemplate; 219 } 220 221 /** 222 * @return Returns the jndiOutboundTemplate. 223 */ 224 public JndiTemplate getJndiOutboundTemplate() { 225 return jndiOutboundTemplate; 226 } 227 228 /** 229 * @param jndiOutboundTemplate The jndiOutboundTemplate to set. 230 */ 231 public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) { 232 this.jndiOutboundTemplate = jndiOutboundTemplate; 233 } 234 235 /** 236 * @return Returns the inboundMessageConvertor. 237 */ 238 public JmsMesageConvertor getInboundMessageConvertor() { 239 return inboundMessageConvertor; 240 } 241 242 /** 243 * @param inboundMessageConvertor The inboundMessageConvertor to set. 244 */ 245 public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 246 this.inboundMessageConvertor = jmsMessageConvertor; 247 } 248 249 /** 250 * @return Returns the outboundMessageConvertor. 251 */ 252 public JmsMesageConvertor getOutboundMessageConvertor() { 253 return outboundMessageConvertor; 254 } 255 256 /** 257 * @param outboundMessageConvertor The outboundMessageConvertor to set. 258 */ 259 public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) { 260 this.outboundMessageConvertor = outboundMessageConvertor; 261 } 262 263 /** 264 * @return Returns the replyToDestinationCacheSize. 265 */ 266 public int getReplyToDestinationCacheSize() { 267 return replyToDestinationCacheSize; 268 } 269 270 /** 271 * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set. 272 */ 273 public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) { 274 this.replyToDestinationCacheSize = replyToDestinationCacheSize; 275 } 276 277 /** 278 * @return Returns the localPassword. 279 */ 280 public String getLocalPassword() { 281 return localPassword; 282 } 283 284 /** 285 * @param localPassword The localPassword to set. 286 */ 287 public void setLocalPassword(String localPassword) { 288 this.localPassword = localPassword; 289 } 290 291 /** 292 * @return Returns the localUsername. 293 */ 294 public String getLocalUsername() { 295 return localUsername; 296 } 297 298 /** 299 * @param localUsername The localUsername to set. 300 */ 301 public void setLocalUsername(String localUsername) { 302 this.localUsername = localUsername; 303 } 304 305 /** 306 * @return Returns the outboundPassword. 307 */ 308 public String getOutboundPassword() { 309 return outboundPassword; 310 } 311 312 /** 313 * @param outboundPassword The outboundPassword to set. 314 */ 315 public void setOutboundPassword(String outboundPassword) { 316 this.outboundPassword = outboundPassword; 317 } 318 319 /** 320 * @return Returns the outboundUsername. 321 */ 322 public String getOutboundUsername() { 323 return outboundUsername; 324 } 325 326 /** 327 * @param outboundUsername The outboundUsername to set. 328 */ 329 public void setOutboundUsername(String outboundUsername) { 330 this.outboundUsername = outboundUsername; 331 } 332 333 /** 334 * @return the outboundClientId 335 */ 336 public String getOutboundClientId() { 337 return outboundClientId; 338 } 339 340 /** 341 * @param outboundClientId the outboundClientId to set 342 */ 343 public void setOutboundClientId(String outboundClientId) { 344 this.outboundClientId = outboundClientId; 345 } 346 347 /** 348 * @return the localClientId 349 */ 350 public String getLocalClientId() { 351 return localClientId; 352 } 353 354 /** 355 * @param localClientId the localClientId to set 356 */ 357 public void setLocalClientId(String localClientId) { 358 this.localClientId = localClientId; 359 } 360 361 /** 362 * @return the currently configured reconnection policy. 363 */ 364 public ReconnectionPolicy getReconnectionPolicy() { 365 return this.policy; 366 } 367 368 /** 369 * @param policy The new reconnection policy this {@link JmsConnector} should use. 370 */ 371 public void setReconnectionPolicy(ReconnectionPolicy policy) { 372 this.policy = policy; 373 } 374 375 /** 376 * @return returns true if the {@link JmsConnector} is connected to both brokers. 377 */ 378 public boolean isConnected() { 379 return localConnection.get() != null && foreignConnection.get() != null; 380 } 381 382 protected void addInboundBridge(DestinationBridge bridge) { 383 if (!inboundBridges.contains(bridge)) { 384 inboundBridges.add(bridge); 385 } 386 } 387 388 protected void addOutboundBridge(DestinationBridge bridge) { 389 if (!outboundBridges.contains(bridge)) { 390 outboundBridges.add(bridge); 391 } 392 } 393 394 protected void removeInboundBridge(DestinationBridge bridge) { 395 inboundBridges.remove(bridge); 396 } 397 398 protected void removeOutboundBridge(DestinationBridge bridge) { 399 outboundBridges.remove(bridge); 400 } 401 402 public String getName() { 403 if (name == null) { 404 name = "Connector:" + getNextId(); 405 } 406 return name; 407 } 408 409 public void setName(String name) { 410 this.name = name; 411 } 412 413 private static synchronized int getNextId() { 414 return nextId++; 415 } 416 417 public boolean isFailed() { 418 return this.failed.get(); 419 } 420 421 /** 422 * Performs the work of connection to the local side of the Connection. 423 * <p> 424 * This creates the initial connection to the local end of the {@link JmsConnector} 425 * and then sets up all the destination bridges with the information needed to bridge 426 * on the local side of the connection. 427 * 428 * @throws Exception if the connection cannot be established for any reason. 429 */ 430 protected abstract void initializeLocalConnection() throws Exception; 431 432 /** 433 * Performs the work of connection to the foreign side of the Connection. 434 * <p> 435 * This creates the initial connection to the foreign end of the {@link JmsConnector} 436 * and then sets up all the destination bridges with the information needed to bridge 437 * on the foreign side of the connection. 438 * 439 * @throws Exception if the connection cannot be established for any reason. 440 */ 441 protected abstract void initializeForeignConnection() throws Exception; 442 443 /** 444 * Callback method that the Destination bridges can use to report an exception to occurs 445 * during normal bridging operations. 446 * 447 * @param connection 448 * The connection that was in use when the failure occured. 449 */ 450 void handleConnectionFailure(Connection connection) { 451 452 // Can happen if async exception listener kicks in at the same time. 453 if (connection == null || !this.started.get()) { 454 return; 455 } 456 457 LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]"); 458 459 // TODO - How do we handle the re-wiring of replyToBridges in this case. 460 replyToBridges.clear(); 461 462 if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) { 463 464 // Stop the inbound bridges when the foreign connection is dropped since 465 // the bridge has no consumer and needs to be restarted once a new connection 466 // to the foreign side is made. 467 for (DestinationBridge bridge : inboundBridges) { 468 try { 469 bridge.stop(); 470 } catch(Exception e) { 471 } 472 } 473 474 // We got here first and cleared the connection, now we queue a reconnect. 475 this.connectionSerivce.execute(new Runnable() { 476 477 @Override 478 public void run() { 479 try { 480 doInitializeConnection(false); 481 } catch (Exception e) { 482 LOG.error("Failed to initialize forgein connection for the JMSConnector", e); 483 } 484 } 485 }); 486 487 } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) { 488 489 // Stop the outbound bridges when the local connection is dropped since 490 // the bridge has no consumer and needs to be restarted once a new connection 491 // to the local side is made. 492 for (DestinationBridge bridge : outboundBridges) { 493 try { 494 bridge.stop(); 495 } catch(Exception e) { 496 } 497 } 498 499 // We got here first and cleared the connection, now we queue a reconnect. 500 this.connectionSerivce.execute(new Runnable() { 501 502 @Override 503 public void run() { 504 try { 505 doInitializeConnection(true); 506 } catch (Exception e) { 507 LOG.error("Failed to initialize local connection for the JMSConnector", e); 508 } 509 } 510 }); 511 } 512 } 513 514 private void scheduleAsyncLocalConnectionReconnect() { 515 this.connectionSerivce.execute(new Runnable() { 516 @Override 517 public void run() { 518 try { 519 doInitializeConnection(true); 520 } catch (Exception e) { 521 LOG.error("Failed to initialize local connection for the JMSConnector", e); 522 } 523 } 524 }); 525 } 526 527 private void scheduleAsyncForeignConnectionReconnect() { 528 this.connectionSerivce.execute(new Runnable() { 529 @Override 530 public void run() { 531 try { 532 doInitializeConnection(false); 533 } catch (Exception e) { 534 LOG.error("Failed to initialize forgein connection for the JMSConnector", e); 535 } 536 } 537 }); 538 } 539 540 private void doInitializeConnection(boolean local) throws Exception { 541 542 int attempt = 0; 543 544 final int maxRetries; 545 if (local) { 546 maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() : 547 policy.getMaxReconnectAttempts(); 548 } else { 549 maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() : 550 policy.getMaxReconnectAttempts(); 551 } 552 553 do 554 { 555 if (attempt > 0) { 556 try { 557 Thread.sleep(policy.getNextDelay(attempt)); 558 } catch(InterruptedException e) { 559 } 560 } 561 562 if (connectionSerivce.isTerminating()) { 563 return; 564 } 565 566 try { 567 568 if (local) { 569 initializeLocalConnection(); 570 localSideInitialized.set(true); 571 } else { 572 initializeForeignConnection(); 573 foreignSideInitialized.set(true); 574 } 575 576 // Once we are connected we ensure all the bridges are started. 577 if (localConnection.get() != null && foreignConnection.get() != null) { 578 for (DestinationBridge bridge : inboundBridges) { 579 bridge.start(); 580 } 581 for (DestinationBridge bridge : outboundBridges) { 582 bridge.start(); 583 } 584 } 585 586 return; 587 } catch(Exception e) { 588 LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") + 589 " connection for JmsConnector [" + attempt + "]: " + e.getMessage()); 590 } 591 } 592 while (maxRetries < ++attempt && !connectionSerivce.isTerminating()); 593 594 this.failed.set(true); 595 } 596 597 private ThreadFactory factory = new ThreadFactory() { 598 public Thread newThread(Runnable runnable) { 599 Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: "); 600 thread.setDaemon(true); 601 return thread; 602 } 603 }; 604 605 private ThreadPoolExecutor createExecutor() { 606 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory); 607 exec.allowCoreThreadTimeOut(true); 608 return exec; 609 } 610 }