Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
ff28cad
ephemeral reads
belliottsmith Jan 29, 2024
af9af41
get simulation working better
belliottsmith Feb 5, 2024
1470dc8
Simplify CommandsForKey handling by merging representations and utili…
belliottsmith Feb 15, 2024
1291db8
update
belliottsmith Feb 20, 2024
3e231d4
fix compaction iterators test
belliottsmith Feb 21, 2024
a38f8c9
fix serialization and add comments; first extra bytes value must be s…
belliottsmith Feb 22, 2024
64146f0
address serialization issues
belliottsmith Feb 22, 2024
40e10f1
wip fixes to CFK
belliottsmith Feb 22, 2024
ad5fbc6
wip fixes to CFK
belliottsmith Feb 22, 2024
070d283
got a first index that works off in-memory
dcapwell Jan 30, 2024
5a09c98
fixed bug where i used ranges and not range...
dcapwell Feb 21, 2024
de6cd58
got all tests passing
dcapwell Feb 21, 2024
8971c18
cleanup
dcapwell Feb 21, 2024
7e04364
make sure to use a ordered map to be deterministic
dcapwell Feb 21, 2024
f072c75
cleanup
dcapwell Feb 21, 2024
a0d1be4
expanding range now allows concurrent
dcapwell Feb 21, 2024
4a312c3
push knowing when to add to the journal into the simulated store
dcapwell Feb 21, 2024
4a3701d
cleanup
dcapwell Feb 21, 2024
cc7fbc6
refactor trying to make utils reusable
dcapwell Feb 21, 2024
6177076
working on a refactor that does pre accept then recover
dcapwell Feb 21, 2024
af57910
got expandingRangeConflicts passing
dcapwell Feb 21, 2024
2acdc0e
now each key/deps will randomly select if PreAccept/BeginRecover/ or …
dcapwell Feb 21, 2024
d235988
added CFK evict to the tests and found an issue with CFK serializatio…
dcapwell Feb 22, 2024
c77a39d
bump accord
dcapwell Feb 22, 2024
3cc8810
remove seed
dcapwell Feb 22, 2024
daecdc7
working on making the SAI serializer logic more portable so CFK can use
dcapwell Feb 22, 2024
3003459
working on testing key|token -> bb -> key|token so i can migrate SAI …
dcapwell Feb 22, 2024
50e14a4
migrated SAI to the new AccordRoutingKeyByteSource
dcapwell Feb 22, 2024
1358c4c
migrated mostly, have a mostly passing test
dcapwell Feb 22, 2024
dcbbcaa
i hate cassandra
dcapwell Feb 22, 2024
876fa11
the test is passing now
dcapwell Feb 23, 2024
7439b14
test cleanup
dcapwell Feb 23, 2024
e83fc68
cleanup shared seed bug
dcapwell Feb 23, 2024
060eea6
fixed tests due to generator refactor
dcapwell Feb 23, 2024
0b55590
fixed simulated tests
dcapwell Feb 23, 2024
fccae22
added memtable flush + compaction and found a bug with the index
dcapwell Feb 23, 2024
cd6faeb
only flush memtable if there is a partition that intersects the data
dcapwell Feb 23, 2024
2b0917b
split tests so CI will be stable
dcapwell Feb 23, 2024
744d4f7
Working on new test that has multiple ranges in a txn.. found a bug w…
dcapwell Feb 23, 2024
069a3b1
more deps
dcapwell Feb 23, 2024
833b27a
tests are passing
dcapwell Feb 23, 2024
323cc25
added median and random weight to the mixed disto logic
dcapwell Feb 23, 2024
4fa8ae6
cleanup
dcapwell Feb 23, 2024
e7daa1c
cleanup
dcapwell Feb 23, 2024
77ff896
cleanup
dcapwell Feb 23, 2024
6854ef1
bump accord
dcapwell Feb 23, 2024
631dc6b
cleanup
dcapwell Feb 23, 2024
f467c67
cleanup
dcapwell Feb 23, 2024
c2176fc
update gitmodules
dcapwell Feb 24, 2024
c2c30c5
bump accord
dcapwell Feb 26, 2024
b218819
compile
dcapwell Feb 26, 2024
f29e876
fixed bug where we return immutable map but caller will mutate
dcapwell Feb 26, 2024
66ee0bf
journal now waits for subsystems
dcapwell Feb 26, 2024
32ea218
fix init bug where schema is needed but isnt created yet
dcapwell Feb 26, 2024
24fb51d
disable accord logic in repair tests as they are non-deterministic
dcapwell Feb 26, 2024
8e06869
lower log levels as accord logging is too verbose
dcapwell Feb 27, 2024
8cfeeed
lower log levels as accord logging is too verbose
dcapwell Feb 27, 2024
ce26131
remove unused import
dcapwell Feb 27, 2024
10d7280
fixed wrong clock bug
dcapwell Feb 27, 2024
f6f0026
fixed import bugs
dcapwell Feb 27, 2024
3971dc2
use mix distro
dcapwell Mar 1, 2024
c9388bd
bump accord
dcapwell Mar 1, 2024
f531d25
allow the accessor defining include logic fully
dcapwell Mar 2, 2024
cdc2415
cleanup
dcapwell Mar 4, 2024
4cd3c45
show size stats as well to show why its taking so much time
dcapwell Mar 4, 2024
189e4af
cleanup
dcapwell Mar 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/dcapwell/cassandra-accord.git
branch = CASSANDRA-19355_on_refactor-deps
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 121 files
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/config/AccordSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ public class AccordSpec
public DurationSpec.IntMillisecondsBound range_barrier_timeout = new DurationSpec.IntMillisecondsBound("2m");

