Skip to content

CASSSIDECAR-373: Define storage provider interface and Cassandra implementation to support durable job tracking#339

Open
andresbeckruiz wants to merge 1 commit intoapache:trunkfrom
andresbeckruiz:CASSSIDECAR-373
Open

CASSSIDECAR-373: Define storage provider interface and Cassandra implementation to support durable job tracking#339
andresbeckruiz wants to merge 1 commit intoapache:trunkfrom
andresbeckruiz:CASSSIDECAR-373

Conversation

@andresbeckruiz
Copy link
Copy Markdown
Contributor

CASSSIDECAR-373

Implements CassandraStorageProvider, the default Cassandra backed implementation of the StorageProvider interface for durable operational job state.

Changes

  • StorageProvider interface, which adds a provider-agnostic abstraction for persisting, querying, and coordinating operational jobs

  • OperationalJobRecord is the data transfer object representing persisted job state

  • OperationalJobConfiguration provides configurable table TTL, and in the future can support remote Cassandra cluster connection

  • Three Cassandra table schemas:

    • cluster_ops — persists and tracks operational jobs (restarts, upgrades, etc.)
    • cluster_ops_node_state — tracks per-node status within an operation
    • active_cluster_ops — provides mutual exclusion of concurrent operations via LWT
  • Database accessors: (ClusterOpsDatabaseAccessor, ClusterOpsNodeStateDatabaseAccessor, ActiveClusterOpsDatabaseAccessor)

  • CassandraStorageProvider composes the three accessors, acts as thin delegation layer

  • Unit tests (integration testing will be unlocked with [CASSSIDECAR-375])

Future Work

  • Remote Cassandra cluster storage support
  • Future CEP-53 work:
    • CASSSIDECAR-374: Implement durable operational job tracker
    • CASSSIDECAR-375: Add storage provider to enable JVM Distributed tests for cluster-wide operational jobs
    • CASSSIDECAR-377: Implement job coordination for cluster-wide operations

@Override
protected String createSchemaStatement()
{
return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider LCS for the compaction strategy on this table? May not be important given the (in-theory) low volume to this table. Just considering the challenges we face when using STCS with TTL tables and we can't use UCS as this should still work on Cassandra 4 clusters.

}

@Override
protected void prepareStatements(@NotNull Session session)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class follows a different patten than the other schema classes that you implement below in that the below classes define a CqlLiterals inner class whereas this one just defines these as raw strings. Is there any reason why we aren't consistent across the schema classes in this PR? If not, I'd recommend consolidating on one of the two patterns.

Comment on lines +28 to +29
* Schema for the {@code active_cluster_ops} table, which tracks active operations
* and provides mutual exclusion via lightweight transactions (LWT).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can improve this javadoc by explaining why this table exists and how it is used. The description from the CEP could be helpful here: https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-53%3A+Cassandra+Rolling+Restarts+via+Sidecar#CEP53:CassandraRollingRestartsviaSidecar-sidecar_internal.active_cluster_ops

import org.jetbrains.annotations.NotNull;

/**
* Schema for the {@code cluster_ops_node_state} table, which tracks node-level status within an operation.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

" operation_id timeuuid," +
" operation_type text," +
" status text," +
" node_execution_order frozen<list<frozen<list<text>>>>," +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a frozen<list<forozen<list<uuid>>>>? Since it stores the order of node ids in which the execution will run?

/**
* Tests for {@link ClusterOpsDatabaseAccessor}
*/
class ClusterOpsDatabaseAccessorTest
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the ActiveClusterOpsDatabaseAccessorTest, I think an integration test would be more appropriate than this.

/**
* Tests for {@link ClusterOpsNodeStateDatabaseAccessor}
*/
class ClusterOpsNodeStateDatabaseAccessorTest
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the ActiveClusterOpsDatabaseAccessorTest, I think an integration test would be more appropriate than this.

Map<String, String> operationMetadata = row.getMap("operation_metadata", String.class, String.class);
return new OperationalJobRecord(operationId, operationType, status,
nodeExecutionOrder == null || nodeExecutionOrder.isEmpty() ? null : nodeExecutionOrder,
operationMetadata.isEmpty() ? null : operationMetadata);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not the biggest fan of the ternay operators used to provide values for the constructor of OperationalJobRecord. It subtly makes the reader of this code need to think of many different states when this object is returned.

A few approaches we could consider here:

  1. A builder for the OperationalJobRecord.
  2. Move the logic into the constructor of the OperationalJobRecord
  3. Extract this logic above. For example:
List<List<String>> nodeExecutionOrder = row.get("node_execution_order",NODES_ORDER_TYPE);
if (nodeExecutionOrder != null && nodeExecutionOrder.isEmpty()) 
{
  nodeExecutironOrder = null;
}

execute(statement);
}

static final int BATCH_CHUNK_SIZE = 100;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put static properties above constructors (per Cassandra style guide: https://cassandra.apache.org/_/development/code_style.html)

// TTL on all tables handles record pruning.
try
{
clusterName = sessionProvider.get().getCluster().getMetadata().getClusterName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates the implicit link between the cluster which is storing the operations and the cluster on which the operation is executing.

I know that this PR has remote storage provider as out of scope, but making this something that we can set via constructor or via initialize would help make sure there is a pathway to store ops in a cluster separate from the one sidecar is connecting to.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants