001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.console.filter; 018 019 import java.net.URI; 020 import java.util.Collections; 021 import java.util.Iterator; 022 import java.util.List; 023 024 import javax.jms.Connection; 025 import javax.jms.ConnectionFactory; 026 import javax.jms.Destination; 027 import javax.jms.JMSException; 028 import javax.jms.QueueBrowser; 029 import javax.jms.Session; 030 031 import org.apache.activemq.ActiveMQConnectionFactory; 032 import org.apache.activemq.command.ActiveMQQueue; 033 import org.apache.activemq.command.ActiveMQTopic; 034 035 public class AmqMessagesQueryFilter extends AbstractQueryFilter { 036 037 private URI brokerUrl; 038 private Destination destination; 039 040 private ConnectionFactory connectionFactory; 041 042 /** 043 * Create a JMS message query filter 044 * 045 * @param brokerUrl - broker url to connect to 046 * @param destination - JMS destination to query 047 */ 048 public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) { 049 super(null); 050 this.brokerUrl = brokerUrl; 051 this.destination = destination; 052 } 053 054 /** 055 * Create a JMS message query filter 056 * 057 * @param brokerUrl - broker url to connect to 058 * @param destination - JMS destination to query 059 */ 060 public AmqMessagesQueryFilter(ConnectionFactory connectionFactory, Destination destination) { 061 super(null); 062 this.destination = destination; 063 this.connectionFactory = connectionFactory; 064 } 065 066 /** 067 * Queries the specified destination using the message selector format query 068 * 069 * @param queries - message selector queries 070 * @return list messages that matches the selector 071 * @throws Exception 072 */ 073 public List query(List queries) throws Exception { 074 String selector = ""; 075 076 // Convert to message selector 077 for (Iterator i = queries.iterator(); i.hasNext(); ) { 078 selector = selector + "(" + i.next().toString() + ") AND "; 079 } 080 081 // Remove last AND 082 if (!selector.equals("")) { 083 selector = selector.substring(0, selector.length() - 5); 084 } 085 086 if (destination instanceof ActiveMQQueue) { 087 return queryMessages((ActiveMQQueue) destination, selector); 088 } else { 089 return queryMessages((ActiveMQTopic) destination, selector); 090 } 091 } 092 093 /** 094 * Query the messages of a queue destination using a queue browser 095 * 096 * @param queue - queue destination 097 * @param selector - message selector 098 * @return list of messages that matches the selector 099 * @throws Exception 100 */ 101 protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception { 102 Connection conn = createConnection(); 103 104 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 105 QueueBrowser browser = sess.createBrowser(queue, selector); 106 107 List messages = Collections.list(browser.getEnumeration()); 108 109 conn.close(); 110 111 return messages; 112 } 113 114 /** 115 * Query the messages of a topic destination using a message consumer 116 * 117 * @param topic - topic destination 118 * @param selector - message selector 119 * @return list of messages that matches the selector 120 * @throws Exception 121 */ 122 protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception { 123 // TODO: should we use a durable subscriber or a retroactive non-durable 124 // subscriber? 125 // TODO: if a durable subscriber is used, how do we manage it? 126 // subscribe/unsubscribe tasks? 127 return null; 128 } 129 130 /** 131 * Create and start a JMS connection 132 * 133 * @param brokerUrl - broker url to connect to. 134 * @return JMS connection 135 * @throws JMSException 136 * @deprecated Use createConnection() instead, and pass the url to the ConnectionFactory when it's created. 137 */ 138 @Deprecated 139 protected Connection createConnection(URI brokerUrl) throws JMSException { 140 // maintain old behaviour, when called this way. 141 connectionFactory = (new ActiveMQConnectionFactory(brokerUrl)); 142 return createConnection(); 143 } 144 145 /** 146 * Create and start a JMS connection 147 * 148 * @return JMS connection 149 * @throws JMSException 150 */ 151 protected Connection createConnection() throws JMSException { 152 // maintain old behaviour, when called either way. 153 if (null == connectionFactory) 154 connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl())); 155 156 Connection conn = connectionFactory.createConnection(); 157 conn.start(); 158 return conn; 159 } 160 161 162 /** 163 * Get the broker url being used. 164 * 165 * @return broker url 166 */ 167 public URI getBrokerUrl() { 168 return brokerUrl; 169 } 170 171 /** 172 * Set the broker url to use. 173 * 174 * @param brokerUrl - broker url 175 */ 176 public void setBrokerUrl(URI brokerUrl) { 177 this.brokerUrl = brokerUrl; 178 } 179 180 /** 181 * Get the destination being used. 182 * 183 * @return - JMS destination 184 */ 185 public Destination getDestination() { 186 return destination; 187 } 188 189 /** 190 * Set the destination to use. 191 * 192 * @param destination - JMS destination 193 */ 194 public void setDestination(Destination destination) { 195 this.destination = destination; 196 } 197 198 }