diff --git a/examples/Agent_client/a2a_client.py b/examples/Agent_client/a2a_client.py new file mode 100644 index 0000000..67dfebf --- /dev/null +++ b/examples/Agent_client/a2a_client.py @@ -0,0 +1,58 @@ +import asyncio +from uuid import uuid4 + +import dotenv + +from isek.node.node_v3_a2a import Node +from mcp.server.fastmcp.utilities.logging import get_logger +from isek.utils.log import log + +dotenv.load_dotenv() + +logger = get_logger(__name__) +AGENT_CARDS_DIR = 'agent_cards' +MODEL = 'text-embedding-ada-002' + +agent_urls = ['http://localhost:9999', # openai agent + ] + +AGENT_CARD_WELL_KNOWN_PATH = "/.well-known/agent.json" # kept for compatibility + +async def query_agent(query: str) -> str: + """Execute a task on a remote agent and return the aggregated response. + + Args: + query: The query to send to the agent. + + Returns: + str: The content of the task result. + """ + # Instantiate a lightweight Node (acts as a client here) + node = Node(host="127.0.0.1", port=uuid4().int >> 112, node_id="a2a-client") + logger.info("[execute_task] Executing task on %s with query: %s", agent_urls[0], query) + message_content = await node.send_message(agent_urls[0], query) + logger.info("[execute_task] Task result content: %s", message_content) + + return message_content +async def get_agent_card(agent_url: str) -> dict: + """ + Fetch the agent card from the given agent URL. + + Args: + agent_url (str): The base URL of the agent. + + Returns: + dict: The agent card as a dictionary. + """ + node = Node(host="127.0.0.1", port=uuid4().int >> 112, node_id="a2a-client") + logger.info("[get_agent_card] Fetching agent card from %s", agent_url) + card = await node.get_agent_card_by_url(agent_url) + logger.info("[get_agent_card] Received agent card: %s", card) + return card + + +# Example usage +print(asyncio.run(query_agent("Hello, how are you?"))) +# print(asyncio.run(get_agent_card(agent_urls[0]))) + + diff --git a/examples/Agent_servers/Pydantic/openai_agent_a2a.py b/examples/Agent_servers/Pydantic/openai_agent_a2a.py new file mode 100644 index 0000000..6117f20 --- /dev/null +++ b/examples/Agent_servers/Pydantic/openai_agent_a2a.py @@ -0,0 +1,53 @@ + +import dotenv +from pydantic_ai import Agent +from a2a.types import AgentCard, AgentCapabilities, AgentSkill +from isek.utils.common import log_agent_start +from isek.node.node_v3_a2a import Node +from isek.adapter.pydantic_ai_adapter import PydanticAIAgentWrapper,PydanticAIAgentExecutor +dotenv.load_dotenv() + +agent_card = AgentCard( + name="OpenAI Agent", + url="http://localhost:9999", + description="Simple OpenAI GPT-4 wrapper agent", + version="1.0", + capabilities=AgentCapabilities( + streaming=True, + tools=True, # Enable tools support + task_execution=True # Enable task execution + ), + defaultInputModes=["text/plain"], + defaultOutputModes=["text/plain"], + skills=[ + AgentSkill( + id="general_assistant", + name="General Assistant", + description="Provides helpful responses to general queries using GPT-4", + tags=["general", "assistant", "gpt4"], + examples=[ + "What is machine learning?", + "How do I write a Python function?", + "Explain quantum computing" + ] + ) + ] +) + +my_agent=Agent( + model="gpt-4", + system_prompt="You are a helpful AI assistant that provides clear and concise responses." +) + +wrapper = PydanticAIAgentWrapper(my_agent, agent_card) +agent_executor = PydanticAIAgentExecutor(wrapper) + +def main(): + """Run the OpenAI agent server.""" + log_agent_start("OpenAI Agent", 9999) + node = Node(host="127.0.0.1", port=9999, node_id="openai-agent") + app = Node.create_server(agent_executor, agent_card) + node.build_server(app, name="OpenAI Agent", daemon=False) + +if __name__ == "__main__": + main() diff --git a/isek/adapter/pydantic_ai_adapter.py b/isek/adapter/pydantic_ai_adapter.py new file mode 100644 index 0000000..69ade07 --- /dev/null +++ b/isek/adapter/pydantic_ai_adapter.py @@ -0,0 +1,177 @@ +from typing import Any, AsyncGenerator, Dict +from pydantic_ai import Agent +from a2a.server.tasks import TaskUpdater +from a2a.types import TaskState, AgentCard +from a2a.utils import new_agent_text_message, new_task +from a2a.server.agent_execution.context import RequestContext +from a2a.server.events.event_queue import EventQueue +from isek.utils.common import ( + log_agent_activity, + log_agent_request, + log_agent_response, + log_error, +) +from a2a.server.agent_execution.agent_executor import AgentExecutor + + +# --- Revised Imports --- + +ResponsePayload = Dict[str, Any] + + +class PydanticAIAgentWrapper: + """Wrap a :class:`pydantic_ai.Agent` instance with a uniform streaming interface. + + The wrapper standardises the input/output contract for use inside the ISEK + ecosystem and adds rich logging for observability. + """ + + def __init__(self, agent: Agent, agent_card: AgentCard) -> None: + """Create a new wrapper around *agent*. + + Parameters + ---------- + agent: + The underlying **pydantic-ai** agent to delegate the actual reasoning + work to. + """ + self._agent: Agent = agent + self._agent_card: AgentCard = agent_card + + log_agent_activity(self._agent_card.name, "Initialized with GPT-4 model") + + async def invoke(self, query: str, context_id: str) -> ResponsePayload: + """Run the agent and return the *final* response. + + This convenience wrapper is useful when the caller is not interested in + the intermediate streaming messages produced by :meth:`stream`. + """ + log_agent_request(self._agent_card.name, query, context_id) + + try: + log_agent_activity( + self._agent_card.name, f"Invoking agent with query: {query}" + ) + response = await self._agent.run(query) + log_agent_response( + self._agent_card.name, "Task completed successfully", context_id + ) + log_agent_response(self._agent_card.name, "content", response.output) + + return { + "is_task_complete": True, + "require_user_input": False, + "content": response.output, + } + except Exception as exc: # noqa: BLE001 + error_msg = f"Error during invoke: {exc}" + log_error(error_msg) + return { + "is_task_complete": False, + "require_user_input": True, + "content": f"Error: {exc}", + } + + async def stream( + self, query: str, context_id: str + ) -> AsyncGenerator[ResponsePayload, None]: + """Yield incremental updates while the agent processes *query*.""" + + try: + log_agent_request(self._agent_card.name, query, context_id) + + # Initial placeholder so the caller can display progress feedback + log_agent_activity(self._agent_card.name, "Starting request processing") + yield { + "is_task_complete": False, + "require_user_input": False, + "content": "Processing your request...", + } + log_agent_activity(self._agent_card.name, "Sending request to OpenAI") + response = await self._agent.run(query) + log_agent_activity(self._agent_card.name, "Received response from OpenAI") + + # Final message + log_agent_response( + self._agent_card.name, "Task completed successfully", context_id + ) + log_agent_response(self._agent_card.name, "content", response.output) + yield { + "is_task_complete": True, + "require_user_input": False, + "content": response.output, + } + + except Exception as exc: + error_msg = f"Error during processing: {exc}" + log_error(error_msg) + yield { + "is_task_complete": False, + "require_user_input": True, + "content": f"Error: {exc}", + } + + +class PydanticAIAgentExecutor(AgentExecutor): + """Simple executor for the OpenAI Agent.""" + + def __init__(self, pydantic_ai_agent: PydanticAIAgentWrapper): + self.agent = pydantic_ai_agent + log_agent_activity(self.agent._agent_card.name, "Initialized") + + async def execute(self, context, event_queue): + """Execute the agent.""" + log_agent_activity(self.agent._agent_card.name, "Starting execution") + query = context.get_user_input() + log_agent_activity( + self.agent._agent_card.name, + f"Received execution request for context: {context.message.contextId}", + ) + log_agent_activity(self.agent._agent_card.name, f"Query: {query}") + + task = context.current_task or new_task(context.message) + await event_queue.enqueue_event(task) + log_agent_activity(self.agent._agent_card.name, f"Created new task: {task.id}") + + updater = TaskUpdater(event_queue, task.id, task.contextId) + log_agent_activity(self.agent._agent_card.name, "Created task updater") + + try: + log_agent_activity(self.agent._agent_card.name, "Starting agent stream") + async for item in self.agent.stream(query, task.contextId): + log_agent_activity( + self.agent._agent_card.name, f"Received stream item: {item}" + ) + is_task_complete = item["is_task_complete"] + require_user_input = item["require_user_input"] + content = item["content"] + + message = new_agent_text_message(content, task.contextId, task.id) + + if is_task_complete: + log_agent_activity( + self.agent._agent_card.name, f"Task {task.id} completed" + ) + await updater.complete(message) + elif require_user_input: + log_agent_activity( + self.agent._agent_card.name, + f"Task {task.id} requires user input", + ) + await updater.update_status(TaskState.user_input_required, message) + else: + log_agent_activity( + self.agent._agent_card.name, f"Task {task.id} in progress" + ) + await updater.update_status(TaskState.working, message) + + except Exception as e: + from a2a.utils.errors import ServerError + from a2a.types import InternalError + + log_error(f"Error in executor: {str(e)}") + log_error(f"Error details: {type(e).__name__}") + raise ServerError(error=InternalError()) from e + + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + raise Exception("cancel not supported") diff --git a/isek/node/node_v3_a2a.py b/isek/node/node_v3_a2a.py new file mode 100644 index 0000000..8dcb17c --- /dev/null +++ b/isek/node/node_v3_a2a.py @@ -0,0 +1,191 @@ +import threading +import uuid +from abc import ABC +from typing import Dict, Any +from isek.utils.log import log +import httpx +import uvicorn +from a2a.server.apps import A2AStarletteApplication +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import AgentCard +from a2a.types import MessageSendParams, SendMessageRequest +from a2a.client import A2AClient +from a2a.types import JSONRPCErrorResponse +from isek.utils.common import log_a2a_api_call, log_error +from uuid import uuid4 +from a2a.types import Message, Part, Role, TextPart +import asyncio + +# Alias for consistency with other modules +logger = log + +NodeDetails = Dict[str, Any] +AGENT_CARD_WELL_KNOWN_PATH = "/.well-known/agent.json" + + +class Node(ABC): + def __init__( + self, + host: str, + port: int, + node_id: str, + **kwargs: Any, # To absorb any extra arguments + ): + if not host: + raise ValueError("Node host cannot be empty.") + if not isinstance(port, int) or not (0 < port < 65536): + raise ValueError(f"Invalid port number for Node: {port}") + if not node_id: + node_id = uuid.uuid4().hex + + self.host: str = host + self.port: int = port + self.node_id: str = node_id + self.all_nodes: Dict[str, NodeDetails] = {} + + async def get_agent_card_by_url(self, agent_url: str) -> dict: + """Fetch and cache agent cards from all configured agent URLs. + + The function uses a simple in-memory cache (``_agent_info_cache``) to avoid + fetching the ­same agent card repeatedly. If a card is not cached, it is + retrieved from the agent’s “well-known” endpoint and stored in the cache. + + Args: + agent_url: The URL of the agent to fetch the agent card from. + + Returns: + dict: ``AgentCard`` fully JSON-serialisable object for interoperability with the rest of the MCP pipeline. + """ + timeout_config = httpx.Timeout(10.0) # seconds + log_a2a_api_call( + "get_agent_card_by_url", f"Fetching agent card for {agent_url}" + ) + + async with httpx.AsyncClient(timeout=timeout_config) as httpx_client: + response = await httpx_client.get( + f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}" + ) + response.raise_for_status() + card_data = response.json() + return card_data + + async def send_message(self, agent_url: str, query: str) -> str: + """Execute a task on a remote agent and return the aggregated response. + + Args: + query: The query to send to the agent. + + Returns: + str: The content of the task result. + """ + # Fetch the agent-card data and build a proper ``AgentCard`` instance. + agent_card_data = await self.get_agent_card_by_url(agent_url) + agent_card = AgentCard(**agent_card_data) + + logger.info( + "[send_message] Executing task on agent %s with query: %s", + agent_card.name, + query, + ) + + # Build request params + msg_params = MessageSendParams( + message=Message( + role=Role.user, + parts=[Part(TextPart(text=query))], + messageId=uuid4().hex, # Include required messageId field + ) + ) + + logger.debug("[execute_task] Sending non-streaming request …") + timeout_config = httpx.Timeout(10.0) + async with httpx.AsyncClient(timeout=timeout_config) as httpx_client: + client = A2AClient(httpx_client, agent_card=agent_card) + response = await client.send_message( + SendMessageRequest(id=uuid4().hex, params=msg_params) + ) + + if isinstance(response, JSONRPCErrorResponse): + logger.error("[execute_task] Error response received: %s", response) + return "Error: Unable to execute task" + + message_content = response.root.result.status.message + + logger.info("[execute_task] Task result content: %s", message_content) + + return message_content + + def build_server( + self, + app: A2AStarletteApplication, + name: str = "A2A-Agent", + daemon: bool = False, + ): + """Bootstrap the A2A HTTP server. + + If *daemon* is ``True`` the server will be started in a background thread, + allowing the current process to continue executing (e.g. to send outbound + messages) while still accepting inbound HTTP requests. + + Parameters + ---------- + app : A2AStarletteApplication + The Starlette application returned from + ``Node.create_agent_a2a_server``. + name : str, optional + A human-readable name for the server, only used for logging. + daemon : bool, default ``False`` + Whether to start the server in a daemon thread (non-blocking) or run + it in the foreground (blocking call). + """ + + async def _runner(): + await self.run_server(app, port=self.port, name=name) + + if not daemon: + # Blocking – run the server in the current thread. + asyncio.run(_runner()) + else: + # Non-blocking – run the server in a daemonised background thread + # so that the main thread can still send outbound messages. + server_thread = threading.Thread( + target=lambda: asyncio.run(_runner()), daemon=True + ) + server_thread.start() + logger.info( + "A2A server started in daemon thread (name=%s, port=%s)", + name, + self.port, + ) + + @staticmethod + def create_server(agent_executor, agent_card: AgentCard) -> A2AStarletteApplication: + request_handler = DefaultRequestHandler( + agent_executor=agent_executor, task_store=InMemoryTaskStore() + ) + + app = A2AStarletteApplication( + agent_card=agent_card, http_handler=request_handler + ) + return app + + @staticmethod + async def run_server(app: A2AStarletteApplication, port: int, name: str): + try: + config = uvicorn.Config( + app.build(), + host="127.0.0.1", + port=port, + log_level="error", + loop="asyncio", + ) + + server = uvicorn.Server(config) + + log_a2a_api_call( + "server.serve()", f"server: {name}, port: {port}, host: 127.0.0.1" + ) + await server.serve() + except Exception as e: + log_error(f"run_server() error: {e} - name: {name}, port: {port}") diff --git a/isek/utils/common.py b/isek/utils/common.py new file mode 100644 index 0000000..cec3fc5 --- /dev/null +++ b/isek/utils/common.py @@ -0,0 +1,148 @@ +# Color codes for colorful logging +class Colors: + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + PURPLE = "\033[35m" + ORANGE = "\033[33m" + PINK = "\033[95m" + # Additional colors for more variety + LIGHT_BLUE = "\033[94m" + LIGHT_GREEN = "\033[92m" + LIGHT_RED = "\033[91m" + LIGHT_YELLOW = "\033[93m" + LIGHT_MAGENTA = "\033[95m" + DARK_GRAY = "\033[90m" + LIGHT_GRAY = "\033[37m" + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- +# NOTE: Keep these small utilities above the public logging helpers so they +# are available everywhere below. + + +def _caller_info() -> str: + """Return the *caller* filename and line-number in unified colored format. + + We step three frames up the stack so that the reported location always + corresponds to the original call-site and not to the internals of the + logging helpers themselves. + """ + import traceback + import os + + frame = traceback.extract_stack()[-4] # see docstring for frame math + filename = os.path.basename(frame.filename) + return f"{Colors.OKCYAN}{filename}:{frame.lineno}{Colors.ENDC}" + + +def log_a2a_protocol( + message: str, direction: str = "→", sender: str = "", receiver: str = "" +): + """Log A2A protocol messages with special formatting.""" + caller_info = _caller_info() + + if direction == "→": + if sender and receiver: + print( + f"{Colors.LIGHT_MAGENTA}[A2A OUT]{Colors.ENDC} {caller_info} | from {Colors.LIGHT_GREEN}{sender}{Colors.ENDC} to {Colors.LIGHT_GREEN}{receiver}{Colors.ENDC} : {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + else: + print( + f"{Colors.LIGHT_MAGENTA}[A2A OUT]{Colors.ENDC} {caller_info} | {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + elif direction == "←": + if sender and receiver: + print( + f"{Colors.LIGHT_YELLOW}[A2A IN]{Colors.ENDC} {caller_info} | from {Colors.LIGHT_GREEN}{sender}{Colors.ENDC} to {Colors.LIGHT_GREEN}{receiver}{Colors.ENDC} : {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + else: + print( + f"{Colors.LIGHT_YELLOW}[A2A IN]{Colors.ENDC} {caller_info} | {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + else: + if sender and receiver: + print( + f"{Colors.LIGHT_MAGENTA}[A2A]{Colors.ENDC} {caller_info} | from {Colors.LIGHT_GREEN}{sender}{Colors.ENDC} to {Colors.LIGHT_GREEN}{receiver}{Colors.ENDC} : {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + else: + print( + f"{Colors.LIGHT_MAGENTA}[A2A]{Colors.ENDC} {caller_info} | {Colors.LIGHT_BLUE}{message}{Colors.ENDC}" + ) + + +def log_a2a_api_call(api_name: str, details: str = ""): + """Log A2A API calls specifically.""" + caller_info = _caller_info() + print( + f"{Colors.OKCYAN}[A2A API]{Colors.ENDC} {caller_info} | {Colors.HEADER}{api_name}{Colors.ENDC} | {Colors.OKBLUE}{details}{Colors.ENDC}" + ) + + +def log_a2a_function_call(function_name: str, details: str = ""): + """Log A2A function calls specifically.""" + caller_info = _caller_info() + print( + f"{Colors.HEADER}[A2A FUNC]{Colors.ENDC} {caller_info} | {Colors.HEADER}{function_name}{Colors.ENDC} | {Colors.OKBLUE}{details}{Colors.ENDC}" + ) + + +def log_error(message: str): + """Log error message with red color.""" + caller_info = f"{Colors.LIGHT_GRAY}{_caller_info()}{Colors.ENDC}" + print( + f"{Colors.LIGHT_RED}[ERROR]{Colors.ENDC} {caller_info} | {Colors.LIGHT_RED}{message}{Colors.ENDC}" + ) + + +def log_agent_start(agent_name: str, port: int = None): + """Log when an agent starts.""" + caller_info = _caller_info() + port_info = f" on port {port}" if port else "" + print( + f"{Colors.OKGREEN}[AGENT START]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}{port_info}" + ) + + +def log_agent_activity(agent_name: str, activity: str): + """Log agent activity/status updates.""" + caller_info = _caller_info() + print( + f"{Colors.PURPLE}[AGENT]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}: {Colors.OKBLUE}{activity}{Colors.ENDC}" + ) + + +def log_agent_request(agent_name: str, query: str, context_id: str = None): + """Log when an agent receives a request.""" + caller_info = _caller_info() + context_info = f" [ctx:{context_id}]" if context_id else "" + query_preview = query[:50] + "..." if len(query) > 50 else query + print( + f"{Colors.LIGHT_BLUE}[AGENT REQ]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}{context_info}: {Colors.LIGHT_GRAY}{query_preview}{Colors.ENDC}" + ) + + +def log_agent_response(agent_name: str, status: str, context_id: str = None): + """Log agent response status.""" + caller_info = _caller_info() + context_info = f" [ctx:{context_id}]" if context_id else "" + print( + f"{Colors.LIGHT_GREEN}[AGENT RESP]{Colors.ENDC} {caller_info} | {Colors.BOLD}{agent_name}{Colors.ENDC}{context_info}: {Colors.WARNING}{status}{Colors.ENDC}" + ) + + +def log_system_event(event: str, details: str = ""): + """Log system-level events.""" + caller_info = _caller_info() + details_info = f" | {details}" if details else "" + print( + f"{Colors.HEADER}[SYSTEM]{Colors.ENDC} {caller_info} | {Colors.BOLD}{event}{Colors.ENDC}{details_info}" + ) diff --git a/pyproject.toml b/pyproject.toml index cfc8bf9..f59bf55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "requests>=2.28.0", # Core dependencies "openai>=0.27.0", + "protobuf==5.29.5", "flask>=2.0.0", "ecdsa", "numpy>=1.23,<2.0",