public volatile DurationSpec fast_path_update_delay = new DurationSpec.IntSecondsBound(5);

public volatile DurationSpec schedule_durability_frequency = new DurationSpec.IntSecondsBound(15);
public volatile DurationSpec durability_txnid_lag = new DurationSpec.IntSecondsBound(5);
public volatile DurationSpec shard_durability_cycle = new DurationSpec.IntMinutesBound(2);
public volatile DurationSpec global_durability_cycle = new DurationSpec.IntMinutesBound(10);
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public enum CassandraRelevantProperties
TCM_UNSAFE_BOOT_WITH_CLUSTERMETADATA("cassandra.unsafe_boot_with_clustermetadata", null),
TCM_USE_ATOMIC_LONG_PROCESSOR("cassandra.test.use_atomic_long_processor", "false"),
TCM_USE_NO_OP_REPLICATOR("cassandra.test.use_no_op_replicator", "false"),

TEST_ACCORD_STORE_THREAD_CHECKS_ENABLED("cassandra.test.accord.store.thread_checks_enabled", "true"),
TEST_BBFAILHELPER_ENABLED("test.bbfailhelper.enabled"),
TEST_BLOB_SHARED_SEED("cassandra.test.blob.shared.seed"),
TEST_BYTEMAN_TRANSFORMATIONS_DEBUG("cassandra.test.byteman.transformations.debug"),
Expand Down
44 changes: 42 additions & 2 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4910,9 +4910,49 @@ public static long getAccordFastPathUpdateDelayMillis()
return conf.accord.fast_path_update_delay.to(TimeUnit.MILLISECONDS);
}

public static void setAccordFastPathUpdateDelayMillis(long millis)
public static void setAccordFastPathUpdateDelaySeconds(long seconds)
{
conf.accord.fast_path_update_delay = new DurationSpec.IntMillisecondsBound(millis);
conf.accord.fast_path_update_delay = new DurationSpec.IntSecondsBound(seconds);
}

public static long getAccordScheduleDurabilityFrequency(TimeUnit unit)
{
return conf.accord.schedule_durability_frequency.to(unit);
}

public static void setAccordScheduleDurabilityFrequencySeconds(long seconds)
{
conf.accord.schedule_durability_frequency = new DurationSpec.IntSecondsBound(seconds);
}

public static long getAccordScheduleDurabilityTxnIdLag(TimeUnit unit)
{
return conf.accord.durability_txnid_lag.to(unit);
}

public static void setAccordScheduleDurabilityTxnIdLagSeconds(long seconds)
{
conf.accord.durability_txnid_lag = new DurationSpec.IntSecondsBound(seconds);
}

public static long getAccordGlobalDurabilityCycle(TimeUnit unit)
{
return conf.accord.global_durability_cycle.to(unit);
}

