001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.ExceptionListener; 022import javax.jms.JMSException; 023import javax.jms.Session; 024import javax.jms.Topic; 025import javax.jms.TopicConnection; 026import javax.jms.TopicConnectionFactory; 027import javax.jms.TopicSession; 028import javax.naming.NamingException; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * A Bridge to other JMS Topic providers 035 */ 036public class SimpleJmsTopicConnector extends JmsConnector { 037 private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsTopicConnector.class); 038 private String outboundTopicConnectionFactoryName; 039 private String localConnectionFactoryName; 040 private TopicConnectionFactory outboundTopicConnectionFactory; 041 private TopicConnectionFactory localTopicConnectionFactory; 042 private InboundTopicBridge[] inboundTopicBridges; 043 private OutboundTopicBridge[] outboundTopicBridges; 044 045 /** 046 * @return Returns the inboundTopicBridges. 047 */ 048 public InboundTopicBridge[] getInboundTopicBridges() { 049 return inboundTopicBridges; 050 } 051 052 /** 053 * @param inboundTopicBridges The inboundTopicBridges to set. 054 */ 055 public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) { 056 this.inboundTopicBridges = inboundTopicBridges; 057 } 058 059 /** 060 * @return Returns the outboundTopicBridges. 061 */ 062 public OutboundTopicBridge[] getOutboundTopicBridges() { 063 return outboundTopicBridges; 064 } 065 066 /** 067 * @param outboundTopicBridges The outboundTopicBridges to set. 068 */ 069 public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) { 070 this.outboundTopicBridges = outboundTopicBridges; 071 } 072 073 /** 074 * @return Returns the localTopicConnectionFactory. 075 */ 076 public TopicConnectionFactory getLocalTopicConnectionFactory() { 077 return localTopicConnectionFactory; 078 } 079 080 /** 081 * @param localTopicConnectionFactory The localTopicConnectionFactory to set. 082 */ 083 public void setLocalTopicConnectionFactory(TopicConnectionFactory localTopicConnectionFactory) { 084 this.localTopicConnectionFactory = localTopicConnectionFactory; 085 } 086 087 /** 088 * @return Returns the outboundTopicConnectionFactory. 089 */ 090 public TopicConnectionFactory getOutboundTopicConnectionFactory() { 091 return outboundTopicConnectionFactory; 092 } 093 094 /** 095 * @return Returns the outboundTopicConnectionFactoryName. 096 */ 097 public String getOutboundTopicConnectionFactoryName() { 098 return outboundTopicConnectionFactoryName; 099 } 100 101 /** 102 * @param foreignTopicConnectionFactoryName The foreignTopicConnectionFactoryName to set. 103 */ 104 public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) { 105 this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName; 106 } 107 108 /** 109 * @return Returns the localConnectionFactoryName. 110 */ 111 public String getLocalConnectionFactoryName() { 112 return localConnectionFactoryName; 113 } 114 115 /** 116 * @param localConnectionFactoryName The localConnectionFactoryName to set. 117 */ 118 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 119 this.localConnectionFactoryName = localConnectionFactoryName; 120 } 121 122 /** 123 * @return Returns the localTopicConnection. 124 */ 125 public TopicConnection getLocalTopicConnection() { 126 return (TopicConnection) localConnection.get(); 127 } 128 129 /** 130 * @param localTopicConnection The localTopicConnection to set. 131 */ 132 public void setLocalTopicConnection(TopicConnection localTopicConnection) { 133 this.localConnection.set(localTopicConnection); 134 } 135 136 /** 137 * @return Returns the outboundTopicConnection. 138 */ 139 public TopicConnection getOutboundTopicConnection() { 140 return (TopicConnection) foreignConnection.get(); 141 } 142 143 /** 144 * @param foreignTopicConnection The foreignTopicConnection to set. 145 */ 146 public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) { 147 this.foreignConnection.set(foreignTopicConnection); 148 } 149 150 /** 151 * @param foreignTopicConnectionFactory The foreignTopicConnectionFactory to set. 152 */ 153 public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) { 154 this.outboundTopicConnectionFactory = foreignTopicConnectionFactory; 155 } 156 157 @Override 158 protected void initializeForeignConnection() throws NamingException, JMSException { 159 160 TopicConnection newConnection = null; 161 162 try { 163 if (foreignConnection.get() == null) { 164 // get the connection factories 165 if (outboundTopicConnectionFactory == null) { 166 // look it up from JNDI 167 if (outboundTopicConnectionFactoryName != null) { 168 outboundTopicConnectionFactory = jndiOutboundTemplate 169 .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class); 170 if (outboundUsername != null) { 171 newConnection = outboundTopicConnectionFactory 172 .createTopicConnection(outboundUsername, outboundPassword); 173 } else { 174 newConnection = outboundTopicConnectionFactory.createTopicConnection(); 175 } 176 } else { 177 throw new JMSException("Cannot create foreignConnection - no information"); 178 } 179 } else { 180 if (outboundUsername != null) { 181 newConnection = outboundTopicConnectionFactory 182 .createTopicConnection(outboundUsername, outboundPassword); 183 } else { 184 newConnection = outboundTopicConnectionFactory.createTopicConnection(); 185 } 186 } 187 } else { 188 // Clear if for now in case something goes wrong during the init. 189 newConnection = (TopicConnection) foreignConnection.getAndSet(null); 190 } 191 192 // Register for any async error notifications now so we can reset in the 193 // case where there's not a lot of activity and a connection drops. 194 newConnection.setExceptionListener(new ExceptionListener() { 195 @Override 196 public void onException(JMSException exception) { 197 handleConnectionFailure(foreignConnection.get()); 198 } 199 }); 200 201 if (outboundClientId != null && outboundClientId.length() > 0) { 202 newConnection.setClientID(getOutboundClientId()); 203 } 204 newConnection.start(); 205 206 outboundMessageConvertor.setConnection(newConnection); 207 208 // Configure the bridges with the new Outbound connection. 209 initializeInboundDestinationBridgesOutboundSide(newConnection); 210 initializeOutboundDestinationBridgesOutboundSide(newConnection); 211 212 // At this point all looks good, so this our current connection now. 213 foreignConnection.set(newConnection); 214 } catch (Exception ex) { 215 if (newConnection != null) { 216 try { 217 newConnection.close(); 218 } catch (Exception ignore) {} 219 } 220 221 throw ex; 222 } 223 } 224 225 @Override 226 protected void initializeLocalConnection() throws NamingException, JMSException { 227 228 TopicConnection newConnection = null; 229 230 try { 231 if (localConnection.get() == null) { 232 // get the connection factories 233 if (localTopicConnectionFactory == null) { 234 if (embeddedConnectionFactory == null) { 235 // look it up from JNDI 236 if (localConnectionFactoryName != null) { 237 localTopicConnectionFactory = jndiLocalTemplate 238 .lookup(localConnectionFactoryName, TopicConnectionFactory.class); 239 if (localUsername != null) { 240 newConnection = localTopicConnectionFactory 241 .createTopicConnection(localUsername, localPassword); 242 } else { 243 newConnection = localTopicConnectionFactory.createTopicConnection(); 244 } 245 } else { 246 throw new JMSException("Cannot create localConnection - no information"); 247 } 248 } else { 249 newConnection = embeddedConnectionFactory.createTopicConnection(); 250 } 251 } else { 252 if (localUsername != null) { 253 newConnection = localTopicConnectionFactory. 254 createTopicConnection(localUsername, localPassword); 255 } else { 256 newConnection = localTopicConnectionFactory.createTopicConnection(); 257 } 258 } 259 260 } else { 261 // Clear if for now in case something goes wrong during the init. 262 newConnection = (TopicConnection) localConnection.getAndSet(null); 263 } 264 265 // Register for any async error notifications now so we can reset in the 266 // case where there's not a lot of activity and a connection drops. 267 newConnection.setExceptionListener(new ExceptionListener() { 268 @Override 269 public void onException(JMSException exception) { 270 handleConnectionFailure(localConnection.get()); 271 } 272 }); 273 274 if (localClientId != null && localClientId.length() > 0) { 275 newConnection.setClientID(getLocalClientId()); 276 } 277 newConnection.start(); 278 279 inboundMessageConvertor.setConnection(newConnection); 280 281 // Configure the bridges with the new Local connection. 282 initializeInboundDestinationBridgesLocalSide(newConnection); 283 initializeOutboundDestinationBridgesLocalSide(newConnection); 284 285 // At this point all looks good, so this our current connection now. 286 localConnection.set(newConnection); 287 } catch (Exception ex) { 288 if (newConnection != null) { 289 try { 290 newConnection.close(); 291 } catch (Exception ignore) {} 292 } 293 294 throw ex; 295 } 296 } 297 298 protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { 299 if (inboundTopicBridges != null) { 300 TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 301 302 for (InboundTopicBridge bridge : inboundTopicBridges) { 303 String TopicName = bridge.getInboundTopicName(); 304 Topic foreignTopic = createForeignTopic(outboundSession, TopicName); 305 bridge.setConsumer(null); 306 bridge.setConsumerTopic(foreignTopic); 307 bridge.setConsumerConnection(connection); 308 bridge.setJmsConnector(this); 309 addInboundBridge(bridge); 310 } 311 outboundSession.close(); 312 } 313 } 314 315 protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { 316 if (inboundTopicBridges != null) { 317 TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 318 319 for (InboundTopicBridge bridge : inboundTopicBridges) { 320 String localTopicName = bridge.getLocalTopicName(); 321 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); 322 bridge.setProducerTopic(activemqTopic); 323 bridge.setProducerConnection(connection); 324 if (bridge.getJmsMessageConvertor() == null) { 325 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 326 } 327 bridge.setJmsConnector(this); 328 addInboundBridge(bridge); 329 } 330 localSession.close(); 331 } 332 } 333 334 protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { 335 if (outboundTopicBridges != null) { 336 TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 337 338 for (OutboundTopicBridge bridge : outboundTopicBridges) { 339 String topicName = bridge.getOutboundTopicName(); 340 Topic foreignTopic = createForeignTopic(outboundSession, topicName); 341 bridge.setProducerTopic(foreignTopic); 342 bridge.setProducerConnection(connection); 343 if (bridge.getJmsMessageConvertor() == null) { 344 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 345 } 346 bridge.setJmsConnector(this); 347 addOutboundBridge(bridge); 348 } 349 outboundSession.close(); 350 } 351 } 352 353 protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { 354 if (outboundTopicBridges != null) { 355 TopicSession localSession = 356 connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 357 358 for (OutboundTopicBridge bridge : outboundTopicBridges) { 359 String localTopicName = bridge.getLocalTopicName(); 360 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); 361 bridge.setConsumer(null); 362 bridge.setConsumerTopic(activemqTopic); 363 bridge.setConsumerConnection(connection); 364 bridge.setJmsConnector(this); 365 addOutboundBridge(bridge); 366 } 367 localSession.close(); 368 } 369 } 370 371 @Override 372 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 373 Connection replyToConsumerConnection) { 374 Topic replyToProducerTopic = (Topic)destination; 375 boolean isInbound = replyToProducerConnection.equals(localConnection.get()); 376 377 if (isInbound) { 378 InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic); 379 if (bridge == null) { 380 bridge = new InboundTopicBridge() { 381 @Override 382 protected Destination processReplyToDestination(Destination destination) { 383 return null; 384 } 385 }; 386 try { 387 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) 388 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 389 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 390 replyToConsumerSession.close(); 391 bridge.setConsumerTopic(replyToConsumerTopic); 392 bridge.setProducerTopic(replyToProducerTopic); 393 bridge.setProducerConnection((TopicConnection)replyToProducerConnection); 394 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); 395 bridge.setDoHandleReplyTo(false); 396 if (bridge.getJmsMessageConvertor() == null) { 397 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 398 } 399 bridge.setJmsConnector(this); 400 bridge.start(); 401 LOG.info("Created replyTo bridge for {}", replyToProducerTopic); 402 } catch (Exception e) { 403 LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e); 404 return null; 405 } 406 replyToBridges.put(replyToProducerTopic, bridge); 407 } 408 return bridge.getConsumerTopic(); 409 } else { 410 OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic); 411 if (bridge == null) { 412 bridge = new OutboundTopicBridge() { 413 @Override 414 protected Destination processReplyToDestination(Destination destination) { 415 return null; 416 } 417 }; 418 try { 419 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) 420 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 421 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 422 replyToConsumerSession.close(); 423 bridge.setConsumerTopic(replyToConsumerTopic); 424 bridge.setProducerTopic(replyToProducerTopic); 425 bridge.setProducerConnection((TopicConnection)replyToProducerConnection); 426 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); 427 bridge.setDoHandleReplyTo(false); 428 if (bridge.getJmsMessageConvertor() == null) { 429 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 430 } 431 bridge.setJmsConnector(this); 432 bridge.start(); 433 LOG.info("Created replyTo bridge for {}", replyToProducerTopic); 434 } catch (Exception e) { 435 LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e); 436 return null; 437 } 438 replyToBridges.put(replyToProducerTopic, bridge); 439 } 440 return bridge.getConsumerTopic(); 441 } 442 } 443 444 protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException { 445 return session.createTopic(topicName); 446 } 447 448 protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException { 449 Topic result = null; 450 451 if (preferJndiDestinationLookup) { 452 try { 453 // look-up the Queue 454 result = jndiOutboundTemplate.lookup(topicName, Topic.class); 455 } catch (NamingException e) { 456 try { 457 result = session.createTopic(topicName); 458 } catch (JMSException e1) { 459 String errStr = "Failed to look-up or create Topic for name: " + topicName; 460 LOG.error(errStr, e); 461 JMSException jmsEx = new JMSException(errStr); 462 jmsEx.setLinkedException(e1); 463 throw jmsEx; 464 } 465 } 466 } else { 467 try { 468 result = session.createTopic(topicName); 469 } catch (JMSException e) { 470 // look-up the Topic 471 try { 472 result = jndiOutboundTemplate.lookup(topicName, Topic.class); 473 } catch (NamingException e1) { 474 String errStr = "Failed to look-up Topic for name: " + topicName; 475 LOG.error(errStr, e); 476 JMSException jmsEx = new JMSException(errStr); 477 jmsEx.setLinkedException(e1); 478 throw jmsEx; 479 } 480 } 481 } 482 483 return result; 484 } 485}