From 603ba57052a5e5ea0058e1f11cd692ecf370616e Mon Sep 17 00:00:00 2001 From: namratam Date: Sun, 20 Aug 2023 11:19:17 -0700 Subject: [PATCH 1/4] #155: Added jobs db update for job exec endpoint --- flask_ades_wpst/ades_base.py | 44 +++++++++++++++++++++++++++++++++++ flask_ades_wpst/ades_hysds.py | 30 +----------------------- requirements.txt | 1 + 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index bfc3c34..75d01bf 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -4,6 +4,7 @@ from jinja2 import Template import logging import json +import boto3 import hashlib from flask_ades_wpst.sqlite_connector import sqlite_get_procs, sqlite_get_proc, sqlite_deploy_proc, \ sqlite_undeploy_proc, sqlite_get_jobs, sqlite_get_job, sqlite_exec_job, sqlite_dismiss_job, \ @@ -34,6 +35,47 @@ def __init__(self, app_config): raise ValueError("Platform {} not implemented.".\ format(self._platform)) self._ades = ADES_Platform() + + def get_sts_and_sns_clients(aws_auth_method): + if aws_auth_method == "keys": + sts_client = boto3.client( + "sts", + region_name="us-west-2", + aws_access_key_id=os.getenv("ACCESS_KEY"), + aws_secret_access_key=os.getenv("SECRET_KEY"), + aws_session_token=os.getenv("SESSION_TOKEN"), + ) + print(sts_client.get_caller_identity()) + client = boto3.client( + "sns", + region_name="us-west-2", + aws_access_key_id=os.getenv("ACCESS_KEY"), + aws_secret_access_key=os.getenv("SECRET_KEY"), + aws_session_token=os.getenv("SESSION_TOKEN"), + ) + + elif aws_auth_method == "iam": + sts_client = boto3.client("sts", region_name="us-west-2") + print(sts_client.get_caller_identity()) + client = boto3.client("sns", region_name="us-west-2") + + else: + print(f"Invalid aws_auth_method: {aws_auth_method}") + print(f"Supported methods: iam, keys") + exit() + + return sts_client, client + + def _update_jobs_database(self, job_id, proc_id, job_inputs={}, job_tags=[]): + sts_client, sns_client = self.get_sts_and_sns_clients(aws_auth_method) + job_data = {"id": job_id, "process": proc_id, "status": "Accepted", "inputs": job_inputs, "tags": job_tags} + + print( + sns_client.publish( + TopicArn=topic_arn, Message=json.dumps(job_data), MessageGroupId=job_id + ) + ) + def proc_dict(self, proc): return {"id": proc[0], @@ -152,6 +194,8 @@ def exec_job(self, proc_id, job_inputs): } ades_resp = self._ades.exec_job(job_spec) job_id = ades_resp.get("job_id") + # Update jobs database + self._update_jobs_database(job_id, proc_id, job_inputs) # ades_resp will return platform specific information that should be # kept in the database with the job ID record sqlite_exec_job(proc_id, job_id, job_inputs, ades_resp) diff --git a/flask_ades_wpst/ades_hysds.py b/flask_ades_wpst/ades_hysds.py index ffaeba2..9e3f855 100644 --- a/flask_ades_wpst/ades_hysds.py +++ b/flask_ades_wpst/ades_hysds.py @@ -50,34 +50,6 @@ def __init__( def _generate_job_id_stub(self, qsub_stdout): return ".".join(qsub_stdout.strip().split(".")[:2]) - def _pbs_job_state_to_status_str(self, work_dir, job_state): - pbs_job_state_to_status = { - "Q": "accepted", - "R": "running", - "E": "running", - } - if job_state in pbs_job_state_to_status: - status = pbs_job_state_to_status[job_state] - elif job_state == "F": - # Job finished; need to check cwl-runner exit-code to determine - # if the job succeeded or failed. In the auto-generated, PBS job - # submission script, the exit code is saved to a file. - exit_code_fname = os.path.join(work_dir, self._exit_code_fname) - try: - with open(exit_code_fname, "r") as f: - d = json.loads(f.read()) - exit_code = d["exit_code"] - if exit_code == 0: - status = "successful" - else: - status = "failed" - except: - status = "unknown-not-qref" - else: - # Encountered a PBS job state that is not supported. - status = "unknown-no-exit-code" - return status - def _construct_job_spec(self, cwl_wfl, wfl_inputs): """ create the job spec for a process to deploy @@ -359,7 +331,7 @@ def exec_job(self, job_spec): "error": None, } except Exception as ex: - error = ex + error = str(ex) return {"job_id": hysds_job.job_id, "error": error} def dismiss_job(self, proc_id, job_id): diff --git a/requirements.txt b/requirements.txt index 9714e8c..373eb63 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ cwl-runner==1.0 docker==6.0.0 jsonschema==4.5.1 GitPython==3.1.29 +boto3==1.26.97 From 4ceb30be286106fbdc3593b990618f9dff413eac Mon Sep 17 00:00:00 2001 From: namratam Date: Sun, 20 Aug 2023 14:29:08 -0700 Subject: [PATCH 2/4] #155: Updates to publish job info to jobs db --- flask_ades_wpst/ades_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index 75d01bf..3de9f4c 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -67,9 +67,9 @@ def get_sts_and_sns_clients(aws_auth_method): return sts_client, client def _update_jobs_database(self, job_id, proc_id, job_inputs={}, job_tags=[]): - sts_client, sns_client = self.get_sts_and_sns_clients(aws_auth_method) + sts_client, sns_client = self.get_sts_and_sns_clients(aws_auth_method="iam") job_data = {"id": job_id, "process": proc_id, "status": "Accepted", "inputs": job_inputs, "tags": job_tags} - + # TOOD: Figure out how to get topic_arn print( sns_client.publish( TopicArn=topic_arn, Message=json.dumps(job_data), MessageGroupId=job_id From f6d8fd2576d6842b033a36fb235ba332901d2497 Mon Sep 17 00:00:00 2001 From: namratam Date: Sun, 20 Aug 2023 16:54:10 -0700 Subject: [PATCH 3/4] #155: Updated jobs result endpoint --- flask_ades_wpst/ades_base.py | 57 ++++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index 3de9f4c..c235dd9 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -76,6 +76,30 @@ def _update_jobs_database(self, job_id, proc_id, job_inputs={}, job_tags=[]): ) ) + def _get_jobs_doc(self, job_id): + """ + This function retrieves the ES document for a given job ID from the Jobs DB + :param job_id: + :return: + """ + # Create an Elasticsearch client + es_client = boto3.client('es') + # TODO: Get Elasticsearch domain name and index name + domain_name = '' + index_name = "" + document_id = job_id + + # Query the document + try: + response = es_client.get_source(index=index_name, id=document_id, DomainName=domain_name) + document = json.loads(response.get('Body').read().decode('utf-8')) + print(f"Retrieved Document: {json.dumps(document, indent=4)}") + return document + except es_client.exceptions.ResourceNotFoundException as e: + raise RuntimeError("Document not found:", e) + except Exception as e: + raise RuntimeError("An error occurred:", e) + def proc_dict(self, proc): return {"id": proc[0], @@ -216,23 +240,24 @@ def dismiss_job(self, proc_id, job_id): return job_spec def get_job_results(self, proc_id, job_id): - # job_spec = self.get_job(proc_id, job_id) - products = self._ades.get_job_results(job_id=job_id) + job_doc = self._get_jobs_doc(job_id=job_id) job_result = dict() outputs = list() - for product in products: - id = product.get("id") - location = None - locations = product.get("browse_urls") - for loc in locations: - if loc.startswith("s3://"): - location = loc - # create output blocks and append - output = { - "mimeType": "tbd", - "href": location, - "id": id - } - outputs.append(output) + #TODO: Add verification to check if job_id corresponds to a job of process type - proc_id + if "outputs" in job_doc: + job_outputs = job_doc.get["outputs"] + print(f"Retrieved Output Field: {json.dumps(outputs)}") + for product in job_outputs: + prod_id = product + prod_location = job_outputs.get(product).get("location") + file_type = job_outputs.get(product).get("class") + output = { + "mimeType": file_type, + "href": prod_location, + "id": prod_id + } + outputs.append(output) + else: + print("Output field not found in the document.") job_result["outputs"] = outputs return job_result From 3075d492c9bbf6bdbe0e8f4fe51cfa2fce84e540 Mon Sep 17 00:00:00 2001 From: namratam Date: Mon, 21 Aug 2023 12:51:56 -0700 Subject: [PATCH 4/4] #155: Updated to use elasticsearchpy --- flask_ades_wpst/ades_base.py | 25 ++++++++++--------------- requirements.txt | 1 + 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/flask_ades_wpst/ades_base.py b/flask_ades_wpst/ades_base.py index c235dd9..0524ce8 100644 --- a/flask_ades_wpst/ades_base.py +++ b/flask_ades_wpst/ades_base.py @@ -1,11 +1,8 @@ -import sys -import requests -from flask import Response -from jinja2 import Template +import os +from elasticsearch import Elasticsearch import logging import json import boto3 -import hashlib from flask_ades_wpst.sqlite_connector import sqlite_get_procs, sqlite_get_proc, sqlite_deploy_proc, \ sqlite_undeploy_proc, sqlite_get_jobs, sqlite_get_job, sqlite_exec_job, sqlite_dismiss_job, \ sqlite_update_job_status @@ -69,7 +66,7 @@ def get_sts_and_sns_clients(aws_auth_method): def _update_jobs_database(self, job_id, proc_id, job_inputs={}, job_tags=[]): sts_client, sns_client = self.get_sts_and_sns_clients(aws_auth_method="iam") job_data = {"id": job_id, "process": proc_id, "status": "Accepted", "inputs": job_inputs, "tags": job_tags} - # TOOD: Figure out how to get topic_arn + topic_arn = os.environ["JOBS_DATA_SNS_TOPIC_ARN"] print( sns_client.publish( TopicArn=topic_arn, Message=json.dumps(job_data), MessageGroupId=job_id @@ -83,22 +80,20 @@ def _get_jobs_doc(self, job_id): :return: """ # Create an Elasticsearch client - es_client = boto3.client('es') - # TODO: Get Elasticsearch domain name and index name - domain_name = '' + # Initialize the Elasticsearch client + # TODO: Change to use environment variables + es = Elasticsearch([{'host': os.environ["ES_URL"], 'port': 9200}]) index_name = "" document_id = job_id # Query the document try: - response = es_client.get_source(index=index_name, id=document_id, DomainName=domain_name) - document = json.loads(response.get('Body').read().decode('utf-8')) - print(f"Retrieved Document: {json.dumps(document, indent=4)}") + result = es.get(index=index_name, id=document_id) + document = result.get('_source', {}) + print(f"Retrieved Document:\n {document}") return document - except es_client.exceptions.ResourceNotFoundException as e: - raise RuntimeError("Document not found:", e) except Exception as e: - raise RuntimeError("An error occurred:", e) + print("An error occurred:", e) def proc_dict(self, proc): diff --git a/requirements.txt b/requirements.txt index 373eb63..c047254 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ docker==6.0.0 jsonschema==4.5.1 GitPython==3.1.29 boto3==1.26.97 +elasticsearch