public static void setAccordGlobalDurabilityCycleSeconds(long seconds)
{
conf.accord.global_durability_cycle = new DurationSpec.IntSecondsBound(seconds);
}

public static long getAccordShardDurabilityCycle(TimeUnit unit)
{
return conf.accord.shard_durability_cycle.to(unit);
}

public static void setAccordShardDurabilityCycleSeconds(long seconds)
{
conf.accord.shard_durability_cycle = new DurationSpec.IntSecondsBound(seconds);
}

public static boolean getForceNewPreparedStatementBehaviour()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.FBUtilities;

import static accord.primitives.Txn.Kind.EphemeralRead;
import static accord.primitives.Txn.Kind.Read;
import static org.apache.cassandra.config.Config.NonSerialWriteStrategy.accord;
import static org.apache.cassandra.config.DatabaseDescriptor.getNonSerialWriteStrategy;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
Expand Down Expand Up @@ -321,7 +325,8 @@ public Txn createTxn(ClientState state, QueryOptions options)
List<TxnNamedRead> reads = createNamedReads(options, state, ImmutableMap.of(), keySet::add);
Keys txnKeys = toKeys(keySet);
TxnRead read = createTxnRead(reads, txnKeys, null);
return new Txn.InMemory(txnKeys, read, TxnQuery.ALL);
Txn.Kind kind = txnKeys.size() == 1 && getNonSerialWriteStrategy() == accord ? EphemeralRead : Read;
return new Txn.InMemory(kind, txnKeys, read, TxnQuery.ALL, null);
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/DiskBoundaryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private static List<PartitionPosition> getDiskBoundaries(RangesAtEndpoint replic
List<PartitionPosition> diskBoundaries = new ArrayList<>();
for (int i = 0; i < boundaries.size() - 1; i++)
diskBoundaries.add(boundaries.get(i).maxKeyBound());
diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
diskBoundaries.add(partitioner.getMaximumTokenForSplitting().maxKeyBound());
return diskBoundaries;
}
}
40 changes: 11 additions & 29 deletions src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@
import static org.apache.cassandra.config.DatabaseDescriptor.paxosStatePurging;
import static org.apache.cassandra.service.accord.AccordKeyspace.CommandRows.maybeDropTruncatedCommandColumns;
import static org.apache.cassandra.service.accord.AccordKeyspace.CommandRows.truncatedApply;
import static org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeysAccessor;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyColumns.last_executed_micros;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyColumns.last_executed_timestamp;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyColumns.last_write_timestamp;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyColumns.max_timestamp;
import static org.apache.cassandra.service.accord.AccordKeyspace.TimestampsForKeyRows.truncateTimestampsForKeyRow;
import static org.apache.cassandra.service.accord.AccordKeyspace.deserializeDurabilityOrNull;
import static org.apache.cassandra.service.accord.AccordKeyspace.deserializeRouteOrNull;
Expand Down Expand Up @@ -221,11 +221,8 @@ private Transformation<UnfilteredRowIterator> purger(ColumnFamilyStore cfs, Supp
if (isAccordTimestampsForKey(cfs))
return new AccordTimestampsForKeyPurger(accordService);

if (isAccordDepsCommandsForKey(cfs))
return new AccordCommandsForKeyPurger(AccordKeyspace.DepsCommandsForKeysAccessor, accordService);

if (isAccordAllCommandsForKey(cfs))
return new AccordCommandsForKeyPurger(AccordKeyspace.AllCommandsForKeysAccessor, accordService);
if (isAccordCommandsForKey(cfs))
return new AccordCommandsForKeyPurger(AccordKeyspace.CommandsForKeysAccessor, accordService);

throw new IllegalArgumentException("Unhandled accord table: " + cfs.keyspace.getName() + '.' + cfs.name);
}
Expand Down Expand Up @@ -936,22 +933,14 @@ protected Row applyToRow(Row row)
lastWriteCell = null;
}

Cell<?> maxTimestampCell = row.getCell(max_timestamp);
Timestamp max_timestamp = deserializeTimestampOrNull(maxTimestampCell);
if (max_timestamp != null && max_timestamp.compareTo(redundantBeforeTxnId) < 0)
{
maxTimestampCell = null;
}

