Skip to content

feat(engine): data classification and trace redaction#23

Open
albertgwo wants to merge 9 commits intofeat/engine-pr5-scoped-compensationfrom
feat/engine-pr11-data-classification
Open

feat(engine): data classification and trace redaction#23
albertgwo wants to merge 9 commits intofeat/engine-pr5-scoped-compensationfrom
feat/engine-pr11-data-classification

Conversation

@albertgwo
Copy link
Copy Markdown
Contributor

@albertgwo albertgwo commented Mar 19, 2026

User description

Summary

  • Add data classification levels (`public`, `internal`, `confidential`, `restricted`)
  • Add trace redaction that strips sensitive data based on classification level
  • Classification propagates through execution context
  • Merges wait/signal (PR 6) and backpressure (PR 7) branches

Dependency

Base: `feat/engine-pr5-scoped-compensation`
Also merges: PR 6 (wait/signal), PR 7 (backpressure)

PR 15 of 16 in the execution engine implementation.

Test plan

  • Data classification levels are correctly assigned
  • Trace redaction strips fields above configured threshold
  • Classification propagates through nested execution
  • Redaction handles edge cases (missing fields, null values)

Generated description

Below is a concise technical summary of the changes proposed in this PR:
Orchestrate wait-node lifecycle, Execution handles, TTL enforcement, and semaphore-backed concurrency limits so flows can pause, resume, and be signaled without leaking timers or deadlocking. Enable configurable trace capture by wiring TraceLevel/RedactionPolicy through classification resolution, redaction helpers, and CompiledFlow so sensitive outputs can be masked before users consume a trace.

TopicDetails
Trace redaction Ensure trace capture honors TraceLevel choices and the new RedactionPolicy by resolving node/lane data classifications, redacting outputs when necessary, and propagating these rules through CompiledFlow so the observable trace respects sensitive data boundaries. Validate the behavior with classification/redaction unit tests that cover inheritance, policy combinations, and custom hooks.
Modified files (5)
  • packages/engine/src/__tests__/engine/classification.test.ts
  • packages/engine/src/engine/classification.ts
  • packages/engine/src/engine/compiled-flow.ts
  • packages/engine/src/engine/redaction.ts
  • packages/engine/src/engine/types.ts
Latest Contributors(0)
UserCommitDate
Engine exports Publish the new execution and observability primitives at the public surface so downstream callers can construct Execution handles, Clock implementations, Semaphore, and trace helpers directly from the engine package along with the adapter errors they rely on.
Modified files (2)
  • packages/engine/src/engine/index.ts
  • packages/engine/src/index.ts
Latest Contributors(1)
UserCommitDate
albertgwo@gmail.comfeat-complete-the-loop...March 02, 2026
Wait flow control Coordinate wait-node lifecycle, PlainAdapter.waitForEvent, Execution, TestClock, and the new semaphore/TTL controls so flows started via CompiledFlow.start() can pause, resume, validate signals, and honor timeouts without blocking execute() callers. Include the relevant timing helpers (parseDuration), adapter typings, and tests that prove signal delivery, clock determinism, and backpressure semantics.
Modified files (11)
  • packages/engine/src/__tests__/engine/clock.test.ts
  • packages/engine/src/__tests__/engine/semaphore.test.ts
  • packages/engine/src/__tests__/engine/wait-signal.test.ts
  • packages/engine/src/adapters/index.ts
  • packages/engine/src/adapters/plain.ts
  • packages/engine/src/adapters/types.ts
  • packages/engine/src/engine/clock.ts
  • packages/engine/src/engine/compiled-flow.ts
  • packages/engine/src/engine/duration.ts
  • packages/engine/src/engine/execution.ts
  • packages/engine/src/engine/semaphore.ts
Latest Contributors(0)
UserCommitDate
This pull request is reviewed by Baz. Review like a pro on (Baz).

