001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.ExceptionListener; 022import javax.jms.JMSException; 023import javax.jms.Queue; 024import javax.jms.QueueConnection; 025import javax.jms.QueueConnectionFactory; 026import javax.jms.QueueSession; 027import javax.jms.Session; 028import javax.naming.NamingException; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 */ 035public class SimpleJmsQueueConnector extends JmsConnector { 036 private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsQueueConnector.class); 037 private String outboundQueueConnectionFactoryName; 038 private String localConnectionFactoryName; 039 private QueueConnectionFactory outboundQueueConnectionFactory; 040 private QueueConnectionFactory localQueueConnectionFactory; 041 private InboundQueueBridge[] inboundQueueBridges; 042 private OutboundQueueBridge[] outboundQueueBridges; 043 044 /** 045 * @return Returns the inboundQueueBridges. 046 */ 047 public InboundQueueBridge[] getInboundQueueBridges() { 048 return inboundQueueBridges; 049 } 050 051 /** 052 * @param inboundQueueBridges The inboundQueueBridges to set. 053 */ 054 public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) { 055 this.inboundQueueBridges = inboundQueueBridges; 056 } 057 058 /** 059 * @return Returns the outboundQueueBridges. 060 */ 061 public OutboundQueueBridge[] getOutboundQueueBridges() { 062 return outboundQueueBridges; 063 } 064 065 /** 066 * @param outboundQueueBridges The outboundQueueBridges to set. 067 */ 068 public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) { 069 this.outboundQueueBridges = outboundQueueBridges; 070 } 071 072 /** 073 * @return Returns the localQueueConnectionFactory. 074 */ 075 public QueueConnectionFactory getLocalQueueConnectionFactory() { 076 return localQueueConnectionFactory; 077 } 078 079 /** 080 * @param localConnectionFactory The localQueueConnectionFactory to 081 * set. 082 */ 083 public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) { 084 this.localQueueConnectionFactory = localConnectionFactory; 085 } 086 087 /** 088 * @return Returns the outboundQueueConnectionFactory. 089 */ 090 public QueueConnectionFactory getOutboundQueueConnectionFactory() { 091 return outboundQueueConnectionFactory; 092 } 093 094 /** 095 * @return Returns the outboundQueueConnectionFactoryName. 096 */ 097 public String getOutboundQueueConnectionFactoryName() { 098 return outboundQueueConnectionFactoryName; 099 } 100 101 /** 102 * @param foreignQueueConnectionFactoryName The 103 * foreignQueueConnectionFactoryName to set. 104 */ 105 public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) { 106 this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName; 107 } 108 109 /** 110 * @return Returns the localConnectionFactoryName. 111 */ 112 public String getLocalConnectionFactoryName() { 113 return localConnectionFactoryName; 114 } 115 116 /** 117 * @param localConnectionFactoryName The localConnectionFactoryName to set. 118 */ 119 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 120 this.localConnectionFactoryName = localConnectionFactoryName; 121 } 122 123 /** 124 * @return Returns the localQueueConnection. 125 */ 126 public QueueConnection getLocalQueueConnection() { 127 return (QueueConnection) localConnection.get(); 128 } 129 130 /** 131 * @param localQueueConnection The localQueueConnection to set. 132 */ 133 public void setLocalQueueConnection(QueueConnection localQueueConnection) { 134 this.localConnection.set(localQueueConnection); 135 } 136 137 /** 138 * @return Returns the outboundQueueConnection. 139 */ 140 public QueueConnection getOutboundQueueConnection() { 141 return (QueueConnection) foreignConnection.get(); 142 } 143 144 /** 145 * @param foreignQueueConnection The foreignQueueConnection to set. 146 */ 147 public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) { 148 this.foreignConnection.set(foreignQueueConnection); 149 } 150 151 /** 152 * @param foreignQueueConnectionFactory The foreignQueueConnectionFactory to set. 153 */ 154 public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) { 155 this.outboundQueueConnectionFactory = foreignQueueConnectionFactory; 156 } 157 158 @Override 159 protected void initializeForeignConnection() throws NamingException, JMSException { 160 161 QueueConnection newConnection = null; 162 163 try { 164 if (foreignConnection.get() == null) { 165 // get the connection factories 166 if (outboundQueueConnectionFactory == null) { 167 // look it up from JNDI 168 if (outboundQueueConnectionFactoryName != null) { 169 outboundQueueConnectionFactory = jndiOutboundTemplate 170 .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class); 171 if (outboundUsername != null) { 172 newConnection = outboundQueueConnectionFactory 173 .createQueueConnection(outboundUsername, outboundPassword); 174 } else { 175 newConnection = outboundQueueConnectionFactory.createQueueConnection(); 176 } 177 } else { 178 throw new JMSException("Cannot create foreignConnection - no information"); 179 } 180 } else { 181 if (outboundUsername != null) { 182 newConnection = outboundQueueConnectionFactory 183 .createQueueConnection(outboundUsername, outboundPassword); 184 } else { 185 newConnection = outboundQueueConnectionFactory.createQueueConnection(); 186 } 187 } 188 } else { 189 // Clear if for now in case something goes wrong during the init. 190 newConnection = (QueueConnection) foreignConnection.getAndSet(null); 191 } 192 193 // Register for any async error notifications now so we can reset in the 194 // case where there's not a lot of activity and a connection drops. 195 newConnection.setExceptionListener(new ExceptionListener() { 196 @Override 197 public void onException(JMSException exception) { 198 handleConnectionFailure(foreignConnection.get()); 199 } 200 }); 201 202 if (outboundClientId != null && outboundClientId.length() > 0) { 203 newConnection.setClientID(getOutboundClientId()); 204 } 205 newConnection.start(); 206 207 outboundMessageConvertor.setConnection(newConnection); 208 209 // Configure the bridges with the new Outbound connection. 210 initializeInboundDestinationBridgesOutboundSide(newConnection); 211 initializeOutboundDestinationBridgesOutboundSide(newConnection); 212 213 // At this point all looks good, so this our current connection now. 214 foreignConnection.set(newConnection); 215 } catch (Exception ex) { 216 if (newConnection != null) { 217 try { 218 newConnection.close(); 219 } catch (Exception ignore) {} 220 } 221 222 throw ex; 223 } 224 } 225 226 @Override 227 protected void initializeLocalConnection() throws NamingException, JMSException { 228 229 QueueConnection newConnection = null; 230 231 try { 232 if (localConnection.get() == null) { 233 // get the connection factories 234 if (localQueueConnectionFactory == null) { 235 if (embeddedConnectionFactory == null) { 236 // look it up from JNDI 237 if (localConnectionFactoryName != null) { 238 localQueueConnectionFactory = jndiLocalTemplate 239 .lookup(localConnectionFactoryName, QueueConnectionFactory.class); 240 if (localUsername != null) { 241 newConnection = localQueueConnectionFactory 242 .createQueueConnection(localUsername, localPassword); 243 } else { 244 newConnection = localQueueConnectionFactory.createQueueConnection(); 245 } 246 } else { 247 throw new JMSException("Cannot create localConnection - no information"); 248 } 249 } else { 250 newConnection = embeddedConnectionFactory.createQueueConnection(); 251 } 252 } else { 253 if (localUsername != null) { 254 newConnection = localQueueConnectionFactory. 255 createQueueConnection(localUsername, localPassword); 256 } else { 257 newConnection = localQueueConnectionFactory.createQueueConnection(); 258 } 259 } 260 261 } else { 262 // Clear if for now in case something goes wrong during the init. 263 newConnection = (QueueConnection) localConnection.getAndSet(null); 264 } 265 266 // Register for any async error notifications now so we can reset in the 267 // case where there's not a lot of activity and a connection drops. 268 newConnection.setExceptionListener(new ExceptionListener() { 269 @Override 270 public void onException(JMSException exception) { 271 handleConnectionFailure(localConnection.get()); 272 } 273 }); 274 275 if (localClientId != null && localClientId.length() > 0) { 276 newConnection.setClientID(getLocalClientId()); 277 } 278 newConnection.start(); 279 280 inboundMessageConvertor.setConnection(newConnection); 281 282 // Configure the bridges with the new Local connection. 283 initializeInboundDestinationBridgesLocalSide(newConnection); 284 initializeOutboundDestinationBridgesLocalSide(newConnection); 285 286 // At this point all looks good, so this our current connection now. 287 localConnection.set(newConnection); 288 } catch (Exception ex) { 289 if (newConnection != null) { 290 try { 291 newConnection.close(); 292 } catch (Exception ignore) {} 293 } 294 295 throw ex; 296 } 297 } 298 299 protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { 300 if (inboundQueueBridges != null) { 301 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 302 303 for (InboundQueueBridge bridge : inboundQueueBridges) { 304 String queueName = bridge.getInboundQueueName(); 305 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 306 bridge.setConsumer(null); 307 bridge.setConsumerQueue(foreignQueue); 308 bridge.setConsumerConnection(connection); 309 bridge.setJmsConnector(this); 310 addInboundBridge(bridge); 311 } 312 outboundSession.close(); 313 } 314 } 315 316 protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { 317 if (inboundQueueBridges != null) { 318 QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 319 320 for (InboundQueueBridge bridge : inboundQueueBridges) { 321 String localQueueName = bridge.getLocalQueueName(); 322 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 323 bridge.setProducerQueue(activemqQueue); 324 bridge.setProducerConnection(connection); 325 if (bridge.getJmsMessageConvertor() == null) { 326 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 327 } 328 bridge.setJmsConnector(this); 329 addInboundBridge(bridge); 330 } 331 localSession.close(); 332 } 333 } 334 335 protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { 336 if (outboundQueueBridges != null) { 337 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 338 339 for (OutboundQueueBridge bridge : outboundQueueBridges) { 340 String queueName = bridge.getOutboundQueueName(); 341 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 342 bridge.setProducerQueue(foreignQueue); 343 bridge.setProducerConnection(connection); 344 if (bridge.getJmsMessageConvertor() == null) { 345 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 346 } 347 bridge.setJmsConnector(this); 348 addOutboundBridge(bridge); 349 } 350 outboundSession.close(); 351 } 352 } 353 354 protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { 355 if (outboundQueueBridges != null) { 356 QueueSession localSession = 357 connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 358 359 for (OutboundQueueBridge bridge : outboundQueueBridges) { 360 String localQueueName = bridge.getLocalQueueName(); 361 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 362 bridge.setConsumer(null); 363 bridge.setConsumerQueue(activemqQueue); 364 bridge.setConsumerConnection(connection); 365 bridge.setJmsConnector(this); 366 addOutboundBridge(bridge); 367 } 368 localSession.close(); 369 } 370 } 371 372 @Override 373 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 374 Connection replyToConsumerConnection) { 375 Queue replyToProducerQueue = (Queue)destination; 376 boolean isInbound = replyToProducerConnection.equals(localConnection.get()); 377 378 if (isInbound) { 379 InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue); 380 if (bridge == null) { 381 bridge = new InboundQueueBridge() { 382 @Override 383 protected Destination processReplyToDestination(Destination destination) { 384 return null; 385 } 386 }; 387 try { 388 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 389 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 390 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 391 replyToConsumerSession.close(); 392 bridge.setConsumerQueue(replyToConsumerQueue); 393 bridge.setProducerQueue(replyToProducerQueue); 394 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 395 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 396 bridge.setDoHandleReplyTo(false); 397 if (bridge.getJmsMessageConvertor() == null) { 398 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 399 } 400 bridge.setJmsConnector(this); 401 bridge.start(); 402 LOG.info("Created replyTo bridge for {}", replyToProducerQueue); 403 } catch (Exception e) { 404 LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e); 405 return null; 406 } 407 replyToBridges.put(replyToProducerQueue, bridge); 408 } 409 return bridge.getConsumerQueue(); 410 } else { 411 OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue); 412 if (bridge == null) { 413 bridge = new OutboundQueueBridge() { 414 @Override 415 protected Destination processReplyToDestination(Destination destination) { 416 return null; 417 } 418 }; 419 try { 420 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 421 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 422 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 423 replyToConsumerSession.close(); 424 bridge.setConsumerQueue(replyToConsumerQueue); 425 bridge.setProducerQueue(replyToProducerQueue); 426 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 427 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 428 bridge.setDoHandleReplyTo(false); 429 if (bridge.getJmsMessageConvertor() == null) { 430 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 431 } 432 bridge.setJmsConnector(this); 433 bridge.start(); 434 LOG.info("Created replyTo bridge for {}", replyToProducerQueue); 435 } catch (Exception e) { 436 LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e); 437 return null; 438 } 439 replyToBridges.put(replyToProducerQueue, bridge); 440 } 441 return bridge.getConsumerQueue(); 442 } 443 } 444 445 protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException { 446 return session.createQueue(queueName); 447 } 448 449 protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException { 450 Queue result = null; 451 452 if (preferJndiDestinationLookup) { 453 try { 454 // look-up the Queue 455 result = jndiOutboundTemplate.lookup(queueName, Queue.class); 456 } catch (NamingException e) { 457 try { 458 result = session.createQueue(queueName); 459 } catch (JMSException e1) { 460 String errStr = "Failed to look-up or create Queue for name: " + queueName; 461 LOG.error(errStr, e); 462 JMSException jmsEx = new JMSException(errStr); 463 jmsEx.setLinkedException(e1); 464 throw jmsEx; 465 } 466 } 467 } else { 468 try { 469 result = session.createQueue(queueName); 470 } catch (JMSException e) { 471 // look-up the Queue 472 try { 473 result = jndiOutboundTemplate.lookup(queueName, Queue.class); 474 } catch (NamingException e1) { 475 String errStr = "Failed to look-up Queue for name: " + queueName; 476 LOG.error(errStr, e); 477 JMSException jmsEx = new JMSException(errStr); 478 jmsEx.setLinkedException(e1); 479 throw jmsEx; 480 } 481 } 482 } 483 484 return result; 485 } 486}