001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.BytesMessage; 032import javax.jms.Destination; 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.InvalidSelectorException; 036import javax.jms.JMSException; 037import javax.jms.MapMessage; 038import javax.jms.Message; 039import javax.jms.MessageConsumer; 040import javax.jms.MessageListener; 041import javax.jms.MessageProducer; 042import javax.jms.ObjectMessage; 043import javax.jms.Queue; 044import javax.jms.QueueBrowser; 045import javax.jms.QueueReceiver; 046import javax.jms.QueueSender; 047import javax.jms.QueueSession; 048import javax.jms.Session; 049import javax.jms.StreamMessage; 050import javax.jms.TemporaryQueue; 051import javax.jms.TemporaryTopic; 052import javax.jms.TextMessage; 053import javax.jms.Topic; 054import javax.jms.TopicPublisher; 055import javax.jms.TopicSession; 056import javax.jms.TopicSubscriber; 057import javax.jms.TransactionRolledBackException; 058 059import org.apache.activemq.blob.BlobDownloader; 060import org.apache.activemq.blob.BlobTransferPolicy; 061import org.apache.activemq.blob.BlobUploader; 062import org.apache.activemq.command.ActiveMQBlobMessage; 063import org.apache.activemq.command.ActiveMQBytesMessage; 064import org.apache.activemq.command.ActiveMQDestination; 065import org.apache.activemq.command.ActiveMQMapMessage; 066import org.apache.activemq.command.ActiveMQMessage; 067import org.apache.activemq.command.ActiveMQObjectMessage; 068import org.apache.activemq.command.ActiveMQQueue; 069import org.apache.activemq.command.ActiveMQStreamMessage; 070import org.apache.activemq.command.ActiveMQTempDestination; 071import org.apache.activemq.command.ActiveMQTempQueue; 072import org.apache.activemq.command.ActiveMQTempTopic; 073import org.apache.activemq.command.ActiveMQTextMessage; 074import org.apache.activemq.command.ActiveMQTopic; 075import org.apache.activemq.command.Command; 076import org.apache.activemq.command.ConsumerId; 077import org.apache.activemq.command.MessageAck; 078import org.apache.activemq.command.MessageDispatch; 079import org.apache.activemq.command.MessageId; 080import org.apache.activemq.command.ProducerId; 081import org.apache.activemq.command.RemoveInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionId; 084import org.apache.activemq.command.SessionInfo; 085import org.apache.activemq.command.TransactionId; 086import org.apache.activemq.management.JMSSessionStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.thread.Scheduler; 090import org.apache.activemq.transaction.Synchronization; 091import org.apache.activemq.usage.MemoryUsage; 092import org.apache.activemq.util.Callback; 093import org.apache.activemq.util.LongSequenceGenerator; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * <P> 099 * A <CODE>Session</CODE> object is a single-threaded context for producing 100 * and consuming messages. Although it may allocate provider resources outside 101 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 102 * <P> 103 * A session serves several purposes: 104 * <UL> 105 * <LI>It is a factory for its message producers and consumers. 106 * <LI>It supplies provider-optimized message factories. 107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 108 * <CODE>TemporaryQueues</CODE>. 109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 110 * objects for those clients that need to dynamically manipulate 111 * provider-specific destination names. 112 * <LI>It supports a single series of transactions that combine work spanning 113 * its producers and consumers into atomic units. 114 * <LI>It defines a serial order for the messages it consumes and the messages 115 * it produces. 116 * <LI>It retains messages it consumes until they have been acknowledged. 117 * <LI>It serializes execution of message listeners registered with its message 118 * consumers. 119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 120 * </UL> 121 * <P> 122 * A session can create and service multiple message producers and consumers. 123 * <P> 124 * One typical use is to have a thread block on a synchronous 125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 127 * <P> 128 * If a client desires to have one thread produce messages while others consume 129 * them, the client should use a separate session for its producing thread. 130 * <P> 131 * Once a connection has been started, any session with one or more registered 132 * message listeners is dedicated to the thread of control that delivers 133 * messages to it. It is erroneous for client code to use this session or any of 134 * its constituent objects from another thread of control. The only exception to 135 * this rule is the use of the session or connection <CODE>close</CODE> 136 * method. 137 * <P> 138 * It should be easy for most clients to partition their work naturally into 139 * sessions. This model allows clients to start simply and incrementally add 140 * message processing complexity as their need for concurrency grows. 141 * <P> 142 * The <CODE>close</CODE> method is the only session method that can be called 143 * while some other session method is being executed in another thread. 144 * <P> 145 * A session may be specified as transacted. Each transacted session supports a 146 * single series of transactions. Each transaction groups a set of message sends 147 * and a set of message receives into an atomic unit of work. In effect, 148 * transactions organize a session's input message stream and output message 149 * stream into series of atomic units. When a transaction commits, its atomic 150 * unit of input is acknowledged and its associated atomic unit of output is 151 * sent. If a transaction rollback is done, the transaction's sent messages are 152 * destroyed and the session's input is automatically recovered. 153 * <P> 154 * The content of a transaction's input and output units is simply those 155 * messages that have been produced and consumed within the session's current 156 * transaction. 157 * <P> 158 * A transaction is completed using either its session's <CODE>commit</CODE> 159 * method or its session's <CODE>rollback </CODE> method. The completion of a 160 * session's current transaction automatically begins the next. The result is 161 * that a transacted session always has a current transaction within which its 162 * work is done. 163 * <P> 164 * The Java Transaction Service (JTS) or some other transaction monitor may be 165 * used to combine a session's transaction with transactions on other resources 166 * (databases, other JMS sessions, etc.). Since Java distributed transactions 167 * are controlled via the Java Transaction API (JTA), use of the session's 168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 169 * prohibited. 170 * <P> 171 * The JMS API does not require support for JTA; however, it does define how a 172 * provider supplies this support. 173 * <P> 174 * Although it is also possible for a JMS client to handle distributed 175 * transactions directly, it is unlikely that many JMS clients will do this. 176 * Support for JTA in the JMS API is targeted at systems vendors who will be 177 * integrating the JMS API into their application server products. 178 * 179 * 180 * @see javax.jms.Session 181 * @see javax.jms.QueueSession 182 * @see javax.jms.TopicSession 183 * @see javax.jms.XASession 184 */ 185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 186 187 /** 188 * Only acknowledge an individual message - using message.acknowledge() 189 * as opposed to CLIENT_ACKNOWLEDGE which 190 * acknowledges all messages consumed by a session at when acknowledge() 191 * is called 192 */ 193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 195 196 public static interface DeliveryListener { 197 void beforeDelivery(ActiveMQSession session, Message msg); 198 199 void afterDelivery(ActiveMQSession session, Message msg); 200 } 201 202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 203 private final ThreadPoolExecutor connectionExecutor; 204 205 protected int acknowledgementMode; 206 protected final ActiveMQConnection connection; 207 protected final SessionInfo info; 208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 211 protected final ActiveMQSessionExecutor executor; 212 protected final AtomicBoolean started = new AtomicBoolean(false); 213 214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 216 217 protected boolean closed; 218 private volatile boolean synchronizationRegistered; 219 protected boolean asyncDispatch; 220 protected boolean sessionAsyncDispatch; 221 protected final boolean debug; 222 protected final Object sendMutex = new Object(); 223 protected final Object redeliveryGuard = new Object(); 224 225 private final AtomicBoolean clearInProgress = new AtomicBoolean(); 226 227 private MessageListener messageListener; 228 private final JMSSessionStatsImpl stats; 229 private TransactionContext transactionContext; 230 private DeliveryListener deliveryListener; 231 private MessageTransformer transformer; 232 private BlobTransferPolicy blobTransferPolicy; 233 private long lastDeliveredSequenceId = -2; 234 235 /** 236 * Construct the Session 237 * 238 * @param connection 239 * @param sessionId 240 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 241 * Session.SESSION_TRANSACTED 242 * @param asyncDispatch 243 * @param sessionAsyncDispatch 244 * @throws JMSException on internal error 245 */ 246 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 247 this.debug = LOG.isDebugEnabled(); 248 this.connection = connection; 249 this.acknowledgementMode = acknowledgeMode; 250 this.asyncDispatch = asyncDispatch; 251 this.sessionAsyncDispatch = sessionAsyncDispatch; 252 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 253 setTransactionContext(new TransactionContext(connection)); 254 stats = new JMSSessionStatsImpl(producers, consumers); 255 this.connection.asyncSendPacket(info); 256 setTransformer(connection.getTransformer()); 257 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 258 this.connectionExecutor=connection.getExecutor(); 259 this.executor = new ActiveMQSessionExecutor(this); 260 connection.addSession(this); 261 if (connection.isStarted()) { 262 start(); 263 } 264 265 } 266 267 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 268 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 269 } 270 271 /** 272 * Sets the transaction context of the session. 273 * 274 * @param transactionContext - provides the means to control a JMS 275 * transaction. 276 */ 277 public void setTransactionContext(TransactionContext transactionContext) { 278 this.transactionContext = transactionContext; 279 } 280 281 /** 282 * Returns the transaction context of the session. 283 * 284 * @return transactionContext - session's transaction context. 285 */ 286 public TransactionContext getTransactionContext() { 287 return transactionContext; 288 } 289 290 /* 291 * (non-Javadoc) 292 * 293 * @see org.apache.activemq.management.StatsCapable#getStats() 294 */ 295 @Override 296 public StatsImpl getStats() { 297 return stats; 298 } 299 300 /** 301 * Returns the session's statistics. 302 * 303 * @return stats - session's statistics. 304 */ 305 public JMSSessionStatsImpl getSessionStats() { 306 return stats; 307 } 308 309 /** 310 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 311 * object is used to send a message containing a stream of uninterpreted 312 * bytes. 313 * 314 * @return the an ActiveMQBytesMessage 315 * @throws JMSException if the JMS provider fails to create this message due 316 * to some internal error. 317 */ 318 @Override 319 public BytesMessage createBytesMessage() throws JMSException { 320 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 321 configureMessage(message); 322 return message; 323 } 324 325 /** 326 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 327 * object is used to send a self-defining set of name-value pairs, where 328 * names are <CODE>String</CODE> objects and values are primitive values 329 * in the Java programming language. 330 * 331 * @return an ActiveMQMapMessage 332 * @throws JMSException if the JMS provider fails to create this message due 333 * to some internal error. 334 */ 335 @Override 336 public MapMessage createMapMessage() throws JMSException { 337 ActiveMQMapMessage message = new ActiveMQMapMessage(); 338 configureMessage(message); 339 return message; 340 } 341 342 /** 343 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 344 * interface is the root interface of all JMS messages. A 345 * <CODE>Message</CODE> object holds all the standard message header 346 * information. It can be sent when a message containing only header 347 * information is sufficient. 348 * 349 * @return an ActiveMQMessage 350 * @throws JMSException if the JMS provider fails to create this message due 351 * to some internal error. 352 */ 353 @Override 354 public Message createMessage() throws JMSException { 355 ActiveMQMessage message = new ActiveMQMessage(); 356 configureMessage(message); 357 return message; 358 } 359 360 /** 361 * Creates an <CODE>ObjectMessage</CODE> object. An 362 * <CODE>ObjectMessage</CODE> object is used to send a message that 363 * contains a serializable Java object. 364 * 365 * @return an ActiveMQObjectMessage 366 * @throws JMSException if the JMS provider fails to create this message due 367 * to some internal error. 368 */ 369 @Override 370 public ObjectMessage createObjectMessage() throws JMSException { 371 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 372 configureMessage(message); 373 return message; 374 } 375 376 /** 377 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 378 * <CODE>ObjectMessage</CODE> object is used to send a message that 379 * contains a serializable Java object. 380 * 381 * @param object the object to use to initialize this message 382 * @return an ActiveMQObjectMessage 383 * @throws JMSException if the JMS provider fails to create this message due 384 * to some internal error. 385 */ 386 @Override 387 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 388 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 389 configureMessage(message); 390 message.setObject(object); 391 return message; 392 } 393 394 /** 395 * Creates a <CODE>StreamMessage</CODE> object. A 396 * <CODE>StreamMessage</CODE> object is used to send a self-defining 397 * stream of primitive values in the Java programming language. 398 * 399 * @return an ActiveMQStreamMessage 400 * @throws JMSException if the JMS provider fails to create this message due 401 * to some internal error. 402 */ 403 @Override 404 public StreamMessage createStreamMessage() throws JMSException { 405 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 406 configureMessage(message); 407 return message; 408 } 409 410 /** 411 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 412 * object is used to send a message containing a <CODE>String</CODE> 413 * object. 414 * 415 * @return an ActiveMQTextMessage 416 * @throws JMSException if the JMS provider fails to create this message due 417 * to some internal error. 418 */ 419 @Override 420 public TextMessage createTextMessage() throws JMSException { 421 ActiveMQTextMessage message = new ActiveMQTextMessage(); 422 configureMessage(message); 423 return message; 424 } 425 426 /** 427 * Creates an initialized <CODE>TextMessage</CODE> object. A 428 * <CODE>TextMessage</CODE> object is used to send a message containing a 429 * <CODE>String</CODE>. 430 * 431 * @param text the string used to initialize this message 432 * @return an ActiveMQTextMessage 433 * @throws JMSException if the JMS provider fails to create this message due 434 * to some internal error. 435 */ 436 @Override 437 public TextMessage createTextMessage(String text) throws JMSException { 438 ActiveMQTextMessage message = new ActiveMQTextMessage(); 439 message.setText(text); 440 configureMessage(message); 441 return message; 442 } 443 444 /** 445 * Creates an initialized <CODE>BlobMessage</CODE> object. A 446 * <CODE>BlobMessage</CODE> object is used to send a message containing a 447 * <CODE>URL</CODE> which points to some network addressible BLOB. 448 * 449 * @param url the network addressable URL used to pass directly to the 450 * consumer 451 * @return a BlobMessage 452 * @throws JMSException if the JMS provider fails to create this message due 453 * to some internal error. 454 */ 455 public BlobMessage createBlobMessage(URL url) throws JMSException { 456 return createBlobMessage(url, false); 457 } 458 459 /** 460 * Creates an initialized <CODE>BlobMessage</CODE> object. A 461 * <CODE>BlobMessage</CODE> object is used to send a message containing a 462 * <CODE>URL</CODE> which points to some network addressible BLOB. 463 * 464 * @param url the network addressable URL used to pass directly to the 465 * consumer 466 * @param deletedByBroker indicates whether or not the resource is deleted 467 * by the broker when the message is acknowledged 468 * @return a BlobMessage 469 * @throws JMSException if the JMS provider fails to create this message due 470 * to some internal error. 471 */ 472 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 473 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 474 configureMessage(message); 475 message.setURL(url); 476 message.setDeletedByBroker(deletedByBroker); 477 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 478 return message; 479 } 480 481 /** 482 * Creates an initialized <CODE>BlobMessage</CODE> object. A 483 * <CODE>BlobMessage</CODE> object is used to send a message containing 484 * the <CODE>File</CODE> content. Before the message is sent the file 485 * conent will be uploaded to the broker or some other remote repository 486 * depending on the {@link #getBlobTransferPolicy()}. 487 * 488 * @param file the file to be uploaded to some remote repo (or the broker) 489 * depending on the strategy 490 * @return a BlobMessage 491 * @throws JMSException if the JMS provider fails to create this message due 492 * to some internal error. 493 */ 494 public BlobMessage createBlobMessage(File file) throws JMSException { 495 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 496 configureMessage(message); 497 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 498 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 499 message.setDeletedByBroker(true); 500 message.setName(file.getName()); 501 return message; 502 } 503 504 /** 505 * Creates an initialized <CODE>BlobMessage</CODE> object. A 506 * <CODE>BlobMessage</CODE> object is used to send a message containing 507 * the <CODE>File</CODE> content. Before the message is sent the file 508 * conent will be uploaded to the broker or some other remote repository 509 * depending on the {@link #getBlobTransferPolicy()}. <br/> 510 * <p> 511 * The caller of this method is responsible for closing the 512 * input stream that is used, however the stream can not be closed 513 * until <b>after</b> the message has been sent. To have this class 514 * manage the stream and close it automatically, use the method 515 * {@link ActiveMQSession#createBlobMessage(File)} 516 * 517 * @param in the stream to be uploaded to some remote repo (or the broker) 518 * depending on the strategy 519 * @return a BlobMessage 520 * @throws JMSException if the JMS provider fails to create this message due 521 * to some internal error. 522 */ 523 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 524 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 525 configureMessage(message); 526 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 527 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 528 message.setDeletedByBroker(true); 529 return message; 530 } 531 532 /** 533 * Indicates whether the session is in transacted mode. 534 * 535 * @return true if the session is in transacted mode 536 * @throws JMSException if there is some internal error. 537 */ 538 @Override 539 public boolean getTransacted() throws JMSException { 540 checkClosed(); 541 return isTransacted(); 542 } 543 544 /** 545 * Returns the acknowledgement mode of the session. The acknowledgement mode 546 * is set at the time that the session is created. If the session is 547 * transacted, the acknowledgement mode is ignored. 548 * 549 * @return If the session is not transacted, returns the current 550 * acknowledgement mode for the session. If the session is 551 * transacted, returns SESSION_TRANSACTED. 552 * @throws JMSException 553 * @see javax.jms.Connection#createSession(boolean,int) 554 * @since 1.1 exception JMSException if there is some internal error. 555 */ 556 @Override 557 public int getAcknowledgeMode() throws JMSException { 558 checkClosed(); 559 return this.acknowledgementMode; 560 } 561 562 /** 563 * Commits all messages done in this transaction and releases any locks 564 * currently held. 565 * 566 * @throws JMSException if the JMS provider fails to commit the transaction 567 * due to some internal error. 568 * @throws TransactionRolledBackException if the transaction is rolled back 569 * due to some internal error during commit. 570 * @throws javax.jms.IllegalStateException if the method is not called by a 571 * transacted session. 572 */ 573 @Override 574 public void commit() throws JMSException { 575 checkClosed(); 576 if (!getTransacted()) { 577 throw new javax.jms.IllegalStateException("Not a transacted session"); 578 } 579 if (LOG.isDebugEnabled()) { 580 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 581 } 582 transactionContext.commit(); 583 } 584 585 /** 586 * Rolls back any messages done in this transaction and releases any locks 587 * currently held. 588 * 589 * @throws JMSException if the JMS provider fails to roll back the 590 * transaction due to some internal error. 591 * @throws javax.jms.IllegalStateException if the method is not called by a 592 * transacted session. 593 */ 594 @Override 595 public void rollback() throws JMSException { 596 checkClosed(); 597 if (!getTransacted()) { 598 throw new javax.jms.IllegalStateException("Not a transacted session"); 599 } 600 if (LOG.isDebugEnabled()) { 601 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 602 } 603 transactionContext.rollback(); 604 } 605 606 /** 607 * Closes the session. 608 * <P> 609 * Since a provider may allocate some resources on behalf of a session 610 * outside the JVM, clients should close the resources when they are not 611 * needed. Relying on garbage collection to eventually reclaim these 612 * resources may not be timely enough. 613 * <P> 614 * There is no need to close the producers and consumers of a closed 615 * session. 616 * <P> 617 * This call will block until a <CODE>receive</CODE> call or message 618 * listener in progress has completed. A blocked message consumer 619 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 620 * is closed. 621 * <P> 622 * Closing a transacted session must roll back the transaction in progress. 623 * <P> 624 * This method is the only <CODE>Session</CODE> method that can be called 625 * concurrently. 626 * <P> 627 * Invoking any other <CODE>Session</CODE> method on a closed session must 628 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 629 * closed session must <I>not </I> throw an exception. 630 * 631 * @throws JMSException if the JMS provider fails to close the session due 632 * to some internal error. 633 */ 634 @Override 635 public void close() throws JMSException { 636 if (!closed) { 637 if (getTransactionContext().isInXATransaction()) { 638 if (!synchronizationRegistered) { 639 synchronizationRegistered = true; 640 getTransactionContext().addSynchronization(new Synchronization() { 641 642 @Override 643 public void afterCommit() throws Exception { 644 doClose(); 645 synchronizationRegistered = false; 646 } 647 648 @Override 649 public void afterRollback() throws Exception { 650 doClose(); 651 synchronizationRegistered = false; 652 } 653 }); 654 } 655 656 } else { 657 doClose(); 658 } 659 } 660 } 661 662 private void doClose() throws JMSException { 663 dispose(); 664 RemoveInfo removeCommand = info.createRemoveCommand(); 665 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 666 connection.asyncSendPacket(removeCommand); 667 } 668 669 final AtomicInteger clearRequestsCounter = new AtomicInteger(0); 670 void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 671 clearRequestsCounter.incrementAndGet(); 672 executor.clearMessagesInProgress(); 673 // we are called from inside the transport reconnection logic which involves us 674 // clearing all the connections' consumers dispatch and delivered lists. So rather 675 // than trying to grab a mutex (which could be already owned by the message listener 676 // calling the send or an ack) we allow it to complete in a separate thread via the 677 // scheduler and notify us via connection.transportInterruptionProcessingComplete() 678 // 679 // We must be careful though not to allow multiple calls to this method from a 680 // connection that is having issue becoming fully established from causing a large 681 // build up of scheduled tasks to clear the same consumers over and over. 682 if (consumers.isEmpty()) { 683 return; 684 } 685 686 if (clearInProgress.compareAndSet(false, true)) { 687 for (final ActiveMQMessageConsumer consumer : consumers) { 688 consumer.inProgressClearRequired(); 689 transportInterruptionProcessingComplete.incrementAndGet(); 690 try { 691 connection.getScheduler().executeAfterDelay(new Runnable() { 692 @Override 693 public void run() { 694 consumer.clearMessagesInProgress(); 695 }}, 0l); 696 } catch (JMSException e) { 697 connection.onClientInternalException(e); 698 } 699 } 700 701 try { 702 connection.getScheduler().executeAfterDelay(new Runnable() { 703 @Override 704 public void run() { 705 clearInProgress.set(false); 706 }}, 0l); 707 } catch (JMSException e) { 708 connection.onClientInternalException(e); 709 } 710 } 711 } 712 713 void deliverAcks() { 714 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 715 ActiveMQMessageConsumer consumer = iter.next(); 716 consumer.deliverAcks(); 717 } 718 } 719 720 public synchronized void dispose() throws JMSException { 721 if (!closed) { 722 723 try { 724 executor.close(); 725 726 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 727 ActiveMQMessageConsumer consumer = iter.next(); 728 consumer.setFailureError(connection.getFirstFailureError()); 729 consumer.dispose(); 730 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 731 } 732 consumers.clear(); 733 734 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 735 ActiveMQMessageProducer producer = iter.next(); 736 producer.dispose(); 737 } 738 producers.clear(); 739 740 try { 741 if (getTransactionContext().isInLocalTransaction()) { 742 rollback(); 743 } 744 } catch (JMSException e) { 745 } 746 747 } finally { 748 connection.removeSession(this); 749 this.transactionContext = null; 750 closed = true; 751 } 752 } 753 } 754 755 /** 756 * Checks that the session is not closed then configures the message 757 */ 758 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 759 checkClosed(); 760 message.setConnection(connection); 761 } 762 763 /** 764 * Check if the session is closed. It is used for ensuring that the session 765 * is open before performing various operations. 766 * 767 * @throws IllegalStateException if the Session is closed 768 */ 769 protected void checkClosed() throws IllegalStateException { 770 if (closed) { 771 throw new IllegalStateException("The Session is closed"); 772 } 773 } 774 775 /** 776 * Checks if the session is closed. 777 * 778 * @return true if the session is closed, false otherwise. 779 */ 780 public boolean isClosed() { 781 return closed; 782 } 783 784 /** 785 * Stops message delivery in this session, and restarts message delivery 786 * with the oldest unacknowledged message. 787 * <P> 788 * All consumers deliver messages in a serial order. Acknowledging a 789 * received message automatically acknowledges all messages that have been 790 * delivered to the client. 791 * <P> 792 * Restarting a session causes it to take the following actions: 793 * <UL> 794 * <LI>Stop message delivery 795 * <LI>Mark all messages that might have been delivered but not 796 * acknowledged as "redelivered" 797 * <LI>Restart the delivery sequence including all unacknowledged messages 798 * that had been previously delivered. Redelivered messages do not have to 799 * be delivered in exactly their original delivery order. 800 * </UL> 801 * 802 * @throws JMSException if the JMS provider fails to stop and restart 803 * message delivery due to some internal error. 804 * @throws IllegalStateException if the method is called by a transacted 805 * session. 806 */ 807 @Override 808 public void recover() throws JMSException { 809 810 checkClosed(); 811 if (getTransacted()) { 812 throw new IllegalStateException("This session is transacted"); 813 } 814 815 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 816 ActiveMQMessageConsumer c = iter.next(); 817 c.rollback(); 818 } 819 820 } 821 822 /** 823 * Returns the session's distinguished message listener (optional). 824 * 825 * @return the message listener associated with this session 826 * @throws JMSException if the JMS provider fails to get the message 827 * listener due to an internal error. 828 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 829 * @see javax.jms.ServerSessionPool 830 * @see javax.jms.ServerSession 831 */ 832 @Override 833 public MessageListener getMessageListener() throws JMSException { 834 checkClosed(); 835 return this.messageListener; 836 } 837 838 /** 839 * Sets the session's distinguished message listener (optional). 840 * <P> 841 * When the distinguished message listener is set, no other form of message 842 * receipt in the session can be used; however, all forms of sending 843 * messages are still supported. 844 * <P> 845 * If this session has been closed, then an {@link IllegalStateException} is 846 * thrown, if trying to set a new listener. However setting the listener 847 * to <tt>null</tt> is allowed, to clear the listener, even if this session 848 * has been closed prior. 849 * <P> 850 * This is an expert facility not used by regular JMS clients. 851 * 852 * @param listener the message listener to associate with this session 853 * @throws JMSException if the JMS provider fails to set the message 854 * listener due to an internal error. 855 * @see javax.jms.Session#getMessageListener() 856 * @see javax.jms.ServerSessionPool 857 * @see javax.jms.ServerSession 858 */ 859 @Override 860 public void setMessageListener(MessageListener listener) throws JMSException { 861 // only check for closed if we set a new listener, as we allow to clear 862 // the listener, such as when an application is shutting down, and is 863 // no longer using a message listener on this session 864 if (listener != null) { 865 checkClosed(); 866 } 867 this.messageListener = listener; 868 869 if (listener != null) { 870 executor.setDispatchedBySessionPool(true); 871 } 872 } 873 874 /** 875 * Optional operation, intended to be used only by Application Servers, not 876 * by ordinary JMS clients. 877 * 878 * @see javax.jms.ServerSession 879 */ 880 @Override 881 public void run() { 882 MessageDispatch messageDispatch; 883 while ((messageDispatch = executor.dequeueNoWait()) != null) { 884 final MessageDispatch md = messageDispatch; 885 final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 886 887 MessageAck earlyAck = null; 888 if (message.isExpired()) { 889 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); 890 earlyAck.setFirstMessageId(message.getMessageId()); 891 } else if (connection.isDuplicate(ActiveMQSession.this, message)) { 892 LOG.debug("{} got duplicate: {}", this, message.getMessageId()); 893 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 894 earlyAck.setFirstMessageId(md.getMessage().getMessageId()); 895 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); 896 } 897 if (earlyAck != null) { 898 try { 899 asyncSendPacket(earlyAck); 900 } catch (Throwable t) { 901 LOG.error("error dispatching ack: {} ", earlyAck, t); 902 connection.onClientInternalException(t); 903 } finally { 904 continue; 905 } 906 } 907 908 if (isClientAcknowledge()||isIndividualAcknowledge()) { 909 message.setAcknowledgeCallback(new Callback() { 910 @Override 911 public void execute() throws Exception { 912 } 913 }); 914 } 915 916 if (deliveryListener != null) { 917 deliveryListener.beforeDelivery(this, message); 918 } 919 920 md.setDeliverySequenceId(getNextDeliveryId()); 921 lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); 922 923 final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 924 925 final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); 926 /* 927 * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. 928 * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. 929 * */ 930 synchronized (redeliveryGuard) { 931 try { 932 ack.setFirstMessageId(md.getMessage().getMessageId()); 933 doStartTransaction(); 934 ack.setTransactionId(getTransactionContext().getTransactionId()); 935 if (ack.getTransactionId() != null) { 936 getTransactionContext().addSynchronization(new Synchronization() { 937 938 final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); 939 940 @Override 941 public void beforeEnd() throws Exception { 942 // validate our consumer so we don't push stale acks that get ignored 943 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 944 LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); 945 throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); 946 } 947 LOG.trace("beforeEnd ack {}", ack); 948 sendAck(ack); 949 } 950 951 @Override 952 public void afterRollback() throws Exception { 953 LOG.trace("rollback {}", ack, new Throwable("here")); 954 // ensure we don't filter this as a duplicate 955 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 956 957 // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect 958 if (clearRequestsCounter.get() > clearRequestCount) { 959 LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); 960 return; 961 } 962 963 // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched 964 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 965 LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); 966 return; 967 } 968 969 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 970 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 971 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 972 && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { 973 // We need to NACK the messages so that they get 974 // sent to the 975 // DLQ. 976 // Acknowledge the last message. 977 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 978 ack.setFirstMessageId(md.getMessage().getMessageId()); 979 ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); 980 asyncSendPacket(ack); 981 982 } else { 983 984 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 985 ack.setFirstMessageId(md.getMessage().getMessageId()); 986 asyncSendPacket(ack); 987 988 // Figure out how long we should wait to resend 989 // this message. 990 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 991 for (int i = 0; i < redeliveryCounter; i++) { 992 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 993 } 994 995 /* 996 * If we are a non blocking delivery then we need to stop the executor to avoid more 997 * messages being delivered, once the message is redelivered we can restart it. 998 * */ 999 if (!connection.isNonBlockingRedelivery()) { 1000 LOG.debug("Blocking session until re-delivery..."); 1001 executor.stop(); 1002 } 1003 1004 connection.getScheduler().executeAfterDelay(new Runnable() { 1005 1006 @Override 1007 public void run() { 1008 /* 1009 * wait for the first delivery to be complete, i.e. after delivery has been called. 1010 * */ 1011 synchronized (redeliveryGuard) { 1012 /* 1013 * If its non blocking then we can just dispatch in a new session. 1014 * */ 1015 if (connection.isNonBlockingRedelivery()) { 1016 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1017 } else { 1018 /* 1019 * If there has been an error thrown during afterDelivery then the 1020 * endpoint will be marked as dead so redelivery will fail (and eventually 1021 * the session marked as stale), in this case we can only call dispatch 1022 * which will create a new session with a new endpoint. 1023 * */ 1024 if (afterDeliveryError.get()) { 1025 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1026 } else { 1027 executor.executeFirst(md); 1028 executor.start(); 1029 } 1030 } 1031 } 1032 } 1033 }, redeliveryDelay); 1034 } 1035 md.getMessage().onMessageRolledBack(); 1036 } 1037 }); 1038 } 1039 1040 LOG.trace("{} onMessage({})", this, message.getMessageId()); 1041 messageListener.onMessage(message); 1042 1043 } catch (Throwable e) { 1044 LOG.error("error dispatching message: ", e); 1045 1046 if (getTransactionContext().isInXATransaction()) { 1047 LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext()); 1048 getTransactionContext().setRollbackOnly(true); 1049 } 1050 1051 // A problem while invoking the MessageListener does not 1052 // in general indicate a problem with the connection to the broker, i.e. 1053 // it will usually be sufficient to let the afterDelivery() method either 1054 // commit or roll back in order to deal with the exception. 1055 // However, we notify any registered client internal exception listener 1056 // of the problem. 1057 connection.onClientInternalException(e); 1058 } finally { 1059 if (ack.getTransactionId() == null) { 1060 try { 1061 asyncSendPacket(ack); 1062 } catch (Throwable e) { 1063 connection.onClientInternalException(e); 1064 } 1065 } 1066 } 1067 1068 if (deliveryListener != null) { 1069 try { 1070 deliveryListener.afterDelivery(this, message); 1071 } catch (Throwable t) { 1072 LOG.debug("Unable to call after delivery", t); 1073 afterDeliveryError.set(true); 1074 throw new RuntimeException(t); 1075 } 1076 } 1077 } 1078 /* 1079 * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. 1080 * It also needs to be outside the redelivery guard. 1081 * */ 1082 try { 1083 executor.waitForQueueRestart(); 1084 } catch (InterruptedException ex) { 1085 connection.onClientInternalException(ex); 1086 } 1087 } 1088 } 1089 1090 /** 1091 * Creates a <CODE>MessageProducer</CODE> to send messages to the 1092 * specified destination. 1093 * <P> 1094 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 1095 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 1096 * inherit from <CODE>Destination</CODE>, they can be used in the 1097 * destination parameter to create a <CODE>MessageProducer</CODE> object. 1098 * 1099 * @param destination the <CODE>Destination</CODE> to send to, or null if 1100 * this is a producer which does not have a specified 1101 * destination. 1102 * @return the MessageProducer 1103 * @throws JMSException if the session fails to create a MessageProducer due 1104 * to some internal error. 1105 * @throws InvalidDestinationException if an invalid destination is 1106 * specified. 1107 * @since 1.1 1108 */ 1109 @Override 1110 public MessageProducer createProducer(Destination destination) throws JMSException { 1111 checkClosed(); 1112 if (destination instanceof CustomDestination) { 1113 CustomDestination customDestination = (CustomDestination)destination; 1114 return customDestination.createProducer(this); 1115 } 1116 int timeSendOut = connection.getSendTimeout(); 1117 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 1118 } 1119 1120 /** 1121 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1122 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1123 * <CODE>Destination</CODE>, they can be used in the destination 1124 * parameter to create a <CODE>MessageConsumer</CODE>. 1125 * 1126 * @param destination the <CODE>Destination</CODE> to access. 1127 * @return the MessageConsumer 1128 * @throws JMSException if the session fails to create a consumer due to 1129 * some internal error. 1130 * @throws InvalidDestinationException if an invalid destination is 1131 * specified. 1132 * @since 1.1 1133 */ 1134 @Override 1135 public MessageConsumer createConsumer(Destination destination) throws JMSException { 1136 return createConsumer(destination, (String) null); 1137 } 1138 1139 /** 1140 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1141 * using a message selector. Since <CODE> Queue</CODE> and 1142 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1143 * can be used in the destination parameter to create a 1144 * <CODE>MessageConsumer</CODE>. 1145 * <P> 1146 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1147 * that have been sent to a destination. 1148 * 1149 * @param destination the <CODE>Destination</CODE> to access 1150 * @param messageSelector only messages with properties matching the message 1151 * selector expression are delivered. A value of null or an 1152 * empty string indicates that there is no message selector 1153 * for the message consumer. 1154 * @return the MessageConsumer 1155 * @throws JMSException if the session fails to create a MessageConsumer due 1156 * to some internal error. 1157 * @throws InvalidDestinationException if an invalid destination is 1158 * specified. 1159 * @throws InvalidSelectorException if the message selector is invalid. 1160 * @since 1.1 1161 */ 1162 @Override 1163 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 1164 return createConsumer(destination, messageSelector, false); 1165 } 1166 1167 /** 1168 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1169 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1170 * <CODE>Destination</CODE>, they can be used in the destination 1171 * parameter to create a <CODE>MessageConsumer</CODE>. 1172 * 1173 * @param destination the <CODE>Destination</CODE> to access. 1174 * @param messageListener the listener to use for async consumption of messages 1175 * @return the MessageConsumer 1176 * @throws JMSException if the session fails to create a consumer due to 1177 * some internal error. 1178 * @throws InvalidDestinationException if an invalid destination is 1179 * specified. 1180 * @since 1.1 1181 */ 1182 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1183 return createConsumer(destination, null, messageListener); 1184 } 1185 1186 /** 1187 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1188 * using a message selector. Since <CODE> Queue</CODE> and 1189 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1190 * can be used in the destination parameter to create a 1191 * <CODE>MessageConsumer</CODE>. 1192 * <P> 1193 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1194 * that have been sent to a destination. 1195 * 1196 * @param destination the <CODE>Destination</CODE> to access 1197 * @param messageSelector only messages with properties matching the message 1198 * selector expression are delivered. A value of null or an 1199 * empty string indicates that there is no message selector 1200 * for the message consumer. 1201 * @param messageListener the listener to use for async consumption of messages 1202 * @return the MessageConsumer 1203 * @throws JMSException if the session fails to create a MessageConsumer due 1204 * to some internal error. 1205 * @throws InvalidDestinationException if an invalid destination is 1206 * specified. 1207 * @throws InvalidSelectorException if the message selector is invalid. 1208 * @since 1.1 1209 */ 1210 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1211 return createConsumer(destination, messageSelector, false, messageListener); 1212 } 1213 1214 /** 1215 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1216 * using a message selector. This method can specify whether messages 1217 * published by its own connection should be delivered to it, if the 1218 * destination is a topic. 1219 * <P> 1220 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1221 * <CODE>Destination</CODE>, they can be used in the destination 1222 * parameter to create a <CODE>MessageConsumer</CODE>. 1223 * <P> 1224 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1225 * that have been published to a destination. 1226 * <P> 1227 * In some cases, a connection may both publish and subscribe to a topic. 1228 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1229 * inhibit the delivery of messages published by its own connection. The 1230 * default value for this attribute is False. The <CODE>noLocal</CODE> 1231 * value must be supported by destinations that are topics. 1232 * 1233 * @param destination the <CODE>Destination</CODE> to access 1234 * @param messageSelector only messages with properties matching the message 1235 * selector expression are delivered. A value of null or an 1236 * empty string indicates that there is no message selector 1237 * for the message consumer. 1238 * @param noLocal - if true, and the destination is a topic, inhibits the 1239 * delivery of messages published by its own connection. The 1240 * behavior for <CODE>NoLocal</CODE> is not specified if 1241 * the destination is a queue. 1242 * @return the MessageConsumer 1243 * @throws JMSException if the session fails to create a MessageConsumer due 1244 * to some internal error. 1245 * @throws InvalidDestinationException if an invalid destination is 1246 * specified. 1247 * @throws InvalidSelectorException if the message selector is invalid. 1248 * @since 1.1 1249 */ 1250 @Override 1251 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1252 return createConsumer(destination, messageSelector, noLocal, null); 1253 } 1254 1255 /** 1256 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1257 * using a message selector. This method can specify whether messages 1258 * published by its own connection should be delivered to it, if the 1259 * destination is a topic. 1260 * <P> 1261 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1262 * <CODE>Destination</CODE>, they can be used in the destination 1263 * parameter to create a <CODE>MessageConsumer</CODE>. 1264 * <P> 1265 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1266 * that have been published to a destination. 1267 * <P> 1268 * In some cases, a connection may both publish and subscribe to a topic. 1269 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1270 * inhibit the delivery of messages published by its own connection. The 1271 * default value for this attribute is False. The <CODE>noLocal</CODE> 1272 * value must be supported by destinations that are topics. 1273 * 1274 * @param destination the <CODE>Destination</CODE> to access 1275 * @param messageSelector only messages with properties matching the message 1276 * selector expression are delivered. A value of null or an 1277 * empty string indicates that there is no message selector 1278 * for the message consumer. 1279 * @param noLocal - if true, and the destination is a topic, inhibits the 1280 * delivery of messages published by its own connection. The 1281 * behavior for <CODE>NoLocal</CODE> is not specified if 1282 * the destination is a queue. 1283 * @param messageListener the listener to use for async consumption of messages 1284 * @return the MessageConsumer 1285 * @throws JMSException if the session fails to create a MessageConsumer due 1286 * to some internal error. 1287 * @throws InvalidDestinationException if an invalid destination is 1288 * specified. 1289 * @throws InvalidSelectorException if the message selector is invalid. 1290 * @since 1.1 1291 */ 1292 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1293 checkClosed(); 1294 1295 if (destination instanceof CustomDestination) { 1296 CustomDestination customDestination = (CustomDestination)destination; 1297 return customDestination.createConsumer(this, messageSelector, noLocal); 1298 } 1299 1300 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1301 int prefetch = 0; 1302 if (destination instanceof Topic) { 1303 prefetch = prefetchPolicy.getTopicPrefetch(); 1304 } else { 1305 prefetch = prefetchPolicy.getQueuePrefetch(); 1306 } 1307 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1308 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1309 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1310 } 1311 1312 /** 1313 * Creates a queue identity given a <CODE>Queue</CODE> name. 1314 * <P> 1315 * This facility is provided for the rare cases where clients need to 1316 * dynamically manipulate queue identity. It allows the creation of a queue 1317 * identity with a provider-specific name. Clients that depend on this 1318 * ability are not portable. 1319 * <P> 1320 * Note that this method is not for creating the physical queue. The 1321 * physical creation of queues is an administrative task and is not to be 1322 * initiated by the JMS API. The one exception is the creation of temporary 1323 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1324 * method. 1325 * 1326 * @param queueName the name of this <CODE>Queue</CODE> 1327 * @return a <CODE>Queue</CODE> with the given name 1328 * @throws JMSException if the session fails to create a queue due to some 1329 * internal error. 1330 * @since 1.1 1331 */ 1332 @Override 1333 public Queue createQueue(String queueName) throws JMSException { 1334 checkClosed(); 1335 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1336 return new ActiveMQTempQueue(queueName); 1337 } 1338 return new ActiveMQQueue(queueName); 1339 } 1340 1341 /** 1342 * Creates a topic identity given a <CODE>Topic</CODE> name. 1343 * <P> 1344 * This facility is provided for the rare cases where clients need to 1345 * dynamically manipulate topic identity. This allows the creation of a 1346 * topic identity with a provider-specific name. Clients that depend on this 1347 * ability are not portable. 1348 * <P> 1349 * Note that this method is not for creating the physical topic. The 1350 * physical creation of topics is an administrative task and is not to be 1351 * initiated by the JMS API. The one exception is the creation of temporary 1352 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1353 * method. 1354 * 1355 * @param topicName the name of this <CODE>Topic</CODE> 1356 * @return a <CODE>Topic</CODE> with the given name 1357 * @throws JMSException if the session fails to create a topic due to some 1358 * internal error. 1359 * @since 1.1 1360 */ 1361 @Override 1362 public Topic createTopic(String topicName) throws JMSException { 1363 checkClosed(); 1364 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1365 return new ActiveMQTempTopic(topicName); 1366 } 1367 return new ActiveMQTopic(topicName); 1368 } 1369 1370 /** 1371 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1372 * the specified queue. 1373 * 1374 * @param queue the <CODE>queue</CODE> to access 1375 * @exception InvalidDestinationException if an invalid destination is 1376 * specified 1377 * @since 1.1 1378 */ 1379 /** 1380 * Creates a durable subscriber to the specified topic. 1381 * <P> 1382 * If a client needs to receive all the messages published on a topic, 1383 * including the ones published while the subscriber is inactive, it uses a 1384 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1385 * record of this durable subscription and insures that all messages from 1386 * the topic's publishers are retained until they are acknowledged by this 1387 * durable subscriber or they have expired. 1388 * <P> 1389 * Sessions with durable subscribers must always provide the same client 1390 * identifier. In addition, each client must specify a name that uniquely 1391 * identifies (within client identifier) each durable subscription it 1392 * creates. Only one session at a time can have a 1393 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1394 * <P> 1395 * A client can change an existing durable subscription by creating a 1396 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1397 * and/or message selector. Changing a durable subscriber is equivalent to 1398 * unsubscribing (deleting) the old one and creating a new one. 1399 * <P> 1400 * In some cases, a connection may both publish and subscribe to a topic. 1401 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1402 * inhibit the delivery of messages published by its own connection. The 1403 * default value for this attribute is false. 1404 * 1405 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1406 * @param name the name used to identify this subscription 1407 * @return the TopicSubscriber 1408 * @throws JMSException if the session fails to create a subscriber due to 1409 * some internal error. 1410 * @throws InvalidDestinationException if an invalid topic is specified. 1411 * @since 1.1 1412 */ 1413 @Override 1414 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1415 checkClosed(); 1416 return createDurableSubscriber(topic, name, null, false); 1417 } 1418 1419 /** 1420 * Creates a durable subscriber to the specified topic, using a message 1421 * selector and specifying whether messages published by its own connection 1422 * should be delivered to it. 1423 * <P> 1424 * If a client needs to receive all the messages published on a topic, 1425 * including the ones published while the subscriber is inactive, it uses a 1426 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1427 * record of this durable subscription and insures that all messages from 1428 * the topic's publishers are retained until they are acknowledged by this 1429 * durable subscriber or they have expired. 1430 * <P> 1431 * Sessions with durable subscribers must always provide the same client 1432 * identifier. In addition, each client must specify a name which uniquely 1433 * identifies (within client identifier) each durable subscription it 1434 * creates. Only one session at a time can have a 1435 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1436 * inactive durable subscriber is one that exists but does not currently 1437 * have a message consumer associated with it. 1438 * <P> 1439 * A client can change an existing durable subscription by creating a 1440 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1441 * and/or message selector. Changing a durable subscriber is equivalent to 1442 * unsubscribing (deleting) the old one and creating a new one. 1443 * 1444 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1445 * @param name the name used to identify this subscription 1446 * @param messageSelector only messages with properties matching the message 1447 * selector expression are delivered. A value of null or an 1448 * empty string indicates that there is no message selector 1449 * for the message consumer. 1450 * @param noLocal if set, inhibits the delivery of messages published by its 1451 * own connection 1452 * @return the Queue Browser 1453 * @throws JMSException if the session fails to create a subscriber due to 1454 * some internal error. 1455 * @throws InvalidDestinationException if an invalid topic is specified. 1456 * @throws InvalidSelectorException if the message selector is invalid. 1457 * @since 1.1 1458 */ 1459 @Override 1460 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1461 checkClosed(); 1462 1463 if (topic == null) { 1464 throw new InvalidDestinationException("Topic cannot be null"); 1465 } 1466 1467 if (topic instanceof CustomDestination) { 1468 CustomDestination customDestination = (CustomDestination)topic; 1469 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1470 } 1471 1472 connection.checkClientIDWasManuallySpecified(); 1473 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1474 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1475 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1476 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1477 noLocal, false, asyncDispatch); 1478 } 1479 1480 /** 1481 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1482 * the specified queue. 1483 * 1484 * @param queue the <CODE>queue</CODE> to access 1485 * @return the Queue Browser 1486 * @throws JMSException if the session fails to create a browser due to some 1487 * internal error. 1488 * @throws InvalidDestinationException if an invalid destination is 1489 * specified 1490 * @since 1.1 1491 */ 1492 @Override 1493 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1494 checkClosed(); 1495 return createBrowser(queue, null); 1496 } 1497 1498 /** 1499 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1500 * the specified queue using a message selector. 1501 * 1502 * @param queue the <CODE>queue</CODE> to access 1503 * @param messageSelector only messages with properties matching the message 1504 * selector expression are delivered. A value of null or an 1505 * empty string indicates that there is no message selector 1506 * for the message consumer. 1507 * @return the Queue Browser 1508 * @throws JMSException if the session fails to create a browser due to some 1509 * internal error. 1510 * @throws InvalidDestinationException if an invalid destination is 1511 * specified 1512 * @throws InvalidSelectorException if the message selector is invalid. 1513 * @since 1.1 1514 */ 1515 @Override 1516 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1517 checkClosed(); 1518 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1519 } 1520 1521 /** 1522 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1523 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1524 * 1525 * @return a temporary queue identity 1526 * @throws JMSException if the session fails to create a temporary queue due 1527 * to some internal error. 1528 * @since 1.1 1529 */ 1530 @Override 1531 public TemporaryQueue createTemporaryQueue() throws JMSException { 1532 checkClosed(); 1533 return (TemporaryQueue)connection.createTempDestination(false); 1534 } 1535 1536 /** 1537 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1538 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1539 * 1540 * @return a temporary topic identity 1541 * @throws JMSException if the session fails to create a temporary topic due 1542 * to some internal error. 1543 * @since 1.1 1544 */ 1545 @Override 1546 public TemporaryTopic createTemporaryTopic() throws JMSException { 1547 checkClosed(); 1548 return (TemporaryTopic)connection.createTempDestination(true); 1549 } 1550 1551 /** 1552 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1553 * the specified queue. 1554 * 1555 * @param queue the <CODE>Queue</CODE> to access 1556 * @return a new QueueBrowser instance. 1557 * @throws JMSException if the session fails to create a receiver due to 1558 * some internal error. 1559 * @throws JMSException 1560 * @throws InvalidDestinationException if an invalid queue is specified. 1561 */ 1562 @Override 1563 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1564 checkClosed(); 1565 return createReceiver(queue, null); 1566 } 1567 1568 /** 1569 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1570 * the specified queue using a message selector. 1571 * 1572 * @param queue the <CODE>Queue</CODE> to access 1573 * @param messageSelector only messages with properties matching the message 1574 * selector expression are delivered. A value of null or an 1575 * empty string indicates that there is no message selector 1576 * for the message consumer. 1577 * @return QueueReceiver 1578 * @throws JMSException if the session fails to create a receiver due to 1579 * some internal error. 1580 * @throws InvalidDestinationException if an invalid queue is specified. 1581 * @throws InvalidSelectorException if the message selector is invalid. 1582 */ 1583 @Override 1584 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1585 checkClosed(); 1586 1587 if (queue instanceof CustomDestination) { 1588 CustomDestination customDestination = (CustomDestination)queue; 1589 return customDestination.createReceiver(this, messageSelector); 1590 } 1591 1592 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1593 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1594 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1595 } 1596 1597 /** 1598 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1599 * specified queue. 1600 * 1601 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1602 * unidentified producer 1603 * @return QueueSender 1604 * @throws JMSException if the session fails to create a sender due to some 1605 * internal error. 1606 * @throws InvalidDestinationException if an invalid queue is specified. 1607 */ 1608 @Override 1609 public QueueSender createSender(Queue queue) throws JMSException { 1610 checkClosed(); 1611 if (queue instanceof CustomDestination) { 1612 CustomDestination customDestination = (CustomDestination)queue; 1613 return customDestination.createSender(this); 1614 } 1615 int timeSendOut = connection.getSendTimeout(); 1616 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1617 } 1618 1619 /** 1620 * Creates a nondurable subscriber to the specified topic. <p/> 1621 * <P> 1622 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1623 * that have been published to a topic. <p/> 1624 * <P> 1625 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1626 * receive only messages that are published while they are active. <p/> 1627 * <P> 1628 * In some cases, a connection may both publish and subscribe to a topic. 1629 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1630 * inhibit the delivery of messages published by its own connection. The 1631 * default value for this attribute is false. 1632 * 1633 * @param topic the <CODE>Topic</CODE> to subscribe to 1634 * @return TopicSubscriber 1635 * @throws JMSException if the session fails to create a subscriber due to 1636 * some internal error. 1637 * @throws InvalidDestinationException if an invalid topic is specified. 1638 */ 1639 @Override 1640 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1641 checkClosed(); 1642 return createSubscriber(topic, null, false); 1643 } 1644 1645 /** 1646 * Creates a nondurable subscriber to the specified topic, using a message 1647 * selector or specifying whether messages published by its own connection 1648 * should be delivered to it. <p/> 1649 * <P> 1650 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1651 * that have been published to a topic. <p/> 1652 * <P> 1653 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1654 * receive only messages that are published while they are active. <p/> 1655 * <P> 1656 * Messages filtered out by a subscriber's message selector will never be 1657 * delivered to the subscriber. From the subscriber's perspective, they do 1658 * not exist. <p/> 1659 * <P> 1660 * In some cases, a connection may both publish and subscribe to a topic. 1661 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1662 * inhibit the delivery of messages published by its own connection. The 1663 * default value for this attribute is false. 1664 * 1665 * @param topic the <CODE>Topic</CODE> to subscribe to 1666 * @param messageSelector only messages with properties matching the message 1667 * selector expression are delivered. A value of null or an 1668 * empty string indicates that there is no message selector 1669 * for the message consumer. 1670 * @param noLocal if set, inhibits the delivery of messages published by its 1671 * own connection 1672 * @return TopicSubscriber 1673 * @throws JMSException if the session fails to create a subscriber due to 1674 * some internal error. 1675 * @throws InvalidDestinationException if an invalid topic is specified. 1676 * @throws InvalidSelectorException if the message selector is invalid. 1677 */ 1678 @Override 1679 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1680 checkClosed(); 1681 1682 if (topic instanceof CustomDestination) { 1683 CustomDestination customDestination = (CustomDestination)topic; 1684 return customDestination.createSubscriber(this, messageSelector, noLocal); 1685 } 1686 1687 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1688 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1689 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1690 } 1691 1692 /** 1693 * Creates a publisher for the specified topic. <p/> 1694 * <P> 1695 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1696 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1697 * a topic, it defines a new sequence of messages that have no ordering 1698 * relationship with the messages it has previously sent. 1699 * 1700 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1701 * an unidentified producer 1702 * @return TopicPublisher 1703 * @throws JMSException if the session fails to create a publisher due to 1704 * some internal error. 1705 * @throws InvalidDestinationException if an invalid topic is specified. 1706 */ 1707 @Override 1708 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1709 checkClosed(); 1710 1711 if (topic instanceof CustomDestination) { 1712 CustomDestination customDestination = (CustomDestination)topic; 1713 return customDestination.createPublisher(this); 1714 } 1715 int timeSendOut = connection.getSendTimeout(); 1716 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1717 } 1718 1719 /** 1720 * Unsubscribes a durable subscription that has been created by a client. 1721 * <P> 1722 * This method deletes the state being maintained on behalf of the 1723 * subscriber by its provider. 1724 * <P> 1725 * It is erroneous for a client to delete a durable subscription while there 1726 * is an active <CODE>MessageConsumer </CODE> or 1727 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1728 * message is part of a pending transaction or has not been acknowledged in 1729 * the session. 1730 * 1731 * @param name the name used to identify this subscription 1732 * @throws JMSException if the session fails to unsubscribe to the durable 1733 * subscription due to some internal error. 1734 * @throws InvalidDestinationException if an invalid subscription name is 1735 * specified. 1736 * @since 1.1 1737 */ 1738 @Override 1739 public void unsubscribe(String name) throws JMSException { 1740 checkClosed(); 1741 connection.unsubscribe(name); 1742 } 1743 1744 @Override 1745 public void dispatch(MessageDispatch messageDispatch) { 1746 try { 1747 executor.execute(messageDispatch); 1748 } catch (InterruptedException e) { 1749 Thread.currentThread().interrupt(); 1750 connection.onClientInternalException(e); 1751 } 1752 } 1753 1754 /** 1755 * Acknowledges all consumed messages of the session of this consumed 1756 * message. 1757 * <P> 1758 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1759 * for use when a client has specified that its JMS session's consumed 1760 * messages are to be explicitly acknowledged. By invoking 1761 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1762 * all messages consumed by the session that the message was delivered to. 1763 * <P> 1764 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1765 * sessions and sessions specified to use implicit acknowledgement modes. 1766 * <P> 1767 * A client may individually acknowledge each message as it is consumed, or 1768 * it may choose to acknowledge messages as an application-defined group 1769 * (which is done by calling acknowledge on the last received message of the 1770 * group, thereby acknowledging all messages consumed by the session.) 1771 * <P> 1772 * Messages that have been received but not acknowledged may be redelivered. 1773 * 1774 * @throws JMSException if the JMS provider fails to acknowledge the 1775 * messages due to some internal error. 1776 * @throws javax.jms.IllegalStateException if this method is called on a 1777 * closed session. 1778 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1779 */ 1780 public void acknowledge() throws JMSException { 1781 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1782 ActiveMQMessageConsumer c = iter.next(); 1783 c.acknowledge(); 1784 } 1785 } 1786 1787 /** 1788 * Add a message consumer. 1789 * 1790 * @param consumer - message consumer. 1791 * @throws JMSException 1792 */ 1793 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1794 this.consumers.add(consumer); 1795 if (consumer.isDurableSubscriber()) { 1796 stats.onCreateDurableSubscriber(); 1797 } 1798 this.connection.addDispatcher(consumer.getConsumerId(), this); 1799 } 1800 1801 /** 1802 * Remove the message consumer. 1803 * 1804 * @param consumer - consumer to be removed. 1805 * @throws JMSException 1806 */ 1807 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1808 this.connection.removeDispatcher(consumer.getConsumerId()); 1809 if (consumer.isDurableSubscriber()) { 1810 stats.onRemoveDurableSubscriber(); 1811 } 1812 this.consumers.remove(consumer); 1813 this.connection.removeDispatcher(consumer); 1814 } 1815 1816 /** 1817 * Adds a message producer. 1818 * 1819 * @param producer - message producer to be added. 1820 * @throws JMSException 1821 */ 1822 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1823 this.producers.add(producer); 1824 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1825 } 1826 1827 /** 1828 * Removes a message producer. 1829 * 1830 * @param producer - message producer to be removed. 1831 * @throws JMSException 1832 */ 1833 protected void removeProducer(ActiveMQMessageProducer producer) { 1834 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1835 this.producers.remove(producer); 1836 } 1837 1838 /** 1839 * Start this Session. 1840 * 1841 * @throws JMSException 1842 */ 1843 protected void start() throws JMSException { 1844 started.set(true); 1845 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1846 ActiveMQMessageConsumer c = iter.next(); 1847 c.start(); 1848 } 1849 executor.start(); 1850 } 1851 1852 /** 1853 * Stops this session. 1854 * 1855 * @throws JMSException 1856 */ 1857 protected void stop() throws JMSException { 1858 1859 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1860 ActiveMQMessageConsumer c = iter.next(); 1861 c.stop(); 1862 } 1863 1864 started.set(false); 1865 executor.stop(); 1866 } 1867 1868 /** 1869 * Returns the session id. 1870 * 1871 * @return value - session id. 1872 */ 1873 protected SessionId getSessionId() { 1874 return info.getSessionId(); 1875 } 1876 1877 /** 1878 * @return a unique ConsumerId instance. 1879 */ 1880 protected ConsumerId getNextConsumerId() { 1881 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1882 } 1883 1884 /** 1885 * @return a unique ProducerId instance. 1886 */ 1887 protected ProducerId getNextProducerId() { 1888 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1889 } 1890 1891 /** 1892 * Sends the message for dispatch by the broker. 1893 * 1894 * @param producer - message producer. 1895 * @param destination - message destination. 1896 * @param message - message to be sent. 1897 * @param deliveryMode - JMS message delivery mode. 1898 * @param priority - message priority. 1899 * @param timeToLive - message expiration. 1900 * @param producerWindow 1901 * @param onComplete 1902 * @throws JMSException 1903 */ 1904 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1905 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1906 1907 checkClosed(); 1908 if (destination.isTemporary() && connection.isDeleted(destination)) { 1909 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1910 } 1911 synchronized (sendMutex) { 1912 // tell the Broker we are about to start a new transaction 1913 doStartTransaction(); 1914 TransactionId txid = transactionContext.getTransactionId(); 1915 long sequenceNumber = producer.getMessageSequence(); 1916 1917 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1918 message.setJMSDeliveryMode(deliveryMode); 1919 long expiration = 0L; 1920 if (!producer.getDisableMessageTimestamp()) { 1921 long timeStamp = System.currentTimeMillis(); 1922 message.setJMSTimestamp(timeStamp); 1923 if (timeToLive > 0) { 1924 expiration = timeToLive + timeStamp; 1925 } 1926 } 1927 message.setJMSExpiration(expiration); 1928 message.setJMSPriority(priority); 1929 message.setJMSRedelivered(false); 1930 1931 // transform to our own message format here 1932 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1933 msg.setDestination(destination); 1934 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1935 1936 // Set the message id. 1937 if (msg != message) { 1938 message.setJMSMessageID(msg.getMessageId().toString()); 1939 // Make sure the JMS destination is set on the foreign messages too. 1940 message.setJMSDestination(destination); 1941 } 1942 //clear the brokerPath in case we are re-sending this message 1943 msg.setBrokerPath(null); 1944 1945 msg.setTransactionId(txid); 1946 if (connection.isCopyMessageOnSend()) { 1947 msg = (ActiveMQMessage)msg.copy(); 1948 } 1949 msg.setConnection(connection); 1950 msg.onSend(); 1951 msg.setProducerId(msg.getMessageId().getProducerId()); 1952 if (LOG.isTraceEnabled()) { 1953 LOG.trace(getSessionId() + " sending message: " + msg); 1954 } 1955 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1956 this.connection.asyncSendPacket(msg); 1957 if (producerWindow != null) { 1958 // Since we defer lots of the marshaling till we hit the 1959 // wire, this might not 1960 // provide and accurate size. We may change over to doing 1961 // more aggressive marshaling, 1962 // to get more accurate sizes.. this is more important once 1963 // users start using producer window 1964 // flow control. 1965 int size = msg.getSize(); 1966 producerWindow.increaseUsage(size); 1967 } 1968 } else { 1969 if (sendTimeout > 0 && onComplete==null) { 1970 this.connection.syncSendPacket(msg,sendTimeout); 1971 }else { 1972 this.connection.syncSendPacket(msg, onComplete); 1973 } 1974 } 1975 1976 } 1977 } 1978 1979 /** 1980 * Send TransactionInfo to indicate transaction has started 1981 * 1982 * @throws JMSException if some internal error occurs 1983 */ 1984 protected void doStartTransaction() throws JMSException { 1985 if (getTransacted() && !transactionContext.isInXATransaction()) { 1986 transactionContext.begin(); 1987 } 1988 } 1989 1990 /** 1991 * Checks whether the session has unconsumed messages. 1992 * 1993 * @return true - if there are unconsumed messages. 1994 */ 1995 public boolean hasUncomsumedMessages() { 1996 return executor.hasUncomsumedMessages(); 1997 } 1998 1999 /** 2000 * Checks whether the session uses transactions. 2001 * 2002 * @return true - if the session uses transactions. 2003 */ 2004 public boolean isTransacted() { 2005 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 2006 } 2007 2008 /** 2009 * Checks whether the session used client acknowledgment. 2010 * 2011 * @return true - if the session uses client acknowledgment. 2012 */ 2013 protected boolean isClientAcknowledge() { 2014 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 2015 } 2016 2017 /** 2018 * Checks whether the session used auto acknowledgment. 2019 * 2020 * @return true - if the session uses client acknowledgment. 2021 */ 2022 public boolean isAutoAcknowledge() { 2023 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 2024 } 2025 2026 /** 2027 * Checks whether the session used dup ok acknowledgment. 2028 * 2029 * @return true - if the session uses client acknowledgment. 2030 */ 2031 public boolean isDupsOkAcknowledge() { 2032 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 2033 } 2034 2035 public boolean isIndividualAcknowledge(){ 2036 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 2037 } 2038 2039 /** 2040 * Returns the message delivery listener. 2041 * 2042 * @return deliveryListener - message delivery listener. 2043 */ 2044 public DeliveryListener getDeliveryListener() { 2045 return deliveryListener; 2046 } 2047 2048 /** 2049 * Sets the message delivery listener. 2050 * 2051 * @param deliveryListener - message delivery listener. 2052 */ 2053 public void setDeliveryListener(DeliveryListener deliveryListener) { 2054 this.deliveryListener = deliveryListener; 2055 } 2056 2057 /** 2058 * Returns the SessionInfo bean. 2059 * 2060 * @return info - SessionInfo bean. 2061 * @throws JMSException 2062 */ 2063 protected SessionInfo getSessionInfo() throws JMSException { 2064 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 2065 return info; 2066 } 2067 2068 /** 2069 * Send the asynchronous command. 2070 * 2071 * @param command - command to be executed. 2072 * @throws JMSException 2073 */ 2074 public void asyncSendPacket(Command command) throws JMSException { 2075 connection.asyncSendPacket(command); 2076 } 2077 2078 /** 2079 * Send the synchronous command. 2080 * 2081 * @param command - command to be executed. 2082 * @return Response 2083 * @throws JMSException 2084 */ 2085 public Response syncSendPacket(Command command) throws JMSException { 2086 return connection.syncSendPacket(command); 2087 } 2088 2089 public long getNextDeliveryId() { 2090 return deliveryIdGenerator.getNextSequenceId(); 2091 } 2092 2093 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 2094 2095 List<MessageDispatch> c = unconsumedMessages.removeAll(); 2096 for (MessageDispatch md : c) { 2097 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 2098 } 2099 Collections.reverse(c); 2100 2101 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 2102 MessageDispatch md = iter.next(); 2103 executor.executeFirst(md); 2104 } 2105 2106 } 2107 2108 public boolean isRunning() { 2109 return started.get(); 2110 } 2111 2112 public boolean isAsyncDispatch() { 2113 return asyncDispatch; 2114 } 2115 2116 public void setAsyncDispatch(boolean asyncDispatch) { 2117 this.asyncDispatch = asyncDispatch; 2118 } 2119 2120 /** 2121 * @return Returns the sessionAsyncDispatch. 2122 */ 2123 public boolean isSessionAsyncDispatch() { 2124 return sessionAsyncDispatch; 2125 } 2126 2127 /** 2128 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 2129 */ 2130 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 2131 this.sessionAsyncDispatch = sessionAsyncDispatch; 2132 } 2133 2134 public MessageTransformer getTransformer() { 2135 return transformer; 2136 } 2137 2138 public ActiveMQConnection getConnection() { 2139 return connection; 2140 } 2141 2142 /** 2143 * Sets the transformer used to transform messages before they are sent on 2144 * to the JMS bus or when they are received from the bus but before they are 2145 * delivered to the JMS client 2146 */ 2147 public void setTransformer(MessageTransformer transformer) { 2148 this.transformer = transformer; 2149 } 2150 2151 public BlobTransferPolicy getBlobTransferPolicy() { 2152 return blobTransferPolicy; 2153 } 2154 2155 /** 2156 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 2157 * OBjects) are transferred from producers to brokers to consumers 2158 */ 2159 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 2160 this.blobTransferPolicy = blobTransferPolicy; 2161 } 2162 2163 public List<MessageDispatch> getUnconsumedMessages() { 2164 return executor.getUnconsumedMessages(); 2165 } 2166 2167 @Override 2168 public String toString() { 2169 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "} " + sendMutex; 2170 } 2171 2172 public void checkMessageListener() throws JMSException { 2173 if (messageListener != null) { 2174 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2175 } 2176 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 2177 ActiveMQMessageConsumer consumer = i.next(); 2178 if (consumer.hasMessageListener()) { 2179 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2180 } 2181 } 2182 } 2183 2184 protected void setOptimizeAcknowledge(boolean value) { 2185 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2186 ActiveMQMessageConsumer c = iter.next(); 2187 c.setOptimizeAcknowledge(value); 2188 } 2189 } 2190 2191 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2192 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2193 ActiveMQMessageConsumer c = iter.next(); 2194 if (c.getConsumerId().equals(id)) { 2195 c.setPrefetchSize(prefetch); 2196 break; 2197 } 2198 } 2199 } 2200 2201 protected void close(ConsumerId id) { 2202 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2203 ActiveMQMessageConsumer c = iter.next(); 2204 if (c.getConsumerId().equals(id)) { 2205 try { 2206 c.close(); 2207 } catch (JMSException e) { 2208 LOG.warn("Exception closing consumer", e); 2209 } 2210 LOG.warn("Closed consumer on Command, " + id); 2211 break; 2212 } 2213 } 2214 } 2215 2216 public boolean isInUse(ActiveMQTempDestination destination) { 2217 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2218 ActiveMQMessageConsumer c = iter.next(); 2219 if (c.isInUse(destination)) { 2220 return true; 2221 } 2222 } 2223 return false; 2224 } 2225 2226 /** 2227 * highest sequence id of the last message delivered by this session. 2228 * Passed to the broker in the close command, maintained by dispose() 2229 * @return lastDeliveredSequenceId 2230 */ 2231 public long getLastDeliveredSequenceId() { 2232 return lastDeliveredSequenceId; 2233 } 2234 2235 protected void sendAck(MessageAck ack) throws JMSException { 2236 sendAck(ack,false); 2237 } 2238 2239 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2240 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2241 asyncSendPacket(ack); 2242 } else { 2243 syncSendPacket(ack); 2244 } 2245 } 2246 2247 protected Scheduler getScheduler() throws JMSException { 2248 return this.connection.getScheduler(); 2249 } 2250 2251 protected ThreadPoolExecutor getConnectionExecutor() { 2252 return this.connectionExecutor; 2253 } 2254}