From 946c83303d4aad8323969a9994d325513d77e5d5 Mon Sep 17 00:00:00 2001 From: Moshi Wei Date: Wed, 24 Sep 2025 20:52:44 +0800 Subject: [PATCH 1/3] add basic a2a --- examples/Agent_client/a2a_client.py | 95 ++++++++ .../Agent_servers/Pydantic/openai_agent.py | 155 +++++++++++++ isek/utils/common.py | 217 ++++++++++++++++++ 3 files changed, 467 insertions(+) create mode 100644 examples/Agent_client/a2a_client.py create mode 100644 examples/Agent_servers/Pydantic/openai_agent.py create mode 100644 isek/utils/common.py diff --git a/examples/Agent_client/a2a_client.py b/examples/Agent_client/a2a_client.py new file mode 100644 index 0000000..f05ac1f --- /dev/null +++ b/examples/Agent_client/a2a_client.py @@ -0,0 +1,95 @@ +import httpx +import json +from pathlib import Path +import asyncio +from a2a.client import A2AClient +from a2a.types import ( + AgentCard, + Message, + MessageSendParams, + Part, + Role, + SendMessageRequest, + TextPart, + JSONRPCErrorResponse, +) +from mcp.server.fastmcp.utilities.logging import get_logger +from uuid import uuid4 +import dotenv + +dotenv.load_dotenv() + +logger = get_logger(__name__) +AGENT_CARDS_DIR = 'agent_cards' +MODEL = 'text-embedding-ada-002' +agent_urls = ['http://localhost:9999', # openai agent + 'http://localhost:10020', # trending agent + 'http://localhost:10021' # analyzer agent + ] +AGENT_CARD_WELL_KNOWN_PATH = "/.well-known/agent.json" + +async def get_agent_card_by_url(agent_url: str) -> dict: + """Fetch and cache agent cards from all configured agent URLs. + + The function uses a simple in-memory cache (``_agent_info_cache``) to avoid + fetching the ­same agent card repeatedly. If a card is not cached, it is + retrieved from the agent’s “well-known” endpoint and stored in the cache. + + Args: + agent_url: The URL of the agent to fetch the agent card from. + + Returns: + dict: ``AgentCard`` fully JSON-serialisable object for interoperability with the rest of the MCP pipeline. + """ + timeout_config = httpx.Timeout(10.0) # seconds + logger.debug("[get_agent_card_by_url],Fetching agent card for %s", agent_url) + async with httpx.AsyncClient(timeout=timeout_config) as httpx_client: + response = await httpx_client.get(f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}") + response.raise_for_status() + card_data = response.json() + return card_data + +async def main(query: str) -> str: + """Execute a task on a remote agent and return the aggregated response. + + Args: + query: The query to send to the agent. + + Returns: + str: The content of the task result. + """ + # Fetch the agent-card data and build a proper ``AgentCard`` instance. + agent_card_data = await get_agent_card_by_url(agent_urls[0]) + agent_card = AgentCard(**agent_card_data) + + logger.info("[execute_task],Executing task on agent %s with query: %s", agent_card.name, query) + + # Build request params + msg_params = MessageSendParams( + message=Message( + role=Role.user, + parts=[Part(TextPart(text=query))], + messageId=uuid4().hex # Add this line to include the required messageId field + ) + ) + + logger.debug("[execute_task] Sending non-streaming request …") + timeout_config = httpx.Timeout(10.0) + async with httpx.AsyncClient(timeout=timeout_config) as httpx_client: + client = A2AClient(httpx_client, agent_card=agent_card) + response = await client.send_message( + SendMessageRequest(id=str(uuid4().hex), params=msg_params) + ) + + if isinstance(response, JSONRPCErrorResponse): + logger.error("[execute_task] Error response received: %s", response) + return "Error: Unable to execute task" + + message_content = response.root.result.status.message + + logger.info("[execute_task] Task result content: %s", message_content) + + return message_content + +# Example usage +print(asyncio.run(main("Hello, how are you?"))) diff --git a/examples/Agent_servers/Pydantic/openai_agent.py b/examples/Agent_servers/Pydantic/openai_agent.py new file mode 100644 index 0000000..418857d --- /dev/null +++ b/examples/Agent_servers/Pydantic/openai_agent.py @@ -0,0 +1,155 @@ + +import asyncio +from typing import Any, AsyncGenerator +import os +import dotenv +from pydantic_ai import Agent +from a2a.server.tasks import TaskUpdater +from a2a.types import TaskState, AgentCard, AgentCapabilities, AgentSkill +from a2a.utils import new_agent_text_message, new_task +from isek.utils.common import ( + create_agent_a2a_server, + run_server, + log_agent_start, + log_agent_activity, + log_agent_request, + log_agent_response, + log_error, + log_system_event +) + +dotenv.load_dotenv() + +class OpenAIAgent: + """Simple OpenAI wrapper agent.""" + + def __init__(self): + """Initialize the OpenAI agent with a basic configuration.""" + self.agent = Agent( + model="gpt-4", + system_prompt="You are a helpful AI assistant that provides clear and concise responses." + ) + log_agent_activity("OpenAI Agent", "Initialized with GPT-4 model") + + async def stream(self, query: str, contextId: str) -> AsyncGenerator[dict[str, Any], None]: + """Stream the agent response.""" + try: + log_agent_request("OpenAI Agent", query, contextId) + + # Initial message + log_agent_activity("OpenAI Agent", "Starting request processing") + yield { + "is_task_complete": False, + "require_user_input": False, + "content": "Processing your request...", + } + + # Get response from OpenAI + log_agent_activity("OpenAI Agent", "Sending request to OpenAI") + response = await self.agent.run(query) + log_agent_activity("OpenAI Agent", "Received response from OpenAI") + + # Return final response + log_agent_response("OpenAI Agent", "Task completed successfully", contextId) + log_agent_response("OpenAI Agent", "content", response.output) + yield { + "is_task_complete": True, + "require_user_input": False, + "content": response.output, + } + + except Exception as e: + error_msg = f"Error during processing: {str(e)}" + log_error(error_msg) + yield { + "is_task_complete": False, + "require_user_input": True, + "content": f"Error: {str(e)}", + } + +class OpenAIAgentExecutor: + """Simple executor for the OpenAI Agent.""" + + def __init__(self): + self.agent = OpenAIAgent() + log_agent_activity("OpenAI Agent Executor", "Initialized") + + async def execute(self, context, event_queue): + """Execute the agent.""" + log_agent_activity("OpenAI Agent Executor", "Starting execution") + query = context.get_user_input() + log_agent_activity("OpenAI Agent Executor", f"Received execution request for context: {context.message.contextId}") + log_agent_activity("OpenAI Agent Executor", f"Query: {query}") + + task = context.current_task or new_task(context.message) + await event_queue.enqueue_event(task) + log_agent_activity("OpenAI Agent Executor", f"Created new task: {task.id}") + + updater = TaskUpdater(event_queue, task.id, task.contextId) + log_agent_activity("OpenAI Agent Executor", "Created task updater") + + try: + log_agent_activity("OpenAI Agent Executor", "Starting agent stream") + async for item in self.agent.stream(query, task.contextId): + log_agent_activity("OpenAI Agent Executor", f"Received stream item: {item}") + is_task_complete = item["is_task_complete"] + require_user_input = item["require_user_input"] + content = item["content"] + + message = new_agent_text_message(content, task.contextId, task.id) + + if is_task_complete: + log_agent_activity("OpenAI Agent Executor", f"Task {task.id} completed") + await updater.complete(message) + elif require_user_input: + log_agent_activity("OpenAI Agent Executor", f"Task {task.id} requires user input") + await updater.update_status(TaskState.input_required, message, final=True) + else: + log_agent_activity("OpenAI Agent Executor", f"Task {task.id} in progress") + await updater.update_status(TaskState.working, message) + + except Exception as e: + from a2a.utils.errors import ServerError + from a2a.types import InternalError + log_error(f"Error in executor: {str(e)}") + log_error(f"Error details: {type(e).__name__}") + raise ServerError(error=InternalError()) from e + +def create_agent(): + """Create and configure the OpenAI agent server.""" + log_system_event("Creating OpenAI Agent server") + agent_card = AgentCard( + name="OpenAI Agent", + url="http://localhost:9999", + description="Simple OpenAI GPT-4 wrapper agent", + version="1.0", + capabilities=AgentCapabilities( + streaming=True, + tools=True, # Enable tools support + task_execution=True # Enable task execution + ), + defaultInputModes=["text/plain"], + defaultOutputModes=["text/plain"], + skills=[ + AgentSkill( + id="general_assistant", + name="General Assistant", + description="Provides helpful responses to general queries using GPT-4", + tags=["general", "assistant", "gpt4"], + examples=[ + "What is machine learning?", + "How do I write a Python function?", + "Explain quantum computing" + ] + ) + ] + ) + return create_agent_a2a_server(OpenAIAgentExecutor(), agent_card) + +def main(): + """Run the OpenAI agent server.""" + log_agent_start("OpenAI Agent", 9999) + asyncio.run(run_server(create_agent, 9999, "OpenAI Agent")) + +if __name__ == "__main__": + main() diff --git a/isek/utils/common.py b/isek/utils/common.py new file mode 100644 index 0000000..68eba99 --- /dev/null +++ b/isek/utils/common.py @@ -0,0 +1,217 @@ +import uvicorn +import socket + +from a2a.server.apps import A2AStarletteApplication +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import AgentCard + + +# Color codes for colorful logging +class Colors: + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + PURPLE = "\033[35m" + ORANGE = "\033[33m" + PINK = "\033[95m" + # Additional colors for more variety + LIGHT_BLUE = "\033[94m" + LIGHT_GREEN = "\033[92m" + LIGHT_RED = "\033[91m" + LIGHT_YELLOW = "\033[93m" + LIGHT_MAGENTA = "\033[95m" + DARK_GRAY = "\033[90m" + LIGHT_GRAY = "\033[37m" + + +def create_agent_a2a_server( + agent_executor, agent_card: AgentCard +) -> A2AStarletteApplication: + request_handler = DefaultRequestHandler( + agent_executor=agent_executor, task_store=InMemoryTaskStore() + ) + + app = A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler) + return app + + +async def run_server(create_agent_function, port: int, name: str): + try: + app = create_agent_function() + + config = uvicorn.Config( + app.build(), host="127.0.0.1", port=port, log_level="error", loop="asyncio" + ) + + server = uvicorn.Server(config) + + log_a2a_api_call( + "server.serve()", f"server: {name}, port: {port}, host: 127.0.0.1" + ) + await server.serve() + except Exception as e: + log_error(f"run_server() error: {e} - name: {name}, port: {port}") + + +def is_port_in_use(port: int, host: str = "127.0.0.1") -> bool: + """Return True if a TCP port is already bound on the given host.""" + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(0.2) + return sock.connect_ex((host, port)) == 0 + except Exception: + return False + + +def log_a2a_protocol( + message: str, direction: str = "→", sender: str = "", receiver: str = "" +): + """Log A2A protocol messages with special formatting.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = ( + f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + ) + + if direction == "→": + if sender and receiver: + print( + f"{Colors.LIGHT_MAGENTA}[A2A OUT]{Colors.ENDC} {caller_info} | from {Colors.LIGHT_GREEN}{sender}{Colors.ENDC} to {Colors.LIGHT_GREEN}{receiver}{Colors.ENDC} : {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + else: + print( + f"{Colors.LIGHT_MAGENTA}[A2A OUT]{Colors.ENDC} {caller_info} | {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + elif direction == "←": + if sender and receiver: + print( + f"{Colors.LIGHT_YELLOW}[A2A IN]{Colors.ENDC} {caller_info} | from {Colors.LIGHT_GREEN}{sender}{Colors.ENDC} to {Colors.LIGHT_GREEN}{receiver}{Colors.ENDC} : {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + else: + print( + f"{Colors.LIGHT_YELLOW}[A2A IN]{Colors.ENDC} {caller_info} | {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + else: + if sender and receiver: + print( + f"{Colors.LIGHT_MAGENTA}[A2A]{Colors.ENDC} {caller_info} | from {Colors.LIGHT_GREEN}{sender}{Colors.ENDC} to {Colors.LIGHT_GREEN}{receiver}{Colors.ENDC} : {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + else: + print( + f"{Colors.LIGHT_MAGENTA}[A2A]{Colors.ENDC} {caller_info} | {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + + +def log_a2a_api_call(api_name: str, details: str = ""): + """Log A2A API calls specifically.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = ( + f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + ) + print( + f"{Colors.OKCYAN}[A2A API]{Colors.ENDC} {caller_info} | {Colors.HEADER}{api_name}{Colors.ENDC} | {Colors.OKBLUE}{details}{Colors.ENDC}" + ) + + +def log_a2a_function_call(function_name: str, details: str = ""): + """Log A2A function calls specifically.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = ( + f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + ) + print( + f"{Colors.HEADER}[A2A FUNC]{Colors.ENDC} {caller_info} | {Colors.HEADER}{function_name}{Colors.ENDC} | {Colors.OKBLUE}{details}{Colors.ENDC}" + ) + + +def log_error(message: str): + """Log error message with red color.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = f"{Colors.LIGHT_GRAY}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + print( + f"{Colors.LIGHT_RED}[ERROR]{Colors.ENDC} {caller_info} | {Colors.LIGHT_RED}{message}{Colors.ENDC}" + ) + + +def log_agent_start(agent_name: str, port: int = None): + """Log when an agent starts.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = ( + f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + ) + port_info = f" on port {port}" if port else "" + print( + f"{Colors.OKGREEN}[AGENT START]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}{port_info}" + ) + + +def log_agent_activity(agent_name: str, activity: str): + """Log agent activity/status updates.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = ( + f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + ) + print( + f"{Colors.PURPLE}[AGENT]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}: {Colors.OKBLUE}{activity}{Colors.ENDC}" + ) + + +def log_agent_request(agent_name: str, query: str, context_id: str = None): + """Log when an agent receives a request.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = ( + f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + ) + context_info = f" [ctx:{context_id}]" if context_id else "" + query_preview = query[:50] + "..." if len(query) > 50 else query + print( + f"{Colors.LIGHT_BLUE}[AGENT REQ]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}{context_info}: {Colors.LIGHT_GRAY}{query_preview}{Colors.ENDC}" + ) + + +def log_agent_response(agent_name: str, status: str, context_id: str = None): + """Log agent response status.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = ( + f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + ) + context_info = f" [ctx:{context_id}]" if context_id else "" + print( + f"{Colors.LIGHT_GREEN}[AGENT RESP]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}{context_info}: {Colors.WARNING}{status}{Colors.ENDC}" + ) + + +def log_system_event(event: str, details: str = ""): + """Log system-level events.""" + import traceback + + caller = traceback.extract_stack()[-2] + caller_info = ( + f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + ) + details_info = f" | {details}" if details else "" + print( + f"{Colors.HEADER}[SYSTEM]{Colors.ENDC} {caller_info} | {Colors.BOLD}{event}{Colors.ENDC}{details_info}" + ) From f998373ec233d9b76a26f27db0b68a093bd0500e Mon Sep 17 00:00:00 2001 From: Moshi Wei Date: Thu, 25 Sep 2025 22:10:20 +0800 Subject: [PATCH 2/3] update node v3 --- examples/Agent_client/a2a_client.py | 105 ++++------ .../{openai_agent.py => openai_agent_a2a.py} | 72 +++---- isek/node/node_v3_a2a.py | 191 ++++++++++++++++++ isek/utils/common.py | 119 +++-------- pyproject.toml | 1 + 5 files changed, 288 insertions(+), 200 deletions(-) rename examples/Agent_servers/Pydantic/{openai_agent.py => openai_agent_a2a.py} (76%) create mode 100644 isek/node/node_v3_a2a.py diff --git a/examples/Agent_client/a2a_client.py b/examples/Agent_client/a2a_client.py index f05ac1f..e411eab 100644 --- a/examples/Agent_client/a2a_client.py +++ b/examples/Agent_client/a2a_client.py @@ -1,95 +1,58 @@ -import httpx -import json -from pathlib import Path import asyncio -from a2a.client import A2AClient -from a2a.types import ( - AgentCard, - Message, - MessageSendParams, - Part, - Role, - SendMessageRequest, - TextPart, - JSONRPCErrorResponse, -) -from mcp.server.fastmcp.utilities.logging import get_logger from uuid import uuid4 + import dotenv +from isek.node.node_v3_a2a import Node +from mcp.server.fastmcp.utilities.logging import get_logger +from isek.utils.log import log + dotenv.load_dotenv() logger = get_logger(__name__) AGENT_CARDS_DIR = 'agent_cards' MODEL = 'text-embedding-ada-002' + agent_urls = ['http://localhost:9999', # openai agent - 'http://localhost:10020', # trending agent - 'http://localhost:10021' # analyzer agent ] -AGENT_CARD_WELL_KNOWN_PATH = "/.well-known/agent.json" -async def get_agent_card_by_url(agent_url: str) -> dict: - """Fetch and cache agent cards from all configured agent URLs. +AGENT_CARD_WELL_KNOWN_PATH = "/.well-known/agent.json" # kept for compatibility + +async def query_agent(query: str) -> str: + """Execute a task on a remote agent and return the aggregated response. - The function uses a simple in-memory cache (``_agent_info_cache``) to avoid - fetching the ­same agent card repeatedly. If a card is not cached, it is - retrieved from the agent’s “well-known” endpoint and stored in the cache. - Args: - agent_url: The URL of the agent to fetch the agent card from. + query: The query to send to the agent. Returns: - dict: ``AgentCard`` fully JSON-serialisable object for interoperability with the rest of the MCP pipeline. + str: The content of the task result. """ - timeout_config = httpx.Timeout(10.0) # seconds - logger.debug("[get_agent_card_by_url],Fetching agent card for %s", agent_url) - async with httpx.AsyncClient(timeout=timeout_config) as httpx_client: - response = await httpx_client.get(f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}") - response.raise_for_status() - card_data = response.json() - return card_data - -async def main(query: str) -> str: - """Execute a task on a remote agent and return the aggregated response. + # Instantiate a lightweight Node (acts as a client here) + node = Node(host="127.0.0.1", port=uuid4().int >> 112, node_id="a2a-client") + logger.info("[execute_task] Executing task on %s with query: %s", agent_urls[0], query) + message_content = await node.send_message(agent_urls[0], query) + logger.info("[execute_task] Task result content: %s", message_content) + + return message_content +async def get_agent_card(agent_url: str) -> dict: + """ + Fetch the agent card from the given agent URL. Args: - query: The query to send to the agent. + agent_url (str): The base URL of the agent. Returns: - str: The content of the task result. + dict: The agent card as a dictionary. """ - # Fetch the agent-card data and build a proper ``AgentCard`` instance. - agent_card_data = await get_agent_card_by_url(agent_urls[0]) - agent_card = AgentCard(**agent_card_data) - - logger.info("[execute_task],Executing task on agent %s with query: %s", agent_card.name, query) - - # Build request params - msg_params = MessageSendParams( - message=Message( - role=Role.user, - parts=[Part(TextPart(text=query))], - messageId=uuid4().hex # Add this line to include the required messageId field - ) - ) - - logger.debug("[execute_task] Sending non-streaming request …") - timeout_config = httpx.Timeout(10.0) - async with httpx.AsyncClient(timeout=timeout_config) as httpx_client: - client = A2AClient(httpx_client, agent_card=agent_card) - response = await client.send_message( - SendMessageRequest(id=str(uuid4().hex), params=msg_params) - ) - - if isinstance(response, JSONRPCErrorResponse): - logger.error("[execute_task] Error response received: %s", response) - return "Error: Unable to execute task" - - message_content = response.root.result.status.message - - logger.info("[execute_task] Task result content: %s", message_content) - - return message_content + node = Node(host="127.0.0.1", port=uuid4().int >> 112, node_id="a2a-client") + logger.info("[get_agent_card] Fetching agent card from %s", agent_url) + card = await node.get_agent_card_by_url(agent_url) + logger.info("[get_agent_card] Received agent card: %s", card) + return card + # Example usage -print(asyncio.run(main("Hello, how are you?"))) +# print(asyncio.run(query_agent("Hello, how are you?"))) +print(asyncio.run(get_agent_card(agent_urls[0]))) + + diff --git a/examples/Agent_servers/Pydantic/openai_agent.py b/examples/Agent_servers/Pydantic/openai_agent_a2a.py similarity index 76% rename from examples/Agent_servers/Pydantic/openai_agent.py rename to examples/Agent_servers/Pydantic/openai_agent_a2a.py index 418857d..8b26bb4 100644 --- a/examples/Agent_servers/Pydantic/openai_agent.py +++ b/examples/Agent_servers/Pydantic/openai_agent_a2a.py @@ -7,9 +7,9 @@ from a2a.server.tasks import TaskUpdater from a2a.types import TaskState, AgentCard, AgentCapabilities, AgentSkill from a2a.utils import new_agent_text_message, new_task +from a2a.server.agent_execution.context import RequestContext +from a2a.server.events.event_queue import EventQueue from isek.utils.common import ( - create_agent_a2a_server, - run_server, log_agent_start, log_agent_activity, log_agent_request, @@ -17,9 +17,38 @@ log_error, log_system_event ) +from isek.node.node_v3_a2a import Node +from a2a.server.agent_execution.agent_executor import AgentExecutor dotenv.load_dotenv() +agent_card = AgentCard( + name="OpenAI Agent", + url="http://localhost:9999", + description="Simple OpenAI GPT-4 wrapper agent", + version="1.0", + capabilities=AgentCapabilities( + streaming=True, + tools=True, # Enable tools support + task_execution=True # Enable task execution + ), + defaultInputModes=["text/plain"], + defaultOutputModes=["text/plain"], + skills=[ + AgentSkill( + id="general_assistant", + name="General Assistant", + description="Provides helpful responses to general queries using GPT-4", + tags=["general", "assistant", "gpt4"], + examples=[ + "What is machine learning?", + "How do I write a Python function?", + "Explain quantum computing" + ] + ) + ] +) + class OpenAIAgent: """Simple OpenAI wrapper agent.""" @@ -67,7 +96,7 @@ async def stream(self, query: str, contextId: str) -> AsyncGenerator[dict[str, A "content": f"Error: {str(e)}", } -class OpenAIAgentExecutor: +class OpenAIAgentExecutor(AgentExecutor): """Simple executor for the OpenAI Agent.""" def __init__(self): @@ -114,42 +143,15 @@ async def execute(self, context, event_queue): log_error(f"Error in executor: {str(e)}") log_error(f"Error details: {type(e).__name__}") raise ServerError(error=InternalError()) from e - -def create_agent(): - """Create and configure the OpenAI agent server.""" - log_system_event("Creating OpenAI Agent server") - agent_card = AgentCard( - name="OpenAI Agent", - url="http://localhost:9999", - description="Simple OpenAI GPT-4 wrapper agent", - version="1.0", - capabilities=AgentCapabilities( - streaming=True, - tools=True, # Enable tools support - task_execution=True # Enable task execution - ), - defaultInputModes=["text/plain"], - defaultOutputModes=["text/plain"], - skills=[ - AgentSkill( - id="general_assistant", - name="General Assistant", - description="Provides helpful responses to general queries using GPT-4", - tags=["general", "assistant", "gpt4"], - examples=[ - "What is machine learning?", - "How do I write a Python function?", - "Explain quantum computing" - ] - ) - ] - ) - return create_agent_a2a_server(OpenAIAgentExecutor(), agent_card) + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + raise Exception("cancel not supported") def main(): """Run the OpenAI agent server.""" log_agent_start("OpenAI Agent", 9999) - asyncio.run(run_server(create_agent, 9999, "OpenAI Agent")) + node = Node(host="127.0.0.1", port=9999, node_id="openai-agent") + app = Node.create_server(OpenAIAgentExecutor(), agent_card) + node.build_server(app, name="OpenAI Agent", daemon=False) if __name__ == "__main__": main() diff --git a/isek/node/node_v3_a2a.py b/isek/node/node_v3_a2a.py new file mode 100644 index 0000000..8dcb17c --- /dev/null +++ b/isek/node/node_v3_a2a.py @@ -0,0 +1,191 @@ +import threading +import uuid +from abc import ABC +from typing import Dict, Any +from isek.utils.log import log +import httpx +import uvicorn +from a2a.server.apps import A2AStarletteApplication +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import AgentCard +from a2a.types import MessageSendParams, SendMessageRequest +from a2a.client import A2AClient +from a2a.types import JSONRPCErrorResponse +from isek.utils.common import log_a2a_api_call, log_error +from uuid import uuid4 +from a2a.types import Message, Part, Role, TextPart +import asyncio + +# Alias for consistency with other modules +logger = log + +NodeDetails = Dict[str, Any] +AGENT_CARD_WELL_KNOWN_PATH = "/.well-known/agent.json" + + +class Node(ABC): + def __init__( + self, + host: str, + port: int, + node_id: str, + **kwargs: Any, # To absorb any extra arguments + ): + if not host: + raise ValueError("Node host cannot be empty.") + if not isinstance(port, int) or not (0 < port < 65536): + raise ValueError(f"Invalid port number for Node: {port}") + if not node_id: + node_id = uuid.uuid4().hex + + self.host: str = host + self.port: int = port + self.node_id: str = node_id + self.all_nodes: Dict[str, NodeDetails] = {} + + async def get_agent_card_by_url(self, agent_url: str) -> dict: + """Fetch and cache agent cards from all configured agent URLs. + + The function uses a simple in-memory cache (``_agent_info_cache``) to avoid + fetching the ­same agent card repeatedly. If a card is not cached, it is + retrieved from the agent’s “well-known” endpoint and stored in the cache. + + Args: + agent_url: The URL of the agent to fetch the agent card from. + + Returns: + dict: ``AgentCard`` fully JSON-serialisable object for interoperability with the rest of the MCP pipeline. + """ + timeout_config = httpx.Timeout(10.0) # seconds + log_a2a_api_call( + "get_agent_card_by_url", f"Fetching agent card for {agent_url}" + ) + + async with httpx.AsyncClient(timeout=timeout_config) as httpx_client: + response = await httpx_client.get( + f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}" + ) + response.raise_for_status() + card_data = response.json() + return card_data + + async def send_message(self, agent_url: str, query: str) -> str: + """Execute a task on a remote agent and return the aggregated response. + + Args: + query: The query to send to the agent. + + Returns: + str: The content of the task result. + """ + # Fetch the agent-card data and build a proper ``AgentCard`` instance. + agent_card_data = await self.get_agent_card_by_url(agent_url) + agent_card = AgentCard(**agent_card_data) + + logger.info( + "[send_message] Executing task on agent %s with query: %s", + agent_card.name, + query, + ) + + # Build request params + msg_params = MessageSendParams( + message=Message( + role=Role.user, + parts=[Part(TextPart(text=query))], + messageId=uuid4().hex, # Include required messageId field + ) + ) + + logger.debug("[execute_task] Sending non-streaming request …") + timeout_config = httpx.Timeout(10.0) + async with httpx.AsyncClient(timeout=timeout_config) as httpx_client: + client = A2AClient(httpx_client, agent_card=agent_card) + response = await client.send_message( + SendMessageRequest(id=uuid4().hex, params=msg_params) + ) + + if isinstance(response, JSONRPCErrorResponse): + logger.error("[execute_task] Error response received: %s", response) + return "Error: Unable to execute task" + + message_content = response.root.result.status.message + + logger.info("[execute_task] Task result content: %s", message_content) + + return message_content + + def build_server( + self, + app: A2AStarletteApplication, + name: str = "A2A-Agent", + daemon: bool = False, + ): + """Bootstrap the A2A HTTP server. + + If *daemon* is ``True`` the server will be started in a background thread, + allowing the current process to continue executing (e.g. to send outbound + messages) while still accepting inbound HTTP requests. + + Parameters + ---------- + app : A2AStarletteApplication + The Starlette application returned from + ``Node.create_agent_a2a_server``. + name : str, optional + A human-readable name for the server, only used for logging. + daemon : bool, default ``False`` + Whether to start the server in a daemon thread (non-blocking) or run + it in the foreground (blocking call). + """ + + async def _runner(): + await self.run_server(app, port=self.port, name=name) + + if not daemon: + # Blocking – run the server in the current thread. + asyncio.run(_runner()) + else: + # Non-blocking – run the server in a daemonised background thread + # so that the main thread can still send outbound messages. + server_thread = threading.Thread( + target=lambda: asyncio.run(_runner()), daemon=True + ) + server_thread.start() + logger.info( + "A2A server started in daemon thread (name=%s, port=%s)", + name, + self.port, + ) + + @staticmethod + def create_server(agent_executor, agent_card: AgentCard) -> A2AStarletteApplication: + request_handler = DefaultRequestHandler( + agent_executor=agent_executor, task_store=InMemoryTaskStore() + ) + + app = A2AStarletteApplication( + agent_card=agent_card, http_handler=request_handler + ) + return app + + @staticmethod + async def run_server(app: A2AStarletteApplication, port: int, name: str): + try: + config = uvicorn.Config( + app.build(), + host="127.0.0.1", + port=port, + log_level="error", + loop="asyncio", + ) + + server = uvicorn.Server(config) + + log_a2a_api_call( + "server.serve()", f"server: {name}, port: {port}, host: 127.0.0.1" + ) + await server.serve() + except Exception as e: + log_error(f"run_server() error: {e} - name: {name}, port: {port}") diff --git a/isek/utils/common.py b/isek/utils/common.py index 68eba99..cec3fc5 100644 --- a/isek/utils/common.py +++ b/isek/utils/common.py @@ -1,12 +1,3 @@ -import uvicorn -import socket - -from a2a.server.apps import A2AStarletteApplication -from a2a.server.request_handlers import DefaultRequestHandler -from a2a.server.tasks import InMemoryTaskStore -from a2a.types import AgentCard - - # Color codes for colorful logging class Colors: HEADER = "\033[95m" @@ -31,55 +22,33 @@ class Colors: LIGHT_GRAY = "\033[37m" -def create_agent_a2a_server( - agent_executor, agent_card: AgentCard -) -> A2AStarletteApplication: - request_handler = DefaultRequestHandler( - agent_executor=agent_executor, task_store=InMemoryTaskStore() - ) - - app = A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler) - return app +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- +# NOTE: Keep these small utilities above the public logging helpers so they +# are available everywhere below. -async def run_server(create_agent_function, port: int, name: str): - try: - app = create_agent_function() - - config = uvicorn.Config( - app.build(), host="127.0.0.1", port=port, log_level="error", loop="asyncio" - ) - - server = uvicorn.Server(config) - - log_a2a_api_call( - "server.serve()", f"server: {name}, port: {port}, host: 127.0.0.1" - ) - await server.serve() - except Exception as e: - log_error(f"run_server() error: {e} - name: {name}, port: {port}") +def _caller_info() -> str: + """Return the *caller* filename and line-number in unified colored format. + We step three frames up the stack so that the reported location always + corresponds to the original call-site and not to the internals of the + logging helpers themselves. + """ + import traceback + import os -def is_port_in_use(port: int, host: str = "127.0.0.1") -> bool: - """Return True if a TCP port is already bound on the given host.""" - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.settimeout(0.2) - return sock.connect_ex((host, port)) == 0 - except Exception: - return False + frame = traceback.extract_stack()[-4] # see docstring for frame math + filename = os.path.basename(frame.filename) + return f"{Colors.OKCYAN}{filename}:{frame.lineno}{Colors.ENDC}" def log_a2a_protocol( message: str, direction: str = "→", sender: str = "", receiver: str = "" ): """Log A2A protocol messages with special formatting.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = ( - f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" - ) + caller_info = _caller_info() if direction == "→": if sender and receiver: @@ -112,12 +81,7 @@ def log_a2a_protocol( def log_a2a_api_call(api_name: str, details: str = ""): """Log A2A API calls specifically.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = ( - f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" - ) + caller_info = _caller_info() print( f"{Colors.OKCYAN}[A2A API]{Colors.ENDC} {caller_info} | {Colors.HEADER}{api_name}{Colors.ENDC} | {Colors.OKBLUE}{details}{Colors.ENDC}" ) @@ -125,12 +89,7 @@ def log_a2a_api_call(api_name: str, details: str = ""): def log_a2a_function_call(function_name: str, details: str = ""): """Log A2A function calls specifically.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = ( - f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" - ) + caller_info = _caller_info() print( f"{Colors.HEADER}[A2A FUNC]{Colors.ENDC} {caller_info} | {Colors.HEADER}{function_name}{Colors.ENDC} | {Colors.OKBLUE}{details}{Colors.ENDC}" ) @@ -138,10 +97,7 @@ def log_a2a_function_call(function_name: str, details: str = ""): def log_error(message: str): """Log error message with red color.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = f"{Colors.LIGHT_GRAY}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" + caller_info = f"{Colors.LIGHT_GRAY}{_caller_info()}{Colors.ENDC}" print( f"{Colors.LIGHT_RED}[ERROR]{Colors.ENDC} {caller_info} | {Colors.LIGHT_RED}{message}{Colors.ENDC}" ) @@ -149,12 +105,7 @@ def log_error(message: str): def log_agent_start(agent_name: str, port: int = None): """Log when an agent starts.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = ( - f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" - ) + caller_info = _caller_info() port_info = f" on port {port}" if port else "" print( f"{Colors.OKGREEN}[AGENT START]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}{port_info}" @@ -163,12 +114,7 @@ def log_agent_start(agent_name: str, port: int = None): def log_agent_activity(agent_name: str, activity: str): """Log agent activity/status updates.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = ( - f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" - ) + caller_info = _caller_info() print( f"{Colors.PURPLE}[AGENT]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}: {Colors.OKBLUE}{activity}{Colors.ENDC}" ) @@ -176,12 +122,7 @@ def log_agent_activity(agent_name: str, activity: str): def log_agent_request(agent_name: str, query: str, context_id: str = None): """Log when an agent receives a request.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = ( - f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" - ) + caller_info = _caller_info() context_info = f" [ctx:{context_id}]" if context_id else "" query_preview = query[:50] + "..." if len(query) > 50 else query print( @@ -191,12 +132,7 @@ def log_agent_request(agent_name: str, query: str, context_id: str = None): def log_agent_response(agent_name: str, status: str, context_id: str = None): """Log agent response status.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = ( - f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" - ) + caller_info = _caller_info() context_info = f" [ctx:{context_id}]" if context_id else "" print( f"{Colors.LIGHT_GREEN}[AGENT RESP]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}{context_info}: {Colors.WARNING}{status}{Colors.ENDC}" @@ -205,12 +141,7 @@ def log_agent_response(agent_name: str, status: str, context_id: str = None): def log_system_event(event: str, details: str = ""): """Log system-level events.""" - import traceback - - caller = traceback.extract_stack()[-2] - caller_info = ( - f"{Colors.OKCYAN}{caller.filename.split('/')[-1]}:{caller.lineno}{Colors.ENDC}" - ) + caller_info = _caller_info() details_info = f" | {details}" if details else "" print( f"{Colors.HEADER}[SYSTEM]{Colors.ENDC} {caller_info} | {Colors.BOLD}{event}{Colors.ENDC}{details_info}" diff --git a/pyproject.toml b/pyproject.toml index cfc8bf9..f59bf55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "requests>=2.28.0", # Core dependencies "openai>=0.27.0", + "protobuf==5.29.5", "flask>=2.0.0", "ecdsa", "numpy>=1.23,<2.0", From d3b5be38df3972f91370e6c0fd71e303143331ca Mon Sep 17 00:00:00 2001 From: Moshi Wei Date: Fri, 26 Sep 2025 01:50:50 +0800 Subject: [PATCH 3/3] update pydantic adapter --- examples/Agent_client/a2a_client.py | 4 +- .../Pydantic/openai_agent_a2a.py | 124 +----------- isek/adapter/pydantic_ai_adapter.py | 177 ++++++++++++++++++ 3 files changed, 189 insertions(+), 116 deletions(-) create mode 100644 isek/adapter/pydantic_ai_adapter.py diff --git a/examples/Agent_client/a2a_client.py b/examples/Agent_client/a2a_client.py index e411eab..67dfebf 100644 --- a/examples/Agent_client/a2a_client.py +++ b/examples/Agent_client/a2a_client.py @@ -52,7 +52,7 @@ async def get_agent_card(agent_url: str) -> dict: # Example usage -# print(asyncio.run(query_agent("Hello, how are you?"))) -print(asyncio.run(get_agent_card(agent_urls[0]))) +print(asyncio.run(query_agent("Hello, how are you?"))) +# print(asyncio.run(get_agent_card(agent_urls[0]))) diff --git a/examples/Agent_servers/Pydantic/openai_agent_a2a.py b/examples/Agent_servers/Pydantic/openai_agent_a2a.py index 8b26bb4..6117f20 100644 --- a/examples/Agent_servers/Pydantic/openai_agent_a2a.py +++ b/examples/Agent_servers/Pydantic/openai_agent_a2a.py @@ -1,25 +1,10 @@ -import asyncio -from typing import Any, AsyncGenerator -import os import dotenv from pydantic_ai import Agent -from a2a.server.tasks import TaskUpdater -from a2a.types import TaskState, AgentCard, AgentCapabilities, AgentSkill -from a2a.utils import new_agent_text_message, new_task -from a2a.server.agent_execution.context import RequestContext -from a2a.server.events.event_queue import EventQueue -from isek.utils.common import ( - log_agent_start, - log_agent_activity, - log_agent_request, - log_agent_response, - log_error, - log_system_event -) +from a2a.types import AgentCard, AgentCapabilities, AgentSkill +from isek.utils.common import log_agent_start from isek.node.node_v3_a2a import Node -from a2a.server.agent_execution.agent_executor import AgentExecutor - +from isek.adapter.pydantic_ai_adapter import PydanticAIAgentWrapper,PydanticAIAgentExecutor dotenv.load_dotenv() agent_card = AgentCard( @@ -49,108 +34,19 @@ ] ) -class OpenAIAgent: - """Simple OpenAI wrapper agent.""" - - def __init__(self): - """Initialize the OpenAI agent with a basic configuration.""" - self.agent = Agent( - model="gpt-4", - system_prompt="You are a helpful AI assistant that provides clear and concise responses." - ) - log_agent_activity("OpenAI Agent", "Initialized with GPT-4 model") - - async def stream(self, query: str, contextId: str) -> AsyncGenerator[dict[str, Any], None]: - """Stream the agent response.""" - try: - log_agent_request("OpenAI Agent", query, contextId) - - # Initial message - log_agent_activity("OpenAI Agent", "Starting request processing") - yield { - "is_task_complete": False, - "require_user_input": False, - "content": "Processing your request...", - } - - # Get response from OpenAI - log_agent_activity("OpenAI Agent", "Sending request to OpenAI") - response = await self.agent.run(query) - log_agent_activity("OpenAI Agent", "Received response from OpenAI") - - # Return final response - log_agent_response("OpenAI Agent", "Task completed successfully", contextId) - log_agent_response("OpenAI Agent", "content", response.output) - yield { - "is_task_complete": True, - "require_user_input": False, - "content": response.output, - } - - except Exception as e: - error_msg = f"Error during processing: {str(e)}" - log_error(error_msg) - yield { - "is_task_complete": False, - "require_user_input": True, - "content": f"Error: {str(e)}", - } - -class OpenAIAgentExecutor(AgentExecutor): - """Simple executor for the OpenAI Agent.""" - - def __init__(self): - self.agent = OpenAIAgent() - log_agent_activity("OpenAI Agent Executor", "Initialized") - - async def execute(self, context, event_queue): - """Execute the agent.""" - log_agent_activity("OpenAI Agent Executor", "Starting execution") - query = context.get_user_input() - log_agent_activity("OpenAI Agent Executor", f"Received execution request for context: {context.message.contextId}") - log_agent_activity("OpenAI Agent Executor", f"Query: {query}") - - task = context.current_task or new_task(context.message) - await event_queue.enqueue_event(task) - log_agent_activity("OpenAI Agent Executor", f"Created new task: {task.id}") - - updater = TaskUpdater(event_queue, task.id, task.contextId) - log_agent_activity("OpenAI Agent Executor", "Created task updater") - - try: - log_agent_activity("OpenAI Agent Executor", "Starting agent stream") - async for item in self.agent.stream(query, task.contextId): - log_agent_activity("OpenAI Agent Executor", f"Received stream item: {item}") - is_task_complete = item["is_task_complete"] - require_user_input = item["require_user_input"] - content = item["content"] - - message = new_agent_text_message(content, task.contextId, task.id) - - if is_task_complete: - log_agent_activity("OpenAI Agent Executor", f"Task {task.id} completed") - await updater.complete(message) - elif require_user_input: - log_agent_activity("OpenAI Agent Executor", f"Task {task.id} requires user input") - await updater.update_status(TaskState.input_required, message, final=True) - else: - log_agent_activity("OpenAI Agent Executor", f"Task {task.id} in progress") - await updater.update_status(TaskState.working, message) +my_agent=Agent( + model="gpt-4", + system_prompt="You are a helpful AI assistant that provides clear and concise responses." +) - except Exception as e: - from a2a.utils.errors import ServerError - from a2a.types import InternalError - log_error(f"Error in executor: {str(e)}") - log_error(f"Error details: {type(e).__name__}") - raise ServerError(error=InternalError()) from e - async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: - raise Exception("cancel not supported") +wrapper = PydanticAIAgentWrapper(my_agent, agent_card) +agent_executor = PydanticAIAgentExecutor(wrapper) def main(): """Run the OpenAI agent server.""" log_agent_start("OpenAI Agent", 9999) node = Node(host="127.0.0.1", port=9999, node_id="openai-agent") - app = Node.create_server(OpenAIAgentExecutor(), agent_card) + app = Node.create_server(agent_executor, agent_card) node.build_server(app, name="OpenAI Agent", daemon=False) if __name__ == "__main__": diff --git a/isek/adapter/pydantic_ai_adapter.py b/isek/adapter/pydantic_ai_adapter.py new file mode 100644 index 0000000..69ade07 --- /dev/null +++ b/isek/adapter/pydantic_ai_adapter.py @@ -0,0 +1,177 @@ +from typing import Any, AsyncGenerator, Dict +from pydantic_ai import Agent +from a2a.server.tasks import TaskUpdater +from a2a.types import TaskState, AgentCard +from a2a.utils import new_agent_text_message, new_task +from a2a.server.agent_execution.context import RequestContext +from a2a.server.events.event_queue import EventQueue +from isek.utils.common import ( + log_agent_activity, + log_agent_request, + log_agent_response, + log_error, +) +from a2a.server.agent_execution.agent_executor import AgentExecutor + + +# --- Revised Imports --- + +ResponsePayload = Dict[str, Any] + + +class PydanticAIAgentWrapper: + """Wrap a :class:`pydantic_ai.Agent` instance with a uniform streaming interface. + + The wrapper standardises the input/output contract for use inside the ISEK + ecosystem and adds rich logging for observability. + """ + + def __init__(self, agent: Agent, agent_card: AgentCard) -> None: + """Create a new wrapper around *agent*. + + Parameters + ---------- + agent: + The underlying **pydantic-ai** agent to delegate the actual reasoning + work to. + """ + self._agent: Agent = agent + self._agent_card: AgentCard = agent_card + + log_agent_activity(self._agent_card.name, "Initialized with GPT-4 model") + + async def invoke(self, query: str, context_id: str) -> ResponsePayload: + """Run the agent and return the *final* response. + + This convenience wrapper is useful when the caller is not interested in + the intermediate streaming messages produced by :meth:`stream`. + """ + log_agent_request(self._agent_card.name, query, context_id) + + try: + log_agent_activity( + self._agent_card.name, f"Invoking agent with query: {query}" + ) + response = await self._agent.run(query) + log_agent_response( + self._agent_card.name, "Task completed successfully", context_id + ) + log_agent_response(self._agent_card.name, "content", response.output) + + return { + "is_task_complete": True, + "require_user_input": False, + "content": response.output, + } + except Exception as exc: # noqa: BLE001 + error_msg = f"Error during invoke: {exc}" + log_error(error_msg) + return { + "is_task_complete": False, + "require_user_input": True, + "content": f"Error: {exc}", + } + + async def stream( + self, query: str, context_id: str + ) -> AsyncGenerator[ResponsePayload, None]: + """Yield incremental updates while the agent processes *query*.""" + + try: + log_agent_request(self._agent_card.name, query, context_id) + + # Initial placeholder so the caller can display progress feedback + log_agent_activity(self._agent_card.name, "Starting request processing") + yield { + "is_task_complete": False, + "require_user_input": False, + "content": "Processing your request...", + } + log_agent_activity(self._agent_card.name, "Sending request to OpenAI") + response = await self._agent.run(query) + log_agent_activity(self._agent_card.name, "Received response from OpenAI") + + # Final message + log_agent_response( + self._agent_card.name, "Task completed successfully", context_id + ) + log_agent_response(self._agent_card.name, "content", response.output) + yield { + "is_task_complete": True, + "require_user_input": False, + "content": response.output, + } + + except Exception as exc: + error_msg = f"Error during processing: {exc}" + log_error(error_msg) + yield { + "is_task_complete": False, + "require_user_input": True, + "content": f"Error: {exc}", + } + + +class PydanticAIAgentExecutor(AgentExecutor): + """Simple executor for the OpenAI Agent.""" + + def __init__(self, pydantic_ai_agent: PydanticAIAgentWrapper): + self.agent = pydantic_ai_agent + log_agent_activity(self.agent._agent_card.name, "Initialized") + + async def execute(self, context, event_queue): + """Execute the agent.""" + log_agent_activity(self.agent._agent_card.name, "Starting execution") + query = context.get_user_input() + log_agent_activity( + self.agent._agent_card.name, + f"Received execution request for context: {context.message.contextId}", + ) + log_agent_activity(self.agent._agent_card.name, f"Query: {query}") + + task = context.current_task or new_task(context.message) + await event_queue.enqueue_event(task) + log_agent_activity(self.agent._agent_card.name, f"Created new task: {task.id}") + + updater = TaskUpdater(event_queue, task.id, task.contextId) + log_agent_activity(self.agent._agent_card.name, "Created task updater") + + try: + log_agent_activity(self.agent._agent_card.name, "Starting agent stream") + async for item in self.agent.stream(query, task.contextId): + log_agent_activity( + self.agent._agent_card.name, f"Received stream item: {item}" + ) + is_task_complete = item["is_task_complete"] + require_user_input = item["require_user_input"] + content = item["content"] + + message = new_agent_text_message(content, task.contextId, task.id) + + if is_task_complete: + log_agent_activity( + self.agent._agent_card.name, f"Task {task.id} completed" + ) + await updater.complete(message) + elif require_user_input: + log_agent_activity( + self.agent._agent_card.name, + f"Task {task.id} requires user input", + ) + await updater.update_status(TaskState.user_input_required, message) + else: + log_agent_activity( + self.agent._agent_card.name, f"Task {task.id} in progress" + ) + await updater.update_status(TaskState.working, message) + + except Exception as e: + from a2a.utils.errors import ServerError + from a2a.types import InternalError + + log_error(f"Error in executor: {str(e)}") + log_error(f"Error details: {type(e).__name__}") + raise ServerError(error=InternalError()) from e + + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + raise Exception("cancel not supported")