diff --git a/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/payloads/imgsave.json b/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/payloads/imgsave.json index 8135e181..73f103a5 100644 --- a/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/payloads/imgsave.json +++ b/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/payloads/imgsave.json @@ -8,8 +8,8 @@ "bucket_name": "your-bucket" }, "webhook": { - "webhook_url": "your-webhook-url", - "webhook_extra_params": {} + "url": "your-webhook-url", + "extra_params": {} }, "modifiers": { diff --git a/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/requestmodels/models.py b/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/requestmodels/models.py index bd95a7a8..38519fb6 100644 --- a/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/requestmodels/models.py +++ b/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/requestmodels/models.py @@ -4,10 +4,10 @@ import json class S3Config(BaseModel): - access_key_id: str = Field(default="") - secret_access_key: str = Field(default="") - endpoint_url: str = Field(default="") - bucket_name: str = Field(default="") + access_key_id: str = Field(default=os.environ.get("S3_ACCESS_KEY_ID", "")) + secret_access_key: str = Field(default=os.environ.get("S3_SECRET_ACCESS_KEY", "")) + endpoint_url: str = Field(default=os.environ.get("S3_ENDPOINT_URL", "")) + bucket_name: str = Field(default=os.environ.get("S3_BUCKET_NAME", "")) connect_timeout: int = Field(default=5) connect_attempts: int = Field(default=1) diff --git a/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/responses/result.py b/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/responses/result.py index 3a752ac4..eb69227b 100644 --- a/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/responses/result.py +++ b/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/responses/result.py @@ -1,11 +1,15 @@ from pydantic import BaseModel, Field from typing import Dict +class Output(BaseModel): + local_path: str = Field(default='') + url: str = Field(default='') + class Result(BaseModel): id: str message: str = Field(default='Request accepted') status: str = Field(default='pending') comfyui_response: Dict = Field(default={}) - output: list = Field(default=[]) + output: list[Output] = Field(default=[]) timings: Dict = Field(default={}) diff --git a/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/workers/postprocess_worker.py b/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/workers/postprocess_worker.py index a2e38b9c..6384ab7c 100644 --- a/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/workers/postprocess_worker.py +++ b/build/COPY_ROOT_1/opt/ai-dock/api-wrapper/workers/postprocess_worker.py @@ -2,6 +2,8 @@ import aiobotocore.session import aiofiles import aiofiles.os +import aiohttp +import json from config import config from pathlib import Path @@ -28,10 +30,10 @@ async def work(self): # Process the job print(f"PostprocessWorker {self.worker_id} processing job: {request_id}") + request = await self.request_store.get(request_id) + result = await self.response_store.get(request_id) + try: - request = await self.request_store.get(request_id) - result = await self.response_store.get(request_id) - await self.move_assets(request_id, result) await self.upload_assets(request_id, request.input.s3.get_config(), result) @@ -43,7 +45,8 @@ async def work(self): result.status = "failed" result.message = f"Postprocessing failed: {e}" await self.response_store.set(request_id, result) - + + await self.invoke_webhook(request_id, request, result) await self.response_store.set(request_id, result) # Mark the job as complete @@ -105,7 +108,7 @@ async def upload_assets(self, request_id, s3_config, result): async def upload_file_and_get_url(self, requst_id, s3_client, bucket_name, local_path): # Get the file name from the local path file_name = f"{requst_id}/{Path(local_path).name}" - print (f"uploading {file_name}") + print(f"uploading {file_name}") try: # Upload the file @@ -121,4 +124,30 @@ async def upload_file_and_get_url(self, requst_id, s3_client, bucket_name, local return presigned_url except Exception as e: print(f"Error uploading {local_path}: {e}") - return None \ No newline at end of file + return None + + async def invoke_webhook(self, request_id, request, result): + if not (request.input.webhook and request.input.webhook.url): + return + + async with aiohttp.ClientSession() as session: + try: + print(f"Calling webhook {request.input.webhook.url} ...") + data = json.dumps({ + "result": { + "images": result.output + }, + "extra_params": request.input.webhook.extra_params, + "request_id": request_id, + "success": result.status == "success", + "message": result.message + }).encode('utf-8') + headers = { + "Content-Type": "application/json" + } + async with session.post(request.input.webhook.url, data=data, headers=headers) as response: + response_data = await response.text() + + print(f"Executed webhook with response: {response_data}") + except Exception as e: + raise aiohttp.ClientError(f"Failed to execute webhook: {e}")