Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions autoresearch/results.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,19 @@ experiment_id timestamp description total_time_s peak_memory_mb correctness kept
013 2026-03-21T00:13:00 cache lazy Workflow import in Node.__call__ 0.0389 0.06 8/8 yes
014 2026-03-21T00:14:00 optimize AddNode.run with __dict__ access 0.0374 0.06 8/8 yes
015 2026-03-21T00:15:00 add large_100, foreach_expand, retry_chain, yield_resume benchmarks 0.4151 1.06 16/16 yes
016 2026-03-21T01:00:00 skip DAG validation in with_namespace/expand_node/ForEach via _construct_trusted 0.4001 0.41 16/16 yes
017 2026-03-21T01:01:00 optimize Edge/Node with_namespace to skip re-validation 0.3971 0.41 16/16 yes
018 2026-03-21T01:02:00 optimize ForEachNode.run loop: skip with_namespace, Edge.model_construct, pre-compute templates 0.3732 0.39 16/16 yes
019 2026-03-21T01:03:00 use Edge.model_construct for all edge creation in ForEachNode.run 0.3717 0.39 16/16 yes
020 2026-03-21T01:04:00 preserve id-independent cached properties in Node.with_namespace 0.3217 0.31 16/16 yes
021 2026-03-21T01:05:00 successor-based incremental get_ready_nodes 0.3217 0.31 failed no
022 2026-03-21T01:06:00 use model_construct in _cast_input fast path 0.3231 0.31 16/16 yes
023 2026-03-21T01:07:00 guard logger.info with isEnabledFor in __call__ 0.3327 0.31 16/16 no
024 2026-03-21T01:08:00 use Edge.model_construct in expand_node 0.3230 0.31 16/16 yes
025 2026-03-21T01:09:00 incremental ready-node tracking in topological executor via successor map 0.2624 0.32 16/16 yes
026 2026-03-21T01:10:00 incremental ready-node tracking in parallel executor (regressed parallel) 0.2638 0.32 16/16 no
027 2026-03-21T01:11:00 use model_construct in AddNode.run return 0.2748 0.32 16/16 no
028 2026-03-21T01:12:00 fast-path ShouldRetry in Node.__call__ to skip traceback formatting 0.2445 0.32 16/16 yes
029 2026-03-21T01:13:00 dispatch all initially-ready nodes in parallel executor 0.2423 0.32 16/16 yes
030 2026-03-21T01:14:00 use defaultdict in edges_by_target 0.2405 0.32 16/16 yes
031 2026-03-21T01:15:00 cache AddNode input_type by arity (module-level) 0.2450 0.33 16/16 no
4 changes: 3 additions & 1 deletion src/workflow_engine/core/edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ def validate_types(self, source: Node, target: Node):
)

def with_namespace(self, namespace: str) -> Self:
return self.model_update(
return self.__class__.model_construct(
source_id=f"{namespace}/{self.source_id}",
source_key=self.source_key,
target_id=f"{namespace}/{self.target_id}",
target_key=self.target_key,
)


Expand Down
34 changes: 32 additions & 2 deletions src/workflow_engine/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
SEMANTIC_VERSION_PATTERN,
parse_semantic_version,
)
from .error import NodeException, ShouldYield, UserException
from .error import NodeException, ShouldRetry, ShouldYield, UserException
from .values import (
Data,
DataMapping,
Expand Down Expand Up @@ -241,6 +241,13 @@ async def display_name(self, context: "Context") -> str:
"""
return self.TYPE_INFO.display_name

# Cached properties that do NOT depend on node ID and should be
# preserved across with_namespace copies.
_ID_INDEPENDENT_CACHED = frozenset({
"input_type", "output_type", "input_fields", "output_fields",
"input_schema", "output_schema",
})

def with_namespace(self, namespace: str) -> Self:
"""
Create a copy of this node with a namespaced ID.
Expand All @@ -251,7 +258,22 @@ def with_namespace(self, namespace: str) -> Self:
Returns:
A new Node with ID '{namespace}/{self.id}'
"""
return self.model_update(id=f"{namespace}/{self.id}")
# Fast path: use model_copy and directly set the new id,
# skipping full re-validation since only the id changes.
copy = self.model_copy(update={"id": f"{namespace}/{self.id}"})
# Clear cached properties that depend on the id, but keep
# id-independent ones (input_type, output_type, etc.) to avoid
# expensive re-computation (e.g., create_model calls).
model_field_names = set(self.__class__.model_fields.keys())
id_independent = self._ID_INDEPENDENT_CACHED
copy_dict = copy.__dict__
cached_keys = [
k for k in copy_dict
if k not in model_field_names and k not in id_independent
]
for key in cached_keys:
del copy_dict[key]
return copy

# --------------------------------------------------------------------------
# VERSIONING
Expand Down Expand Up @@ -373,6 +395,10 @@ async def _cast_input(
casted_input[key] = casted_value

try:
# Fast path: if all input fields are present and type-checked,
# skip full model_validate and use model_construct directly.
if not cast_tasks and len(casted_input) == len(input_fields):
return self.input_type.model_construct(**casted_input)
return self.input_type.model_validate(casted_input)
except ValidationError as e:
raise UserException(
Expand Down Expand Up @@ -433,6 +459,10 @@ async def __call__(
return output
except ShouldYield:
raise
except ShouldRetry as e:
# ShouldRetry is an expected transient failure handled by the
# execution loop — skip expensive traceback formatting.
raise NodeException(self.id) from e
except Exception as e:
# In subclasses, you don't have to worry about logging the error,
# since it'll be logged here.
Expand Down
57 changes: 44 additions & 13 deletions src/workflow_engine/core/workflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# workflow_engine/core/workflow.py
import asyncio
from collections import defaultdict
from collections.abc import Mapping, Sequence
from functools import cached_property
from typing import TYPE_CHECKING, Type
Expand Down Expand Up @@ -46,15 +47,14 @@ def edges_by_target(self) -> Mapping[str, Mapping[str, Edge]]:
A mapping from each node and input key to the (unique) edge that targets
the node at that key.
"""
edges_by_target: dict[str, dict[str, Edge]] = {
node.id: {} for node in self.nodes
}
edges_by_target: defaultdict[str, dict[str, Edge]] = defaultdict(dict)
for edge in self.edges:
if edge.target_key in edges_by_target[edge.target_id]:
target_edges = edges_by_target[edge.target_id]
if edge.target_key in target_edges:
raise ValueError(
f"In-edge to {edge.target_id}.{edge.target_key} is already in the graph"
)
edges_by_target[edge.target_id][edge.target_key] = edge
target_edges[edge.target_key] = edge
return edges_by_target

@cached_property
Expand Down Expand Up @@ -142,6 +142,31 @@ def _validate_no_id_prefix_collisions(self):
)
return self

@classmethod
def _construct_trusted(
cls,
*,
input_node: "InputNode",
inner_nodes: "Sequence[Node]",
output_node: "OutputNode",
edges: "Sequence[Edge]",
) -> "Workflow":
"""
Construct a Workflow without running model validators (DAG check,
prefix collision check, nx_graph construction).

Only use this when the inputs are known to form a valid DAG — e.g.,
when the workflow is derived from an already-validated workflow via
namespace prefixing or structural expansion.
"""
obj = cls.model_construct(
input_node=input_node,
inner_nodes=inner_nodes,
output_node=output_node,
edges=edges,
)
return obj

def get_ready_nodes(
self,
node_outputs: Mapping[str, DataMapping] | None = None,
Expand Down Expand Up @@ -301,24 +326,29 @@ def expand_node(
] + list(subgraph.nodes)
new_edges: list[Edge] = list(subgraph.edges)

subgraph_input_id = subgraph.input_node.id
subgraph_output_id = subgraph.output_node.id
subgraph_input_fields = subgraph.input_node.input_fields

_edge = Edge.model_construct
for edge in self.edges:
if edge.target_id == node_id:
# Only create the edge if the subgraph's input_node has this field
if edge.target_key in subgraph.input_node.input_fields:
if edge.target_key in subgraph_input_fields:
new_edges.append(
Edge(
_edge(
source_id=edge.source_id,
source_key=edge.source_key,
target_id=subgraph.input_node.id,
target_id=subgraph_input_id,
target_key=edge.target_key,
)
)
# Edges to fields not in the subgraph input are dropped
# (e.g., control fields like 'condition' for IfElseNode)
elif edge.source_id == node_id:
new_edges.append(
Edge(
source_id=subgraph.output_node.id,
_edge(
source_id=subgraph_output_id,
source_key=edge.source_key,
target_id=edge.target_id,
target_key=edge.target_key,
Expand All @@ -327,7 +357,7 @@ def expand_node(
else:
new_edges.append(edge)

return Workflow(
return Workflow._construct_trusted(
inner_nodes=new_inner_nodes,
input_node=self.input_node,
output_node=self.output_node,
Expand All @@ -346,8 +376,9 @@ def with_namespace(self, namespace: str) -> "Workflow":
Returns:
A new Workflow with all node IDs prefixed with '{namespace}/'
"""
# Create namespaced nodes
return Workflow(
# Create namespaced nodes — skip validation since we're just
# prefixing IDs on a known-valid workflow
return Workflow._construct_trusted(
input_node=self.input_node.with_namespace(namespace),
inner_nodes=[node.with_namespace(namespace) for node in self.inner_nodes],
output_node=self.output_node.with_namespace(namespace),
Expand Down
6 changes: 5 additions & 1 deletion src/workflow_engine/execution/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ async def execute(

try:
# Initial dispatch - start all initially ready nodes
ready_nodes = {workflow.input_node.id: input}
# (input node + nodes with no incoming edges like constants)
ready_nodes = workflow.get_ready_nodes(node_outputs={})
ready_nodes = dict(ready_nodes)
# Override input node's data with the actual input
ready_nodes[workflow.input_node.id] = input
for node_id, node_input in ready_nodes.items():
task = asyncio.create_task(
self._execute_node(
Expand Down
78 changes: 66 additions & 12 deletions src/workflow_engine/execution/topological.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,34 @@ def _get_node_max_retries(self, node) -> int | None:
return node.TYPE_INFO.max_retries
return None

@staticmethod
def _build_successor_map(
workflow: Workflow,
) -> dict[str, set[str]]:
"""Build a mapping from each node to the set of nodes that depend on it."""
successors: dict[str, set[str]] = {node.id: set() for node in workflow.nodes}
for edge in workflow.edges:
successors[edge.source_id].add(edge.target_id)
return successors

@staticmethod
def _check_node_ready(
node_id: str,
edges_by_target: dict[str, dict[str, "Edge"]],
node_outputs: dict[str, DataMapping],
) -> DataMapping | None:
"""Check if a node is ready and return its input, or None if not ready."""
node_edges = edges_by_target.get(node_id)
if not node_edges:
return {}
node_input: dict[str, object] = {}
for target_key, edge in node_edges.items():
source_output = node_outputs.get(edge.source_id)
if source_output is None:
return None
node_input[target_key] = source_output[edge.source_key]
return node_input # type: ignore

@override
async def execute(
self,
Expand All @@ -71,10 +99,22 @@ async def execute(
# Track nodes that yielded (node_id -> yield message)
node_yields: dict[str, str] = {}

try:
ready_nodes = {workflow.input_node.id: input}
# Build successor map for incremental ready-node tracking
successors = self._build_successor_map(workflow)
edges_by_target = workflow.edges_by_target
_check_ready = self._check_node_ready

while len(ready_nodes) > 0 or len(pending_retry) > 0:
try:
# Seed with ALL initially ready nodes (input node + nodes with no incoming edges)
ready_nodes: dict[str, DataMapping] = {}
for node in workflow.nodes:
nid = node.id
if nid == workflow.input_node.id:
ready_nodes[nid] = input
elif not edges_by_target.get(nid):
ready_nodes[nid] = {}

while ready_nodes or pending_retry:
# Check if any pending retries are now ready
for node_id in list(pending_retry.keys()):
state = retry_tracker.get_state(node_id)
Expand All @@ -88,7 +128,7 @@ async def execute(
await asyncio.sleep(wait_time.total_seconds())
continue

if len(ready_nodes) == 0:
if not ready_nodes:
break

node_id, node_input = ready_nodes.popitem()
Expand All @@ -99,11 +139,13 @@ async def execute(
if limiter is not None:
await limiter.acquire()

expanded = False
try:
node_result = await node(context, node_input)

if isinstance(node_result, Workflow):
workflow = workflow.expand_node(node_id, node_result)
expanded = True
else:
node_outputs[node.id] = node_result

Expand Down Expand Up @@ -145,14 +187,26 @@ async def execute(
if limiter is not None:
limiter.release()

ready_nodes = {
node_id: node_input
for node_id, node_input in workflow.get_ready_nodes(
node_outputs=node_outputs,
partial_results=ready_nodes,
).items()
if node_id not in node_yields
}
if expanded:
# After expansion, rebuild maps and do a full scan
successors = self._build_successor_map(workflow)
edges_by_target = workflow.edges_by_target
ready_nodes = {
node_id: node_input
for node_id, node_input in workflow.get_ready_nodes(
node_outputs=node_outputs,
partial_results=ready_nodes,
).items()
if node_id not in node_yields
}
else:
# Incremental: only check successors of the just-completed node
for succ_id in successors.get(node_id, ()):
if succ_id in node_outputs or succ_id in ready_nodes or succ_id in node_yields:
continue
succ_input = _check_ready(succ_id, edges_by_target, node_outputs)
if succ_input is not None:
ready_nodes[succ_id] = succ_input

if len(node_yields) > 0:
partial_output = await workflow.get_output(
Expand Down
Loading