001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.io.OutputStream; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 025 import javax.jms.InvalidDestinationException; 026 import javax.jms.JMSException; 027 028 import org.apache.activemq.command.ActiveMQBytesMessage; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.ActiveMQMessage; 031 import org.apache.activemq.command.MessageId; 032 import org.apache.activemq.command.ProducerId; 033 import org.apache.activemq.command.ProducerInfo; 034 import org.apache.activemq.util.IOExceptionSupport; 035 import org.apache.activemq.util.IntrospectionSupport; 036 037 /** 038 * 039 */ 040 public class ActiveMQOutputStream extends OutputStream implements Disposable { 041 042 protected int count; 043 044 final byte buffer[]; 045 046 private final ActiveMQConnection connection; 047 private final Map<String, Object> properties; 048 private final ProducerInfo info; 049 050 private long messageSequence; 051 private boolean closed; 052 private final int deliveryMode; 053 private final int priority; 054 private final long timeToLive; 055 private boolean alwaysSyncSend = false; 056 057 /** 058 * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb 059 */ 060 public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE"; 061 062 public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority, 063 long timeToLive) throws JMSException { 064 this.connection = connection; 065 this.deliveryMode = deliveryMode; 066 this.priority = priority; 067 this.timeToLive = timeToLive; 068 this.properties = properties == null ? null : new HashMap<String, Object>(properties); 069 070 Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE); 071 if (chunkSize == null) { 072 chunkSize = 64 * 1024; 073 } else { 074 if (chunkSize < 1) { 075 throw new IllegalArgumentException("Chunk size must be greater then 0"); 076 } else { 077 chunkSize *= 1024; 078 } 079 } 080 081 buffer = new byte[chunkSize]; 082 083 if (destination == null) { 084 throw new InvalidDestinationException("Don't understand null destinations"); 085 } 086 087 this.info = new ProducerInfo(producerId); 088 089 // Allows the options on the destination to configure the stream 090 if (destination.getOptions() != null) { 091 Map<String, String> options = new HashMap<String, String>(destination.getOptions()); 092 IntrospectionSupport.setProperties(this, options, "producer."); 093 IntrospectionSupport.setProperties(this.info, options, "producer."); 094 } 095 096 this.info.setDestination(destination); 097 098 this.connection.addOutputStream(this); 099 this.connection.asyncSendPacket(info); 100 } 101 102 public void close() throws IOException { 103 if (!closed) { 104 flushBuffer(); 105 try { 106 // Send an EOS style empty message to signal EOS. 107 send(new ActiveMQMessage(), true); 108 dispose(); 109 this.connection.asyncSendPacket(info.createRemoveCommand()); 110 } catch (JMSException e) { 111 IOExceptionSupport.create(e); 112 } 113 } 114 } 115 116 public void dispose() { 117 if (!closed) { 118 this.connection.removeOutputStream(this); 119 closed = true; 120 } 121 } 122 123 public synchronized void write(int b) throws IOException { 124 buffer[count++] = (byte) b; 125 if (count == buffer.length) { 126 flushBuffer(); 127 } 128 } 129 130 public synchronized void write(byte b[], int off, int len) throws IOException { 131 while (len > 0) { 132 int max = Math.min(len, buffer.length - count); 133 System.arraycopy(b, off, buffer, count, max); 134 135 len -= max; 136 count += max; 137 off += max; 138 139 if (count == buffer.length) { 140 flushBuffer(); 141 } 142 } 143 } 144 145 public synchronized void flush() throws IOException { 146 flushBuffer(); 147 } 148 149 private void flushBuffer() throws IOException { 150 if (count != 0) { 151 try { 152 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 153 msg.writeBytes(buffer, 0, count); 154 send(msg, false); 155 } catch (JMSException e) { 156 throw IOExceptionSupport.create(e); 157 } 158 count = 0; 159 } 160 } 161 162 /** 163 * @param msg 164 * @throws JMSException 165 */ 166 private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException { 167 if (properties != null) { 168 for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) { 169 String key = iter.next(); 170 Object value = properties.get(key); 171 msg.setObjectProperty(key, value); 172 } 173 } 174 msg.setType("org.apache.activemq.Stream"); 175 msg.setGroupID(info.getProducerId().toString()); 176 if (eosMessage) { 177 msg.setGroupSequence(-1); 178 } else { 179 msg.setGroupSequence((int) messageSequence); 180 } 181 MessageId id = new MessageId(info.getProducerId(), messageSequence++); 182 connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend()); 183 } 184 185 public String toString() { 186 return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }"; 187 } 188 189 public boolean isAlwaysSyncSend() { 190 return alwaysSyncSend; 191 } 192 193 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 194 this.alwaysSyncSend = alwaysSyncSend; 195 } 196 197 }