// No need to emit a tombstone as earlier versions of the row will also be nulled out
// when compacted later or loaded into a commands for key
if (lastExecuteMicrosCell == null &&
lastExecuteCell == null &&
lastWriteCell == null &&
maxTimestampCell == null)
lastWriteCell == null)
return null;

return truncateTimestampsForKeyRow(nowInSec, row, lastExecuteMicrosCell, lastExecuteCell, lastWriteCell, maxTimestampCell);
return truncateTimestampsForKeyRow(nowInSec, row, lastExecuteMicrosCell, lastExecuteCell, lastWriteCell);
}

@Override
Expand Down Expand Up @@ -997,11 +986,10 @@ protected Row applyToRow(Row row)
return row;

TxnId redundantBeforeTxnId = redundantBeforeEntry.shardRedundantBefore();
Timestamp timestamp = accessor.getTimestamp(row);
if (timestamp != null && timestamp.compareTo(redundantBeforeTxnId) < 0)
return null;
if (redundantBeforeTxnId.equals(TxnId.NONE))
return row;

return row;
return CommandsForKeysAccessor.withoutRedundantCommands(partitionKey, row, redundantBeforeTxnId);
}

@Override
Expand Down Expand Up @@ -1057,8 +1045,7 @@ private static boolean requiresAccordSpecificPurger(ColumnFamilyStore cfs)
return cfs.getKeyspaceName().equals(SchemaConstants.ACCORD_KEYSPACE_NAME) &&
ImmutableSet.of(AccordKeyspace.COMMANDS,
AccordKeyspace.TIMESTAMPS_FOR_KEY,
AccordKeyspace.DEPS_COMMANDS_FOR_KEY,
AccordKeyspace.ALL_COMMANDS_FOR_KEY)
AccordKeyspace.COMMANDS_FOR_KEY)
.contains(cfs.getTableName());
}

Expand All @@ -1077,13 +1064,8 @@ private static boolean isAccordTimestampsForKey(ColumnFamilyStore cfs)
return isAccordTable(cfs, AccordKeyspace.TIMESTAMPS_FOR_KEY);
}

private static boolean isAccordDepsCommandsForKey(ColumnFamilyStore cfs)
{
return isAccordTable(cfs, AccordKeyspace.DEPS_COMMANDS_FOR_KEY);
}

private static boolean isAccordAllCommandsForKey(ColumnFamilyStore cfs)
private static boolean isAccordCommandsForKey(ColumnFamilyStore cfs)
{
return isAccordTable(cfs, AccordKeyspace.ALL_COMMANDS_FOR_KEY);
return isAccordTable(cfs, AccordKeyspace.COMMANDS_FOR_KEY);
}
}
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ public double size(Token next)

@Override
public Token nextValidToken()
{
throw new UnsupportedOperationException(String.format("Token type %s does not support token allocation.",
getClass().getSimpleName()));
}

