Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"bucket_name": "your-bucket"
},
"webhook": {
"webhook_url": "your-webhook-url",
"webhook_extra_params": {}
"url": "your-webhook-url",
"extra_params": {}
},
"modifiers": {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import json

class S3Config(BaseModel):
access_key_id: str = Field(default="")
secret_access_key: str = Field(default="")
endpoint_url: str = Field(default="")
bucket_name: str = Field(default="")
access_key_id: str = Field(default=os.environ.get("S3_ACCESS_KEY_ID", ""))
secret_access_key: str = Field(default=os.environ.get("S3_SECRET_ACCESS_KEY", ""))
endpoint_url: str = Field(default=os.environ.get("S3_ENDPOINT_URL", ""))
bucket_name: str = Field(default=os.environ.get("S3_BUCKET_NAME", ""))
connect_timeout: int = Field(default=5)
connect_attempts: int = Field(default=1)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from pydantic import BaseModel, Field
from typing import Dict

class Output(BaseModel):
local_path: str = Field(default='')
url: str = Field(default='')

class Result(BaseModel):
id: str
message: str = Field(default='Request accepted')
status: str = Field(default='pending')
comfyui_response: Dict = Field(default={})
output: list = Field(default=[])
output: list[Output] = Field(default=[])
timings: Dict = Field(default={})

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import aiobotocore.session
import aiofiles
import aiofiles.os
import aiohttp
import json
from config import config
from pathlib import Path

Expand All @@ -28,10 +30,10 @@ async def work(self):

# Process the job
print(f"PostprocessWorker {self.worker_id} processing job: {request_id}")
request = await self.request_store.get(request_id)
result = await self.response_store.get(request_id)

try:
request = await self.request_store.get(request_id)
result = await self.response_store.get(request_id)

await self.move_assets(request_id, result)
await self.upload_assets(request_id, request.input.s3.get_config(), result)

Expand All @@ -43,7 +45,8 @@ async def work(self):
result.status = "failed"
result.message = f"Postprocessing failed: {e}"
await self.response_store.set(request_id, result)


await self.invoke_webhook(request_id, request, result)
await self.response_store.set(request_id, result)

# Mark the job as complete
Expand Down Expand Up @@ -105,7 +108,7 @@ async def upload_assets(self, request_id, s3_config, result):
async def upload_file_and_get_url(self, requst_id, s3_client, bucket_name, local_path):
# Get the file name from the local path
file_name = f"{requst_id}/{Path(local_path).name}"
print (f"uploading {file_name}")
print(f"uploading {file_name}")

try:
# Upload the file
Expand All @@ -121,4 +124,30 @@ async def upload_file_and_get_url(self, requst_id, s3_client, bucket_name, local
return presigned_url
except Exception as e:
print(f"Error uploading {local_path}: {e}")
return None
return None

async def invoke_webhook(self, request_id, request, result):
if not (request.input.webhook and request.input.webhook.url):
return

async with aiohttp.ClientSession() as session:
try:
print(f"Calling webhook {request.input.webhook.url} ...")
data = json.dumps({
"result": {
"images": result.output
},
"extra_params": request.input.webhook.extra_params,
"request_id": request_id,
"success": result.status == "success",
"message": result.message
}).encode('utf-8')
headers = {
"Content-Type": "application/json"
}
async with session.post(request.input.webhook.url, data=data, headers=headers) as response:
response_data = await response.text()

print(f"Executed webhook with response: {response_data}")
except Exception as e:
raise aiohttp.ClientError(f"Failed to execute webhook: {e}")