001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.net.URI; 020 import java.net.URISyntaxException; 021 import java.util.HashMap; 022 import java.util.Map; 023 import java.util.Properties; 024 import java.util.concurrent.Executor; 025 import java.util.concurrent.ScheduledThreadPoolExecutor; 026 import java.util.concurrent.ThreadFactory; 027 028 import javax.jms.Connection; 029 import javax.jms.ConnectionFactory; 030 import javax.jms.ExceptionListener; 031 import javax.jms.JMSException; 032 import javax.jms.QueueConnection; 033 import javax.jms.QueueConnectionFactory; 034 import javax.jms.TopicConnection; 035 import javax.jms.TopicConnectionFactory; 036 import javax.naming.Context; 037 038 import org.apache.activemq.blob.BlobTransferPolicy; 039 import org.apache.activemq.jndi.JNDIBaseStorable; 040 import org.apache.activemq.management.JMSStatsImpl; 041 import org.apache.activemq.management.StatsCapable; 042 import org.apache.activemq.management.StatsImpl; 043 import org.apache.activemq.transport.Transport; 044 import org.apache.activemq.transport.TransportFactory; 045 import org.apache.activemq.transport.TransportListener; 046 import org.apache.activemq.util.IdGenerator; 047 import org.apache.activemq.util.IntrospectionSupport; 048 import org.apache.activemq.util.JMSExceptionSupport; 049 import org.apache.activemq.util.URISupport; 050 import org.apache.activemq.util.URISupport.CompositeData; 051 052 /** 053 * A ConnectionFactory is an an Administered object, and is used for creating 054 * Connections. <p/> This class also implements QueueConnectionFactory and 055 * TopicConnectionFactory. You can use this connection to create both 056 * QueueConnections and TopicConnections. 057 * 058 * 059 * @see javax.jms.ConnectionFactory 060 */ 061 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable { 062 063 public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616"; 064 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; 065 public static final String DEFAULT_USER = null; 066 public static final String DEFAULT_PASSWORD = null; 067 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; 068 069 protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 070 public Thread newThread(Runnable run) { 071 Thread thread = new Thread(run); 072 thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION); 073 return thread; 074 } 075 }); 076 077 protected URI brokerURL; 078 protected String userName; 079 protected String password; 080 protected String clientID; 081 protected boolean dispatchAsync=true; 082 protected boolean alwaysSessionAsync=true; 083 084 JMSStatsImpl factoryStats = new JMSStatsImpl(); 085 086 private IdGenerator clientIdGenerator; 087 private String clientIDPrefix; 088 private IdGenerator connectionIdGenerator; 089 private String connectionIDPrefix; 090 091 // client policies 092 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 093 private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 094 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 095 private MessageTransformer transformer; 096 097 private boolean disableTimeStampsByDefault; 098 private boolean optimizedMessageDispatch = true; 099 private long optimizeAcknowledgeTimeOut = 300; 100 private boolean copyMessageOnSend = true; 101 private boolean useCompression; 102 private boolean objectMessageSerializationDefered; 103 private boolean useAsyncSend; 104 private boolean optimizeAcknowledge; 105 private int closeTimeout = 15000; 106 private boolean useRetroactiveConsumer; 107 private boolean exclusiveConsumer; 108 private boolean nestedMapAndListEnabled = true; 109 private boolean alwaysSyncSend; 110 private boolean watchTopicAdvisories = true; 111 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; 112 private long warnAboutUnstartedConnectionTimeout = 500L; 113 private int sendTimeout = 0; 114 private boolean sendAcksAsync=true; 115 private TransportListener transportListener; 116 private ExceptionListener exceptionListener; 117 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; 118 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; 119 private boolean useDedicatedTaskRunner; 120 private long consumerFailoverRedeliveryWaitPeriod = 0; 121 private boolean checkForDuplicates = true; 122 private ClientInternalExceptionListener clientInternalExceptionListener; 123 private boolean messagePrioritySupported = true; 124 private boolean transactedIndividualAck = false; 125 private boolean nonBlockingRedelivery = false; 126 127 // ///////////////////////////////////////////// 128 // 129 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods 130 // 131 // ///////////////////////////////////////////// 132 133 public ActiveMQConnectionFactory() { 134 this(DEFAULT_BROKER_URL); 135 } 136 137 public ActiveMQConnectionFactory(String brokerURL) { 138 this(createURI(brokerURL)); 139 } 140 141 public ActiveMQConnectionFactory(URI brokerURL) { 142 setBrokerURL(brokerURL.toString()); 143 } 144 145 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { 146 setUserName(userName); 147 setPassword(password); 148 setBrokerURL(brokerURL.toString()); 149 } 150 151 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 152 setUserName(userName); 153 setPassword(password); 154 setBrokerURL(brokerURL); 155 } 156 157 /** 158 * Returns a copy of the given connection factory 159 */ 160 public ActiveMQConnectionFactory copy() { 161 try { 162 return (ActiveMQConnectionFactory)super.clone(); 163 } catch (CloneNotSupportedException e) { 164 throw new RuntimeException("This should never happen: " + e, e); 165 } 166 } 167 168 /** 169 * @param brokerURL 170 * @return 171 * @throws URISyntaxException 172 */ 173 private static URI createURI(String brokerURL) { 174 try { 175 return new URI(brokerURL); 176 } catch (URISyntaxException e) { 177 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e); 178 } 179 } 180 181 /** 182 * @return Returns the Connection. 183 */ 184 public Connection createConnection() throws JMSException { 185 return createActiveMQConnection(); 186 } 187 188 /** 189 * @return Returns the Connection. 190 */ 191 public Connection createConnection(String userName, String password) throws JMSException { 192 return createActiveMQConnection(userName, password); 193 } 194 195 /** 196 * @return Returns the QueueConnection. 197 * @throws JMSException 198 */ 199 public QueueConnection createQueueConnection() throws JMSException { 200 return createActiveMQConnection(); 201 } 202 203 /** 204 * @return Returns the QueueConnection. 205 */ 206 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 207 return createActiveMQConnection(userName, password); 208 } 209 210 /** 211 * @return Returns the TopicConnection. 212 * @throws JMSException 213 */ 214 public TopicConnection createTopicConnection() throws JMSException { 215 return createActiveMQConnection(); 216 } 217 218 /** 219 * @return Returns the TopicConnection. 220 */ 221 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 222 return createActiveMQConnection(userName, password); 223 } 224 225 /** 226 * @returns the StatsImpl associated with this ConnectionFactory. 227 */ 228 public StatsImpl getStats() { 229 return this.factoryStats; 230 } 231 232 // ///////////////////////////////////////////// 233 // 234 // Implementation methods. 235 // 236 // ///////////////////////////////////////////// 237 238 protected ActiveMQConnection createActiveMQConnection() throws JMSException { 239 return createActiveMQConnection(userName, password); 240 } 241 242 /** 243 * Creates a Transport based on this object's connection settings. Separated 244 * from createActiveMQConnection to allow for subclasses to override. 245 * 246 * @return The newly created Transport. 247 * @throws JMSException If unable to create trasnport. 248 * @author sepandm@gmail.com 249 */ 250 protected Transport createTransport() throws JMSException { 251 try { 252 return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR); 253 } catch (Exception e) { 254 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); 255 } 256 } 257 258 /** 259 * @return Returns the Connection. 260 */ 261 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 262 if (brokerURL == null) { 263 throw new ConfigurationException("brokerURL not set."); 264 } 265 ActiveMQConnection connection = null; 266 try { 267 Transport transport = createTransport(); 268 connection = createActiveMQConnection(transport, factoryStats); 269 270 connection.setUserName(userName); 271 connection.setPassword(password); 272 273 configureConnection(connection); 274 275 transport.start(); 276 277 if (clientID != null) { 278 connection.setDefaultClientID(clientID); 279 } 280 281 return connection; 282 } catch (JMSException e) { 283 // Clean up! 284 try { 285 connection.close(); 286 } catch (Throwable ignore) { 287 } 288 throw e; 289 } catch (Exception e) { 290 // Clean up! 291 try { 292 connection.close(); 293 } catch (Throwable ignore) { 294 } 295 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); 296 } 297 } 298 299 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { 300 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), 301 getConnectionIdGenerator(), stats); 302 return connection; 303 } 304 305 protected void configureConnection(ActiveMQConnection connection) throws JMSException { 306 connection.setPrefetchPolicy(getPrefetchPolicy()); 307 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 308 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 309 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 310 connection.setUseCompression(isUseCompression()); 311 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); 312 connection.setDispatchAsync(isDispatchAsync()); 313 connection.setUseAsyncSend(isUseAsyncSend()); 314 connection.setAlwaysSyncSend(isAlwaysSyncSend()); 315 connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); 316 connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); 317 connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut()); 318 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); 319 connection.setExclusiveConsumer(isExclusiveConsumer()); 320 connection.setRedeliveryPolicy(getRedeliveryPolicy()); 321 connection.setTransformer(getTransformer()); 322 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); 323 connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); 324 connection.setProducerWindowSize(getProducerWindowSize()); 325 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); 326 connection.setSendTimeout(getSendTimeout()); 327 connection.setCloseTimeout(getCloseTimeout()); 328 connection.setSendAcksAsync(isSendAcksAsync()); 329 connection.setAuditDepth(getAuditDepth()); 330 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); 331 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); 332 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); 333 connection.setCheckForDuplicates(isCheckForDuplicates()); 334 connection.setMessagePrioritySupported(isMessagePrioritySupported()); 335 connection.setTransactedIndividualAck(isTransactedIndividualAck()); 336 connection.setNonBlockingRedelivery(isNonBlockingRedelivery()); 337 if (transportListener != null) { 338 connection.addTransportListener(transportListener); 339 } 340 if (exceptionListener != null) { 341 connection.setExceptionListener(exceptionListener); 342 } 343 if (clientInternalExceptionListener != null) { 344 connection.setClientInternalExceptionListener(clientInternalExceptionListener); 345 } 346 } 347 348 // ///////////////////////////////////////////// 349 // 350 // Property Accessors 351 // 352 // ///////////////////////////////////////////// 353 354 public String getBrokerURL() { 355 return brokerURL == null ? null : brokerURL.toString(); 356 } 357 358 /** 359 * Sets the <a 360 * href="http://activemq.apache.org/configuring-transports.html">connection 361 * URL</a> used to connect to the ActiveMQ broker. 362 */ 363 public void setBrokerURL(String brokerURL) { 364 this.brokerURL = createURI(brokerURL); 365 366 // Use all the properties prefixed with 'jms.' to set the connection 367 // factory 368 // options. 369 if (this.brokerURL.getQuery() != null) { 370 // It might be a standard URI or... 371 try { 372 373 Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery()); 374 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms."); 375 if (buildFromMap(jmsOptionsMap)) { 376 if (!jmsOptionsMap.isEmpty()) { 377 String msg = "There are " + jmsOptionsMap.size() 378 + " jms options that couldn't be set on the ConnectionFactory." 379 + " Check the options are spelled correctly." 380 + " Unknown parameters=[" + jmsOptionsMap + "]." 381 + " This connection factory cannot be started."; 382 throw new IllegalArgumentException(msg); 383 } 384 385 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); 386 } 387 388 } catch (URISyntaxException e) { 389 } 390 391 } else { 392 393 // It might be a composite URI. 394 try { 395 CompositeData data = URISupport.parseComposite(this.brokerURL); 396 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms."); 397 if (buildFromMap(jmsOptionsMap)) { 398 if (!jmsOptionsMap.isEmpty()) { 399 String msg = "There are " + jmsOptionsMap.size() 400 + " jms options that couldn't be set on the ConnectionFactory." 401 + " Check the options are spelled correctly." 402 + " Unknown parameters=[" + jmsOptionsMap + "]." 403 + " This connection factory cannot be started."; 404 throw new IllegalArgumentException(msg); 405 } 406 407 this.brokerURL = data.toURI(); 408 } 409 } catch (URISyntaxException e) { 410 } 411 } 412 } 413 414 public String getClientID() { 415 return clientID; 416 } 417 418 /** 419 * Sets the JMS clientID to use for the created connection. Note that this 420 * can only be used by one connection at once so generally its a better idea 421 * to set the clientID on a Connection 422 */ 423 public void setClientID(String clientID) { 424 this.clientID = clientID; 425 } 426 427 public boolean isCopyMessageOnSend() { 428 return copyMessageOnSend; 429 } 430 431 /** 432 * Should a JMS message be copied to a new JMS Message object as part of the 433 * send() method in JMS. This is enabled by default to be compliant with the 434 * JMS specification. You can disable it if you do not mutate JMS messages 435 * after they are sent for a performance boost 436 */ 437 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 438 this.copyMessageOnSend = copyMessageOnSend; 439 } 440 441 public boolean isDisableTimeStampsByDefault() { 442 return disableTimeStampsByDefault; 443 } 444 445 /** 446 * Sets whether or not timestamps on messages should be disabled or not. If 447 * you disable them it adds a small performance boost. 448 */ 449 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 450 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 451 } 452 453 public boolean isOptimizedMessageDispatch() { 454 return optimizedMessageDispatch; 455 } 456 457 /** 458 * If this flag is set then an larger prefetch limit is used - only 459 * applicable for durable topic subscribers. 460 */ 461 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 462 this.optimizedMessageDispatch = optimizedMessageDispatch; 463 } 464 465 public String getPassword() { 466 return password; 467 } 468 469 /** 470 * Sets the JMS password used for connections created from this factory 471 */ 472 public void setPassword(String password) { 473 this.password = password; 474 } 475 476 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 477 return prefetchPolicy; 478 } 479 480 /** 481 * Sets the <a 482 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 483 * policy</a> for consumers created by this connection. 484 */ 485 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 486 this.prefetchPolicy = prefetchPolicy; 487 } 488 489 public boolean isUseAsyncSend() { 490 return useAsyncSend; 491 } 492 493 public BlobTransferPolicy getBlobTransferPolicy() { 494 return blobTransferPolicy; 495 } 496 497 /** 498 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 499 * OBjects) are transferred from producers to brokers to consumers 500 */ 501 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 502 this.blobTransferPolicy = blobTransferPolicy; 503 } 504 505 /** 506 * Forces the use of <a 507 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 508 * adds a massive performance boost; but means that the send() method will 509 * return immediately whether the message has been sent or not which could 510 * lead to message loss. 511 */ 512 public void setUseAsyncSend(boolean useAsyncSend) { 513 this.useAsyncSend = useAsyncSend; 514 } 515 516 public synchronized boolean isWatchTopicAdvisories() { 517 return watchTopicAdvisories; 518 } 519 520 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 521 this.watchTopicAdvisories = watchTopicAdvisories; 522 } 523 524 /** 525 * @return true if always sync send messages 526 */ 527 public boolean isAlwaysSyncSend() { 528 return this.alwaysSyncSend; 529 } 530 531 /** 532 * Set true if always require messages to be sync sent 533 * 534 * @param alwaysSyncSend 535 */ 536 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 537 this.alwaysSyncSend = alwaysSyncSend; 538 } 539 540 public String getUserName() { 541 return userName; 542 } 543 544 /** 545 * Sets the JMS userName used by connections created by this factory 546 */ 547 public void setUserName(String userName) { 548 this.userName = userName; 549 } 550 551 public boolean isUseRetroactiveConsumer() { 552 return useRetroactiveConsumer; 553 } 554 555 /** 556 * Sets whether or not retroactive consumers are enabled. Retroactive 557 * consumers allow non-durable topic subscribers to receive old messages 558 * that were published before the non-durable subscriber started. 559 */ 560 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 561 this.useRetroactiveConsumer = useRetroactiveConsumer; 562 } 563 564 public boolean isExclusiveConsumer() { 565 return exclusiveConsumer; 566 } 567 568 /** 569 * Enables or disables whether or not queue consumers should be exclusive or 570 * not for example to preserve ordering when not using <a 571 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 572 * 573 * @param exclusiveConsumer 574 */ 575 public void setExclusiveConsumer(boolean exclusiveConsumer) { 576 this.exclusiveConsumer = exclusiveConsumer; 577 } 578 579 public RedeliveryPolicy getRedeliveryPolicy() { 580 return redeliveryPolicy; 581 } 582 583 /** 584 * Sets the global redelivery policy to be used when a message is delivered 585 * but the session is rolled back 586 */ 587 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 588 this.redeliveryPolicy = redeliveryPolicy; 589 } 590 591 public MessageTransformer getTransformer() { 592 return transformer; 593 } 594 595 /** 596 * @return the sendTimeout 597 */ 598 public int getSendTimeout() { 599 return sendTimeout; 600 } 601 602 /** 603 * @param sendTimeout the sendTimeout to set 604 */ 605 public void setSendTimeout(int sendTimeout) { 606 this.sendTimeout = sendTimeout; 607 } 608 609 /** 610 * @return the sendAcksAsync 611 */ 612 public boolean isSendAcksAsync() { 613 return sendAcksAsync; 614 } 615 616 /** 617 * @param sendAcksAsync the sendAcksAsync to set 618 */ 619 public void setSendAcksAsync(boolean sendAcksAsync) { 620 this.sendAcksAsync = sendAcksAsync; 621 } 622 623 /** 624 * @return the messagePrioritySupported 625 */ 626 public boolean isMessagePrioritySupported() { 627 return this.messagePrioritySupported; 628 } 629 630 /** 631 * @param messagePrioritySupported the messagePrioritySupported to set 632 */ 633 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 634 this.messagePrioritySupported = messagePrioritySupported; 635 } 636 637 638 /** 639 * Sets the transformer used to transform messages before they are sent on 640 * to the JMS bus or when they are received from the bus but before they are 641 * delivered to the JMS client 642 */ 643 public void setTransformer(MessageTransformer transformer) { 644 this.transformer = transformer; 645 } 646 647 @SuppressWarnings({ "unchecked", "rawtypes" }) 648 @Override 649 public void buildFromProperties(Properties properties) { 650 651 if (properties == null) { 652 properties = new Properties(); 653 } 654 655 String temp = properties.getProperty(Context.PROVIDER_URL); 656 if (temp == null || temp.length() == 0) { 657 temp = properties.getProperty("brokerURL"); 658 } 659 if (temp != null && temp.length() > 0) { 660 setBrokerURL(temp); 661 } 662 663 Map<String, Object> p = new HashMap(properties); 664 buildFromMap(p); 665 } 666 667 public boolean buildFromMap(Map<String, Object> properties) { 668 boolean rc = false; 669 670 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); 671 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) { 672 setPrefetchPolicy(p); 673 rc = true; 674 } 675 676 RedeliveryPolicy rp = new RedeliveryPolicy(); 677 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) { 678 setRedeliveryPolicy(rp); 679 rc = true; 680 } 681 682 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 683 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) { 684 setBlobTransferPolicy(blobTransferPolicy); 685 rc = true; 686 } 687 688 rc |= IntrospectionSupport.setProperties(this, properties); 689 690 return rc; 691 } 692 693 @Override 694 public void populateProperties(Properties props) { 695 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); 696 697 if (getBrokerURL() != null) { 698 props.setProperty(Context.PROVIDER_URL, getBrokerURL()); 699 props.setProperty("brokerURL", getBrokerURL()); 700 } 701 702 if (getClientID() != null) { 703 props.setProperty("clientID", getClientID()); 704 } 705 706 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); 707 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); 708 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); 709 710 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); 711 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); 712 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); 713 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); 714 715 if (getPassword() != null) { 716 props.setProperty("password", getPassword()); 717 } 718 719 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); 720 props.setProperty("useCompression", Boolean.toString(isUseCompression())); 721 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); 722 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); 723 724 if (getUserName() != null) { 725 props.setProperty("userName", getUserName()); 726 } 727 728 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); 729 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); 730 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); 731 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); 732 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); 733 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); 734 props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); 735 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); 736 props.setProperty("auditDepth", Integer.toString(getAuditDepth())); 737 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); 738 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); 739 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); 740 props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck())); 741 props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery())); 742 } 743 744 public boolean isUseCompression() { 745 return useCompression; 746 } 747 748 /** 749 * Enables the use of compression of the message bodies 750 */ 751 public void setUseCompression(boolean useCompression) { 752 this.useCompression = useCompression; 753 } 754 755 public boolean isObjectMessageSerializationDefered() { 756 return objectMessageSerializationDefered; 757 } 758 759 /** 760 * When an object is set on an ObjectMessage, the JMS spec requires the 761 * object to be serialized by that set method. Enabling this flag causes the 762 * object to not get serialized. The object may subsequently get serialized 763 * if the message needs to be sent over a socket or stored to disk. 764 */ 765 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 766 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 767 } 768 769 public boolean isDispatchAsync() { 770 return dispatchAsync; 771 } 772 773 /** 774 * Enables or disables the default setting of whether or not consumers have 775 * their messages <a 776 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 777 * synchronously or asynchronously by the broker</a>. For non-durable 778 * topics for example we typically dispatch synchronously by default to 779 * minimize context switches which boost performance. However sometimes its 780 * better to go slower to ensure that a single blocked consumer socket does 781 * not block delivery to other consumers. 782 * 783 * @param asyncDispatch If true then consumers created on this connection 784 * will default to having their messages dispatched 785 * asynchronously. The default value is true. 786 */ 787 public void setDispatchAsync(boolean asyncDispatch) { 788 this.dispatchAsync = asyncDispatch; 789 } 790 791 /** 792 * @return Returns the closeTimeout. 793 */ 794 public int getCloseTimeout() { 795 return closeTimeout; 796 } 797 798 /** 799 * Sets the timeout before a close is considered complete. Normally a 800 * close() on a connection waits for confirmation from the broker; this 801 * allows that operation to timeout to save the client hanging if there is 802 * no broker 803 */ 804 public void setCloseTimeout(int closeTimeout) { 805 this.closeTimeout = closeTimeout; 806 } 807 808 /** 809 * @return Returns the alwaysSessionAsync. 810 */ 811 public boolean isAlwaysSessionAsync() { 812 return alwaysSessionAsync; 813 } 814 815 /** 816 * If this flag is set then a separate thread is not used for dispatching 817 * messages for each Session in the Connection. However, a separate thread 818 * is always used if there is more than one session, or the session isn't in 819 * auto acknowledge or duplicates ok mode 820 */ 821 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 822 this.alwaysSessionAsync = alwaysSessionAsync; 823 } 824 825 /** 826 * @return Returns the optimizeAcknowledge. 827 */ 828 public boolean isOptimizeAcknowledge() { 829 return optimizeAcknowledge; 830 } 831 832 /** 833 * @param optimizeAcknowledge The optimizeAcknowledge to set. 834 */ 835 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 836 this.optimizeAcknowledge = optimizeAcknowledge; 837 } 838 839 /** 840 * The max time in milliseconds between optimized ack batches 841 * @param optimizeAcknowledgeTimeOut 842 */ 843 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 844 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 845 } 846 847 public long getOptimizeAcknowledgeTimeOut() { 848 return optimizeAcknowledgeTimeOut; 849 } 850 851 public boolean isNestedMapAndListEnabled() { 852 return nestedMapAndListEnabled; 853 } 854 855 /** 856 * Enables/disables whether or not Message properties and MapMessage entries 857 * support <a 858 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 859 * Structures</a> of Map and List objects 860 */ 861 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 862 this.nestedMapAndListEnabled = structuredMapsEnabled; 863 } 864 865 public String getClientIDPrefix() { 866 return clientIDPrefix; 867 } 868 869 /** 870 * Sets the prefix used by autogenerated JMS Client ID values which are used 871 * if the JMS client does not explicitly specify on. 872 * 873 * @param clientIDPrefix 874 */ 875 public void setClientIDPrefix(String clientIDPrefix) { 876 this.clientIDPrefix = clientIDPrefix; 877 } 878 879 protected synchronized IdGenerator getClientIdGenerator() { 880 if (clientIdGenerator == null) { 881 if (clientIDPrefix != null) { 882 clientIdGenerator = new IdGenerator(clientIDPrefix); 883 } else { 884 clientIdGenerator = new IdGenerator(); 885 } 886 } 887 return clientIdGenerator; 888 } 889 890 protected void setClientIdGenerator(IdGenerator clientIdGenerator) { 891 this.clientIdGenerator = clientIdGenerator; 892 } 893 894 /** 895 * Sets the prefix used by connection id generator 896 * @param connectionIDPrefix 897 */ 898 public void setConnectionIDPrefix(String connectionIDPrefix) { 899 this.connectionIDPrefix = connectionIDPrefix; 900 } 901 902 protected synchronized IdGenerator getConnectionIdGenerator() { 903 if (connectionIdGenerator == null) { 904 if (connectionIDPrefix != null) { 905 connectionIdGenerator = new IdGenerator(connectionIDPrefix); 906 } else { 907 connectionIdGenerator = new IdGenerator(); 908 } 909 } 910 return connectionIdGenerator; 911 } 912 913 protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) { 914 this.connectionIdGenerator = connectionIdGenerator; 915 } 916 917 /** 918 * @return the statsEnabled 919 */ 920 public boolean isStatsEnabled() { 921 return this.factoryStats.isEnabled(); 922 } 923 924 /** 925 * @param statsEnabled the statsEnabled to set 926 */ 927 public void setStatsEnabled(boolean statsEnabled) { 928 this.factoryStats.setEnabled(statsEnabled); 929 } 930 931 public synchronized int getProducerWindowSize() { 932 return producerWindowSize; 933 } 934 935 public synchronized void setProducerWindowSize(int producerWindowSize) { 936 this.producerWindowSize = producerWindowSize; 937 } 938 939 public long getWarnAboutUnstartedConnectionTimeout() { 940 return warnAboutUnstartedConnectionTimeout; 941 } 942 943 /** 944 * Enables the timeout from a connection creation to when a warning is 945 * generated if the connection is not properly started via 946 * {@link Connection#start()} and a message is received by a consumer. It is 947 * a very common gotcha to forget to <a 948 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 949 * the connection</a> so this option makes the default case to create a 950 * warning if the user forgets. To disable the warning just set the value to < 951 * 0 (say -1). 952 */ 953 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 954 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 955 } 956 957 public TransportListener getTransportListener() { 958 return transportListener; 959 } 960 961 /** 962 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used 963 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register 964 * a transport listener. 965 * 966 * @param transportListener sets the listener to be registered on all connections 967 * created by this factory 968 */ 969 public void setTransportListener(TransportListener transportListener) { 970 this.transportListener = transportListener; 971 } 972 973 974 public ExceptionListener getExceptionListener() { 975 return exceptionListener; 976 } 977 978 /** 979 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory 980 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 981 * an exception listener. 982 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than 983 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 984 * @param exceptionListener sets the exception listener to be registered on all connections 985 * created by this factory 986 */ 987 public void setExceptionListener(ExceptionListener exceptionListener) { 988 this.exceptionListener = exceptionListener; 989 } 990 991 public int getAuditDepth() { 992 return auditDepth; 993 } 994 995 public void setAuditDepth(int auditDepth) { 996 this.auditDepth = auditDepth; 997 } 998 999 public int getAuditMaximumProducerNumber() { 1000 return auditMaximumProducerNumber; 1001 } 1002 1003 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 1004 this.auditMaximumProducerNumber = auditMaximumProducerNumber; 1005 } 1006 1007 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1008 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1009 } 1010 1011 public boolean isUseDedicatedTaskRunner() { 1012 return useDedicatedTaskRunner; 1013 } 1014 1015 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 1016 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 1017 } 1018 1019 public long getConsumerFailoverRedeliveryWaitPeriod() { 1020 return consumerFailoverRedeliveryWaitPeriod; 1021 } 1022 1023 public ClientInternalExceptionListener getClientInternalExceptionListener() { 1024 return clientInternalExceptionListener; 1025 } 1026 1027 /** 1028 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory 1029 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1030 * an exception listener. 1031 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than 1032 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1033 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections 1034 * created by this factory 1035 */ 1036 public void setClientInternalExceptionListener( 1037 ClientInternalExceptionListener clientInternalExceptionListener) { 1038 this.clientInternalExceptionListener = clientInternalExceptionListener; 1039 } 1040 1041 /** 1042 * @return the checkForDuplicates 1043 */ 1044 public boolean isCheckForDuplicates() { 1045 return this.checkForDuplicates; 1046 } 1047 1048 /** 1049 * @param checkForDuplicates the checkForDuplicates to set 1050 */ 1051 public void setCheckForDuplicates(boolean checkForDuplicates) { 1052 this.checkForDuplicates = checkForDuplicates; 1053 } 1054 1055 public boolean isTransactedIndividualAck() { 1056 return transactedIndividualAck; 1057 } 1058 1059 /** 1060 * when true, submit individual transacted acks immediately rather than with transaction completion. 1061 * This allows the acks to represent delivery status which can be persisted on rollback 1062 * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean) true 1063 */ 1064 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 1065 this.transactedIndividualAck = transactedIndividualAck; 1066 } 1067 1068 1069 public boolean isNonBlockingRedelivery() { 1070 return nonBlockingRedelivery; 1071 } 1072 1073 /** 1074 * When true a MessageConsumer will not stop Message delivery before re-delivering Messages 1075 * from a rolled back transaction. This implies that message order will not be preserved and 1076 * also will result in the TransactedIndividualAck option to be enabled. 1077 */ 1078 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 1079 this.nonBlockingRedelivery = nonBlockingRedelivery; 1080 } 1081 1082 }