-
Notifications
You must be signed in to change notification settings - Fork 288
Add dynamic workflow tool #3426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
d91a5e8
Add dynamic workflow tool
openhands-agent 181521b
Update dynamic workflow example
openhands-agent 70d18ca
Address dynamic workflow review feedback
openhands-agent 3b947f9
Improve workflow failure diagnostics
openhands-agent 41bef42
Tighten workflow script boundaries
openhands-agent 83a85ee
Merge branch 'main' into redo-dynamic-workflow-mvp
neubig 5222816
chore: address PR review feedback (#3426)
openhands-agent c35e9d5
chore: address PR review feedback (#3426)
openhands-agent cd0aeb1
chore: address PR review feedback (#3426)
openhands-agent d100fd8
chore: address PR review feedback (#3426)
openhands-agent ab75043
chore: restore type() to _safe_globals (#3426)
openhands-agent 9597084
chore: address PR review feedback (#3426)
openhands-agent bb8d812
chore: address PR review feedback (#3426)
openhands-agent 2515653
chore: address PR review feedback (#3426)
openhands-agent 76f4ee5
chore: address PR review feedback (#3426)
openhands-agent c8c50ee
chore: address PR review feedback (#3426)
openhands-agent 3a480eb
chore: address PR review feedback (#3426)
openhands-agent aeafbcb
chore: address PR review feedback (#3426)
openhands-agent e33dd39
chore: address PR review feedback (#3426)
openhands-agent b722988
chore: address PR review feedback (#3426)
openhands-agent 05b78ee
chore: address PR review feedback (#3426)
openhands-agent da12413
chore: address PR review feedback (#3426)
openhands-agent fe4a34e
chore: address PR review feedback (#3426)
openhands-agent 97ec7a2
chore: address PR review feedback (#3426)
openhands-agent 4949144
chore: address PR review feedback (#3426)
openhands-agent a60cdbe
chore: address PR review feedback (#3426)
openhands-agent eb267fa
chore: address PR review feedback (#3426)
openhands-agent 5bc40d2
chore: address PR review feedback (#3426)
openhands-agent 8a484b7
chore: address PR review feedback (#3426)
openhands-agent fa05bbc
chore: address workflow review feedback
openhands-agent 16f873b
chore: address workflow code review style nits
openhands-agent File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| """Dynamic workflow tool example. | ||
|
|
||
| This example demonstrates the intended workflow shape: | ||
|
|
||
| 1. The parent agent writes a Python workflow script. | ||
| 2. The parent agent calls the workflow tool with that generated script. | ||
| 3. The workflow fans out sub-agents to audit test coverage by project area. | ||
| 4. A reducer sub-agent summarizes the repo-wide coverage risks. | ||
| """ | ||
|
|
||
| import os | ||
| from pathlib import Path | ||
|
|
||
| from openhands.sdk import LLM, Agent, AgentContext, Conversation, Tool | ||
| from openhands.sdk.context import Skill | ||
| from openhands.sdk.subagent import register_agent_if_absent | ||
| from openhands.tools.delegate import DelegationVisualizer | ||
| from openhands.tools.file_editor import FileEditorTool | ||
| from openhands.tools.terminal import TerminalTool | ||
| from openhands.tools.workflow import WorkflowToolSet | ||
|
|
||
|
|
||
| llm = LLM( | ||
| model=os.getenv("LLM_MODEL", "gpt-5.5"), | ||
| api_key=os.getenv("LLM_API_KEY"), | ||
| base_url=os.getenv("LLM_BASE_URL"), | ||
| usage_id="dynamic-workflow-demo", | ||
| ) | ||
|
|
||
|
|
||
| # Sub-agent used by the generated workflow. | ||
| def create_coverage_auditor(llm: LLM) -> Agent: | ||
| return Agent( | ||
| llm=llm, | ||
| tools=[ | ||
| Tool(name=TerminalTool.name), | ||
| Tool(name=FileEditorTool.name), | ||
| ], | ||
| agent_context=AgentContext( | ||
| skills=[ | ||
| Skill( | ||
| name="coverage_audit", | ||
| content=( | ||
| "You audit whether source code has meaningful test " | ||
| "coverage. Use read-only inspection commands and file " | ||
| "views. Compare source modules against the matching " | ||
| "tests under tests/sdk, tests/tools, tests/workspace, " | ||
| "or tests/agent_server. Identify risky untested " | ||
| "behavior, and recommend the " | ||
| "next tests to add. Use at most three tool calls, " | ||
| "avoid broad dumps, and do not edit files." | ||
| ), | ||
| trigger=None, | ||
| ) | ||
| ], | ||
| system_message_suffix=( | ||
| "Return a concise coverage assessment with evidence, gaps, " | ||
| "and recommended tests. Keep command output under 200 lines " | ||
| "and do not modify the repository." | ||
| ), | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| register_agent_if_absent( | ||
| name="coverage_auditor", | ||
| factory_func=create_coverage_auditor, | ||
| description="Audits test coverage quality for one project area.", | ||
| ) | ||
|
|
||
| # The parent agent has the workflow tool. It is responsible for writing the | ||
| # workflow script and then calling the tool with that generated Python code. | ||
| parent_agent = Agent( | ||
| llm=llm, | ||
| tools=[Tool(name=WorkflowToolSet.name)], | ||
| agent_context=AgentContext( | ||
| skills=[ | ||
| Skill( | ||
| name="workflow_author", | ||
| content=( | ||
| "When a task benefits from parallel sub-agents, write a " | ||
| "Python workflow script with `async def main(wf):` and call " | ||
| "the workflow tool. Keep intermediate findings inside the " | ||
| "workflow and return only the reducer's final report. " | ||
| "Prefer bounded prompts and `max_concurrency=2` for " | ||
| "examples that inspect repositories." | ||
| ), | ||
| trigger=None, | ||
| ) | ||
| ] | ||
| ), | ||
| ) | ||
|
|
||
| conversation = Conversation( | ||
| agent=parent_agent, | ||
| workspace=Path.cwd(), | ||
| visualizer=DelegationVisualizer(name="CoverageWorkflow"), | ||
| max_iteration_per_run=6, # increase if more turns needed to write the script | ||
| ) | ||
|
|
||
| conversation.send_message( | ||
| "Write and run a dynamic workflow that audits whether test coverage is " | ||
| "good across this repository. In the workflow code you generate, create " | ||
| "one item for each project area: `openhands-sdk/openhands/sdk`, " | ||
| "`openhands-tools/openhands/tools`, " | ||
| "`openhands-workspace/openhands/workspace`, and " | ||
| "`openhands-agent-server/openhands/agent_server`. Use `wf.map_agents` " | ||
| "with `max_concurrency=2` to fan out one `coverage_auditor` sub-agent " | ||
| "per area. Each sub-agent should inspect source files and matching tests " | ||
| "under `tests/sdk`, `tests/tools`, `tests/workspace`, or " | ||
| "`tests/agent_server` with at most three read-only commands or file views, " | ||
| "avoid running the full test suite, and report coverage strengths, risky " | ||
| "gaps, and the " | ||
| "next tests to add. Finally use `wf.reduce_agent` with " | ||
| "`coverage_auditor` to synthesize a " | ||
| "repo-wide coverage report with the highest-priority gaps. Return the " | ||
| "final report to me." | ||
| ) | ||
| conversation.run() | ||
|
|
||
| cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost | ||
| print(f"EXAMPLE_COST: {cost}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| """Dynamic workflow tool for sub-agent orchestration.""" | ||
|
|
||
| from openhands.tools.workflow.definition import ( | ||
| WorkflowAction, | ||
| WorkflowObservation, | ||
| WorkflowTool, | ||
| WorkflowToolSet, | ||
| ) | ||
| from openhands.tools.workflow.impl import ( | ||
| WorkflowContext, | ||
| WorkflowExecutor, | ||
| WorkflowScriptError, | ||
| ) | ||
|
|
||
|
|
||
| __all__ = [ | ||
| "WorkflowAction", | ||
| "WorkflowContext", | ||
| "WorkflowExecutor", | ||
| "WorkflowObservation", | ||
| "WorkflowScriptError", | ||
| "WorkflowTool", | ||
| "WorkflowToolSet", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| """Dynamic workflow tool definitions.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from typing import TYPE_CHECKING, Final, Literal | ||
|
|
||
| from pydantic import Field | ||
|
|
||
| from openhands.sdk.tool import ( | ||
| Action, | ||
| Observation, | ||
| ToolAnnotations, | ||
| ToolDefinition, | ||
| register_tool, | ||
| ) | ||
|
|
||
|
|
||
| if TYPE_CHECKING: | ||
| from openhands.sdk.conversation.state import ConversationState | ||
| from openhands.tools.workflow.impl import WorkflowExecutor | ||
|
|
||
|
|
||
| class WorkflowAction(Action): | ||
| """Schema for running a Python dynamic workflow script.""" | ||
|
|
||
| name: str = Field(description="A short name for this workflow run.") | ||
| script: str = Field( | ||
| description=( | ||
| "Python workflow script to run. It must define `async def main(wf):` " | ||
| "and coordinate work only through the provided `wf` object." | ||
| ) | ||
| ) | ||
| max_concurrency: int = Field( | ||
|
neubig marked this conversation as resolved.
neubig marked this conversation as resolved.
|
||
| default=8, | ||
| ge=1, | ||
| le=64, | ||
| description=( | ||
| "Maximum number of sub-agent tasks to run concurrently. " | ||
| "Consider 2–4 for LLM-heavy workflows to avoid hitting API rate limits." | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| class WorkflowObservation(Observation): | ||
| """Observation from a dynamic workflow run.""" | ||
|
|
||
| name: str = Field(description="The workflow name that was executed.") | ||
| status: Literal["completed", "error"] = Field( | ||
| description="The workflow execution status." | ||
| ) | ||
|
|
||
|
|
||
| _WORKFLOW_DESCRIPTION: Final[ | ||
| str | ||
| ] = """Run a dynamic workflow written as Python orchestration code. | ||
|
|
||
| Use this tool for large tasks that benefit from parallel sub-agents, such as | ||
| codebase-wide audits, independent plan reviews, security sweeps, or discovery | ||
| work where intermediate results should stay outside the main conversation. | ||
|
|
||
| Provide a Python script that defines exactly this entry point: | ||
|
|
||
| ```python | ||
| async def main(wf): | ||
| ... | ||
| ``` | ||
|
|
||
| The script coordinates sub-agents through the `wf` object. It should not read or | ||
| write files, run shell commands, or perform the engineering work directly. | ||
| Sub-agents should do that work through their normal OpenHands tools and security | ||
| policy. Scripts should use only the documented `wf` methods; private `wf` | ||
| attributes are rejected. Large reducer inputs may be truncated before being sent | ||
| to the reducer sub-agent. | ||
|
|
||
| Available `wf` methods: | ||
| - `await wf.run_agent(prompt, subagent_type="general-purpose", description=None)` | ||
| - `await wf.map_agents(items, prompt, subagent_type="general-purpose", | ||
| max_concurrency=None, description=None)` | ||
| - `await wf.reduce_agent(items, prompt, subagent_type="general-purpose", | ||
| description=None)` | ||
| - `wf.flatten(values)` — flatten one level of nesting (not recursive) | ||
|
|
||
| `subagent_type` must be a sub-agent type registered in the parent application. | ||
| Use the same type names you registered when building your agent. | ||
|
|
||
| Scripts must use only the documented `wf` methods listed above; calling | ||
| `wf.close()` or any other undocumented attribute is not supported. | ||
|
neubig marked this conversation as resolved.
|
||
|
|
||
| `print()` is available for debugging but writes to the server logs, not to | ||
| the workflow observation seen by the LLM; use the return value of `main()` to | ||
| surface results. | ||
|
|
||
| If one or more `map_agents` items fail, the whole call raises an | ||
| `ExceptionGroup`. The name `ExceptionGroup` is not available by name in the | ||
| workflow sandbox, so scripts cannot use `except*` for selective group handling. | ||
| A plain `except Exception` will still catch the entire group. To handle partial | ||
| failures and collect all results, design sub-agent prompts to return an error | ||
| sentinel value instead of raising. | ||
|
|
||
| `map_agents` accepts either a callable prompt, such as | ||
| `lambda item: f"Review this finding: {item}"`, or a string template containing | ||
| `{item}`. | ||
|
|
||
| Example: | ||
| ```python | ||
| async def main(wf): | ||
| strategies = ["minimal fix", "test-first", "security-focused"] | ||
| plans = await wf.map_agents( | ||
| items=strategies, | ||
| subagent_type="general-purpose", | ||
| max_concurrency=3, | ||
|
neubig marked this conversation as resolved.
|
||
| prompt=lambda strategy: f"Create a plan using this strategy: {strategy}", | ||
| ) | ||
| critiques = await wf.map_agents( | ||
| items=plans, | ||
| subagent_type="code-reviewer", | ||
| prompt=lambda plan: f"Adversarially critique this plan: {plan}", | ||
| ) | ||
| return await wf.reduce_agent( | ||
| items={"plans": plans, "critiques": critiques}, | ||
| prompt="Synthesize the safest and simplest final plan.", | ||
| ) | ||
| ``` | ||
|
|
||
| This MVP executes generated Python in-process after best-effort validation. Treat | ||
| running a workflow as approving generated code execution. | ||
| """ | ||
|
|
||
|
|
||
| class WorkflowTool(ToolDefinition[WorkflowAction, WorkflowObservation]): | ||
| """Low-level tool for explicit executor injection. | ||
|
|
||
| Prefer ``WorkflowToolSet`` for standard SDK auto-create usage. | ||
| Use ``WorkflowTool`` when you need to inject a custom executor | ||
| (e.g., in tests or extensions). | ||
|
VascoSch92 marked this conversation as resolved.
|
||
| """ | ||
|
|
||
| @classmethod | ||
| def create( | ||
| cls, | ||
| conv_state: ConversationState | None = None, # noqa: ARG003 | ||
| executor: WorkflowExecutor | None = None, | ||
| description: str = _WORKFLOW_DESCRIPTION, | ||
| ) -> Sequence[WorkflowTool]: | ||
| from openhands.tools.workflow.impl import WorkflowExecutor | ||
|
|
||
| return [ | ||
| cls( | ||
| action_type=WorkflowAction, | ||
| observation_type=WorkflowObservation, | ||
| description=description, | ||
| annotations=ToolAnnotations( | ||
| title="workflow", | ||
| readOnlyHint=False, | ||
| destructiveHint=True, | ||
| idempotentHint=False, | ||
| openWorldHint=True, | ||
| ), | ||
| executor=executor if executor is not None else WorkflowExecutor(), | ||
| ) | ||
| ] | ||
|
|
||
|
|
||
| class WorkflowToolSet(ToolDefinition[WorkflowAction, WorkflowObservation]): | ||
| """Tool set that creates the dynamic workflow tool.""" | ||
|
|
||
| @classmethod | ||
| def create( | ||
| cls, | ||
| conv_state: ConversationState, # noqa: ARG003 | ||
| ) -> Sequence[WorkflowTool]: | ||
| from openhands.tools.workflow.impl import WorkflowExecutor | ||
|
|
||
| return WorkflowTool.create(executor=WorkflowExecutor()) | ||
|
neubig marked this conversation as resolved.
|
||
|
|
||
|
|
||
| register_tool(WorkflowToolSet.name, WorkflowToolSet) | ||
|
neubig marked this conversation as resolved.
neubig marked this conversation as resolved.
|
||
| register_tool(WorkflowTool.name, WorkflowTool) | ||
|
neubig marked this conversation as resolved.
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.