Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 52 files
+3 −1 accord-core/src/main/java/accord/api/Agent.java
+4 −0 accord-core/src/main/java/accord/api/CoordinatorEventListener.java
+0 −23 accord-core/src/main/java/accord/api/TraceEventType.java
+53 −34 accord-core/src/main/java/accord/api/Tracing.java
+3 −12 accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+26 −5 accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
+7 −2 accord-core/src/main/java/accord/coordinate/AbstractSimpleCoordination.java
+5 −13 accord-core/src/main/java/accord/coordinate/AsynchronousAwait.java
+2 −22 accord-core/src/main/java/accord/coordinate/CheckShards.java
+3 −11 accord-core/src/main/java/accord/coordinate/CollectLatestDeps.java
+4 −4 accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
+1 −1 accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java
+4 −4 accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
+3 −3 accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+8 −14 accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+14 −7 accord-core/src/main/java/accord/coordinate/Coordination.java
+3 −3 accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
+7 −3 accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
+3 −9 accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+2 −3 accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+3 −9 accord-core/src/main/java/accord/coordinate/FetchCoordinator.java
+7 −11 accord-core/src/main/java/accord/coordinate/FetchData.java
+12 −7 accord-core/src/main/java/accord/coordinate/FetchRoute.java
+2 −2 accord-core/src/main/java/accord/coordinate/Infer.java
+10 −20 accord-core/src/main/java/accord/coordinate/Invalidate.java
+10 −14 accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+6 −15 accord-core/src/main/java/accord/coordinate/Persist.java
+22 −25 accord-core/src/main/java/accord/coordinate/PrepareRecovery.java
+13 −30 accord-core/src/main/java/accord/coordinate/Propose.java
+37 −5 accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+51 −68 accord-core/src/main/java/accord/coordinate/Recover.java
+9 −18 accord-core/src/main/java/accord/coordinate/Stabilise.java
+3 −12 accord-core/src/main/java/accord/coordinate/SynchronousAwait.java
+2 −3 accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java
+4 −8 accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+21 −19 accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+2 −4 accord-core/src/main/java/accord/local/Command.java
+0 −1 accord-core/src/main/java/accord/local/CommandStores.java
+7 −0 accord-core/src/main/java/accord/local/MapReduceCommandStores.java
+3 −10 accord-core/src/main/java/accord/local/Node.java
+12 −0 accord-core/src/main/java/accord/local/SafeCommand.java
+5 −0 accord-core/src/main/java/accord/local/StoreParticipants.java
+0 −14 accord-core/src/main/java/accord/messages/GetMaxConflict.java
+1 −9 accord-core/src/main/java/accord/messages/ParticipantsRequest.java
+3 −2 accord-core/src/main/java/accord/messages/Propagate.java
+1 −4 accord-core/src/main/java/accord/messages/RouteRequest.java
+1 −1 accord-core/src/main/java/accord/primitives/Routable.java
+11 −2 accord-core/src/main/java/accord/primitives/Txn.java
+2 −2 accord-core/src/main/java/accord/primitives/TxnId.java
+85 −10 accord-core/src/main/java/accord/utils/TinyEnumSet.java
+52 −0 accord-core/src/test/java/accord/api/TracingTest.java
+5 −3 accord-core/src/test/java/accord/impl/list/ListAgent.java
59 changes: 58 additions & 1 deletion src/java/org/apache/cassandra/db/filter/ColumnFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.btree.BTree;

/**
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
Expand Down Expand Up @@ -64,7 +65,6 @@
*/
public abstract class ColumnFilter
{

public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);

public static final Serializer serializer = new Serializer();
Expand Down Expand Up @@ -305,6 +305,20 @@ public boolean isWildcard()
return false;
}

/**
* Rebinds matching columns into a new filter; ignores any missing but fails if any are a different type
*/
abstract ColumnFilter rebind(TableMetadata newTable);

public static ColumnFilter rebindVirtual(ColumnFilter filter, TableMetadata newTable)
{
// review feedback; nothing actually preventing its use with other tables,
// but unclear utility/rationale so just some protection against incorrect usage
if (!newTable.isVirtual())
throw new UnsupportedOperationException("This feature is intended only to be used with virtual keyspaces");
return filter.rebind(newTable);
}

/**
* Returns the CQL string corresponding to this {@code ColumnFilter}.
*
Expand Down Expand Up @@ -630,6 +644,12 @@ public boolean isWildcard()
return true;
}

@Override
ColumnFilter rebind(TableMetadata newTable)
{
return new WildCardColumnFilter(ColumnFilter.rebind(newTable, fetchedAndQueried));
}

@Override
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
{
Expand Down Expand Up @@ -779,6 +799,21 @@ public Tester newTester(ColumnMetadata column)
return new Tester(fetchingStrategy.fetchesAllColumns(column.isStatic()), s.iterator());
}

@Override
ColumnFilter rebind(TableMetadata newTable)
{
RegularAndStaticColumns queried = ColumnFilter.rebind(newTable, this.queried);
RegularAndStaticColumns fetched = this.queried == this.fetched ? queried : ColumnFilter.rebind(newTable, this.fetched);
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null;
if (this.subSelections != null)
{
subSelections = TreeMultimap.create();
for (Map.Entry<ColumnIdentifier, ColumnSubselection> e : this.subSelections.entries())
subSelections.put(e.getKey(), e.getValue().rebind(newTable));
}
return new SelectionColumnFilter(fetchingStrategy, queried, fetched, subSelections);
}

@Override
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
{
Expand Down Expand Up @@ -1003,4 +1038,26 @@ private long subSelectionsSerializedSize(SortedSetMultimap<ColumnIdentifier, Col
return size;
}
}

private static RegularAndStaticColumns rebind(TableMetadata newTable, RegularAndStaticColumns columns)
{
return new RegularAndStaticColumns(rebind(newTable, columns.statics), rebind(newTable, columns.regulars));
}

private static Columns rebind(TableMetadata newTable, Columns columns)
{
if (columns.isEmpty())
return columns;

try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
{
for (ColumnMetadata in : columns)
{
ColumnMetadata out = newTable.getColumn(in.name);
if (out != null)
builder.add(out);
}
return Columns.from(builder);
}
}
}
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public ColumnMetadata column()

protected abstract CellPath comparisonPath();

protected abstract ColumnSubselection rebind(TableMetadata newTable);

public int compareTo(ColumnSubselection other)
{
assert other.column().name.equals(column().name);
Expand Down Expand Up @@ -118,6 +120,12 @@ public CellPath comparisonPath()
return from;
}

@Override
protected ColumnSubselection rebind(TableMetadata newTable)
{
return new Slice(newTable.getColumn(column.name), from, to);
}

public int compareInclusionOf(CellPath path)
{
Comparator<CellPath> cmp = column.cellPathComparator();
Expand Down Expand Up @@ -160,6 +168,12 @@ public CellPath comparisonPath()
return element;
}

@Override
protected ColumnSubselection rebind(TableMetadata newTable)
{
return new Element(newTable.getColumn(column.name), element);
}

public int compareInclusionOf(CellPath path)
{
return column.cellPathComparator().compare(path, element);
Expand Down
50 changes: 49 additions & 1 deletion src/java/org/apache/cassandra/db/filter/RowFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void addCustomIndexExpression(TableMetadata metadata, IndexMetadata targe
add(new CustomExpression(metadata, targetIndex, value));
}

private void add(Expression expression)
public void add(Expression expression)
{
expression.validate();
expressions.add(expression);
Expand Down Expand Up @@ -549,6 +549,28 @@ public void validateForIndexing()
"Index expression values may not be larger than 64K");
}

/**
* Rebind this expression to a table metadata that is expected to have equivalent columns.
* If any referenced column is missing, returns null;
* if any referenced column has a different type throws an exception
*/
public Expression rebind(TableMetadata newTable)
{
throw new UnsupportedOperationException("Expression " + toString(true) + " does not support rebinding to another table definition");
}

protected static ColumnMetadata rebind(ColumnMetadata in, TableMetadata newTable)
{
ColumnMetadata out = newTable.getColumn(in.name);
if (out == null)
return null;

if (!out.type.equals(in.type) && !out.type.isCompatibleWith(in.type) || !in.type.isCompatibleWith(out.type))
throw new IllegalArgumentException("The provided TableMetadata is not compatible with the expression");

return out;
}

/**
* Returns whether the provided row satisfied this expression or not.
*
Expand Down Expand Up @@ -734,6 +756,16 @@ public static class SimpleExpression extends Expression
super(column, operator, value);
}

@Override
public Expression rebind(TableMetadata newTable)
{
ColumnMetadata out = rebind(column, newTable);
if (out == null)
return null;

return new SimpleExpression(out, operator, value);
}

@Override
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
{
Expand Down Expand Up @@ -853,6 +885,16 @@ public void validate() throws InvalidRequestException
checkBindValueSet(value, "Unsupported unset map value for column %s", column.name);
}

@Override
public Expression rebind(TableMetadata newTable)
{
ColumnMetadata out = rebind(column, newTable);
if (out == null)
return null;

return new MapElementExpression(out, key, operator, value);
}

@Override
public ByteBuffer getIndexValue()
{
Expand Down Expand Up @@ -978,6 +1020,12 @@ protected Kind kind()
return Kind.CUSTOM;
}

@Override
public Expression rebind(TableMetadata newTable)
{
return new CustomExpression(table, targetIndex, value);
}

// Filtering by custom expressions isn't supported yet, so just accept any row
@Override
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
Expand Down
Loading