public Token increaseSlightly()
{
// find first byte we can increment
int i = token.length - 1;
Expand Down Expand Up @@ -180,8 +186,6 @@ public Token decreaseSlightly()
if (i == -1)
{
byte[] newToken = Arrays.copyOf(token, token.length - 1);
if (newToken.length > 0)
newToken[newToken.length - 1] = (byte)-1;
return new BytesToken(newToken);
}

Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/dht/IPartitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ static void validate(AbstractBounds<?> bounds)
* The biggest token for this partitioner, unlike getMinimumToken, this token is actually used and users wanting to
* include all tokens need to do getMaximumToken().maxKeyBound()
*
* Not implemented for the ordered partitioners
* THIS IS NOT SAFE FOR PURPOSES BESIDES SPLITTING/BALANCING
*/
default Token getMaximumToken()
default Token getMaximumTokenForSplitting()
{
throw new UnsupportedOperationException("If you are using a splitting partitioner, getMaximumToken has to be implemented");
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public AbstractType<?> getTokenValidator()
return LongType.instance;
}

public Token getMaximumToken()
public Token getMaximumTokenForSplitting()
{
return new LongToken(Long.MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public StringToken getMinimumToken()
return MINIMUM;
}

public StringToken getMaximumToken()
public StringToken getMaximumTokenForSplitting()
{
return MAXIMUM;
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/dht/RandomPartitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
return ownerships;
}

public Token getMaximumToken()
public Token getMaximumTokenForSplitting()
{
return new BigIntegerToken(MAXIMUM);
}
Expand Down
14 changes: 7 additions & 7 deletions src/java/org/apache/cassandra/dht/Splitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected BigInteger tokensInRange(Range<Token> range)
{
//full range case
if (range.left.equals(range.right))
return tokensInRange(new Range(partitioner.getMinimumToken(), partitioner.getMaximumToken()));
return tokensInRange(new Range(partitioner.getMinimumToken(), partitioner.getMaximumTokenForSplitting()));

BigInteger totalTokens = BigInteger.ZERO;
for (Range<Token> unwrapped : range.unwrap())
Expand Down Expand Up @@ -95,7 +95,7 @@ public double positionInRange(Token token, Range<Token> range)
{
//full range case
if (range.left.equals(range.right))
return positionInRange(token, new Range(partitioner.getMinimumToken(), partitioner.getMaximumToken()));
return positionInRange(token, new Range(partitioner.getMinimumToken(), partitioner.getMaximumTokenForSplitting()));

// leftmost token means we are on position 0.0
if (token.equals(range.left))
Expand All @@ -115,7 +115,7 @@ public double positionInRange(Token token, Range<Token> range)
public List<Token> splitOwnedRanges(int parts, List<WeightedRange> weightedRanges, boolean dontSplitRanges)
{
if (weightedRanges.isEmpty() || parts == 1)
return Collections.singletonList(partitioner.getMaximumToken());
return Collections.singletonList(partitioner.getMaximumTokenForSplitting());

BigInteger totalTokens = BigInteger.ZERO;
for (WeightedRange weightedRange : weightedRanges)
Expand All @@ -126,7 +126,7 @@ public List<Token> splitOwnedRanges(int parts, List<WeightedRange> weightedRange
BigInteger perPart = totalTokens.divide(BigInteger.valueOf(parts));
// the range owned is so tiny we can't split it:
if (perPart.equals(BigInteger.ZERO))
return Collections.singletonList(partitioner.getMaximumToken());
return Collections.singletonList(partitioner.getMaximumTokenForSplitting());

if (dontSplitRanges)
return splitOwnedRangesNoPartialRanges(weightedRanges, perPart, parts);
Expand Down Expand Up @@ -155,7 +155,7 @@ else if (partsLeft == 1)
}
sum = sum.add(currentRangeWidth);
}
boundaries.set(boundaries.size() - 1, partitioner.getMaximumToken());
boundaries.set(boundaries.size() - 1, partitioner.getMaximumTokenForSplitting());

assert boundaries.size() == parts : boundaries.size() + "!=" + parts + " " + boundaries + ":" + weightedRanges;
return boundaries;
Expand Down Expand Up @@ -192,7 +192,7 @@ private List<Token> splitOwnedRangesNoPartialRanges(List<WeightedRange> weighted
}
i++;
}
boundaries.add(partitioner.getMaximumToken());
boundaries.add(partitioner.getMaximumTokenForSplitting());
return boundaries;
}

Expand All @@ -202,7 +202,7 @@ private List<Token> splitOwnedRangesNoPartialRanges(List<WeightedRange> weighted
*/
private Token token(Token t)
{
return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumToken() : t;
return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumTokenForSplitting() : t;
}

/**
Expand Down
7 changes: 6 additions & 1 deletion src/java/org/apache/cassandra/dht/Token.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,12 @@ public long getLongValue()
abstract public Token nextValidToken();

/**
* Returns a token that is slightly less than this.
* Returns a token that is slightly more than this. This is NOT guaranteed to be the directly following token.
*/
public Token increaseSlightly() { return nextValidToken(); }

/**
* Returns a token that is slightly less than this. This is NOT guaranteed to be the directly preceding token.
*/
abstract public Token decreaseSlightly();

Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/index/sai/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class QueryContext
{
private static final boolean DISABLE_TIMEOUT = SAI_TEST_DISABLE_TIMEOUT.getBoolean();

private final ReadCommand readCommand;
public final ReadCommand readCommand;
private final long queryStartTimeNanos;

public final long executionQuotaNano;
Expand Down
Loading