Add an optional maxConcurrency option to EngineOptions that limits
concurrent execute() calls on a CompiledFlow via a counting semaphore.
When all slots are taken, subsequent callers wait until a slot frees up.
Zero overhead when omitted (no semaphore created).
Unit tests for the Semaphore class (acquire/release, FIFO ordering,
blocking beyond max). Integration tests verifying backpressure behavior:
unlimited default, maxConcurrency limiting concurrent execute() calls,
and slot release allowing queued executions to proceed.
Add Clock abstraction for deterministic time control in tests:
- RealClock wraps Date.now() and native setTimeout/clearTimeout
- TestClock with advance() for synchronous timer triggering
- ISO 8601 duration parser (PT format + legacy shorthand)
- ValidateSignalFn type and new EngineOptions: clock, validateSignal, pausedExecutionTTL
- Execution class: status tracking, signal delivery, TestClock time advancement, TTL
- PlainAdapter.waitForEvent(): promise-based suspension with timeout support
- PlainAdapter.deliverSignal(): external signal delivery to waiting nodes
- WaitTimeoutError for timeout expiration
- CompiledFlow.start(): async flow execution that handles wait nodes
- resolveWaitNext callback routes to timeout_next on timeout
- execute() still throws "use start()" for flows with wait nodes
Clock tests (21):
- RealClock delegates to Date.now() and native timers
- TestClock: starts at 0, advance fires timers in order, boundary, clearTimeout
- parseDuration: PT format, legacy shorthand, error cases

Wait/signal tests (17):
- Wait node pauses execution with correct status and waitingFor
- Signal resumes execution and data merges into state
- Multiple sequential waits
- Timeout routes to timeout_next via TestClock.advance
- Wrong signal name / non-waiting state / validateSignal hook rejection
- execute() throws on wait nodes
- Result promise resolves, completedResult getter
- Paused execution TTL expiration and non-interference
- advanceTime() with RealClock throws, ISO 8601 parsing
- Trace recording includes wait nodes
…1-data-classification

# Conflicts:
#	packages/engine/src/adapters/plain.ts
#	packages/engine/src/adapters/types.ts
#	packages/engine/src/engine/compiled-flow.ts
#	packages/engine/src/engine/index.ts
#	packages/engine/src/index.ts
…a-classification

# Conflicts:
#	packages/engine/src/engine/compiled-flow.ts
#	packages/engine/src/engine/index.ts
#	packages/engine/src/engine/types.ts
Implement runtime data classification resolution and trace redaction
based on node/lane data_class fields from the JSON Schema.

- Add DataClassification, RedactionPolicy, TraceLevel types
- Add resolveClassifications() for node/lane classification inheritance
- Add redactRecord() for policy-driven output redaction
- Integrate into CompiledFlow's recordStep with three trace levels:
  full (default), policy (apply redaction), none (no trace)
- Support custom redactTrace hook for user-defined redaction
33 tests covering:
- Classification resolution (node-level, lane inheritance, override)
- Redaction policy (single/multiple classifications, visible/redact)
- Trace-level integration (full, policy, none)
- Custom redactTrace hook behavior
- Lane inheritance integration with engine
@cloudflare-workers-and-pages
Copy link
Copy Markdown

Deploying flowprint with  Cloudflare Pages  Cloudflare Pages

Latest commit: d34f40d
Status: ✅  Deploy successful!
Preview URL: https://c7ac7fa6.flowprint.pages.dev
Branch Preview URL: https://feat-engine-pr11-data-classi.flowprint.pages.dev

View logs

