From 36fbfc580f373e064cf9050a1cbcc37a64302a94 Mon Sep 17 00:00:00 2001 From: wuyangfan Date: Mon, 18 May 2026 11:12:44 +0800 Subject: [PATCH 1/2] fix: avoid blocking async agent handlers --- Rag-agent/ango/agent.py | 11 ++-- image-agent-payment-protocol/payment_proto.py | 17 +++--- image-agent-payment-protocol/requirements.txt | 3 -- web3/internet-computer/fetch/agent.py | 52 +++++++++++-------- web3/internet-computer/fetch/requirements.txt | 3 ++ 5 files changed, 50 insertions(+), 36 deletions(-) create mode 100644 web3/internet-computer/fetch/requirements.txt diff --git a/Rag-agent/ango/agent.py b/Rag-agent/ango/agent.py index 0d3f29f7..8353d1ae 100644 --- a/Rag-agent/ango/agent.py +++ b/Rag-agent/ango/agent.py @@ -1,5 +1,6 @@ -import os +import asyncio import logging +import os from datetime import datetime from uuid import uuid4 from dotenv import load_dotenv @@ -184,7 +185,11 @@ async def generate_rag_response(self, query: str) -> str: try: # First, check if we have documents in the knowledge base try: - search_results = self.knowledge.search(query, num_documents=3) + search_results = await asyncio.to_thread( + self.knowledge.search, + query, + num_documents=3, + ) logger.info(f"RAG Search found {len(search_results)} relevant documents for query: '{query}'") if len(search_results) == 0: @@ -196,7 +201,7 @@ async def generate_rag_response(self, query: str) -> str: return "I encountered an error while searching my knowledge base. Please try again." # Use the Agno agent to generate a response - response = self.rag_agent.run(query) + response = await asyncio.to_thread(self.rag_agent.run, query) response_content = response.content if hasattr(response, 'content') else str(response) # Add a note if the response seems generic diff --git a/image-agent-payment-protocol/payment_proto.py b/image-agent-payment-protocol/payment_proto.py index 0423e128..fe9ec697 100644 --- a/image-agent-payment-protocol/payment_proto.py +++ b/image-agent-payment-protocol/payment_proto.py @@ -1,5 +1,7 @@ import os from uuid import uuid4 + +import aiohttp from uagents import Context, Protocol from uagents_core.contrib.protocols.payment import ( Funds, @@ -116,15 +118,16 @@ def _sanitize_prompt(raw: str) -> str: ctx.logger.info(f"Generating image via Pollinations for prompt: {clean_prompt}") try: - import requests as _r pollinations_url = f"https://image.pollinations.ai/prompt/{quote(clean_prompt)}?width=512&height=512" - resp = _r.get(pollinations_url, timeout=90) - ctype = resp.headers.get("Content-Type", "") - if resp.status_code != 200 or not resp.content or not ctype.startswith("image/"): - await ctx.send(user_address, create_text_chat("Image generation failed")) - return + timeout = aiohttp.ClientTimeout(total=90) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(pollinations_url) as resp: + ctype = resp.headers.get("Content-Type", "") + image_bytes = await resp.read() + if resp.status != 200 or not image_bytes or not ctype.startswith("image/"): + await ctx.send(user_address, create_text_chat("Image generation failed")) + return - image_bytes: bytes = resp.content mime_type: str = ctype or "image/png" # Upload to Agentverse External Storage and send as resource diff --git a/image-agent-payment-protocol/requirements.txt b/image-agent-payment-protocol/requirements.txt index 22fee6a7..a020940a 100644 --- a/image-agent-payment-protocol/requirements.txt +++ b/image-agent-payment-protocol/requirements.txt @@ -8,6 +8,3 @@ python-jose[cryptography]==3.3.0 # Environment variables python-dotenv==1.0.1 - -# HTTP client -requests diff --git a/web3/internet-computer/fetch/agent.py b/web3/internet-computer/fetch/agent.py index fa148c50..86c29125 100644 --- a/web3/internet-computer/fetch/agent.py +++ b/web3/internet-computer/fetch/agent.py @@ -1,5 +1,8 @@ -import requests import json +from datetime import datetime, timezone, timedelta +from uuid import uuid4 + +import aiohttp from uagents_core.contrib.protocols.chat import ( chat_protocol_spec, ChatMessage, @@ -8,8 +11,6 @@ StartSessionContent, ) from uagents import Agent, Context, Protocol -from datetime import datetime, timezone, timedelta -from uuid import uuid4 # ASI1 API settings ASI1_API_KEY = "your_asi1_api_key" # Replace with your ASI1 key @@ -109,20 +110,23 @@ async def call_icp_endpoint(func_name: str, args: dict): if func_name == "get_current_fee_percentiles": url = f"{BASE_URL}/get-current-fee-percentiles" - response = requests.post(url, headers=HEADERS, json={}) + payload = {} elif func_name == "get_balance": url = f"{BASE_URL}/get-balance" - response = requests.post(url, headers=HEADERS, json={"address": args["address"]}) + payload = {"address": args["address"]} elif func_name == "get_utxos": url = f"{BASE_URL}/get-utxos" - response = requests.post(url, headers=HEADERS, json={"address": args["address"]}) + payload = {"address": args["address"]} elif func_name == "send": url = f"{BASE_URL}/send" - response = requests.post(url, headers=HEADERS, json=args) + payload = args else: raise ValueError(f"Unsupported function call: {func_name}") - response.raise_for_status() - return response.json() + + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=HEADERS, json=payload) as response: + response.raise_for_status() + return await response.json() async def process_query(query: str, ctx: Context) -> str: try: @@ -138,13 +142,14 @@ async def process_query(query: str, ctx: Context) -> str: "temperature": 0.7, "max_tokens": 1024 } - response = requests.post( - f"{ASI1_BASE_URL}/chat/completions", - headers=ASI1_HEADERS, - json=payload - ) - response.raise_for_status() - response_json = response.json() + async with aiohttp.ClientSession() as session: + async with session.post( + f"{ASI1_BASE_URL}/chat/completions", + headers=ASI1_HEADERS, + json=payload + ) as response: + response.raise_for_status() + response_json = await response.json() # Step 2: Parse tool calls from response tool_calls = response_json["choices"][0]["message"].get("tool_calls", []) @@ -185,13 +190,14 @@ async def process_query(query: str, ctx: Context) -> str: "temperature": 0.7, "max_tokens": 1024 } - final_response = requests.post( - f"{ASI1_BASE_URL}/chat/completions", - headers=ASI1_HEADERS, - json=final_payload - ) - final_response.raise_for_status() - final_response_json = final_response.json() + async with aiohttp.ClientSession() as session: + async with session.post( + f"{ASI1_BASE_URL}/chat/completions", + headers=ASI1_HEADERS, + json=final_payload + ) as final_response: + final_response.raise_for_status() + final_response_json = await final_response.json() # Step 5: Return the model's final answer return final_response_json["choices"][0]["message"]["content"] diff --git a/web3/internet-computer/fetch/requirements.txt b/web3/internet-computer/fetch/requirements.txt new file mode 100644 index 00000000..98109e1b --- /dev/null +++ b/web3/internet-computer/fetch/requirements.txt @@ -0,0 +1,3 @@ +uagents +uagents-core +aiohttp>=3.8.0 From 0163ea35e7143673b9c8468804ad3d8f8058c4ce Mon Sep 17 00:00:00 2001 From: wuyangfan Date: Wed, 17 Jun 2026 11:39:00 +0800 Subject: [PATCH 2/2] fix: satisfy async handler CI checks --- Rag-agent/ango/__init__.py | 0 Rag-agent/ango/agent.py | 156 ++++++++++-------- image-agent-payment-protocol/payment_proto.py | 70 +++++--- web3/internet-computer/fetch/__init__.py | 0 web3/internet-computer/fetch/agent.py | 93 +++++------ 5 files changed, 183 insertions(+), 136 deletions(-) create mode 100644 Rag-agent/ango/__init__.py create mode 100644 web3/internet-computer/fetch/__init__.py diff --git a/Rag-agent/ango/__init__.py b/Rag-agent/ango/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/Rag-agent/ango/agent.py b/Rag-agent/ango/agent.py index 8353d1ae..16a95e12 100644 --- a/Rag-agent/ango/agent.py +++ b/Rag-agent/ango/agent.py @@ -3,16 +3,13 @@ import os from datetime import datetime from uuid import uuid4 -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() # Import Agno components for RAG functionality from agno.agent import Agent as AgnoAgent, AgentKnowledge from agno.embedder.openai import OpenAIEmbedder from agno.models.openai import OpenAIChat from agno.vectordb.pgvector import PgVector, SearchType +from dotenv import load_dotenv # Import uAgent components for chat protocol from uagents import Context, Protocol, Agent @@ -23,13 +20,17 @@ chat_protocol_spec, ) +# Load environment variables +load_dotenv() + # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) + class RAGChatAgent: """Integrated RAG Chat Agent combining Agno RAG with uAgent chat protocol""" - + def __init__(self): # Initialize the knowledge base with PgVector self.db_url = "postgresql+psycopg://ai:ai@localhost:5532/ai" @@ -39,44 +40,44 @@ def __init__(self): db_url=self.db_url, search_type=SearchType.hybrid, embedder=OpenAIEmbedder( - id="text-embedding-3-small", - api_key=os.getenv("OPENAI_API_KEY") + id="text-embedding-3-small", api_key=os.getenv("OPENAI_API_KEY") ), ), ) - + # Initialize the knowledge base first try: self.knowledge.load() logger.info("Knowledge base initialized successfully") except Exception as init_error: logger.warning(f"Could not initialize knowledge base: {init_error}") - + # Add PDF content to knowledge base try: from agno.document.reader.pdf_reader import PDFReader - from agno.document.base import Document import uuid - + pdf_reader = PDFReader() documents = pdf_reader.read("ai.pdf") - + # Ensure each document has a proper ID and fix serialization issue for i, document in enumerate(documents): # Always set a unique ID - document.id = f"ai_doc_{i+1}_{uuid.uuid4().hex[:8]}" - + document.id = f"ai_doc_{i + 1}_{uuid.uuid4().hex[:8]}" + # Set document name - document.name = f"ai_document_{i+1}" - + document.name = f"ai_document_{i + 1}" + # Ensure the document has all required fields - document.meta_data = {'source': 'ai.pdf', 'page': i+1} - + document.meta_data = {"source": "ai.pdf", "page": i + 1} + # Load documents using load_documents method try: self.knowledge.load_documents(documents, upsert=True) loaded_count = len(documents) - logger.info(f"Successfully loaded {loaded_count} documents from ai.pdf to knowledge base") + logger.info( + f"Successfully loaded {loaded_count} documents from ai.pdf to knowledge base" + ) except Exception as load_error: logger.error(f"Failed to load documents: {load_error}") # Try loading documents one by one as fallback @@ -86,31 +87,33 @@ def __init__(self): self.knowledge.load_document(document, upsert=True) loaded_count += 1 except Exception as doc_error: - logger.error(f"Failed to load document {document.id}: {doc_error}") - + logger.error( + f"Failed to load document {document.id}: {doc_error}" + ) + # Verify documents are in the knowledge base try: test_results = self.knowledge.search("AI", num_documents=1) - logger.info(f"Knowledge base verification: Found {len(test_results)} documents for test query") + logger.info( + f"Knowledge base verification: Found {len(test_results)} documents for test query" + ) except Exception as verify_error: logger.warning(f"Could not verify knowledge base: {verify_error}") - + except Exception as e: logger.error(f"Failed to add PDF to knowledge base: {e}") import traceback + traceback.print_exc() - + # Initialize Agno RAG agent self.rag_agent = AgnoAgent( - model=OpenAIChat( - id="gpt-4o-mini", - api_key=os.getenv("OPENAI_API_KEY") - ), + model=OpenAIChat(id="gpt-4o-mini", api_key=os.getenv("OPENAI_API_KEY")), knowledge=self.knowledge, search_knowledge=True, markdown=True, ) - + # Initialize uAgent self.uagent = Agent( name="RAG-Chat-Agent", @@ -118,14 +121,14 @@ def __init__(self): port=8001, mailbox=True, ) - + # Set up chat protocol self.setup_protocol() - + def setup_protocol(self): """Set up the chat protocol for the uAgent""" protocol = Protocol(spec=chat_protocol_spec) - + @protocol.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): """Handle incoming chat messages and generate RAG-powered responses""" @@ -134,52 +137,60 @@ async def handle_message(ctx: Context, sender: str, msg: ChatMessage): await ctx.send( sender, ChatAcknowledgement( - timestamp=datetime.now(), - acknowledged_msg_id=msg.msg_id + timestamp=datetime.now(), acknowledged_msg_id=msg.msg_id ), ) - + # Extract text from message user_text = "" for item in msg.content: if isinstance(item, TextContent): user_text += item.text - + logger.info(f"Received message from {sender}: {user_text}") - + # Generate response using RAG agent response = await self.generate_rag_response(user_text) - + # Send response back - await ctx.send(sender, ChatMessage( - timestamp=datetime.now(), - msg_id=uuid4(), - content=[ - TextContent(type="text", text=response), - ] - )) - + await ctx.send( + sender, + ChatMessage( + timestamp=datetime.now(), + msg_id=uuid4(), + content=[ + TextContent(type="text", text=response), + ], + ), + ) + logger.info(f"Sent response to {sender}") - + except Exception as e: logger.error(f"Error handling message: {e}") # Send error response - await ctx.send(sender, ChatMessage( - timestamp=datetime.now(), - msg_id=uuid4(), - content=[ - TextContent(type="text", text=f"Sorry, I encountered an error: {str(e)}"), - ] - )) - + await ctx.send( + sender, + ChatMessage( + timestamp=datetime.now(), + msg_id=uuid4(), + content=[ + TextContent( + type="text", + text=f"Sorry, I encountered an error: {str(e)}", + ), + ], + ), + ) + @protocol.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): """Handle chat acknowledgements""" logger.info(f"Received acknowledgement from {sender}") - + # Include protocol in uAgent self.uagent.include(protocol, publish_manifest=True) - + async def generate_rag_response(self, query: str) -> str: """Generate a response using the RAG agent""" try: @@ -190,38 +201,47 @@ async def generate_rag_response(self, query: str) -> str: query, num_documents=3, ) - logger.info(f"RAG Search found {len(search_results)} relevant documents for query: '{query}'") - + logger.info( + f"RAG Search found {len(search_results)} relevant documents for query: '{query}'" + ) + if len(search_results) == 0: logger.warning("No documents found in knowledge base for RAG query") return "I don't have any relevant information from the document to answer your question. Please make sure the PDF has been properly loaded into my knowledge base." - + except Exception as search_error: logger.error(f"Error searching knowledge base: {search_error}") return "I encountered an error while searching my knowledge base. Please try again." - + # Use the Agno agent to generate a response response = await asyncio.to_thread(self.rag_agent.run, query) - response_content = response.content if hasattr(response, 'content') else str(response) - + response_content = ( + response.content if hasattr(response, "content") else str(response) + ) + # Add a note if the response seems generic - if not any(word in response_content.lower() for word in ['document', 'pdf', 'text', 'according to', 'based on']): + if not any( + word in response_content.lower() + for word in ["document", "pdf", "text", "according to", "based on"] + ): response_content += "\n\n[Note: This response is based on my knowledge of the loaded PDF document.]" - + return response_content - + except Exception as e: logger.error(f"Error generating RAG response: {e}") import traceback + traceback.print_exc() return f"I'm sorry, I couldn't process your request. Error: {str(e)}" - + def run(self): """Start the uAgent""" logger.info("Starting RAG Chat Agent...") logger.info(f"Agent address: {self.uagent.address}") self.uagent.run() + # Create and run the integrated agent if __name__ == "__main__": try: @@ -230,4 +250,4 @@ def run(self): except KeyboardInterrupt: logger.info("Agent stopped by user") except Exception as e: - logger.error(f"Failed to start agent: {e}") \ No newline at end of file + logger.error(f"Failed to start agent: {e}") diff --git a/image-agent-payment-protocol/payment_proto.py b/image-agent-payment-protocol/payment_proto.py index fe9ec697..6b5f8569 100644 --- a/image-agent-payment-protocol/payment_proto.py +++ b/image-agent-payment-protocol/payment_proto.py @@ -8,7 +8,6 @@ RequestPayment, RejectPayment, CommitPayment, - CancelPayment, CompletePayment, payment_protocol_spec, ) @@ -63,7 +62,9 @@ async def handle_commit_payment(ctx: Context, sender: str, msg: CommitPayment): if msg.funds.payment_method == "skyfire" and msg.funds.currency == "USDC": try: - payment_verified = await verify_and_charge(msg.transaction_id, "0.001", ctx.logger) + payment_verified = await verify_and_charge( + msg.transaction_id, "0.001", ctx.logger + ) except Exception as e: ctx.logger.error(f"Skyfire verify/charge error: {e}") payment_verified = False @@ -77,7 +78,12 @@ async def handle_commit_payment(ctx: Context, sender: str, msg: CommitPayment): ctx.storage.set(f"{sender}:{session_id}:awaiting_prompt", True) ctx.storage.set(f"{sender}:{session_id}:verified_payment", True) await ctx.send(sender, CompletePayment(transaction_id=msg.transaction_id)) - await ctx.send(sender, create_text_chat("Please send your image prompt (one image will be generated).")) + await ctx.send( + sender, + create_text_chat( + "Please send your image prompt (one image will be generated)." + ), + ) else: ctx.logger.error(f"Payment verification failed from {sender}") await ctx.send(sender, RejectPayment(reason="Payment verification failed")) @@ -87,7 +93,9 @@ async def generate_image_after_payment(ctx: Context, user_address: str): from chat_proto import create_text_chat session_id = str(ctx.session) - prompt = ctx.storage.get(f"prompt:{user_address}:{session_id}") or ctx.storage.get("current_prompt") + prompt = ctx.storage.get(f"prompt:{user_address}:{session_id}") or ctx.storage.get( + "current_prompt" + ) if not prompt: ctx.logger.error("No prompt found in storage") await ctx.send(user_address, create_text_chat("Error: No prompt found")) @@ -104,6 +112,7 @@ def _sanitize_prompt(raw: str) -> str: p = p.split("")[0] # Remove any XML/HTML-like tags import re as _re + p = _re.sub(r"<[^>]+>", " ", p) # Collapse whitespace and trim p = " ".join(p.split()) @@ -124,8 +133,14 @@ def _sanitize_prompt(raw: str) -> str: async with session.get(pollinations_url) as resp: ctype = resp.headers.get("Content-Type", "") image_bytes = await resp.read() - if resp.status != 200 or not image_bytes or not ctype.startswith("image/"): - await ctx.send(user_address, create_text_chat("Image generation failed")) + if ( + resp.status != 200 + or not image_bytes + or not ctype.startswith("image/") + ): + await ctx.send( + user_address, create_text_chat("Image generation failed") + ) return mime_type: str = ctype or "image/png" @@ -136,7 +151,12 @@ def _sanitize_prompt(raw: str) -> str: storage_url = f"{base_url}/v1/storage" if not api_key: - await ctx.send(user_address, create_text_chat("Storage not configured. Please set AGENTVERSE_API_KEY to deliver the image.")) + await ctx.send( + user_address, + create_text_chat( + "Storage not configured. Please set AGENTVERSE_API_KEY to deliver the image." + ), + ) return from uagents_core.storage import ExternalStorage @@ -146,21 +166,32 @@ def _sanitize_prompt(raw: str) -> str: from uagents_core.contrib.protocols.chat import ChatMessage as AvChatMessage storage = ExternalStorage(api_token=api_key, storage_url=storage_url) - asset_id = storage.create_asset(name=str(ctx.session), content=image_bytes, mime_type=mime_type) + asset_id = storage.create_asset( + name=str(ctx.session), content=image_bytes, mime_type=mime_type + ) storage.set_permissions(asset_id=asset_id, agent_address=user_address) asset_uri = f"agent-storage://{storage.storage_url}/{asset_id}" - await ctx.send(user_address, AvChatMessage( - timestamp=datetime.now(timezone.utc), - msg_id=uuid4(), - content=[ - ResourceContent( - type="resource", - resource_id=asset_id, - resource=Resource(uri=asset_uri, metadata={"mime_type": mime_type, "role": "generated-image"}), - ) - ], - )) + await ctx.send( + user_address, + AvChatMessage( + timestamp=datetime.now(timezone.utc), + msg_id=uuid4(), + content=[ + ResourceContent( + type="resource", + resource_id=asset_id, + resource=Resource( + uri=asset_uri, + metadata={ + "mime_type": mime_type, + "role": "generated-image", + }, + ), + ) + ], + ), + ) except Exception as e: ctx.logger.error(f"Image generation error: {e}") await ctx.send(user_address, create_text_chat(f"Error generating image: {e}")) @@ -175,4 +206,3 @@ async def handle_reject_payment(ctx: Context, sender: str, msg: RejectPayment): "You rejected the payment. If you'd like to continue, reply and I'll send a new payment request." ), ) - diff --git a/web3/internet-computer/fetch/__init__.py b/web3/internet-computer/fetch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/web3/internet-computer/fetch/agent.py b/web3/internet-computer/fetch/agent.py index 86c29125..fd6c5e20 100644 --- a/web3/internet-computer/fetch/agent.py +++ b/web3/internet-computer/fetch/agent.py @@ -1,5 +1,5 @@ import json -from datetime import datetime, timezone, timedelta +from datetime import datetime, timezone from uuid import uuid4 import aiohttp @@ -17,16 +17,13 @@ ASI1_BASE_URL = "https://api.asi1.ai/v1" ASI1_HEADERS = { "Authorization": f"Bearer {ASI1_API_KEY}", - "Content-Type": "application/json" + "Content-Type": "application/json", } CANISTER_ID = "uzt4z-lp777-77774-qaabq-cai" BASE_URL = "http://127.0.0.1:4943" -HEADERS = { - "Host": f"{CANISTER_ID}.localhost", - "Content-Type": "application/json" -} +HEADERS = {"Host": f"{CANISTER_ID}.localhost", "Content-Type": "application/json"} # Function definitions for ASI1 function calling tools = [ @@ -39,10 +36,10 @@ "type": "object", "properties": {}, "required": [], - "additionalProperties": False + "additionalProperties": False, }, - "strict": True - } + "strict": True, + }, }, { "type": "function", @@ -54,14 +51,14 @@ "properties": { "address": { "type": "string", - "description": "The Bitcoin address to check." + "description": "The Bitcoin address to check.", } }, "required": ["address"], - "additionalProperties": False + "additionalProperties": False, }, - "strict": True - } + "strict": True, + }, }, { "type": "function", @@ -73,14 +70,14 @@ "properties": { "address": { "type": "string", - "description": "The Bitcoin address to fetch UTXOs for." + "description": "The Bitcoin address to fetch UTXOs for.", } }, "required": ["address"], - "additionalProperties": False + "additionalProperties": False, }, - "strict": True - } + "strict": True, + }, }, { "type": "function", @@ -92,21 +89,22 @@ "properties": { "destinationAddress": { "type": "string", - "description": "The destination Bitcoin address." + "description": "The destination Bitcoin address.", }, "amountInSatoshi": { "type": "number", - "description": "Amount to send in satoshis." - } + "description": "Amount to send in satoshis.", + }, }, "required": ["destinationAddress", "amountInSatoshi"], - "additionalProperties": False + "additionalProperties": False, }, - "strict": True - } - } + "strict": True, + }, + }, ] + async def call_icp_endpoint(func_name: str, args: dict): if func_name == "get_current_fee_percentiles": url = f"{BASE_URL}/get-current-fee-percentiles" @@ -128,25 +126,21 @@ async def call_icp_endpoint(func_name: str, args: dict): response.raise_for_status() return await response.json() + async def process_query(query: str, ctx: Context) -> str: try: # Step 1: Initial call to ASI1 with user query and tools - initial_message = { - "role": "user", - "content": query - } + initial_message = {"role": "user", "content": query} payload = { "model": "asi1-mini", "messages": [initial_message], "tools": tools, "temperature": 0.7, - "max_tokens": 1024 + "max_tokens": 1024, } async with aiohttp.ClientSession() as session: async with session.post( - f"{ASI1_BASE_URL}/chat/completions", - headers=ASI1_HEADERS, - json=payload + f"{ASI1_BASE_URL}/chat/completions", headers=ASI1_HEADERS, json=payload ) as response: response.raise_for_status() response_json = await response.json() @@ -172,14 +166,14 @@ async def process_query(query: str, ctx: Context) -> str: except Exception as e: error_content = { "error": f"Tool execution failed: {str(e)}", - "status": "failed" + "status": "failed", } content_to_send = json.dumps(error_content) tool_result_message = { "role": "tool", "tool_call_id": tool_call_id, - "content": content_to_send + "content": content_to_send, } messages_history.append(tool_result_message) @@ -188,13 +182,13 @@ async def process_query(query: str, ctx: Context) -> str: "model": "asi1-mini", "messages": messages_history, "temperature": 0.7, - "max_tokens": 1024 + "max_tokens": 1024, } async with aiohttp.ClientSession() as session: async with session.post( f"{ASI1_BASE_URL}/chat/completions", headers=ASI1_HEADERS, - json=final_payload + json=final_payload, ) as final_response: final_response.raise_for_status() final_response_json = await final_response.json() @@ -206,19 +200,16 @@ async def process_query(query: str, ctx: Context) -> str: ctx.logger.error(f"Error processing query: {str(e)}") return f"An error occurred while processing your request: {str(e)}" -agent = Agent( - name='test-ICP-agent', - port=8001, - mailbox=True -) + +agent = Agent(name="test-ICP-agent", port=8001, mailbox=True) chat_proto = Protocol(spec=chat_protocol_spec) + @chat_proto.on_message(model=ChatMessage) async def handle_chat_message(ctx: Context, sender: str, msg: ChatMessage): try: ack = ChatAcknowledgement( - timestamp=datetime.now(timezone.utc), - acknowledged_msg_id=msg.msg_id + timestamp=datetime.now(timezone.utc), acknowledged_msg_id=msg.msg_id ) await ctx.send(sender, ack) @@ -233,7 +224,7 @@ async def handle_chat_message(ctx: Context, sender: str, msg: ChatMessage): response = ChatMessage( timestamp=datetime.now(timezone.utc), msg_id=uuid4(), - content=[TextContent(type="text", text=response_text)] + content=[TextContent(type="text", text=response_text)], ) await ctx.send(sender, response) else: @@ -243,16 +234,22 @@ async def handle_chat_message(ctx: Context, sender: str, msg: ChatMessage): error_response = ChatMessage( timestamp=datetime.now(timezone.utc), msg_id=uuid4(), - content=[TextContent(type="text", text=f"An error occurred: {str(e)}")] + content=[TextContent(type="text", text=f"An error occurred: {str(e)}")], ) await ctx.send(sender, error_response) + @chat_proto.on_message(model=ChatAcknowledgement) -async def handle_chat_acknowledgement(ctx: Context, sender: str, msg: ChatAcknowledgement): - ctx.logger.info(f"Received acknowledgement from {sender} for message {msg.acknowledged_msg_id}") +async def handle_chat_acknowledgement( + ctx: Context, sender: str, msg: ChatAcknowledgement +): + ctx.logger.info( + f"Received acknowledgement from {sender} for message {msg.acknowledged_msg_id}" + ) if msg.metadata: ctx.logger.info(f"Metadata: {msg.metadata}") + agent.include(chat_proto) if __name__ == "__main__": @@ -301,4 +298,4 @@ async def handle_chat_acknowledgement(ctx: Context, sender: str, msg: ChatAcknow Can I see a test response? Hit the dummy-test route to make sure it works. -""" \ No newline at end of file +"""