001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc.adapter;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.sql.Blob;
023    import java.sql.Connection;
024    import java.sql.PreparedStatement;
025    import java.sql.ResultSet;
026    import java.sql.SQLException;
027    
028    import javax.jms.JMSException;
029    import javax.sql.rowset.serial.SerialBlob;
030    
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.MessageId;
033    import org.apache.activemq.store.jdbc.TransactionContext;
034    import org.apache.activemq.util.ByteArrayOutputStream;
035    
036    /**
037     * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
038     * operations. This is a little more involved since to insert a blob you have
039     * to:
040     * 
041     * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
042     * value.
043     * 
044     * The databases/JDBC drivers that use this adapter are:
045     * <ul>
046     * <li></li>
047     * </ul>
048     * 
049     * @org.apache.xbean.XBean element="blobJDBCAdapter"
050     * 
051     * 
052     */
053    public class BlobJDBCAdapter extends DefaultJDBCAdapter {
054    
055        @Override
056        public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
057                long expiration, byte priority) throws SQLException, IOException {
058            PreparedStatement s = null;
059            ResultSet rs = null;
060            cleanupExclusiveLock.readLock().lock();
061            try {
062                // Add the Blob record.
063                s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
064                s.setLong(1, sequence);
065                s.setString(2, messageID.getProducerId().toString());
066                s.setLong(3, messageID.getProducerSequenceId());
067                s.setString(4, destination.getQualifiedName());
068                s.setLong(5, expiration);
069                s.setLong(6, priority);
070    
071                if (s.executeUpdate() != 1) {
072                    throw new IOException("Failed to add broker message: " + messageID + " in container.");
073                }
074                s.close();
075    
076                // Select the blob record so that we can update it.
077                s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(),
078                            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
079                s.setLong(1, sequence);
080                rs = s.executeQuery();
081                if (!rs.next()) {
082                    throw new IOException("Failed select blob for message: " + messageID + " in container.");
083                }
084    
085                // Update the blob
086                Blob blob = rs.getBlob(1);
087                blob.truncate(0);
088                blob.setBytes(1, data);
089                rs.updateBlob(1, blob);
090                rs.updateRow();             // Update the row with the updated blob
091    
092            } finally {
093                cleanupExclusiveLock.readLock().unlock();
094                close(rs);
095                close(s);
096            }
097        }
098    
099        @Override
100        public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
101            PreparedStatement s = null;
102            ResultSet rs = null;
103            cleanupExclusiveLock.readLock().lock();
104            try {
105    
106                s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
107                s.setString(1, id.getProducerId().toString());
108                s.setLong(2, id.getProducerSequenceId());
109                rs = s.executeQuery();
110    
111                if (!rs.next()) {
112                    return null;
113                }
114                Blob blob = rs.getBlob(1);
115                InputStream is = blob.getBinaryStream();
116    
117                ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
118                int ch;
119                while ((ch = is.read()) >= 0) {
120                    os.write(ch);
121                }
122                is.close();
123                os.close();
124    
125                return os.toByteArray();
126    
127            } finally {
128                cleanupExclusiveLock.readLock().unlock();
129                close(rs);
130                close(s);
131            }
132        }
133    
134    }