Skip to content

runtime-v2: introduce runtime-next with flowctl raw preview-next#2925

Open
jgraettinger wants to merge 8 commits into
masterfrom
johnny/runtime-v2
Open

runtime-v2: introduce runtime-next with flowctl raw preview-next#2925
jgraettinger wants to merge 8 commits into
masterfrom
johnny/runtime-v2

Conversation

@jgraettinger
Copy link
Copy Markdown
Member

@jgraettinger jgraettinger commented May 7, 2026

Foundational work for the architecture in plans/runtime-v2/plan.md:
a per-shard Rust TaskService and a sidecar Shuffle Leader that
together replace the Go-driven transaction loop. Materialization is
implemented end-to-end (open / commit / acknowledge / trigger,
recovery, Frontier↔Checkpoint mapping). Derivations and captures are
not yet implemented.

The Go runtime is unchanged in this branch. It still drives the
existing runtime crate for every task. The new runtime-next code
is reachable only through flowctl raw preview-next, which is how
materialization is exercised end-to-end today. Wiring runtime-next
into the Go reactor is follow-on work, as is introduction of the sidecar binary.

Commits are arranged for narrative review:

  • Design docs first (plan.md).
  • Then small, self-contained protocol changes (proto-gazette,
    shuffle Frontier collapse, materialize state patches).
  • Then a verbatim file copy from runtime/runtime-next/, so the
    next commit's diff is reviewable.
  • Then the substantive runtime-next implementation.
  • Finally, flowctl raw preview-next as an E2E harness against the
    new stack (replacing the removed raw shuffle).

Test plan

  • cargo test -p runtime-next (recovery + triggers snapshots)
  • cargo test -p proto-flow, cargo test -p shuffle
  • E2E materialization via flowctl raw preview-next
    see plans/runtime-v2/preview-harness.md
  • Legacy runtime still green under mise run ci:catalog-test

Adds the runtime-v2 architecture plan under plans/, refreshes CLAUDE.md
guidelines, and seeds AGENTS.md.
Switch generated maps to ordered BTreeMap so encodings are
deterministic, and reuse zero-copy bytes::Bytes for Checkpoint
ack_intents and ProducerEntry ids. Add Clock::delta for saturating
duration math between two Clock instants.
Collapse the chunked FrontierChunk wire format into a single Frontier
message on the SessionRequest/SessionResponse and SliceResponse paths,
which simplifies framing for both Open's resume_checkpoint and ongoing
Progressed/NextCheckpoint exchanges. Drop the disk_backlog_threshold
field from CollectionPartitions and LogRequest -- this is instead a
service-wide constant.

Also build the shuffle Go protobuf as part of mise build:go-protobufs,
and remove the legacy "flowctl raw shuffle" subcommand which exercised
the old chunked protocol; "flowctl raw preview-next" replaces it in a
follow-on commit.
…knowledge

Extend the materialization Request protocol with aggregated connector
state patches on Flush, StartCommit, and Acknowledge. The runtime-v2
Shuffle Leader forwards these verbatim from its L:Flush, L:StartCommit,
and L:Acknowledge so connectors participating in cooperative
multi-shard strategies can observe peers' state.

The legacy runtime path keeps the field empty pending leader-side
support, and the materialize_fixture / regression tests are updated to
populate the new fields.
Verbatim copies of files lifted from the runtime crate, prior to their
modification for the new Leader / Shard runtime architecture. Splitting
the copy from the modifications makes the subsequent diff reviewable.

  runtime/src/container.rs            => runtime-next/src/container.rs
  runtime/src/tokio_context.rs        => runtime-next/src/tokio_context.rs
  runtime/src/local_connector.rs      => runtime-next/src/local_connector.rs
  runtime/src/image_connector.rs      => runtime-next/src/image_connector.rs
  runtime/src/task_service.rs         => runtime-next/src/task_service.rs
  runtime/src/materialize/triggers.rs => runtime-next/src/leader/materialize/triggers.rs
  runtime/src/materialize/connector.rs => runtime-next/src/shard/materialize/connector.rs
  runtime/src/rocksdb.rs              => runtime-next/src/shard/rocksdb.rs
  runtime/src/materialize/task.rs     => runtime-next/src/shard/materialize/task.rs
Define the bidirectional Leader and Shard RPCs in runtime.proto and
regenerate the Rust + Go bindings. Include the runtime package in
pbjson serde generation, re-export proto_gazette::consumer so generated
paths resolve, and have rust-protobufs run cargo fmt afterward.
Introduce the runtime-next crate housing both the Shuffle Leader and
per-shard TaskService implementations behind the bidirectional Leader
and Shard RPCs defined in runtime.proto. Materialization is implemented
end-to-end (open / commit / acknowledge / trigger plus recovery and
Frontier↔Checkpoint mapping); derivations and captures land in
follow-on work.

