001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command; 018 019import org.apache.activemq.ActiveMQConnectionFactory; 020import org.apache.activemq.command.ActiveMQDestination; 021import org.apache.activemq.util.ConsumerThread; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import javax.jms.Connection; 026import javax.jms.Session; 027import java.util.List; 028import java.util.concurrent.CountDownLatch; 029 030public class ConsumerCommand extends AbstractCommand { 031 private static final Logger LOG = LoggerFactory.getLogger(ConsumerCommand.class); 032 033 String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 034 String user = ActiveMQConnectionFactory.DEFAULT_USER; 035 String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 036 String destination = "queue://TEST"; 037 int messageCount = 1000; 038 int sleep; 039 boolean transacted; 040 private boolean durable; 041 private String clientId; 042 int batchSize = 10; 043 int ackMode = Session.AUTO_ACKNOWLEDGE; 044 int parallelThreads = 1; 045 boolean bytesAsText; 046 047 @Override 048 protected void runTask(List<String> tokens) throws Exception { 049 LOG.info("Connecting to URL: " + brokerUrl + " as user: " + user); 050 LOG.info("Consuming " + destination); 051 LOG.info("Sleeping between receives " + sleep + " ms"); 052 LOG.info("Running " + parallelThreads + " parallel threads"); 053 054 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); 055 Connection conn = null; 056 try { 057 conn = factory.createConnection(user, password); 058 if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) { 059 conn.setClientID(clientId); 060 } 061 conn.start(); 062 063 064 CountDownLatch active = new CountDownLatch(parallelThreads); 065 066 for (int i = 1; i <= parallelThreads; i++) { 067 Session sess; 068 if (transacted) { 069 sess = conn.createSession(true, Session.SESSION_TRANSACTED); 070 } else { 071 sess = conn.createSession(false, ackMode); 072 } 073 ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE)); 074 consumer.setName("consumer-" + i); 075 consumer.setDurable(durable); 076 consumer.setBreakOnNull(false); 077 consumer.setMessageCount(messageCount); 078 consumer.setSleep(sleep); 079 consumer.setBatchSize(batchSize); 080 consumer.setFinished(active); 081 consumer.setBytesAsText(bytesAsText); 082 consumer.start(); 083 } 084 085 active.await(); 086 } finally { 087 if (conn != null) { 088 conn.close(); 089 } 090 } 091 } 092 093 public String getBrokerUrl() { 094 return brokerUrl; 095 } 096 097 public void setBrokerUrl(String brokerUrl) { 098 this.brokerUrl = brokerUrl; 099 } 100 101 public String getUser() { 102 return user; 103 } 104 105 public void setUser(String user) { 106 this.user = user; 107 } 108 109 public String getPassword() { 110 return password; 111 } 112 113 public void setPassword(String password) { 114 this.password = password; 115 } 116 117 public String getDestination() { 118 return destination; 119 } 120 121 public void setDestination(String destination) { 122 this.destination = destination; 123 } 124 125 public int getMessageCount() { 126 return messageCount; 127 } 128 129 public void setMessageCount(int messageCount) { 130 this.messageCount = messageCount; 131 } 132 133 public int getSleep() { 134 return sleep; 135 } 136 137 public void setSleep(int sleep) { 138 this.sleep = sleep; 139 } 140 141 public int getBatchSize() { 142 return batchSize; 143 } 144 145 public void setBatchSize(int batchSize) { 146 this.batchSize = batchSize; 147 } 148 149 public int getParallelThreads() { 150 return parallelThreads; 151 } 152 153 public void setParallelThreads(int parallelThreads) { 154 this.parallelThreads = parallelThreads; 155 } 156 157 public boolean isBytesAsText() { 158 return bytesAsText; 159 } 160 161 public void setBytesAsText(boolean bytesAsText) { 162 this.bytesAsText = bytesAsText; 163 } 164 165 public boolean isTransacted() { 166 return transacted; 167 } 168 169 public void setTransacted(boolean transacted) { 170 this.transacted = transacted; 171 } 172 173 public int getAckMode() { 174 return ackMode; 175 } 176 177 public void setAckMode(String ackMode) { 178 if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) { 179 this.ackMode = Session.CLIENT_ACKNOWLEDGE; 180 } 181 if ("AUTO_ACKNOWLEDGE".equals(ackMode)) { 182 this.ackMode = Session.AUTO_ACKNOWLEDGE; 183 } 184 if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) { 185 this.ackMode = Session.DUPS_OK_ACKNOWLEDGE; 186 } 187 } 188 189 public boolean isDurable() { 190 return durable; 191 } 192 193 public void setDurable(boolean durable) { 194 this.durable = durable; 195 } 196 197 public String getClientId() { 198 return clientId; 199 } 200 201 public void setClientId(String clientId) { 202 this.clientId = clientId; 203 } 204 205 @Override 206 protected void printHelp() { 207 printHelpFromFile(); 208 } 209 210 @Override 211 public String getName() { 212 return "consumer"; 213 } 214 215 @Override 216 public String getOneLineDescription() { 217 return "Receives messages from the broker"; 218 } 219}