Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions docker/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import uuid
from aiohttp import web
from cellpack.autopack.DBRecipeHandler import DataDoc, DBUploader
from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS
Expand All @@ -25,19 +26,19 @@ def job_exists(self, dedup_hash):
job_status, _ = db.get_doc_by_id("job_status", dedup_hash)
return job_status is not None

async def run_packing(self, dedup_hash, recipe=None, config=None, body=None):
self.update_job_status(dedup_hash, "RUNNING")
async def run_packing(self, job_id, recipe=None, config=None, body=None):
self.update_job_status(job_id, "RUNNING")
try:
# Pack JSON recipe in body if provided, otherwise use recipe path
pack(recipe=(body if body else recipe), config_path=config, docker=True, hash=dedup_hash)
pack(recipe=(body if body else recipe), config_path=config, docker=True, hash=job_id)
except Exception as e:
self.update_job_status(dedup_hash, "FAILED", error_message=str(e))
self.update_job_status(job_id, "FAILED", error_message=str(e))

def update_job_status(self, dedup_hash, status, result_path=None, error_message=None):
def update_job_status(self, job_id, status, result_path=None, error_message=None):
db = self._get_firebase_handler()
if db:
db_uploader = DBUploader(db)
db_uploader.upload_job_status(dedup_hash, status, result_path, error_message)
db_uploader.upload_job_status(job_id, status, result_path, error_message)

async def hello_world(self, request: web.Request) -> web.Response:
return web.Response(text="Hello from the cellPACK server")
Expand All @@ -58,13 +59,18 @@ async def pack_handler(self, request: web.Request) -> web.Response:
)
config = request.rel_url.query.get("config")

dedup_hash = DataDoc.generate_hash(body)

if self.job_exists(dedup_hash):
return web.json_response({"jobId": dedup_hash})
if body:
dedup_hash = DataDoc.generate_hash(body)
if self.job_exists(dedup_hash):
return web.json_response({"jobId": dedup_hash})
job_id = dedup_hash
else:
job_id = str(uuid.uuid4())
Comment on lines +62 to +68
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only key lines


# Initiate packing task to run in background
packing_task = asyncio.create_task(self.run_packing(dedup_hash, recipe, config, body))
packing_task = asyncio.create_task(
self.run_packing(job_id, recipe, config, body)
)

# Keep track of task references to prevent them from being garbage
# collected, then discard after task completion
Expand All @@ -73,7 +79,7 @@ async def pack_handler(self, request: web.Request) -> web.Response:

# return job id immediately, rather than wait for task to complete,
# to avoid timeout issues with API gateway
return web.json_response({"jobId": dedup_hash})
return web.json_response({"jobId": job_id})


async def init_app() -> web.Application:
Expand Down