Comment on lines +127 to +138
start(input: Record<string, unknown>): Execution {
const adapter = this.adapter
if (!(adapter instanceof PlainAdapter)) {
throw new Error('start() requires PlainAdapter (or a subclass)')
}

const ttl = this.options.pausedExecutionTTL ?? DEFAULT_PAUSED_TTL
const execution = new Execution(adapter, this.clock, this.options.validateSignal, ttl)

// Run the flow asynchronously
this.runAsync(input, execution, adapter).then(
(result) => execution.complete(result),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start()/runAsync bypass the semaphore so maxConcurrency is ignored — should we acquire the same semaphore before starting and release it on completion/failure?

await this.semaphore?.acquire(); try { await this.runAsync(...); } finally { this.semaphore?.release(); }

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 127 to 145, the start()
method launches runAsync(...) but never acquires/releases this.semaphore, so
wait-enabled flows bypass maxConcurrency. Modify start() so that if this.semaphore
exists you call this.semaphore.acquire() before invoking runAsync and ensure
this.semaphore.release() is called in a finally block after runAsync completes or fails.
Implement this by creating an async wrapper (e.g. startExecution async function) that
awaits acquire(), calls runAsync(...) and in a finally releases the semaphore, then
invoke that wrapper without awaiting so start() remains synchronous; leave runAsync()
unchanged.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +127 to +138
start(input: Record<string, unknown>): Execution {
const adapter = this.adapter
if (!(adapter instanceof PlainAdapter)) {
throw new Error('start() requires PlainAdapter (or a subclass)')
}

const ttl = this.options.pausedExecutionTTL ?? DEFAULT_PAUSED_TTL
const execution = new Execution(adapter, this.clock, this.options.validateSignal, ttl)

// Run the flow asynchronously
this.runAsync(input, execution, adapter).then(
(result) => execution.complete(result),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompiledFlow.start() reuses a single adapter so pendingWaits keyed only by nodeId collides across concurrent executions — should we scope waits by execution (add executionId to the key or give each execution its own adapter) and update Execution.signal/adapter.deliverSignal?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 127 to 170, the start()
method creates an Execution and passes the shared this.adapter into runAsync/onWait,
which causes pending waits to collide across concurrent executions. Refactor so waits
are scoped per-execution by including the execution id in adapter wait keys (or by
creating a thin per-execution adapter wrapper). Specifically: (1) change
adapter.waitForEvent(nodeId, ...) and adapter.deliverSignal(nodeId, ...) signatures to
accept an executionId (e.g. waitForEvent(executionId, nodeId, ...)), (2) update the
onWait callback in compiled-flow.ts to call waitForEvent with execution.id and to set
execution.setWaiting/ setRunning as before, and (3) update Execution.signal in
packages/engine/src/engine/execution.ts to pass its execution.id to
adapter.deliverSignal. Also update packages/engine/src/adapters/plain.ts to key
pendingWaits by (executionId, nodeId) instead of just nodeId and adjust types/tests
accordingly so concurrent starts do not clobber each other.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines 179 to 183
} catch (err: unknown) {
const error = err instanceof Error ? err : new Error(String(err))
safeCallHook(() => hooks?.onFlowError?.(error))

// Extract compensation result attached by walkGraph
const compResult = (err as { __compensationResult?: CompensationResult })
?.__compensationResult

// Find the failed node from the trace
const failedRecord = [...externalTrace].reverse().find((r) => r.error)
const failedNode = failedRecord?.nodeId ?? 'unknown'

throw new ExecutionError(
error.message,
externalTrace,
failedNode,
compResult?.compensated ?? [],
compResult?.compensationErrors ?? [],
)
throw err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should runAsync mirror execute()'s catch and throw an ExecutionError including trace, failedNode, compensated and compensationErrors instead of rethrowing the raw error?

Finding type: Breaking Changes | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 179 to 183, the private
method runAsync catches errors and simply rethrows the raw error. This loses the richer
ExecutionError constructed by execute() and breaks the error contract for asynchronous
runs. Change the catch to mirror the execute() catch block: after calling safeCallHook,
extract the attached __compensationResult from the thrown error, determine the failed
node from externalTrace, and throw a new ExecutionError(error.message, externalTrace,
failedNode, compResult?.compensated ?? [], compResult?.compensationErrors ?? []). Ensure
Execution.fail will thus receive an ExecutionError with trace, failedNode, compensated,
and compensationErrors populated.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines 217 to +223
const recordStep = (record: NodeExecutionRecord): void => {
externalTrace.push(record)
callbacks.onStep(record)
if (traceLevel === 'none') return

let finalRecord = record
if (traceLevel === 'policy') {
if (customRedact) {
finalRecord = customRedact(record)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With traceLevel === 'none' externalTrace stays empty so ExecutionError.failedNode falls back to 'unknown' — should we still record at least the last failed node/error when trace recording is disabled?

Finding type: Breaking Changes | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 217 to 223, the recordStep
function returns immediately when traceLevel === 'none', so externalTrace is never
populated and ExecutionError.failedNode becomes 'unknown'. Change recordStep so that
even if traceLevel === 'none' it still records at least the last failed node/error:
either (1) maintain a small internal variable (e.g. lastFailedRecord or lastFailedNode +
lastError) and update it whenever a record with record.error is passed, or (2) push a
minimal failure-only NodeExecutionRecord (nodeId, error, timestamps) into externalTrace
when record.error is present while continuing to suppress non-error records. Ensure the
ExecutionError construction later uses that stored last-failure info so failedNode is
accurate even with traceLevel 'none'.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +524 to +528
// Parse timeout from the node's duration string
let timeoutMs: number | undefined
if (node.timeout) {
timeoutMs = parseDuration(node.timeout)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema allows any non-empty timeout string while parseDuration only accepts ISO/PT/#sh formats — should we tighten the timeout schema/type or validate/parse timeouts in FlowprintEngine.load()?

Finding type: Type Inconsistency | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 524 to 528, the onWait
handler unconditionally calls parseDuration(node.timeout) which throws for timeout
strings that don't match the parser's supported ISO/PT formats. Fix this by adding
load-time validation in the flow loader (e.g. FlowprintEngine.load or
packages/engine/src/engine.ts where documents are loaded) to validate every Wait node's
timeout string: call parseDuration inside a try/catch (or validate with the same
regex/logic) and if it throws, reject loading with a clear, descriptive error that
includes the nodeId and the invalid timeout value. This ensures invalid timeout formats
are rejected at load time rather than causing runtime failures in onWait. If you prefer
schema-level enforcement instead, update packages/schema/flowprint.schema.json to
constrain wait.timeout to the exact duration pattern supported by parseDuration and add
migration notes in the loader to surface schema validation errors.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +63 to +74
export {
FlowprintEngine,
CompiledFlow,
ExecutionError,
Execution,
RealClock,
TestClock,
parseDuration,
Semaphore,
resolveClassifications,
redactRecord,
} from './engine/index.js'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

packages/engine adds new public exports but the PR doesn't include a packages/engine/.changeset file — should we add a .changeset/.md per CLAUDE.md?

Finding type: AI Coding Guidelines | Severity: 🟠 Medium


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/index.ts around lines 63 to 91, new public exports were added
(Execution, clocks, parseDuration, Semaphore, resolveClassifications, redactRecord,
types, and adapter exports) but no changeset entry was added for the engine package. Add
a new changeset file under packages/engine/.changeset with a short filename (e.g.,
<your-pr-number>-engine.md). The changeset should state that the engine package has a
public API change, list the added exports, and specify an appropriate release level (use
minor for new public exports). Follow this repo's changeset format and include a concise
summary and the package-to-release mapping for packages/engine. Ensure the file is
committed in this PR so the release tooling captures the API change.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

@albertgwo
Copy link
Copy Markdown
Contributor Author

Code Review Findings

Important (80-89 confidence)

1. PR description fundamentally misrepresents the implementation (85)

  • Description says: "classification levels (public, internal, confidential, restricted)" + "strips fields above configured threshold"
  • Code has: category tags 'pii' | 'financial' | 'credentials' | 'internal' + per-tag 'redact' | 'visible' policy
  • These are completely different models (hierarchical levels vs flat tags). Values in description (public, confidential, restricted) don't even exist in code.
  • Fix: Update PR description to match tag-based implementation.

2. start() bypasses maxConcurrency semaphore (82)

  • File: packages/engine/src/engine/compiled-flow.ts
  • execute() acquires/releases semaphore but start() doesn't interact with it. Long-running signal flows are exactly where backpressure matters most. Three start() calls with maxConcurrency: 2 all run concurrently.
  • Fix: Wrap runAsync in start() with semaphore acquire/release, or document limitation.

3. No test coverage for trace redaction via start() path (80)

  • All 608 lines of tests use execute(). Zero tests combine traceLevel/redactionPolicy with start().
  • Fix: Add integration test using start() with traceLevel: 'policy'.

Positive notes

  • Clean classification.ts (32 lines) and redaction.ts (29 lines)
  • Correct fail-closed security model (any redact triggers full output redaction)
  • Proper separation: hooks get unredacted data, trace gets redacted data
  • Custom redactTrace exceptions not caught (fail-closed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant