001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.EOFException; 020import java.io.File; 021import java.io.FileNotFoundException; 022import java.io.FilenameFilter; 023import java.io.IOException; 024import java.io.RandomAccessFile; 025import java.io.UnsupportedEncodingException; 026import java.nio.ByteBuffer; 027import java.nio.channels.ClosedByInterruptException; 028import java.nio.channels.FileChannel; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.LinkedList; 034import java.util.Map; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.ScheduledFuture; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.AtomicReference; 046import java.util.zip.Adler32; 047import java.util.zip.Checksum; 048 049import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 051import org.apache.activemq.store.kahadb.disk.util.Sequence; 052import org.apache.activemq.util.ByteSequence; 053import org.apache.activemq.util.DataByteArrayInputStream; 054import org.apache.activemq.util.DataByteArrayOutputStream; 055import org.apache.activemq.util.IOHelper; 056import org.apache.activemq.util.RecoverableRandomAccessFile; 057import org.apache.activemq.util.ThreadPoolUtils; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Manages DataFiles 063 */ 064public class Journal { 065 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 066 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 067 068 private static final int PREALLOC_CHUNK_SIZE = 1024*1024; 069 070 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 071 public static final int RECORD_HEAD_SPACE = 4 + 1; 072 073 public static final byte USER_RECORD_TYPE = 1; 074 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 075 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 076 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 077 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; 078 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 079 public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); 080 public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); 081 public static final byte EOF_EOT = '4'; 082 public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); 083 084 private ScheduledExecutorService scheduler; 085 086 // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss 087 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 088 DataFile dataFile = getDataFile(recoveryPosition); 089 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 090 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 091 try { 092 RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); 093 randomAccessFile.seek(recoveryPosition.getOffset() + 1); 094 byte[] data = new byte[getWriteBatchSize()]; 095 ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); 096 int nextOffset = 0; 097 if (findNextBatchRecord(bs, randomAccessFile) >= 0) { 098 nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining()); 099 } else { 100 nextOffset = Math.toIntExact(randomAccessFile.length()); 101 } 102 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1); 103 LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); 104 105 // skip corruption on getNextLocation 106 recoveryPosition.setOffset(nextOffset); 107 recoveryPosition.setSize(-1); 108 109 dataFile.corruptedBlocks.add(sequence); 110 } catch (IOException e) { 111 } finally { 112 accessorPool.closeDataFileAccessor(reader); 113 } 114 } 115 116 public DataFileAccessorPool getAccessorPool() { 117 return accessorPool; 118 } 119 120 public void allowIOResumption() { 121 if (appender instanceof DataFileAppender) { 122 DataFileAppender dataFileAppender = (DataFileAppender)appender; 123 dataFileAppender.shutdown = false; 124 } 125 } 126 127 public enum PreallocationStrategy { 128 SPARSE_FILE, 129 OS_KERNEL_COPY, 130 ZEROS, 131 CHUNKED_ZEROS; 132 } 133 134 public enum PreallocationScope { 135 ENTIRE_JOURNAL, 136 ENTIRE_JOURNAL_ASYNC, 137 NONE; 138 } 139 140 public enum JournalDiskSyncStrategy { 141 ALWAYS, 142 PERIODIC, 143 NEVER; 144 } 145 146 private static byte[] createBatchControlRecordHeader() { 147 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 148 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 149 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 150 os.write(BATCH_CONTROL_RECORD_MAGIC); 151 ByteSequence sequence = os.toByteSequence(); 152 sequence.compact(); 153 return sequence.getData(); 154 } catch (IOException e) { 155 throw new RuntimeException("Could not create batch control record header.", e); 156 } 157 } 158 159 private static byte[] createEmptyBatchControlRecordHeader() { 160 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 161 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 162 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 163 os.write(BATCH_CONTROL_RECORD_MAGIC); 164 os.writeInt(0); 165 os.writeLong(0l); 166 ByteSequence sequence = os.toByteSequence(); 167 sequence.compact(); 168 return sequence.getData(); 169 } catch (IOException e) { 170 throw new RuntimeException("Could not create empty batch control record header.", e); 171 } 172 } 173 174 private static byte[] createEofBatchAndLocationRecord() { 175 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 176 os.writeInt(EOF_INT); 177 os.writeByte(EOF_EOT); 178 ByteSequence sequence = os.toByteSequence(); 179 sequence.compact(); 180 return sequence.getData(); 181 } catch (IOException e) { 182 throw new RuntimeException("Could not create eof header.", e); 183 } 184 } 185 186 public static final String DEFAULT_DIRECTORY = "."; 187 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 188 public static final String DEFAULT_FILE_PREFIX = "db-"; 189 public static final String DEFAULT_FILE_SUFFIX = ".log"; 190 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 191 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 192 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 193 194 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 195 196 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 197 198 protected File directory = new File(DEFAULT_DIRECTORY); 199 protected File directoryArchive; 200 private boolean directoryArchiveOverridden = false; 201 202 protected String filePrefix = DEFAULT_FILE_PREFIX; 203 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 204 protected boolean started; 205 206 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 207 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 208 209 protected FileAppender appender; 210 protected DataFileAccessorPool accessorPool; 211 212 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 213 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 214 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 215 216 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 217 protected ScheduledFuture cleanupTask; 218 protected AtomicLong totalLength = new AtomicLong(); 219 protected boolean archiveDataLogs; 220 private ReplicationTarget replicationTarget; 221 protected boolean checksum; 222 protected boolean checkForCorruptionOnStartup; 223 protected boolean enableAsyncDiskSync = true; 224 private int nextDataFileId = 1; 225 private Object dataFileIdLock = new Object(); 226 private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null); 227 private volatile DataFile nextDataFile; 228 229 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 230 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 231 private File osKernelCopyTemplateFile = null; 232 private ByteBuffer preAllocateDirectBuffer = null; 233 234 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 235 236 public interface DataFileRemovedListener { 237 void fileRemoved(DataFile datafile); 238 } 239 240 private DataFileRemovedListener dataFileRemovedListener; 241 242 public synchronized void start() throws IOException { 243 if (started) { 244 return; 245 } 246 247 long start = System.currentTimeMillis(); 248 accessorPool = new DataFileAccessorPool(this); 249 started = true; 250 251 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 252 253 File[] files = directory.listFiles(new FilenameFilter() { 254 @Override 255 public boolean accept(File dir, String n) { 256 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 257 } 258 }); 259 260 if (files != null) { 261 for (File file : files) { 262 try { 263 String n = file.getName(); 264 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 265 int num = Integer.parseInt(numStr); 266 DataFile dataFile = new DataFile(file, num); 267 fileMap.put(dataFile.getDataFileId(), dataFile); 268 totalLength.addAndGet(dataFile.getLength()); 269 } catch (NumberFormatException e) { 270 // Ignore file that do not match the pattern. 271 } 272 } 273 274 // Sort the list so that we can link the DataFiles together in the 275 // right order. 276 LinkedList<DataFile> l = new LinkedList<>(fileMap.values()); 277 Collections.sort(l); 278 for (DataFile df : l) { 279 if (df.getLength() == 0) { 280 // possibly the result of a previous failed write 281 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 282 continue; 283 } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { 284 continue; 285 } 286 dataFiles.addLast(df); 287 fileByFileMap.put(df.getFile(), df); 288 289 if( isCheckForCorruptionOnStartup() ) { 290 lastAppendLocation.set(recoveryCheck(df)); 291 } 292 } 293 } 294 295 if (preallocationScope != PreallocationScope.NONE) { 296 switch (preallocationStrategy) { 297 case SPARSE_FILE: 298 break; 299 case OS_KERNEL_COPY: { 300 osKernelCopyTemplateFile = createJournalTemplateFile(); 301 } 302 break; 303 case CHUNKED_ZEROS: { 304 preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE); 305 } 306 break; 307 case ZEROS: { 308 preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength()); 309 } 310 break; 311 } 312 } 313 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { 314 @Override 315 public Thread newThread(Runnable r) { 316 Thread schedulerThread = new Thread(r); 317 schedulerThread.setName("ActiveMQ Journal Scheduled executor"); 318 schedulerThread.setDaemon(true); 319 return schedulerThread; 320 } 321 }); 322 323 // init current write file 324 if (dataFiles.isEmpty()) { 325 nextDataFileId = 1; 326 rotateWriteFile(); 327 } else { 328 currentDataFile.set(dataFiles.getTail()); 329 nextDataFileId = currentDataFile.get().dataFileId + 1; 330 } 331 332 if( lastAppendLocation.get()==null ) { 333 DataFile df = dataFiles.getTail(); 334 lastAppendLocation.set(recoveryCheck(df)); 335 } 336 337 // ensure we don't report unused space of last journal file in size metric 338 int lastFileLength = dataFiles.getTail().getLength(); 339 if (totalLength.get() > lastFileLength && lastAppendLocation.get().getOffset() > 0) { 340 totalLength.addAndGet(lastAppendLocation.get().getOffset() - lastFileLength); 341 } 342 343 cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { 344 @Override 345 public void run() { 346 cleanup(); 347 } 348 }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS); 349 350 long end = System.currentTimeMillis(); 351 LOG.trace("Startup took: "+(end-start)+" ms"); 352 } 353 354 private ByteBuffer allocateDirectBuffer(int size) { 355 ByteBuffer buffer = ByteBuffer.allocateDirect(size); 356 buffer.put(EOF_RECORD); 357 return buffer; 358 } 359 360 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 361 362 if (PreallocationScope.NONE != preallocationScope) { 363 364 try { 365 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 366 doPreallocationKernelCopy(file); 367 } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 368 doPreallocationZeros(file); 369 } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { 370 doPreallocationChunkedZeros(file); 371 } else { 372 doPreallocationSparseFile(file); 373 } 374 } catch (Throwable continueWithNoPrealloc) { 375 // error on preallocation is non fatal, and we don't want to leak the journal handle 376 LOG.error("cound not preallocate journal data file", continueWithNoPrealloc); 377 } 378 } 379 } 380 381 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 382 final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); 383 try { 384 FileChannel channel = file.getChannel(); 385 channel.position(0); 386 channel.write(journalEof); 387 channel.position(maxFileLength - 5); 388 journalEof.rewind(); 389 channel.write(journalEof); 390 channel.force(false); 391 channel.position(0); 392 } catch (ClosedByInterruptException ignored) { 393 LOG.trace("Could not preallocate journal file with sparse file", ignored); 394 } catch (IOException e) { 395 LOG.error("Could not preallocate journal file with sparse file", e); 396 } 397 } 398 399 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 400 preAllocateDirectBuffer.rewind(); 401 try { 402 FileChannel channel = file.getChannel(); 403 channel.write(preAllocateDirectBuffer); 404 channel.force(false); 405 channel.position(0); 406 } catch (ClosedByInterruptException ignored) { 407 LOG.trace("Could not preallocate journal file with zeros", ignored); 408 } catch (IOException e) { 409 LOG.error("Could not preallocate journal file with zeros", e); 410 } 411 } 412 413 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 414 try (RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");){ 415 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 416 } catch (ClosedByInterruptException ignored) { 417 LOG.trace("Could not preallocate journal file with kernel copy", ignored); 418 } catch (FileNotFoundException e) { 419 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 420 } catch (IOException e) { 421 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 422 } 423 } 424 425 private File createJournalTemplateFile() { 426 String fileName = "db-log.template"; 427 File rc = new File(directory, fileName); 428 try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { 429 templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); 430 templateRaf.setLength(maxFileLength); 431 templateRaf.getChannel().force(true); 432 } catch (FileNotFoundException e) { 433 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 434 } catch (IOException e) { 435 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 436 } 437 return rc; 438 } 439 440 private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { 441 preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity()); 442 preAllocateDirectBuffer.rewind(); 443 try { 444 FileChannel channel = file.getChannel(); 445 446 int remLen = maxFileLength; 447 while (remLen > 0) { 448 if (remLen < preAllocateDirectBuffer.remaining()) { 449 preAllocateDirectBuffer.limit(remLen); 450 } 451 int writeLen = channel.write(preAllocateDirectBuffer); 452 remLen -= writeLen; 453 preAllocateDirectBuffer.rewind(); 454 } 455 456 channel.force(false); 457 channel.position(0); 458 } catch (ClosedByInterruptException ignored) { 459 LOG.trace("Could not preallocate journal file with zeros", ignored); 460 } catch (IOException e) { 461 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 462 } 463 } 464 465 private static byte[] bytes(String string) { 466 try { 467 return string.getBytes("UTF-8"); 468 } catch (UnsupportedEncodingException e) { 469 throw new RuntimeException(e); 470 } 471 } 472 473 public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { 474 if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { 475 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 476 try { 477 byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length]; 478 reader.readFully(0, firstFewBytes); 479 ByteSequence bs = new ByteSequence(firstFewBytes); 480 return bs.startsWith(EOF_RECORD); 481 } catch (Exception ignored) { 482 } finally { 483 accessorPool.closeDataFileAccessor(reader); 484 } 485 } 486 return false; 487 } 488 489 protected Location recoveryCheck(DataFile dataFile) throws IOException { 490 Location location = new Location(); 491 location.setDataFileId(dataFile.getDataFileId()); 492 location.setOffset(0); 493 494 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 495 try { 496 RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); 497 randomAccessFile.seek(0); 498 final long totalFileLength = randomAccessFile.length(); 499 byte[] data = new byte[getWriteBatchSize()]; 500 ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); 501 502 while (true) { 503 int size = checkBatchRecord(bs, randomAccessFile); 504 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) { 505 if (size == 0) { 506 // eof batch record 507 break; 508 } 509 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 510 } else { 511 512 // Perhaps it's just some corruption... scan through the 513 // file to find the next valid batch record. We 514 // may have subsequent valid batch records. 515 if (findNextBatchRecord(bs, randomAccessFile) >= 0) { 516 int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining()); 517 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 518 LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); 519 dataFile.corruptedBlocks.add(sequence); 520 location.setOffset(nextOffset); 521 } else { 522 break; 523 } 524 } 525 } 526 527 } catch (IOException e) { 528 } finally { 529 accessorPool.closeDataFileAccessor(reader); 530 } 531 532 int existingLen = dataFile.getLength(); 533 dataFile.setLength(location.getOffset()); 534 if (existingLen > dataFile.getLength()) { 535 totalLength.addAndGet(dataFile.getLength() - existingLen); 536 } 537 538 if (!dataFile.corruptedBlocks.isEmpty()) { 539 // Is the end of the data file corrupted? 540 if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { 541 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 542 } 543 } 544 545 return location; 546 } 547 548 private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { 549 final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 550 int pos = 0; 551 while (true) { 552 pos = bs.indexOf(header, 0); 553 if (pos >= 0) { 554 bs.setOffset(bs.offset + pos); 555 return pos; 556 } else { 557 // need to load the next data chunck in.. 558 if (bs.length != bs.data.length) { 559 // If we had a short read then we were at EOF 560 return -1; 561 } 562 bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length); 563 bs.reset(); 564 bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length)); 565 } 566 } 567 } 568 569 private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { 570 ensureAvailable(bs, reader, EOF_RECORD.length); 571 if (bs.startsWith(EOF_RECORD)) { 572 return 0; // eof 573 } 574 ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE); 575 try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) { 576 577 // Assert that it's a batch record. 578 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 579 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 580 return -1; 581 } 582 } 583 584 int size = controlIs.readInt(); 585 if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { 586 return -2; 587 } 588 589 long expectedChecksum = controlIs.readLong(); 590 Checksum checksum = null; 591 if (isChecksum() && expectedChecksum > 0) { 592 checksum = new Adler32(); 593 } 594 595 // revert to bs to consume data 596 bs.setOffset(controlIs.position()); 597 int toRead = size; 598 while (toRead > 0) { 599 if (bs.remaining() >= toRead) { 600 if (checksum != null) { 601 checksum.update(bs.getData(), bs.getOffset(), toRead); 602 } 603 bs.setOffset(bs.offset + toRead); 604 toRead = 0; 605 } else { 606 if (bs.length != bs.data.length) { 607 // buffer exhausted 608 return -3; 609 } 610 611 toRead -= bs.remaining(); 612 if (checksum != null) { 613 checksum.update(bs.getData(), bs.getOffset(), bs.remaining()); 614 } 615 bs.setLength(reader.read(bs.data)); 616 bs.setOffset(0); 617 } 618 } 619 if (checksum != null && expectedChecksum != checksum.getValue()) { 620 return -4; 621 } 622 623 return size; 624 } 625 } 626 627 private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required) throws IOException { 628 if (bs.remaining() < required) { 629 bs.reset(); 630 int read = reader.read(bs.data, bs.length, bs.data.length - bs.length); 631 if (read < 0) { 632 if (bs.remaining() == 0) { 633 throw new EOFException("request for " + required + " bytes reached EOF"); 634 } 635 } 636 bs.setLength(bs.length + read); 637 } 638 } 639 640 void addToTotalLength(int size) { 641 totalLength.addAndGet(size); 642 } 643 644 public long length() { 645 return totalLength.get(); 646 } 647 648 private void rotateWriteFile() throws IOException { 649 synchronized (dataFileIdLock) { 650 DataFile dataFile = nextDataFile; 651 if (dataFile == null) { 652 dataFile = newDataFile(); 653 } 654 synchronized (currentDataFile) { 655 fileMap.put(dataFile.getDataFileId(), dataFile); 656 fileByFileMap.put(dataFile.getFile(), dataFile); 657 dataFiles.addLast(dataFile); 658 currentDataFile.set(dataFile); 659 } 660 nextDataFile = null; 661 } 662 if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { 663 preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); 664 } 665 } 666 667 private Runnable preAllocateNextDataFileTask = new Runnable() { 668 @Override 669 public void run() { 670 if (nextDataFile == null) { 671 synchronized (dataFileIdLock){ 672 try { 673 nextDataFile = newDataFile(); 674 } catch (IOException e) { 675 LOG.warn("Failed to proactively allocate data file", e); 676 } 677 } 678 } 679 } 680 }; 681 682 private volatile Future preAllocateNextDataFileFuture; 683 684 private DataFile newDataFile() throws IOException { 685 int nextNum = nextDataFileId++; 686 File file = getFile(nextNum); 687 DataFile nextWriteFile = new DataFile(file, nextNum); 688 preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); 689 return nextWriteFile; 690 } 691 692 693 public DataFile reserveDataFile() { 694 synchronized (dataFileIdLock) { 695 int nextNum = nextDataFileId++; 696 File file = getFile(nextNum); 697 DataFile reservedDataFile = new DataFile(file, nextNum); 698 synchronized (currentDataFile) { 699 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 700 fileByFileMap.put(file, reservedDataFile); 701 if (dataFiles.isEmpty()) { 702 dataFiles.addLast(reservedDataFile); 703 } else { 704 dataFiles.getTail().linkBefore(reservedDataFile); 705 } 706 } 707 return reservedDataFile; 708 } 709 } 710 711 public File getFile(int nextNum) { 712 String fileName = filePrefix + nextNum + fileSuffix; 713 File file = new File(directory, fileName); 714 return file; 715 } 716 717 DataFile getDataFile(Location item) throws IOException { 718 Integer key = Integer.valueOf(item.getDataFileId()); 719 DataFile dataFile = null; 720 synchronized (currentDataFile) { 721 dataFile = fileMap.get(key); 722 } 723 if (dataFile == null) { 724 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 725 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 726 } 727 return dataFile; 728 } 729 730 public void close() throws IOException { 731 synchronized (this) { 732 if (!started) { 733 return; 734 } 735 cleanupTask.cancel(true); 736 if (preAllocateNextDataFileFuture != null) { 737 preAllocateNextDataFileFuture.cancel(true); 738 } 739 ThreadPoolUtils.shutdownGraceful(scheduler, 4000); 740 accessorPool.close(); 741 } 742 // the appender can be calling back to to the journal blocking a close AMQ-5620 743 appender.close(); 744 synchronized (currentDataFile) { 745 fileMap.clear(); 746 fileByFileMap.clear(); 747 dataFiles.clear(); 748 lastAppendLocation.set(null); 749 started = false; 750 } 751 } 752 753 public synchronized void cleanup() { 754 if (accessorPool != null) { 755 accessorPool.disposeUnused(); 756 } 757 } 758 759 public synchronized boolean delete() throws IOException { 760 761 // Close all open file handles... 762 appender.close(); 763 accessorPool.close(); 764 765 boolean result = true; 766 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 767 DataFile dataFile = i.next(); 768 result &= dataFile.delete(); 769 } 770 771 if (preAllocateNextDataFileFuture != null) { 772 preAllocateNextDataFileFuture.cancel(true); 773 } 774 synchronized (dataFileIdLock) { 775 if (nextDataFile != null) { 776 nextDataFile.delete(); 777 nextDataFile = null; 778 } 779 } 780 781 totalLength.set(0); 782 synchronized (currentDataFile) { 783 fileMap.clear(); 784 fileByFileMap.clear(); 785 lastAppendLocation.set(null); 786 dataFiles = new LinkedNodeList<DataFile>(); 787 } 788 // reopen open file handles... 789 accessorPool = new DataFileAccessorPool(this); 790 appender = new DataFileAppender(this); 791 return result; 792 } 793 794 public void removeDataFiles(Set<Integer> files) throws IOException { 795 for (Integer key : files) { 796 // Can't remove the data file (or subsequent files) that is currently being written to. 797 if (key >= lastAppendLocation.get().getDataFileId()) { 798 continue; 799 } 800 DataFile dataFile = null; 801 synchronized (currentDataFile) { 802 dataFile = fileMap.remove(key); 803 if (dataFile != null) { 804 fileByFileMap.remove(dataFile.getFile()); 805 dataFile.unlink(); 806 } 807 } 808 if (dataFile != null) { 809 forceRemoveDataFile(dataFile); 810 } 811 } 812 } 813 814 private void forceRemoveDataFile(DataFile dataFile) throws IOException { 815 accessorPool.disposeDataFileAccessors(dataFile); 816 totalLength.addAndGet(-dataFile.getLength()); 817 if (archiveDataLogs) { 818 File directoryArchive = getDirectoryArchive(); 819 if (directoryArchive.exists()) { 820 LOG.debug("Archive directory exists: {}", directoryArchive); 821 } else { 822 if (directoryArchive.isAbsolute()) 823 if (LOG.isDebugEnabled()) { 824 LOG.debug("Archive directory [{}] does not exist - creating it now", 825 directoryArchive.getAbsolutePath()); 826 } 827 IOHelper.mkdirs(directoryArchive); 828 } 829 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 830 dataFile.move(directoryArchive); 831 LOG.debug("Successfully moved data file"); 832 } else { 833 LOG.debug("Deleting data file: {}", dataFile); 834 if (dataFile.delete()) { 835 LOG.debug("Discarded data file: {}", dataFile); 836 } else { 837 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 838 } 839 } 840 if (dataFileRemovedListener != null) { 841 dataFileRemovedListener.fileRemoved(dataFile); 842 } 843 } 844 845 /** 846 * @return the maxFileLength 847 */ 848 public int getMaxFileLength() { 849 return maxFileLength; 850 } 851 852 /** 853 * @param maxFileLength the maxFileLength to set 854 */ 855 public void setMaxFileLength(int maxFileLength) { 856 this.maxFileLength = maxFileLength; 857 } 858 859 @Override 860 public String toString() { 861 return directory.toString(); 862 } 863 864 public Location getNextLocation(Location location) throws IOException, IllegalStateException { 865 return getNextLocation(location, null); 866 } 867 868 public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException { 869 Location cur = null; 870 while (true) { 871 if (cur == null) { 872 if (location == null) { 873 DataFile head = null; 874 synchronized (currentDataFile) { 875 head = dataFiles.getHead(); 876 } 877 if (head == null) { 878 return null; 879 } 880 cur = new Location(); 881 cur.setDataFileId(head.getDataFileId()); 882 cur.setOffset(0); 883 } else { 884 // Set to the next offset.. 885 if (location.getSize() == -1) { 886 cur = new Location(location); 887 } else { 888 cur = new Location(location); 889 cur.setOffset(location.getOffset() + location.getSize()); 890 } 891 } 892 } else { 893 cur.setOffset(cur.getOffset() + cur.getSize()); 894 } 895 896 DataFile dataFile = getDataFile(cur); 897 898 // Did it go into the next file?? 899 if (dataFile.getLength() <= cur.getOffset()) { 900 synchronized (currentDataFile) { 901 dataFile = dataFile.getNext(); 902 } 903 if (dataFile == null) { 904 return null; 905 } else { 906 cur.setDataFileId(dataFile.getDataFileId().intValue()); 907 cur.setOffset(0); 908 if (limit != null && cur.compareTo(limit) >= 0) { 909 LOG.trace("reached limit: {} at: {}", limit, cur); 910 return null; 911 } 912 } 913 } 914 915 // Load in location size and type. 916 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 917 try { 918 reader.readLocationDetails(cur); 919 } catch (EOFException eof) { 920 LOG.trace("EOF on next: " + location + ", cur: " + cur); 921 throw eof; 922 } finally { 923 accessorPool.closeDataFileAccessor(reader); 924 } 925 926 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 927 if (corruptedRange != null) { 928 // skip corruption 929 cur.setSize((int) corruptedRange.range()); 930 } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || 931 (cur.getType() == 0 && cur.getSize() == 0)) { 932 // eof - jump to next datafile 933 // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for 934 // replay of existing journals 935 // possibly journal is larger than maxFileLength after config change 936 cur.setSize(EOF_RECORD.length); 937 cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); 938 } else if (cur.getType() == USER_RECORD_TYPE) { 939 // Only return user records. 940 return cur; 941 } 942 } 943 } 944 945 public ByteSequence read(Location location) throws IOException, IllegalStateException { 946 DataFile dataFile = getDataFile(location); 947 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 948 ByteSequence rc = null; 949 try { 950 rc = reader.readRecord(location); 951 } finally { 952 accessorPool.closeDataFileAccessor(reader); 953 } 954 return rc; 955 } 956 957 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 958 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 959 return loc; 960 } 961 962 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 963 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 964 return loc; 965 } 966 967 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 968 DataFile dataFile = getDataFile(location); 969 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 970 try { 971 updater.updateRecord(location, data, sync); 972 } finally { 973 accessorPool.closeDataFileAccessor(updater); 974 } 975 } 976 977 public PreallocationStrategy getPreallocationStrategy() { 978 return preallocationStrategy; 979 } 980 981 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 982 this.preallocationStrategy = preallocationStrategy; 983 } 984 985 public PreallocationScope getPreallocationScope() { 986 return preallocationScope; 987 } 988 989 public void setPreallocationScope(PreallocationScope preallocationScope) { 990 this.preallocationScope = preallocationScope; 991 } 992 993 public File getDirectory() { 994 return directory; 995 } 996 997 public void setDirectory(File directory) { 998 this.directory = directory; 999 } 1000 1001 public String getFilePrefix() { 1002 return filePrefix; 1003 } 1004 1005 public void setFilePrefix(String filePrefix) { 1006 this.filePrefix = filePrefix; 1007 } 1008 1009 public Map<WriteKey, WriteCommand> getInflightWrites() { 1010 return inflightWrites; 1011 } 1012 1013 public Location getLastAppendLocation() { 1014 return lastAppendLocation.get(); 1015 } 1016 1017 public void setLastAppendLocation(Location lastSyncedLocation) { 1018 this.lastAppendLocation.set(lastSyncedLocation); 1019 } 1020 1021 public File getDirectoryArchive() { 1022 if (!directoryArchiveOverridden && (directoryArchive == null)) { 1023 // create the directoryArchive relative to the journal location 1024 directoryArchive = new File(directory.getAbsolutePath() + 1025 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 1026 } 1027 return directoryArchive; 1028 } 1029 1030 public void setDirectoryArchive(File directoryArchive) { 1031 directoryArchiveOverridden = true; 1032 this.directoryArchive = directoryArchive; 1033 } 1034 1035 public boolean isArchiveDataLogs() { 1036 return archiveDataLogs; 1037 } 1038 1039 public void setArchiveDataLogs(boolean archiveDataLogs) { 1040 this.archiveDataLogs = archiveDataLogs; 1041 } 1042 1043 public DataFile getDataFileById(int dataFileId) { 1044 synchronized (currentDataFile) { 1045 return fileMap.get(Integer.valueOf(dataFileId)); 1046 } 1047 } 1048 1049 public DataFile getCurrentDataFile(int capacity) throws IOException { 1050 //First just acquire the currentDataFile lock and return if no rotation needed 1051 synchronized (currentDataFile) { 1052 if (currentDataFile.get().getLength() + capacity < maxFileLength) { 1053 return currentDataFile.get(); 1054 } 1055 } 1056 1057 //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks 1058 //then re-check if rotation is needed 1059 synchronized (dataFileIdLock) { 1060 synchronized (currentDataFile) { 1061 if (currentDataFile.get().getLength() + capacity >= maxFileLength) { 1062 rotateWriteFile(); 1063 } 1064 return currentDataFile.get(); 1065 } 1066 } 1067 } 1068 1069 public Integer getCurrentDataFileId() { 1070 synchronized (currentDataFile) { 1071 return currentDataFile.get().getDataFileId(); 1072 } 1073 } 1074 1075 /** 1076 * Get a set of files - only valid after start() 1077 * 1078 * @return files currently being used 1079 */ 1080 public Set<File> getFiles() { 1081 synchronized (currentDataFile) { 1082 return fileByFileMap.keySet(); 1083 } 1084 } 1085 1086 public Map<Integer, DataFile> getFileMap() { 1087 synchronized (currentDataFile) { 1088 return new TreeMap<Integer, DataFile>(fileMap); 1089 } 1090 } 1091 1092 public long getDiskSize() { 1093 return totalLength.get(); 1094 } 1095 1096 public void setReplicationTarget(ReplicationTarget replicationTarget) { 1097 this.replicationTarget = replicationTarget; 1098 } 1099 1100 public ReplicationTarget getReplicationTarget() { 1101 return replicationTarget; 1102 } 1103 1104 public String getFileSuffix() { 1105 return fileSuffix; 1106 } 1107 1108 public void setFileSuffix(String fileSuffix) { 1109 this.fileSuffix = fileSuffix; 1110 } 1111 1112 public boolean isChecksum() { 1113 return checksum; 1114 } 1115 1116 public void setChecksum(boolean checksumWrites) { 1117 this.checksum = checksumWrites; 1118 } 1119 1120 public boolean isCheckForCorruptionOnStartup() { 1121 return checkForCorruptionOnStartup; 1122 } 1123 1124 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 1125 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 1126 } 1127 1128 public void setWriteBatchSize(int writeBatchSize) { 1129 this.writeBatchSize = writeBatchSize; 1130 } 1131 1132 public int getWriteBatchSize() { 1133 return writeBatchSize; 1134 } 1135 1136 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 1137 this.totalLength = storeSizeAccumulator; 1138 } 1139 1140 public void setEnableAsyncDiskSync(boolean val) { 1141 this.enableAsyncDiskSync = val; 1142 } 1143 1144 public boolean isEnableAsyncDiskSync() { 1145 return enableAsyncDiskSync; 1146 } 1147 1148 public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { 1149 return journalDiskSyncStrategy; 1150 } 1151 1152 public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { 1153 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 1154 } 1155 1156 public boolean isJournalDiskSyncPeriodic() { 1157 return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); 1158 } 1159 1160 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 1161 this.dataFileRemovedListener = dataFileRemovedListener; 1162 } 1163 1164 public static class WriteCommand extends LinkedNode<WriteCommand> { 1165 public final Location location; 1166 public final ByteSequence data; 1167 final boolean sync; 1168 public final Runnable onComplete; 1169 1170 public WriteCommand(Location location, ByteSequence data, boolean sync) { 1171 this.location = location; 1172 this.data = data; 1173 this.sync = sync; 1174 this.onComplete = null; 1175 } 1176 1177 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 1178 this.location = location; 1179 this.data = data; 1180 this.onComplete = onComplete; 1181 this.sync = false; 1182 } 1183 } 1184 1185 public static class WriteKey { 1186 private final int file; 1187 private final long offset; 1188 private final int hash; 1189 1190 public WriteKey(Location item) { 1191 file = item.getDataFileId(); 1192 offset = item.getOffset(); 1193 // TODO: see if we can build a better hash 1194 hash = (int)(file ^ offset); 1195 } 1196 1197 @Override 1198 public int hashCode() { 1199 return hash; 1200 } 1201 1202 @Override 1203 public boolean equals(Object obj) { 1204 if (obj instanceof WriteKey) { 1205 WriteKey di = (WriteKey)obj; 1206 return di.file == file && di.offset == offset; 1207 } 1208 return false; 1209 } 1210 } 1211}