Small supporting bits land in the runtime, doc, and ops crates:
TaskServiceConfig destructuring tolerates new fields,
combine::Accumulator exposes spill-segment ranges, and ops re-exports
proto_flow::ops as ops::proto.

See plans/runtime-v2/plan.md for the architecture and rollout plan.
Replace the now-removed "raw shuffle" subcommand with "raw preview-
next", which drives a local materialization end-to-end against the
runtime-next Shuffle Leader and per-shard TaskService for ad-hoc
preview and integration testing. See plans/runtime-v2/preview-harness.md
for the harness design.

Drop a stale "panics if ..." note from
flow_client_next::workflows::user_collection_auth::new_journal_client_factory
which the new caller in services.rs runs into; the function no longer
panics on the documented case.
@jgraettinger jgraettinger requested a review from williamhbaker May 7, 2026 17:48
Comment thread crates/runtime-next/src/leader/materialize/startup.rs Dismissed
Comment thread crates/runtime-next/src/leader/materialize/startup.rs Dismissed
Comment thread crates/runtime-next/src/leader/materialize/startup.rs Dismissed
Comment thread crates/runtime-next/src/leader/materialize/startup.rs Dismissed
Comment thread crates/runtime-next/src/leader/materialize/startup.rs Dismissed
Copy link
Copy Markdown
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments below. Overall looks good; there's a lot here but the individual pieces were relatively easy to understand. How everything fits together in the complete end-to-end workings was more difficult to really internalize, and I'm not confident I 100% got there on everything, so let me know if any of these are off the mark.

Comment thread AGENTS.md
@@ -0,0 +1,133 @@
# CLAUDE.md
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have this file as a symlink to CLAUDE.md?

message Task {
// Task specification (protobuf-encoded bytes).
bytes spec = 1;
// Collection journal partition to which task states are written.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo? states -> stats

//! `Publisher` is the unified entry point. Two variants:
//!
//! - `Publisher::Real` wraps a real `publisher::Publisher` and performs
//! Gazette journal IO (stats / logs / ACK intents / future capture &
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a future thing, but I was looking for logs publishing here and didn't see it, in relation to #2831

// files may grow to 4GB, but they are typically written very slowly and thus
// artificially inflate the recovery log horizon. We use a much smaller limit
// to encourage more frequent compactions into new files.
opts.set_max_manifest_file_size(1 << 17); // 131072 bytes
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is removing set_max_manifest_file_size intentional?

let extent = self.extents.bindings.entry(index).or_default();

if !max_key_delta.is_empty() {
extent.max_key_delta = max_key_delta;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a max(extent.max_key_delta vs. max_key_delta) here, otherwise keys for multiple shards getting recorded out of order would be a problem.

policy_close |= close_requested;

let may_close = policy_close && !unresolved_hints && tail_done;
let may_extend =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we prevent extension (exception being: unresolved hints, I think) if close_requested? That way an extended stream of ready frontiers doesn't stop the shard from closing when requested.

.copied()
.unwrap_or_default(),
);
entry.last_source_published_at = extents.max_source_clock.to_pb_json_timestamp();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also for the case of multiple bindings sourcing from the same collection: I think it would be technically more correct to take the maximum of entry.last_source_published_at vs. extents.max_source_clock. Perhaps most significantly this would prevent any cases of the lastSourcePublishedAt from going backwards in subsequent transactions.

committed_close = hinted_close;
committed_frontier = committed_frontier.reduce(std::mem::take(&mut hinted_frontier));

pending_ack_intents = connector_checkpoint.ack_intents;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking through a couple of scenarios here, for remote authoritative materializations:

  • We get the pending ack intents here from the connector's persisted checkpoint, but not the trigger params. Since the persist at fsm.rs:828 didn't land (or we wouldn't be on this branch?) there won't be any trigger parameters loaded, and so that trigger firing would get skipped.
  • Max keys is similar, but I think doesn't have the same problem, since it is persisted at fsm.rs:523 (hinted), before StartCommit, so maybe trigger params could do the same?

}

// Compose the trigger payload now that we have a complete txn-wide view.
if task.triggers.is_some() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we gate this to compute trigger_params only if the transaction contained Stored docs? That's something I remember from the original implementation, to avoid firing triggers on empty transactions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants