diff --git a/build.xml b/build.xml index 8016a8b3e01c..23e1c2dcde81 100644 --- a/build.xml +++ b/build.xml @@ -1229,13 +1229,20 @@ - + + + + + + Compressed config: ${compressed_yaml} + + diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 8711625b563e..0ff8b81944d4 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -244,6 +244,17 @@ commitlog_sync_period_in_ms: 10000 # is reasonable. commitlog_segment_size_in_mb: 32 +# Compression to apply to the commit log. If omitted, the commit log +# will be written uncompressed. +#commitlog_compression: +# - class_name: LZ4Compressor +# parameters: +# - + +# Specifies the number of sync threads to use for the commit log. +# Only usable with compression. +#commitlog_sync_threads: 1 + # any class that implements the SeedProvider interface and has a # constructor that takes a Map of parameters will do. seed_provider: diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index bb0744927102..2efa5a257425 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -23,9 +23,9 @@ import java.util.Set; import com.google.common.collect.Sets; + import org.supercsv.io.CsvListReader; import org.supercsv.prefs.CsvPreference; - import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.exceptions.ConfigurationException; @@ -53,7 +53,7 @@ public class Config public Set hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet(); public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour - public SeedProviderDef seed_provider; + public ParametrizedClass seed_provider; public DiskAccessMode disk_access_mode = DiskAccessMode.auto; public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore; @@ -150,8 +150,10 @@ public class Config public CommitLogSync commitlog_sync; public Double commitlog_sync_batch_window_in_ms; public Integer commitlog_sync_period_in_ms; + public int commitlog_sync_threads = 1; public int commitlog_segment_size_in_mb = 32; public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors(); + public ParametrizedClass commitlog_compression; public String endpoint_snitch; public Boolean dynamic_snitch = true; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 68993759e628..012610a408a2 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -189,6 +189,12 @@ else if (conf.commitlog_sync_batch_window_in_ms != null) logger.debug("Syncing log with a period of {}", conf.commitlog_sync_period_in_ms); } + if (conf.commitlog_sync_threads < 1) + throw new ConfigurationException("commitlog_sync_threads must be a positive integer."); + + if (conf.commitlog_compression == null && conf.commitlog_sync_threads > 1) + throw new ConfigurationException("commitlog_sync_threads can only be used when compression is enabled."); + if (conf.commitlog_total_space_in_mb == null) conf.commitlog_total_space_in_mb = hasLargeAddressSpace() ? 8192 : 32; @@ -1068,6 +1074,16 @@ public static String getCommitLogLocation() return conf.commitlog_directory; } + public static ParametrizedClass getCommitLogCompression() + { + return conf.commitlog_compression; + } + + public static int getCommitLogSyncThreadCount() + { + return conf.commitlog_sync_threads; + } + public static int getTombstoneWarnThreshold() { return conf.tombstone_warn_threshold; diff --git a/src/java/org/apache/cassandra/config/SeedProviderDef.java b/src/java/org/apache/cassandra/config/ParametrizedClass.java similarity index 51% rename from src/java/org/apache/cassandra/config/SeedProviderDef.java rename to src/java/org/apache/cassandra/config/ParametrizedClass.java index cbe444af990d..783b3b024a9d 100644 --- a/src/java/org/apache/cassandra/config/SeedProviderDef.java +++ b/src/java/org/apache/cassandra/config/ParametrizedClass.java @@ -21,15 +21,40 @@ import java.util.List; import java.util.Map; +import com.google.common.base.Objects; -public class SeedProviderDef +public class ParametrizedClass { public String class_name; public Map parameters; - public SeedProviderDef(LinkedHashMap p) + public ParametrizedClass(String class_name, Map parameters) { - class_name = (String)p.get("class_name"); - parameters = (Map)((List)p.get("parameters")).get(0); + this.class_name = class_name; + this.parameters = parameters; + } + + @SuppressWarnings("unchecked") + public ParametrizedClass(LinkedHashMap p) + { + this((String)p.get("class_name"), + p.containsKey("parameters") ? (Map)((List)p.get("parameters")).get(0) : null); + } + + @Override + public boolean equals(Object that) + { + return that instanceof ParametrizedClass && equals((ParametrizedClass) that); + } + + public boolean equals(ParametrizedClass that) + { + return Objects.equal(class_name, that.class_name) && Objects.equal(parameters, that.parameters); + } + + @Override + public String toString() + { + return class_name + (parameters == null ? "" : parameters.toString()); } } diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java index 50991f239217..47b91c0491c6 100644 --- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java +++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java @@ -104,7 +104,7 @@ public Config loadConfig(URL url) throws ConfigurationException logConfig(configBytes); org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class); - TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class); + TypeDescription seedDesc = new TypeDescription(ParametrizedClass.class); seedDesc.putMapPropertyType("parameters", String.class, String.class); constructor.addTypeDescription(seedDesc); MissingPropertiesChecker propertiesChecker = new MissingPropertiesChecker(); diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 59bf69129580..adfad2517958 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -17,10 +17,15 @@ */ package org.apache.cassandra.db.commitlog; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.utils.concurrent.WaitQueue; + import org.slf4j.*; -import java.util.concurrent.Semaphore; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -31,11 +36,10 @@ public abstract class AbstractCommitLogService // how often should we log syngs that lag behind our desired period private static final long LAG_REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(5); - private final Thread thread; - private volatile boolean shutdown = false; - - // all Allocations written before this time will be synced - protected volatile long lastSyncedAt = System.currentTimeMillis(); + // all Allocations written before this time are synced. + // Note: this number might jump back once in a while, but will eventually increase past any point in time. + // If a jump does occur, mutations started before both the higher and lower number are synced at that time. + protected volatile long approximateSyncedAt = System.currentTimeMillis(); // counts of total written, and pending, log messages private final AtomicLong written = new AtomicLong(0); @@ -43,9 +47,18 @@ public abstract class AbstractCommitLogService // signal that writers can wait on to be notified of a completed sync protected final WaitQueue syncComplete = new WaitQueue(); - private final Semaphore haveWork = new Semaphore(1); private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class); + private final ScheduledExecutorService executor; + private final Runnable runnable; + + long firstLagAt = 0; + long totalSyncDuration = 0; // total time spent syncing since firstLagAt + long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt + int lagCount = 0; + int syncCount = 0; + + volatile boolean shutdown = false; /** * CommitLogService provides a fsync service for Allocations, fulfilling either the @@ -58,89 +71,61 @@ public abstract class AbstractCommitLogService if (pollIntervalMillis < 1) throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis)); - Runnable runnable = new Runnable() + runnable = new Runnable() { public void run() { - long firstLagAt = 0; - long totalSyncDuration = 0; // total time spent syncing since firstLagAt - long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt - int lagCount = 0; - int syncCount = 0; - - boolean run = true; - while (run) + try { - try + // always run once after shutdown signalled + if (shutdown) + executor.shutdown(); + + // sync and signal + long syncStarted = System.currentTimeMillis(); + commitLog.sync(shutdown); + // This might overwrite a higher number put by another thread. This is not a problem + // as to reach this point the other thread must have waited for our writes to complete. + approximateSyncedAt = syncStarted; + syncComplete.signalAll(); + + + // sleep any time we have left before the next one is due + long now = System.currentTimeMillis(); + long sleep = syncStarted + pollIntervalMillis - now; + if (sleep < 0) { - // always run once after shutdown signalled - run = !shutdown; - - // sync and signal - long syncStarted = System.currentTimeMillis(); - commitLog.sync(shutdown); - lastSyncedAt = syncStarted; - syncComplete.signalAll(); - - - // sleep any time we have left before the next one is due - long now = System.currentTimeMillis(); - long sleep = syncStarted + pollIntervalMillis - now; - if (sleep < 0) + // if we have lagged noticeably, update our lag counter + if (firstLagAt == 0) { - // if we have lagged noticeably, update our lag counter - if (firstLagAt == 0) - { - firstLagAt = now; - totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; - } - syncExceededIntervalBy -= sleep; - lagCount++; - } - syncCount++; - totalSyncDuration += now - syncStarted; - - if (firstLagAt > 0 && now - firstLagAt >= LAG_REPORT_INTERVAL) - { - logger.warn(String.format("Out of %d commit log syncs over the past %ds with average duration of %.2fms, %d have exceeded the configured commit interval by an average of %.2fms", - syncCount, (now - firstLagAt) / 1000, (double) totalSyncDuration / syncCount, lagCount, (double) syncExceededIntervalBy / lagCount)); - firstLagAt = 0; - } - - // if we have lagged this round, we probably have work to do already so we don't sleep - if (sleep < 0 || !run) - continue; - - try - { - haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); + firstLagAt = now; + totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; } + syncExceededIntervalBy -= sleep; + lagCount++; } - catch (Throwable t) - { - if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) - break; + syncCount++; + totalSyncDuration += now - syncStarted; - // sleep for full poll-interval after an error, so we don't spam the log file - try - { - haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } + if (firstLagAt > 0 && now - firstLagAt >= LAG_REPORT_INTERVAL) + { + logger.warn(String.format("Out of %d commit log syncs over the past %ds with average duration of %.2fms, %d have exceeded the configured commit interval by an average of %.2fms", + syncCount, (now - firstLagAt) / 1000, (double) totalSyncDuration / syncCount, lagCount, (double) syncExceededIntervalBy / lagCount)); + firstLagAt = 0; } } + catch (Throwable t) + { + if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) + executor.shutdown(); + } } }; - thread = new Thread(runnable, name); - thread.start(); + int threadCount = DatabaseDescriptor.getCommitLogSyncThreadCount(); + executor = Executors.newScheduledThreadPool(threadCount, new NamedThreadFactory("commit-log-service")); + for (int i=0; i requestExtraSync() { - WaitQueue.Signal signal = syncComplete.register(); - haveWork.release(1); - return signal; + return executor.submit(runnable); } public void shutdown() { shutdown = true; - haveWork.release(1); + requestExtraSync(); } public void awaitTermination() throws InterruptedException { - thread.join(); + executor.awaitTermination(3600, TimeUnit.SECONDS); } public long getCompletedTasks() diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 6d77966c3747..5fcb62de0852 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -30,8 +30,12 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParametrizedClass; import org.apache.cassandra.db.*; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.DataOutputByteBuffer; import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; @@ -59,6 +63,20 @@ public class CommitLog implements CommitLogMBean final CommitLogMetrics metrics; final AbstractCommitLogService executor; + static final ICompressor compressor; + static { + try + { + ParametrizedClass compressionClass = DatabaseDescriptor.getCommitLogCompression(); + compressor = compressionClass != null ? CompressionParameters.createCompressor(compressionClass) : null; + } + catch (ConfigurationException e) + { + logger.error("Fatal configuration error", e); + throw new ExceptionInInitializerError(e); + } + } + private CommitLog() { DatabaseDescriptor.createAllDirectories(); diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index 2795cae652c7..47c77ca253d5 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -32,10 +32,11 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,7 +167,7 @@ public void maybeRestoreArchive() CommitLogDescriptor descriptor; if (fromHeader == null && fromName == null) throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath()); - else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName)) + else if (fromHeader != null && fromName != null && !fromHeader.equalsIgnoringCompression(fromName)) throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath())); else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21) throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath()); @@ -174,9 +175,20 @@ else if (fromHeader != null) descriptor = fromHeader; else descriptor = fromName; - if (descriptor.version > CommitLogDescriptor.VERSION_21) + if (descriptor.version > CommitLogDescriptor.VERSION_22) throw new IllegalStateException("Unsupported commit log version: " + descriptor.version); + if (descriptor.compression != null) { + try + { + CompressionParameters.createCompressor(descriptor.compression); + } + catch (ConfigurationException e) + { + throw new IllegalStateException("Unknown compression", e); + } + } + File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName()); if (toFile.exists()) throw new IllegalStateException("Trying to restore archive " + fromFile.getPath() + ", but the same segment already exists in the restore location: " + toFile.getPath()); diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index e50a5853b497..1b1024eaedd2 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -20,17 +20,25 @@ */ package org.apache.cassandra.db.commitlog; +import java.io.DataInput; import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParametrizedClass; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.PureJavaCrc32; +import org.json.simple.JSONValue; + +import com.google.common.base.Objects; public class CommitLogDescriptor { @@ -43,38 +51,56 @@ public class CommitLogDescriptor public static final int VERSION_12 = 2; public static final int VERSION_20 = 3; public static final int VERSION_21 = 4; + public static final int VERSION_22 = 5; /** * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. * Note: make sure to handle {@link #getMessagingVersion()} */ - public static final int current_version = VERSION_21; - - // [version, id, checksum] - static final int HEADER_SIZE = 4 + 8 + 4; + public static final int current_version = VERSION_22; final int version; public final long id; + public final ParametrizedClass compression; - public CommitLogDescriptor(int version, long id) + public CommitLogDescriptor(int version, long id, ParametrizedClass compression) { this.version = version; this.id = id; + this.compression = compression; } public CommitLogDescriptor(long id) { - this(current_version, id); + this(current_version, id, DatabaseDescriptor.getCommitLogCompression()); } - static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) + public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) { - out.putInt(0, descriptor.version); - out.putLong(4, descriptor.id); PureJavaCrc32 crc = new PureJavaCrc32(); + out.putInt(descriptor.version); crc.updateInt(descriptor.version); + out.putLong(descriptor.id); crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL)); crc.updateInt((int) (descriptor.id >>> 32)); - out.putInt(12, crc.getCrc()); + String compressionString = constructCompressionString(descriptor.compression); + if (descriptor.version >= VERSION_22) { + byte[] compressionBytes = compressionString.getBytes(StandardCharsets.UTF_8); + out.putShort((short) compressionBytes.length); + crc.updateInt(compressionBytes.length); + out.put(compressionBytes); + crc.update(compressionBytes, 0, compressionBytes.length); + } + out.putInt(crc.getCrc()); + } + + private static String constructCompressionString(ParametrizedClass compression) + { + if (compression == null) + return ""; + String params = ""; + if (compression.parameters != null) + params = JSONValue.toJSONString(compression.parameters); + return compression.class_name + params; } public static CommitLogDescriptor fromHeader(File file) @@ -82,16 +108,7 @@ public static CommitLogDescriptor fromHeader(File file) try (RandomAccessFile raf = new RandomAccessFile(file, "r")) { assert raf.getFilePointer() == 0; - int version = raf.readInt(); - long id = raf.readLong(); - int crc = raf.readInt(); - PureJavaCrc32 checkcrc = new PureJavaCrc32(); - checkcrc.updateInt(version); - checkcrc.updateInt((int) (id & 0xFFFFFFFFL)); - checkcrc.updateInt((int) (id >>> 32)); - if (crc == checkcrc.getCrc()) - return new CommitLogDescriptor(version, id); - return null; + return readHeader(raf); } catch (EOFException e) { @@ -103,6 +120,45 @@ public static CommitLogDescriptor fromHeader(File file) } } + public static CommitLogDescriptor readHeader(DataInput input) throws IOException + { + PureJavaCrc32 checkcrc = new PureJavaCrc32(); + int version = input.readInt(); + checkcrc.updateInt(version); + long id = input.readLong(); + checkcrc.updateInt((int) (id & 0xFFFFFFFFL)); + checkcrc.updateInt((int) (id >>> 32)); + int compressionLength = 0; + if (version >= VERSION_22) { + compressionLength = input.readShort() & 0xFFFF; + checkcrc.updateInt(compressionLength); + } + // This should always succeed as compressionLength cannot be too long even for a + // corrupt segment file. + byte[] compressionBytes = new byte[compressionLength]; + input.readFully(compressionBytes); + checkcrc.update(compressionBytes, 0, compressionBytes.length); + int crc = input.readInt(); + if (crc == checkcrc.getCrc()) + return new CommitLogDescriptor(version, id, + parseCompressionString(new String(compressionBytes, StandardCharsets.UTF_8))); + return null; + } + + @SuppressWarnings("unchecked") + private static ParametrizedClass parseCompressionString(String string) + { + if (string.isEmpty()) + return null; + int split = string.indexOf('{'); + if (split < 0) { + return new ParametrizedClass(string, null); + } + String className = string.substring(0, split); + String optionsString = string.substring(split); + return new ParametrizedClass(className, (Map) JSONValue.parse(optionsString)); + } + public static CommitLogDescriptor fromFileName(String name) { Matcher matcher; @@ -113,7 +169,7 @@ public static CommitLogDescriptor fromFileName(String name) throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first"); long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]); - return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id); + return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null); } public int getMessagingVersion() @@ -126,6 +182,8 @@ public int getMessagingVersion() return MessagingService.VERSION_20; case VERSION_21: return MessagingService.VERSION_21; + case VERSION_22: + return MessagingService.VERSION_21; default: throw new IllegalStateException("Unknown commitlog version " + version); } @@ -147,7 +205,7 @@ public static boolean isValid(String filename) public String toString() { - return "(" + version + "," + id + ")"; + return "(" + version + "," + id + (compression != null ? "," + compression : "") + ")"; } public boolean equals(Object that) @@ -155,9 +213,14 @@ public boolean equals(Object that) return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that); } - public boolean equals(CommitLogDescriptor that) + public boolean equalsIgnoringCompression(CommitLogDescriptor that) { return this.version == that.version && this.id == that.id; } + public boolean equals(CommitLogDescriptor that) + { + return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression); + } + } diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 1012829bdff6..7faa7c209771 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -18,30 +18,59 @@ */ package org.apache.cassandra.db.commitlog; -import java.io.*; -import java.util.*; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.base.Predicate; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.common.collect.Ordering; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ColumnSerializer; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.UnknownColumnFamilyException; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.util.ByteBufferDataInput; import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.utils.*; - +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.PureJavaCrc32; +import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.commons.lang3.StringUtils; import org.cliffc.high_scale_lib.NonBlockingHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Predicate; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; public class CommitLogReplayer { @@ -57,12 +86,14 @@ public class CommitLogReplayer private final ReplayPosition globalPosition; private final PureJavaCrc32 checksum; private byte[] buffer; + private byte[] uncompressedBuffer; public CommitLogReplayer() { this.keyspacesRecovered = new NonBlockingHashSet(); this.futures = new ArrayList>(); this.buffer = new byte[4096]; + this.uncompressedBuffer = new byte[4096]; this.invalidMutations = new HashMap(); // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. this.replayedCount = new AtomicInteger(); @@ -148,17 +179,22 @@ else if (end < offset || end > reader.length()) return end; } - private int getStartOffset(long segmentId, int version) + private int getStartOffset(long segmentId, int version, int headerSize) { if (globalPosition.segment < segmentId) { if (version >= CommitLogDescriptor.VERSION_21) - return CommitLogDescriptor.HEADER_SIZE + CommitLogSegment.SYNC_MARKER_SIZE; + return headerSize + CommitLogSegment.SYNC_MARKER_SIZE; else return 0; } else if (globalPosition.segment == segmentId) - return globalPosition.position; + { + if (version >= CommitLogDescriptor.VERSION_22) + return headerSize + CommitLogSegment.SYNC_MARKER_SIZE; + else + return globalPosition.position; + } else return -1; } @@ -231,27 +267,59 @@ public void recover(File file) throws IOException logger.info("Replaying {}", file.getPath()); CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); final long segmentId = desc.id; - logger.info("Replaying {} (CL version {}, messaging version {})", - file.getPath(), - desc.version, - desc.getMessagingVersion()); RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); - try { + int headerSize = 0; + if (desc.version >= CommitLogDescriptor.VERSION_21) + { + try + { + desc = CommitLogDescriptor.readHeader(reader); + headerSize = (int) reader.getFilePointer(); + } + catch (IOException e) + { + desc = null; + } + if (desc == null) { + logger.warn("Could not read commit log descriptor in file {}", file); + return; + } + } + logger.info("Replaying {} (CL version {}, messaging version {}, compression {})", + file.getPath(), + desc.version, + desc.getMessagingVersion(), + desc.compression); + assert segmentId == desc.id; + ICompressor compressor = null; + if (desc.compression != null) + { + try + { + compressor = CompressionParameters.createCompressor(desc.compression); + } + catch (ConfigurationException e) + { + logger.warn("Unknown compression: {}", e.getMessage()); + return; + } + } + assert reader.length() <= Integer.MAX_VALUE; - int offset = getStartOffset(segmentId, desc.version); + int offset = getStartOffset(segmentId, desc.version, headerSize); if (offset < 0) { logger.debug("skipping replay of fully-flushed {}", file); return; } - int prevEnd = CommitLogDescriptor.HEADER_SIZE; - main: while (true) + int uncompressedPos = headerSize + CommitLogSegment.SYNC_MARKER_SIZE; + int end = headerSize; + for (; true; offset = end + CommitLogSegment.SYNC_MARKER_SIZE) { - - int end = prevEnd; + int prevEnd = end; if (desc.version < CommitLogDescriptor.VERSION_21) end = Integer.MAX_VALUE; else @@ -268,177 +336,214 @@ public void recover(File file) throws IOException reader.seek(offset); - /* read the logs populate Mutation and apply */ - while (reader.getPosition() < end && !reader.isEOF()) + FileDataInput sectionReader = reader; + int sectionEnd = end; + if (compressor != null) { if (logger.isDebugEnabled()) - logger.debug("Reading mutation at {}", reader.getFilePointer()); - - long claimedCRC32; - int serializedSize; - try - { - // any of the reads may hit EOF - serializedSize = reader.readInt(); - if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) - { - logger.debug("Encountered end of segment marker at {}", reader.getFilePointer()); - break main; - } - - // Mutation must be at LEAST 10 bytes: - // 3 each for a non-empty Keyspace and Key (including the - // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. - // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 - if (serializedSize < 10) - break main; - - long claimedSizeChecksum; - if (desc.version < CommitLogDescriptor.VERSION_21) - claimedSizeChecksum = reader.readLong(); - else - claimedSizeChecksum = reader.readInt() & 0xffffffffL; - checksum.reset(); - if (desc.version < CommitLogDescriptor.VERSION_20) - checksum.update(serializedSize); - else - checksum.updateInt(serializedSize); - - if (checksum.getValue() != claimedSizeChecksum) - break main; // entry wasn't synced correctly/fully. that's - // ok. - - if (serializedSize > buffer.length) - buffer = new byte[(int) (1.2 * serializedSize)]; - reader.readFully(buffer, 0, serializedSize); - if (desc.version < CommitLogDescriptor.VERSION_21) - claimedCRC32 = reader.readLong(); - else - claimedCRC32 = reader.readInt() & 0xffffffffL; - } - catch (EOFException eof) - { - break main; // last CL entry didn't get completely written. that's ok. - } - - checksum.update(buffer, 0, serializedSize); - if (claimedCRC32 != checksum.getValue()) - { - // this entry must not have been fsynced. probably the rest is bad too, - // but just in case there is no harm in trying them (since we still read on an entry boundary) + logger.debug("Decompressing {} between {} and {}", file, offset, end); + int uncompressedLength = reader.readInt(); + int compressedLength = end - offset - 4; + int sectionStart = uncompressedPos; + uncompressedPos += uncompressedLength + CommitLogSegment.SYNC_MARKER_SIZE; + if (segmentId == globalPosition.segment && uncompressedPos < globalPosition.position) + // Skip over flushed section. continue; - } + if (compressedLength > buffer.length) + buffer = new byte[(int) (1.2 * compressedLength)]; + reader.readFully(buffer, 0, compressedLength); + if (uncompressedLength > uncompressedBuffer.length) + uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)]; + compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0); + sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), sectionStart, 0); + sectionEnd = sectionStart + uncompressedLength; + } - /* deserialize the commit log entry */ - FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize); - final Mutation mutation; - try - { - mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), - desc.getMessagingVersion(), - ColumnSerializer.Flag.LOCAL); - // doublecheck that what we read is [still] valid for the current schema - for (ColumnFamily cf : mutation.getColumnFamilies()) - for (Cell cell : cf) - cf.getComparator().validate(cell.name()); - } - catch (UnknownColumnFamilyException ex) - { - if (ex.cfId == null) - continue; - AtomicInteger i = invalidMutations.get(ex.cfId); - if (i == null) - { - i = new AtomicInteger(1); - invalidMutations.put(ex.cfId, i); - } - else - i.incrementAndGet(); - continue; - } - catch (Throwable t) - { - File f = File.createTempFile("mutation", "dat"); - DataOutputStream out = new DataOutputStream(new FileOutputStream(f)); - try - { - out.write(buffer, 0, serializedSize); - } - finally - { - out.close(); - } - String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", - f.getAbsolutePath()); - logger.error(st, t); - continue; - } + if (!replaySyncSection(sectionReader, sectionEnd, desc, replayFilter)) + break; + } + } + finally + { + FileUtils.closeQuietly(reader); + logger.info("Finished reading {}", file); + } + } - if (logger.isDebugEnabled()) - logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}"); + /** + * Replays a sync section containing a list of mutations. + * + * @return Whether replay should continue with the next section. + */ + private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, + final ReplayFilter replayFilter) throws IOException, FileNotFoundException + { + /* read the logs populate Mutation and apply */ + while (reader.getFilePointer() < end && !reader.isEOF()) + { + if (logger.isDebugEnabled()) + logger.debug("Reading mutation at {}", reader.getFilePointer()); - final long entryLocation = reader.getFilePointer(); - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws IOException - { - if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) - return; - if (pointInTimeExceeded(mutation)) - return; - - final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); - - // Rebuild the mutation, omitting column families that - // a) the user has requested that we ignore, - // b) have already been flushed, - // or c) are part of a cf that was dropped. - // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. - Mutation newMutation = null; - for (ColumnFamily columnFamily : replayFilter.filter(mutation)) - { - if (Schema.instance.getCF(columnFamily.id()) == null) - continue; // dropped - - ReplayPosition rp = cfPositions.get(columnFamily.id()); - - // replay if current segment is newer than last flushed one or, - // if it is the last known segment, if we are after the replay position - if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position)) - { - if (newMutation == null) - newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); - newMutation.add(columnFamily); - replayedCount.incrementAndGet(); - } - } - if (newMutation != null) - { - assert !newMutation.isEmpty(); - Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false); - keyspacesRecovered.add(keyspace); - } - } - }; - futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); - if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) - { - FBUtilities.waitOnFutures(futures); - futures.clear(); - } + long claimedCRC32; + int serializedSize; + try + { + // any of the reads may hit EOF + serializedSize = reader.readInt(); + if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) + { + logger.debug("Encountered end of segment marker at {}", reader.getFilePointer()); + return false; } + // Mutation must be at LEAST 10 bytes: + // 3 each for a non-empty Keyspace and Key (including the + // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. + // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 + if (serializedSize < 10) + return false; + + long claimedSizeChecksum; if (desc.version < CommitLogDescriptor.VERSION_21) - break; + claimedSizeChecksum = reader.readLong(); + else + claimedSizeChecksum = reader.readInt() & 0xffffffffL; + checksum.reset(); + if (desc.version < CommitLogDescriptor.VERSION_20) + checksum.update(serializedSize); + else + checksum.updateInt(serializedSize); + + if (checksum.getValue() != claimedSizeChecksum) + return false; + // ok. - offset = end + CommitLogSegment.SYNC_MARKER_SIZE; - prevEnd = end; + if (serializedSize > buffer.length) + buffer = new byte[(int) (1.2 * serializedSize)]; + reader.readFully(buffer, 0, serializedSize); + if (desc.version < CommitLogDescriptor.VERSION_21) + claimedCRC32 = reader.readLong(); + else + claimedCRC32 = reader.readInt() & 0xffffffffL; + } + catch (EOFException eof) + { + return false; // last CL entry didn't get completely written. that's ok. } + + checksum.update(buffer, 0, serializedSize); + if (claimedCRC32 != checksum.getValue()) + { + // this entry must not have been fsynced. probably the rest is bad too, + // but just in case there is no harm in trying them (since we still read on an entry boundary) + continue; + } + replayMutation(buffer, serializedSize, reader.getFilePointer(), desc, replayFilter); } - finally + return desc.version >= CommitLogDescriptor.VERSION_21; + } + + /** + * Deserializes and replays a commit log entry. + */ + private void replayMutation(byte[] inputBuffer, int size, + final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter) throws IOException, + FileNotFoundException + { + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + final Mutation mutation; + try { - FileUtils.closeQuietly(reader); - logger.info("Finished reading {}", file); + mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), + desc.getMessagingVersion(), + ColumnSerializer.Flag.LOCAL); + // doublecheck that what we read is [still] valid for the current schema + for (ColumnFamily cf : mutation.getColumnFamilies()) + for (Cell cell : cf) + cf.getComparator().validate(cell.name()); + } + catch (UnknownColumnFamilyException ex) + { + if (ex.cfId == null) + return; + AtomicInteger i = invalidMutations.get(ex.cfId); + if (i == null) + { + i = new AtomicInteger(1); + invalidMutations.put(ex.cfId, i); + } + else + i.incrementAndGet(); + return; + } + catch (Throwable t) + { + File f = File.createTempFile("mutation", "dat"); + DataOutputStream out = new DataOutputStream(new FileOutputStream(f)); + try + { + out.write(inputBuffer, 0, size); + } + finally + { + out.close(); + } + String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", + f.getAbsolutePath()); + logger.error(st, t); + return; + } + + if (logger.isDebugEnabled()) + logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}"); + + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws IOException + { + if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + return; + if (pointInTimeExceeded(mutation)) + return; + + final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + + // Rebuild the mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + Mutation newMutation = null; + for (ColumnFamily columnFamily : replayFilter.filter(mutation)) + { + if (Schema.instance.getCF(columnFamily.id()) == null) + continue; // dropped + + ReplayPosition rp = cfPositions.get(columnFamily.id()); + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the replay position + if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position)) + { + if (newMutation == null) + newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); + newMutation.add(columnFamily); + replayedCount.incrementAndGet(); + } + } + if (newMutation != null) + { + assert !newMutation.isEmpty(); + Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false); + keyspacesRecovered.add(keyspace); + } + } + }; + futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); + if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) + { + FBUtilities.waitOnFutures(futures); + futures.clear(); } } diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 0a1c844616ac..61dabc145ec2 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -34,10 +32,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import org.cliffc.high_scale_lib.NonBlockingHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; @@ -49,13 +43,16 @@ import org.apache.cassandra.utils.PureJavaCrc32; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; +import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * A single commit log file on disk. Manages creation of the file and writing mutations to disk, * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary. */ -public class CommitLogSegment +public abstract class CommitLogSegment { private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); @@ -87,10 +84,8 @@ public class CommitLogSegment // each sync are reserved, and point forwards to the next such offset. The final // sync marker in a segment will be zeroed out, or point to EOF. private volatile int lastSyncedOffset; - - // the amount of the tail of the file we have allocated but not used - this is used when we discard a log segment - // to ensure nobody writes to it after we've decided we're done with it - private int discardedTailFrom; + // The position where sync was initiated last. + private volatile int lastSyncStartedOffset; // a signal for writers to wait on to confirm the log message they provided has been written to disk private final WaitQueue syncComplete = new WaitQueue(); @@ -103,11 +98,11 @@ public class CommitLogSegment public final long id; - private final File logFile; - private final RandomAccessFile logFileAccessor; - private final int fd; - - private final MappedByteBuffer buffer; + protected RandomAccessFile logFileAccessor; + int fd; + protected ByteBuffer buffer; + protected int bufferSize; + protected final File logFile; public final CommitLogDescriptor descriptor; @@ -116,7 +111,12 @@ public class CommitLogSegment */ static CommitLogSegment freshSegment() { - return new CommitLogSegment(null); + return createSegment(null); + } + + static CommitLogSegment createSegment(String reusePath) + { + return CommitLog.compressor != null ? new CompressedSegment(reusePath) : new MemoryMappedSegment(reusePath); } static long getNextId() @@ -134,50 +134,36 @@ static long getNextId() id = getNextId(); descriptor = new CommitLogDescriptor(id); logFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName()); - boolean isCreating = true; + if (filePath != null) + recycleFile(filePath); try { - if (filePath != null) - { - File oldFile = new File(filePath); - - if (oldFile.exists()) - { - logger.debug("Re-using discarded CommitLog segment for {} from {}", id, filePath); - if (!oldFile.renameTo(logFile)) - throw new IOException("Rename from " + filePath + " to " + id + " failed"); - isCreating = false; - } - } - // Open the initial the segment file logFileAccessor = new RandomAccessFile(logFile, "rw"); - if (isCreating) - logger.debug("Creating new commit log segment {}", logFile.getPath()); - - // Map the segment, extending or truncating it to the standard segment size. - // (We may have restarted after a segment size configuration change, leaving "incorrectly" - // sized segments on disk.) - logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize()); fd = CLibrary.getfd(logFileAccessor.getFD()); - - buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize()); - // write the header - CommitLogDescriptor.writeHeader(buffer, descriptor); - // mark the initial sync marker as uninitialised - buffer.putInt(CommitLogDescriptor.HEADER_SIZE, 0); - buffer.putLong(CommitLogDescriptor.HEADER_SIZE + 4, 0); - allocatePosition.set(CommitLogDescriptor.HEADER_SIZE + SYNC_MARKER_SIZE); - lastSyncedOffset = CommitLogDescriptor.HEADER_SIZE; } catch (IOException e) { throw new FSWriteError(e, logFile); } + + buffer = createBuffer(); + bufferSize = buffer.capacity(); + // write the header + CommitLogDescriptor.writeHeader(buffer, descriptor); + lastSyncedOffset = lastSyncStartedOffset = buffer.position(); + // mark the initial sync marker as uninitialised + buffer.putInt(lastSyncedOffset + 0, 0); + buffer.putInt(lastSyncedOffset + 4, 0); + allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE); } + abstract void recycleFile(String filePath); + + abstract ByteBuffer createBuffer(); + /** * Allocate space in this buffer for the provided mutation, and return the allocated Allocation object. * Returns null if there is not enough space in this segment, and a new segment is needed. @@ -210,7 +196,7 @@ private int allocate(int size) { int prev = allocatePosition.get(); int next = prev + size; - if (next >= buffer.capacity()) + if (next >= bufferSize) return -1; if (allocatePosition.compareAndSet(prev, next)) return prev; @@ -228,14 +214,9 @@ void discardUnusedTail() { while (true) { - int prev = allocatePosition.get(); - // we set allocatePosition past buffer.capacity() to make sure we always set discardedTailFrom - int next = buffer.capacity() + 1; - if (prev == next) - return; - if (allocatePosition.compareAndSet(prev, next)) + bufferSize = allocatePosition.get(); + if (allocatePosition.compareAndSet(bufferSize, bufferSize + 1)) { - discardedTailFrom = prev; return; } } @@ -254,85 +235,92 @@ void waitForModifications() /** * Forces a disk flush for this segment file. */ - synchronized void sync() + void sync() { - try + int startMarker; + int nextMarker; + boolean close = false; + + synchronized (this) { + startMarker = lastSyncStartedOffset; + // check we have more work to do - if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE) - return; - - // allocate a new sync marker; this is both necessary in itself, but also serves to demarcate - // the point at which we can safely consider records to have been completely written to - int nextMarker; - nextMarker = allocate(SYNC_MARKER_SIZE); - boolean close = false; - if (nextMarker < 0) + if (allocatePosition.get() > startMarker + SYNC_MARKER_SIZE) { - // ensure no more of this CLS is writeable, and mark ourselves for closing - discardUnusedTail(); - close = true; - - // wait for modifications guards both discardedTailFrom, and any outstanding appends - waitForModifications(); - - if (discardedTailFrom < buffer.capacity() - SYNC_MARKER_SIZE) + // allocate a new sync marker; this is both necessary in itself, but also serves to demarcate + // the point at which we can safely consider records to have been completely written to + nextMarker = allocate(SYNC_MARKER_SIZE); + if (nextMarker < 0) { - // if there's room in the discard section to write an empty header, use that as the nextMarker - nextMarker = discardedTailFrom; + // ensure no more of this CLS is writeable, and mark ourselves for closing + discardUnusedTail(); + close = true; + + // wait for modifications guards both discardedTailFrom, and any outstanding appends + waitForModifications(); + + if (bufferSize < buffer.capacity() - SYNC_MARKER_SIZE) + { + // if there's room in the discard section to write an empty header, use that as the nextMarker + nextMarker = bufferSize; + } + else + { + // not enough space left in the buffer, so mark the next sync marker as the EOF position + nextMarker = buffer.capacity(); + } + + // zero out the next sync marker so replayer can cleanly exit + if (nextMarker < buffer.capacity()) + { + buffer.putInt(nextMarker, 0); + buffer.putInt(nextMarker + 4, 0); + } } else { - // not enough space left in the buffer, so mark the next sync marker as the EOF position - nextMarker = buffer.capacity(); + waitForModifications(); } - } - else - { - waitForModifications(); - } - - assert nextMarker > lastSyncedOffset; - - // write previous sync marker to point to next sync marker - // we don't chain the crcs here to ensure this method is idempotent if it fails - int offset = lastSyncedOffset; - final PureJavaCrc32 crc = new PureJavaCrc32(); - crc.updateInt((int) (id & 0xFFFFFFFFL)); - crc.updateInt((int) (id >>> 32)); - crc.updateInt(offset); - buffer.putInt(offset, nextMarker); - buffer.putInt(offset + 4, crc.getCrc()); - - // zero out the next sync marker so replayer can cleanly exit - if (nextMarker < buffer.capacity()) - { - buffer.putInt(nextMarker, 0); - buffer.putInt(nextMarker + 4, 0); - } - - // actually perform the sync and signal those waiting for it - buffer.force(); - + + lastSyncStartedOffset = nextMarker; + } else + // Nothing needs to be done, but we must still wait for outstanding writes to complete. + nextMarker = startMarker; + } + + + if (nextMarker > startMarker) + { + // Perform compression, writing to file and flush. Some or all of this may execute in parallel. + write(startMarker, nextMarker); + + // Signal the sync as complete. This section needs to be executed only after any previous syncs have finished. + waitForSync(startMarker); + assert lastSyncedOffset == startMarker; if (close) - nextMarker = buffer.capacity(); - + close(); lastSyncedOffset = nextMarker; syncComplete.signalAll(); + } else + waitForSync(startMarker); + } - CLibrary.trySkipCache(fd, offset, nextMarker); - if (close) - close(); - } - catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it - { - throw new FSWriteError(e, getPath()); - } + protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker) + { + final PureJavaCrc32 crc = new PureJavaCrc32(); + crc.updateInt((int) (id & 0xFFFFFFFFL)); + crc.updateInt((int) (id >>> 32)); + crc.updateInt(filePos); + buffer.putInt(offset, nextMarker); + buffer.putInt(offset + 4, crc.getCrc()); } + abstract void write(int lastSyncedOffset, int nextMarker); + public boolean isStillAllocating() { - return allocatePosition.get() < buffer.capacity(); + return allocatePosition.get() < bufferSize; } /** @@ -348,22 +336,7 @@ void delete() * * @return a new CommitLogSegment representing the newly reusable segment. */ - CommitLogSegment recycle() - { - try - { - sync(); - } - catch (FSWriteError e) - { - logger.error("I/O error flushing {} {}", this, e.getMessage()); - throw e; - } - - close(); - - return new CommitLogSegment(getPath()); - } + abstract CommitLogSegment recycle(); /** * @return the current ReplayPosition for this log segment @@ -394,7 +367,7 @@ void waitForFinalSync() while (true) { WaitQueue.Signal signal = syncComplete.register(); - if (lastSyncedOffset < buffer.capacity()) + if (lastSyncedOffset < bufferSize) { signal.awaitUninterruptibly(); } @@ -406,6 +379,18 @@ void waitForFinalSync() } } + void waitForSync(int position) + { + while (lastSyncedOffset < position) + { + WaitQueue.Signal signal = syncComplete.register(); + if (lastSyncedOffset < position) + signal.awaitUninterruptibly(); + else + signal.cancel(); + } + } + /** * Close the segment file. */ @@ -413,8 +398,6 @@ void close() { try { - if (FileUtils.isCleanerAvailable()) - FileUtils.clean(buffer); logFileAccessor.close(); } catch (IOException e) @@ -629,4 +612,5 @@ public ReplayPosition getReplayPosition() } } + } diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 05c4b9d04646..b099dcd13099 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -393,7 +393,7 @@ void recycleSegment(final File file) { public CommitLogSegment call() { - return new CommitLogSegment(file.getPath()); + return CommitLogSegment.createSegment(file.getPath()); } }); } diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java new file mode 100644 index 000000000000..2e9d8ec6927a --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.ICompressor.WrappedArray; +import org.apache.cassandra.io.util.FileUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * A single commit log file on disk. Manages creation of the file and writing mutations to disk, + * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment + * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary. + */ +public class CompressedSegment extends CommitLogSegment +{ + private static final Logger logger = LoggerFactory.getLogger(CompressedSegment.class); + + private final FileChannel channel; + + static private final ThreadLocal compressedArrayHolder = new ThreadLocal() { + protected WrappedArray initialValue() + { + return new WrappedArray(new byte[1024]); + } + }; + static private final ThreadLocal compressedBufferHolder = new ThreadLocal() { + protected ByteBuffer initialValue() + { + return ByteBuffer.wrap(compressedArrayHolder.get().buffer); + } + }; + + static Queue bufferPool = new ConcurrentLinkedQueue<>(); + + static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4; + + /** + * Constructs a new segment file. + * + * @param filePath if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE. + */ + CompressedSegment(String filePath) + { + super(filePath); + try + { + channel = logFileAccessor.getChannel(); + channel.write((ByteBuffer) buffer.duplicate().flip()); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + } + + void recycleFile(String filePath) + { + File oldFile = new File(filePath); + + if (oldFile.exists()) + { + logger.debug("Deleting old CommitLog segment {}", filePath); + FileUtils.deleteWithConfirm(oldFile); + } + } + + ByteBuffer createBuffer() + { + ByteBuffer buf = bufferPool.poll(); + if (buf == null) + buf = ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()); + else + buf.clear(); + return buf; + } + + static long startMillis = System.currentTimeMillis(); + + @Override + void write(int startMarker, int nextMarker) + { + int contentStart = startMarker + SYNC_MARKER_SIZE; + int length = nextMarker - contentStart; + assert length > 0; + + try { + + int compressedLength = CommitLog.compressor.initialCompressedBufferLength(length); + WrappedArray compressedArray = compressedArrayHolder.get(); + if (compressedArray.buffer.length < compressedLength + COMPRESSED_MARKER_SIZE) + { + compressedArray.buffer = new byte[compressedLength + COMPRESSED_MARKER_SIZE]; + } + + compressedLength = CommitLog.compressor.compress(buffer.array(), buffer.arrayOffset() + contentStart, length, compressedArray, COMPRESSED_MARKER_SIZE); + + ByteBuffer compressedBuffer = compressedBufferHolder.get(); + if (compressedBuffer.array() != compressedArray.buffer) + { + compressedBuffer = ByteBuffer.wrap(compressedArray.buffer); + compressedBufferHolder.set(compressedBuffer); + } + compressedBuffer.position(0); + compressedBuffer.limit(COMPRESSED_MARKER_SIZE + compressedLength); + compressedBuffer.putInt(SYNC_MARKER_SIZE, length); + + // Only write after the previous write has completed. + waitForSync(startMarker); + + // Only one thread can be here at a given time. + writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining()); + channel.write(compressedBuffer); + channel.force(true); + } + catch (Exception e) + { + throw new FSWriteError(e, getPath()); + } + } + + /** + * Recycle processes an unneeded segment file for reuse. + * + * @return a new CommitLogSegment representing the newly reusable segment. + */ + CompressedSegment recycle() + { + // Run a sync to complete any outstanding writes. + sync(); + + close(); + return new CompressedSegment(getPath()); + } + + + @Override + void close() + { + super.close(); + synchronized (this) + { + if (buffer != null) + bufferPool.add(buffer); + buffer = null; + } + } + +} diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java new file mode 100644 index 000000000000..9365b50144f2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.CLibrary; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * A single commit log file on disk. Manages creation of the file and writing mutations to disk, + * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment + * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary. + */ +public class MemoryMappedSegment extends CommitLogSegment +{ + private static final Logger logger = LoggerFactory.getLogger(MemoryMappedSegment.class); + + private MappedByteBuffer mappedBuffer; + + /** + * Constructs a new segment file. + * + * @param filePath if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE. + */ + MemoryMappedSegment(String filePath) + { + super(filePath); + } + + void recycleFile(String filePath) + { + File oldFile = new File(filePath); + + if (oldFile.exists()) + { + logger.debug("Re-using discarded CommitLog segment for {} from {}", id, filePath); + if (!oldFile.renameTo(logFile)) + throw new FSWriteError(new IOException("Rename from " + filePath + " to " + id + " failed"), filePath); + } else { + logger.debug("Creating new commit log segment {}", logFile.getPath()); + } + } + + ByteBuffer createBuffer() + { + try + { + // Map the segment, extending or truncating it to the standard segment size. + // (We may have restarted after a segment size configuration change, leaving "incorrectly" + // sized segments on disk.) + logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize()); + mappedBuffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize()); + return mappedBuffer; + } + catch (IOException e) + { + throw new FSWriteError(e, logFile); + } + } + + @Override + void write(int startMarker, int nextMarker) + { + // write previous sync marker to point to next sync marker + // we don't chain the crcs here to ensure this method is idempotent if it fails + writeSyncMarker(buffer, startMarker, startMarker, nextMarker); + + try { + // actually perform the sync and signal those waiting for it + mappedBuffer.force(); + } + catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it + { + throw new FSWriteError(e, getPath()); + } + CLibrary.trySkipCache(fd, startMarker, nextMarker); + } + + /** + * Recycle processes an unneeded segment file for reuse. + * + * @return a new CommitLogSegment representing the newly reusable segment. + */ + MemoryMappedSegment recycle() + { + try + { + sync(); + } + catch (FSWriteError e) + { + logger.error("I/O error flushing {} {}", this, e.getMessage()); + throw e; + } + + close(); + + return new MemoryMappedSegment(getPath()); + } + + @Override + void close() + { + if (FileUtils.isCleanerAvailable()) + FileUtils.clean(mappedBuffer); + super.close(); + } +} diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java index 14bb367081ea..5a91f09c4d97 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java @@ -54,6 +54,6 @@ protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) */ private boolean waitForSyncToCatchUp(long started) { - return started > lastSyncedAt + blockWhenSyncLagsMillis; + return started > approximateSyncedAt + blockWhenSyncLagsMillis; } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java index 3ad087998373..5126b031bd93 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java @@ -29,10 +29,11 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; + import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; - import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.config.ParametrizedClass; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; @@ -137,7 +138,7 @@ public int chunkLength() return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength; } - private static Class parseCompressorClass(String className) throws ConfigurationException + private static Class parseCompressorClass(String className) throws ConfigurationException { if (className == null || className.isEmpty()) return null; @@ -145,7 +146,7 @@ private static Class parseCompressorClass(String classNam className = className.contains(".") ? className : "org.apache.cassandra.io.compress." + className; try { - return (Class)Class.forName(className); + return Class.forName(className); } catch (Exception e) { @@ -153,7 +154,7 @@ private static Class parseCompressorClass(String classNam } } - private static ICompressor createCompressor(Class compressorClass, Map compressionOptions) throws ConfigurationException + private static ICompressor createCompressor(Class compressorClass, Map compressionOptions) throws ConfigurationException { if (compressorClass == null) { @@ -199,6 +200,10 @@ private static ICompressor createCompressor(Class compres } } + public static ICompressor createCompressor(ParametrizedClass compression) throws ConfigurationException { + return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters)); + } + private static Map copyOptions(Map co) { if (co == null || co.isEmpty()) diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java similarity index 90% rename from src/java/org/apache/cassandra/io/util/MappedFileDataInput.java rename to src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java index 047925693e23..d83048b9757c 100644 --- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java +++ b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java @@ -19,19 +19,18 @@ import java.io.*; import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import org.apache.cassandra.utils.ByteBufferUtil; -public class MappedFileDataInput extends AbstractDataInput implements FileDataInput, DataInput +public class ByteBufferDataInput extends AbstractDataInput implements FileDataInput, DataInput { - private final MappedByteBuffer buffer; + private final ByteBuffer buffer; private final String filename; private final long segmentOffset; private int position; - public MappedFileDataInput(FileInputStream stream, String filename, long segmentOffset, int position) throws IOException + public ByteBufferDataInput(FileInputStream stream, String filename, long segmentOffset, int position) throws IOException { FileChannel channel = stream.getChannel(); buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, channel.size()); @@ -40,7 +39,7 @@ public MappedFileDataInput(FileInputStream stream, String filename, long segment this.position = position; } - public MappedFileDataInput(MappedByteBuffer buffer, String filename, long segmentOffset, int position) + public ByteBufferDataInput(ByteBuffer buffer, String filename, long segmentOffset, int position) { assert buffer != null; this.buffer = buffer; @@ -65,12 +64,14 @@ public long getFilePointer() return segmentOffset + position; } - protected long getPosition() + @Override + public long getPosition() { return segmentOffset + position; } - protected long getPositionLimit() + @Override + public long getPositionLimit() { return segmentOffset + buffer.capacity(); } @@ -156,9 +157,10 @@ public final void readFully(byte[] bytes) throws IOException } @Override - public final void readFully(byte[] buffer, int offset, int count) throws IOException + public final void readFully(byte[] bytes, int offset, int count) throws IOException { - throw new UnsupportedOperationException("use readBytes instead"); + ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, offset, count); + position += count; } private static class MappedFileDataInputMark implements FileMark diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index 450553bd267d..2b4016713e4b 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -73,7 +73,7 @@ public FileDataInput getSegment(long position) if (segment.right != null) { // segment is mmap'd - return new MappedFileDataInput(segment.right, path, segment.left, (int) (position - segment.left)); + return new ByteBufferDataInput(segment.right, path, segment.left, (int) (position - segment.left)); } // not mmap'd: open a braf covering the segment diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index ec988e299ff6..307c47b8672c 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -7,6 +7,7 @@ memtable_allocation_type: offheap_objects commitlog_sync: batch commitlog_sync_batch_window_in_ms: 1.0 commitlog_segment_size_in_mb: 5 +commitlog_directory: build/test/cassandra/commitlog partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner listen_address: 127.0.0.1 storage_port: 7010 @@ -14,7 +15,6 @@ rpc_port: 9170 start_native_transport: true native_transport_port: 9042 column_index_size_in_kb: 4 -commitlog_directory: build/test/cassandra/commitlog saved_caches_directory: build/test/cassandra/saved_caches data_file_directories: - build/test/cassandra/data diff --git a/test/conf/commitlog_compression.yaml b/test/conf/commitlog_compression.yaml new file mode 100644 index 000000000000..be342cdc9951 --- /dev/null +++ b/test/conf/commitlog_compression.yaml @@ -0,0 +1,3 @@ +commitlog_compression: + - class_name: LZ4Compressor +commitlog_sync_threads: 4 diff --git a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java index dc908047c5b1..b4efd49d98eb 100644 --- a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java +++ b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java @@ -48,7 +48,7 @@ public static void main(String[] args) throws Exception { System.out.println("Setting num threads to: " + NUM_THREADS); } ExecutorService executor = new JMXEnabledThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60, - TimeUnit.SECONDS, new ArrayBlockingQueue(10 * NUM_THREADS), new NamedThreadFactory(""), ""); + TimeUnit.SECONDS, new ArrayBlockingQueue(10 * NUM_THREADS), new NamedThreadFactory("Stress"), ""); ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); org.apache.cassandra.SchemaLoader.loadSchema(); @@ -86,10 +86,12 @@ public static class CommitlogExecutor implements Runnable { public void run() { String ks = "Keyspace1"; ByteBuffer key = ByteBufferUtil.bytes(keyString); - Mutation mutation = new Mutation(ks, key); - mutation.add("Standard1", Util.cellname("name"), ByteBufferUtil.bytes("value"), - System.currentTimeMillis()); - CommitLog.instance.add(mutation); + for (int i=0; i<100; ++i) { + Mutation mutation = new Mutation(ks, key); + mutation.add("Standard1", Util.cellname("name"), ByteBufferUtil.bytes("value" + i), + System.currentTimeMillis()); + CommitLog.instance.add(mutation); + } } } } diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index 1383d7836748..c8a5c1dc5f0e 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -27,22 +27,26 @@ import java.util.zip.CRC32; import java.util.zip.Checksum; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; + import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.ParametrizedClass; +import org.apache.cassandra.db.commitlog.CommitLogSegment; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogDescriptor; import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.commitlog.CommitLogSegment; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.ByteBufferDataInput; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.filter.NamesQueryFilter; @@ -385,4 +389,29 @@ public void testTruncateWithoutSnapshotNonDurable() throws ExecutionException, row = command.getRow(notDurableKs); Assert.assertEquals(null, row.cf); } + + private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException + { + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + long length = buf.position(); + // Put some extra data in the stream. + buf.putDouble(0.1); + buf.flip(); + FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0); + CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); + Assert.assertEquals("Descriptor length", length, input.getFilePointer()); + Assert.assertEquals("Descriptors", desc, read); + } + + @Test + public void testDescriptorPersistence() throws IOException + { + testDescriptorPersistence(new CommitLogDescriptor(11)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParametrizedClass("LZ4Compressor", null))); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19, + new ParametrizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")))); + } }