diff --git a/autoresearch/results.tsv b/autoresearch/results.tsv index 0fda31c..00bb360 100644 --- a/autoresearch/results.tsv +++ b/autoresearch/results.tsv @@ -15,3 +15,19 @@ experiment_id timestamp description total_time_s peak_memory_mb correctness kept 013 2026-03-21T00:13:00 cache lazy Workflow import in Node.__call__ 0.0389 0.06 8/8 yes 014 2026-03-21T00:14:00 optimize AddNode.run with __dict__ access 0.0374 0.06 8/8 yes 015 2026-03-21T00:15:00 add large_100, foreach_expand, retry_chain, yield_resume benchmarks 0.4151 1.06 16/16 yes +016 2026-03-21T01:00:00 skip DAG validation in with_namespace/expand_node/ForEach via _construct_trusted 0.4001 0.41 16/16 yes +017 2026-03-21T01:01:00 optimize Edge/Node with_namespace to skip re-validation 0.3971 0.41 16/16 yes +018 2026-03-21T01:02:00 optimize ForEachNode.run loop: skip with_namespace, Edge.model_construct, pre-compute templates 0.3732 0.39 16/16 yes +019 2026-03-21T01:03:00 use Edge.model_construct for all edge creation in ForEachNode.run 0.3717 0.39 16/16 yes +020 2026-03-21T01:04:00 preserve id-independent cached properties in Node.with_namespace 0.3217 0.31 16/16 yes +021 2026-03-21T01:05:00 successor-based incremental get_ready_nodes 0.3217 0.31 failed no +022 2026-03-21T01:06:00 use model_construct in _cast_input fast path 0.3231 0.31 16/16 yes +023 2026-03-21T01:07:00 guard logger.info with isEnabledFor in __call__ 0.3327 0.31 16/16 no +024 2026-03-21T01:08:00 use Edge.model_construct in expand_node 0.3230 0.31 16/16 yes +025 2026-03-21T01:09:00 incremental ready-node tracking in topological executor via successor map 0.2624 0.32 16/16 yes +026 2026-03-21T01:10:00 incremental ready-node tracking in parallel executor (regressed parallel) 0.2638 0.32 16/16 no +027 2026-03-21T01:11:00 use model_construct in AddNode.run return 0.2748 0.32 16/16 no +028 2026-03-21T01:12:00 fast-path ShouldRetry in Node.__call__ to skip traceback formatting 0.2445 0.32 16/16 yes +029 2026-03-21T01:13:00 dispatch all initially-ready nodes in parallel executor 0.2423 0.32 16/16 yes +030 2026-03-21T01:14:00 use defaultdict in edges_by_target 0.2405 0.32 16/16 yes +031 2026-03-21T01:15:00 cache AddNode input_type by arity (module-level) 0.2450 0.33 16/16 no diff --git a/src/workflow_engine/core/edge.py b/src/workflow_engine/core/edge.py index 8c141b9..2911ae5 100644 --- a/src/workflow_engine/core/edge.py +++ b/src/workflow_engine/core/edge.py @@ -60,9 +60,11 @@ def validate_types(self, source: Node, target: Node): ) def with_namespace(self, namespace: str) -> Self: - return self.model_update( + return self.__class__.model_construct( source_id=f"{namespace}/{self.source_id}", + source_key=self.source_key, target_id=f"{namespace}/{self.target_id}", + target_key=self.target_key, ) diff --git a/src/workflow_engine/core/node.py b/src/workflow_engine/core/node.py index ccc182f..03c7ed3 100644 --- a/src/workflow_engine/core/node.py +++ b/src/workflow_engine/core/node.py @@ -33,7 +33,7 @@ SEMANTIC_VERSION_PATTERN, parse_semantic_version, ) -from .error import NodeException, ShouldYield, UserException +from .error import NodeException, ShouldRetry, ShouldYield, UserException from .values import ( Data, DataMapping, @@ -241,6 +241,13 @@ async def display_name(self, context: "Context") -> str: """ return self.TYPE_INFO.display_name + # Cached properties that do NOT depend on node ID and should be + # preserved across with_namespace copies. + _ID_INDEPENDENT_CACHED = frozenset({ + "input_type", "output_type", "input_fields", "output_fields", + "input_schema", "output_schema", + }) + def with_namespace(self, namespace: str) -> Self: """ Create a copy of this node with a namespaced ID. @@ -251,7 +258,22 @@ def with_namespace(self, namespace: str) -> Self: Returns: A new Node with ID '{namespace}/{self.id}' """ - return self.model_update(id=f"{namespace}/{self.id}") + # Fast path: use model_copy and directly set the new id, + # skipping full re-validation since only the id changes. + copy = self.model_copy(update={"id": f"{namespace}/{self.id}"}) + # Clear cached properties that depend on the id, but keep + # id-independent ones (input_type, output_type, etc.) to avoid + # expensive re-computation (e.g., create_model calls). + model_field_names = set(self.__class__.model_fields.keys()) + id_independent = self._ID_INDEPENDENT_CACHED + copy_dict = copy.__dict__ + cached_keys = [ + k for k in copy_dict + if k not in model_field_names and k not in id_independent + ] + for key in cached_keys: + del copy_dict[key] + return copy # -------------------------------------------------------------------------- # VERSIONING @@ -373,6 +395,10 @@ async def _cast_input( casted_input[key] = casted_value try: + # Fast path: if all input fields are present and type-checked, + # skip full model_validate and use model_construct directly. + if not cast_tasks and len(casted_input) == len(input_fields): + return self.input_type.model_construct(**casted_input) return self.input_type.model_validate(casted_input) except ValidationError as e: raise UserException( @@ -433,6 +459,10 @@ async def __call__( return output except ShouldYield: raise + except ShouldRetry as e: + # ShouldRetry is an expected transient failure handled by the + # execution loop — skip expensive traceback formatting. + raise NodeException(self.id) from e except Exception as e: # In subclasses, you don't have to worry about logging the error, # since it'll be logged here. diff --git a/src/workflow_engine/core/workflow.py b/src/workflow_engine/core/workflow.py index 31b6e9e..4afa8f7 100644 --- a/src/workflow_engine/core/workflow.py +++ b/src/workflow_engine/core/workflow.py @@ -1,5 +1,6 @@ # workflow_engine/core/workflow.py import asyncio +from collections import defaultdict from collections.abc import Mapping, Sequence from functools import cached_property from typing import TYPE_CHECKING, Type @@ -46,15 +47,14 @@ def edges_by_target(self) -> Mapping[str, Mapping[str, Edge]]: A mapping from each node and input key to the (unique) edge that targets the node at that key. """ - edges_by_target: dict[str, dict[str, Edge]] = { - node.id: {} for node in self.nodes - } + edges_by_target: defaultdict[str, dict[str, Edge]] = defaultdict(dict) for edge in self.edges: - if edge.target_key in edges_by_target[edge.target_id]: + target_edges = edges_by_target[edge.target_id] + if edge.target_key in target_edges: raise ValueError( f"In-edge to {edge.target_id}.{edge.target_key} is already in the graph" ) - edges_by_target[edge.target_id][edge.target_key] = edge + target_edges[edge.target_key] = edge return edges_by_target @cached_property @@ -142,6 +142,31 @@ def _validate_no_id_prefix_collisions(self): ) return self + @classmethod + def _construct_trusted( + cls, + *, + input_node: "InputNode", + inner_nodes: "Sequence[Node]", + output_node: "OutputNode", + edges: "Sequence[Edge]", + ) -> "Workflow": + """ + Construct a Workflow without running model validators (DAG check, + prefix collision check, nx_graph construction). + + Only use this when the inputs are known to form a valid DAG — e.g., + when the workflow is derived from an already-validated workflow via + namespace prefixing or structural expansion. + """ + obj = cls.model_construct( + input_node=input_node, + inner_nodes=inner_nodes, + output_node=output_node, + edges=edges, + ) + return obj + def get_ready_nodes( self, node_outputs: Mapping[str, DataMapping] | None = None, @@ -301,15 +326,20 @@ def expand_node( ] + list(subgraph.nodes) new_edges: list[Edge] = list(subgraph.edges) + subgraph_input_id = subgraph.input_node.id + subgraph_output_id = subgraph.output_node.id + subgraph_input_fields = subgraph.input_node.input_fields + + _edge = Edge.model_construct for edge in self.edges: if edge.target_id == node_id: # Only create the edge if the subgraph's input_node has this field - if edge.target_key in subgraph.input_node.input_fields: + if edge.target_key in subgraph_input_fields: new_edges.append( - Edge( + _edge( source_id=edge.source_id, source_key=edge.source_key, - target_id=subgraph.input_node.id, + target_id=subgraph_input_id, target_key=edge.target_key, ) ) @@ -317,8 +347,8 @@ def expand_node( # (e.g., control fields like 'condition' for IfElseNode) elif edge.source_id == node_id: new_edges.append( - Edge( - source_id=subgraph.output_node.id, + _edge( + source_id=subgraph_output_id, source_key=edge.source_key, target_id=edge.target_id, target_key=edge.target_key, @@ -327,7 +357,7 @@ def expand_node( else: new_edges.append(edge) - return Workflow( + return Workflow._construct_trusted( inner_nodes=new_inner_nodes, input_node=self.input_node, output_node=self.output_node, @@ -346,8 +376,9 @@ def with_namespace(self, namespace: str) -> "Workflow": Returns: A new Workflow with all node IDs prefixed with '{namespace}/' """ - # Create namespaced nodes - return Workflow( + # Create namespaced nodes — skip validation since we're just + # prefixing IDs on a known-valid workflow + return Workflow._construct_trusted( input_node=self.input_node.with_namespace(namespace), inner_nodes=[node.with_namespace(namespace) for node in self.inner_nodes], output_node=self.output_node.with_namespace(namespace), diff --git a/src/workflow_engine/execution/parallel.py b/src/workflow_engine/execution/parallel.py index 9ae1a5f..7d7a62e 100644 --- a/src/workflow_engine/execution/parallel.py +++ b/src/workflow_engine/execution/parallel.py @@ -110,7 +110,11 @@ async def execute( try: # Initial dispatch - start all initially ready nodes - ready_nodes = {workflow.input_node.id: input} + # (input node + nodes with no incoming edges like constants) + ready_nodes = workflow.get_ready_nodes(node_outputs={}) + ready_nodes = dict(ready_nodes) + # Override input node's data with the actual input + ready_nodes[workflow.input_node.id] = input for node_id, node_input in ready_nodes.items(): task = asyncio.create_task( self._execute_node( diff --git a/src/workflow_engine/execution/topological.py b/src/workflow_engine/execution/topological.py index b45ef77..640180a 100644 --- a/src/workflow_engine/execution/topological.py +++ b/src/workflow_engine/execution/topological.py @@ -50,6 +50,34 @@ def _get_node_max_retries(self, node) -> int | None: return node.TYPE_INFO.max_retries return None + @staticmethod + def _build_successor_map( + workflow: Workflow, + ) -> dict[str, set[str]]: + """Build a mapping from each node to the set of nodes that depend on it.""" + successors: dict[str, set[str]] = {node.id: set() for node in workflow.nodes} + for edge in workflow.edges: + successors[edge.source_id].add(edge.target_id) + return successors + + @staticmethod + def _check_node_ready( + node_id: str, + edges_by_target: dict[str, dict[str, "Edge"]], + node_outputs: dict[str, DataMapping], + ) -> DataMapping | None: + """Check if a node is ready and return its input, or None if not ready.""" + node_edges = edges_by_target.get(node_id) + if not node_edges: + return {} + node_input: dict[str, object] = {} + for target_key, edge in node_edges.items(): + source_output = node_outputs.get(edge.source_id) + if source_output is None: + return None + node_input[target_key] = source_output[edge.source_key] + return node_input # type: ignore + @override async def execute( self, @@ -71,10 +99,22 @@ async def execute( # Track nodes that yielded (node_id -> yield message) node_yields: dict[str, str] = {} - try: - ready_nodes = {workflow.input_node.id: input} + # Build successor map for incremental ready-node tracking + successors = self._build_successor_map(workflow) + edges_by_target = workflow.edges_by_target + _check_ready = self._check_node_ready - while len(ready_nodes) > 0 or len(pending_retry) > 0: + try: + # Seed with ALL initially ready nodes (input node + nodes with no incoming edges) + ready_nodes: dict[str, DataMapping] = {} + for node in workflow.nodes: + nid = node.id + if nid == workflow.input_node.id: + ready_nodes[nid] = input + elif not edges_by_target.get(nid): + ready_nodes[nid] = {} + + while ready_nodes or pending_retry: # Check if any pending retries are now ready for node_id in list(pending_retry.keys()): state = retry_tracker.get_state(node_id) @@ -88,7 +128,7 @@ async def execute( await asyncio.sleep(wait_time.total_seconds()) continue - if len(ready_nodes) == 0: + if not ready_nodes: break node_id, node_input = ready_nodes.popitem() @@ -99,11 +139,13 @@ async def execute( if limiter is not None: await limiter.acquire() + expanded = False try: node_result = await node(context, node_input) if isinstance(node_result, Workflow): workflow = workflow.expand_node(node_id, node_result) + expanded = True else: node_outputs[node.id] = node_result @@ -145,14 +187,26 @@ async def execute( if limiter is not None: limiter.release() - ready_nodes = { - node_id: node_input - for node_id, node_input in workflow.get_ready_nodes( - node_outputs=node_outputs, - partial_results=ready_nodes, - ).items() - if node_id not in node_yields - } + if expanded: + # After expansion, rebuild maps and do a full scan + successors = self._build_successor_map(workflow) + edges_by_target = workflow.edges_by_target + ready_nodes = { + node_id: node_input + for node_id, node_input in workflow.get_ready_nodes( + node_outputs=node_outputs, + partial_results=ready_nodes, + ).items() + if node_id not in node_yields + } + else: + # Incremental: only check successors of the just-completed node + for succ_id in successors.get(node_id, ()): + if succ_id in node_outputs or succ_id in ready_nodes or succ_id in node_yields: + continue + succ_input = _check_ready(succ_id, edges_by_target, node_outputs) + if succ_input is not None: + ready_nodes[succ_id] = succ_input if len(node_yields) > 0: partial_output = await workflow.get_output( diff --git a/src/workflow_engine/nodes/iteration.py b/src/workflow_engine/nodes/iteration.py index 3da3f31..cdce97f 100644 --- a/src/workflow_engine/nodes/iteration.py +++ b/src/workflow_engine/nodes/iteration.py @@ -140,6 +140,8 @@ async def run(self, context: Context, input: SequenceData) -> Workflow: n = len(input.sequence) input_node, output_node = self._build_input_output_nodes(n) has_no_output = self._has_no_output() + has_single_input = self._has_single_input() + has_single_output = self._has_single_output() expand_element_type = self._input_element_type() expand = ExpandSequenceNode.from_length( @@ -148,12 +150,13 @@ async def run(self, context: Context, input: SequenceData) -> Workflow: element_type=expand_element_type, ) + _edge = Edge.model_construct # local alias for speed inner_nodes: list[Node] = [expand] edges: list[Edge] = [ - Edge.from_nodes( - source=input_node, + _edge( + source_id=input_node.id, source_key="sequence", - target=expand, + target_id=expand.id, target_key="sequence", ), ] @@ -167,77 +170,105 @@ async def run(self, context: Context, input: SequenceData) -> Workflow: ) inner_nodes.append(gather) edges.append( - Edge.from_nodes( - source=gather, + _edge( + source_id=gather.id, source_key="sequence", - target=output_node, + target_id=output_node.id, target_key="sequence", ) ) + # Pre-compute adapter templates (created once, namespaced per iteration) + needs_input_adapter = not has_single_input + needs_output_adapter = not has_single_output and not has_no_output + input_adapter_template = ( + ExpandDataNode.from_data_type( + id="input_adapter", + data_type=self.workflow.input_type, + ) + if needs_input_adapter + else None + ) + output_adapter_template = ( + GatherDataNode.from_data_type( + id="output_adapter", + data_type=self.workflow.output_type, + ) + if needs_output_adapter + else None + ) + + # Cache expand/gather IDs + expand_id = expand.id + gather_id = gather.id if gather is not None else None + + # Cache the base workflow's input/output node IDs (before namespacing) + base_input_id = self.workflow.input_node.id + base_output_id = self.workflow.output_node.id + base_edges = self.workflow.edges + base_inner_nodes = self.workflow.inner_nodes + for i in range(n): namespace = f"element_{i}" - item_workflow = self.workflow.with_namespace(namespace) + prefix = f"{namespace}/" + + # Namespace inner nodes directly + namespaced_inner = [node.with_namespace(namespace) for node in base_inner_nodes] + inner_nodes.extend(namespaced_inner) + + # Namespace the input/output node IDs + ns_input_id = f"{prefix}{base_input_id}" + ns_output_id = f"{prefix}{base_output_id}" input_adapter = ( - ExpandDataNode.from_data_type( - id="input_adapter", - data_type=self.workflow.input_type, - ).with_namespace(namespace) - if not self._has_single_input() + input_adapter_template.with_namespace(namespace) + if input_adapter_template is not None else None ) output_adapter = ( - GatherDataNode.from_data_type( - id="output_adapter", - data_type=self.workflow.output_type, - ).with_namespace(namespace) - if self._has_single_output() is False and has_no_output is False + output_adapter_template.with_namespace(namespace) + if output_adapter_template is not None else None ) if input_adapter is not None: inner_nodes.append(input_adapter) - inner_nodes.extend(item_workflow.inner_nodes) - if output_adapter is not None: - inner_nodes.append(output_adapter) - - if input_adapter is not None: edges.append( - Edge.from_nodes( - source=expand, + _edge( + source_id=expand_id, source_key=expand.key(i), - target=input_adapter, + target_id=input_adapter.id, target_key="data", ) ) + if output_adapter is not None: + inner_nodes.append(output_adapter) - for edge in item_workflow.edges: - if edge.source_id == item_workflow.input_node.id: - source_id = ( - input_adapter.id if input_adapter is not None else expand.id - ) - source_key = ( - edge.source_key if input_adapter is not None else expand.key(i) - ) + # Rewire edges from the base workflow + input_adapter_id = input_adapter.id if input_adapter is not None else None + output_adapter_id = output_adapter.id if output_adapter is not None else None + + for edge in base_edges: + # Determine source + if edge.source_id == base_input_id: + source_id = input_adapter_id if input_adapter_id is not None else expand_id + source_key = edge.source_key if input_adapter_id is not None else expand.key(i) else: - source_id = edge.source_id + source_id = f"{prefix}{edge.source_id}" source_key = edge.source_key - if edge.target_id == item_workflow.output_node.id: + + # Determine target + if edge.target_id == base_output_id: if has_no_output: continue - assert gather is not None - target_id = ( - output_adapter.id if output_adapter is not None else gather.id - ) - target_key = ( - edge.target_key if output_adapter is not None else gather.key(i) - ) + target_id = output_adapter_id if output_adapter_id is not None else gather_id + target_key = edge.target_key if output_adapter_id is not None else gather.key(i) else: - target_id = edge.target_id + target_id = f"{prefix}{edge.target_id}" target_key = edge.target_key + edges.append( - Edge( + _edge( source_id=source_id, source_key=source_key, target_id=target_id, @@ -248,15 +279,15 @@ async def run(self, context: Context, input: SequenceData) -> Workflow: if output_adapter is not None: assert gather is not None edges.append( - Edge.from_nodes( - source=output_adapter, + _edge( + source_id=output_adapter.id, source_key="data", - target=gather, + target_id=gather_id, target_key=gather.key(i), ) ) - return Workflow( + return Workflow._construct_trusted( input_node=input_node, inner_nodes=inner_nodes, output_node=output_node,