diff --git a/.gitignore b/.gitignore index 799e87c..96082d1 100644 --- a/.gitignore +++ b/.gitignore @@ -89,7 +89,7 @@ cover/ # Jupyter Notebook .ipynb_checkpoints -*.ipynb +# *.ipynb # IPython profile_default/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 498d1a1..029ff40 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,6 +2,10 @@ default_language_version: python: python3 repos: + - repo: https://github.com/psf/black-pre-commit-mirror + rev: 25.1.0 + hooks: + - id: black-jupyter - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: @@ -92,3 +96,7 @@ repos: - types-tqdm - typing-extensions - wandb + - repo: https://github.com/Yelp/detect-secrets + rev: v1.5.0 # Use the latest version + hooks: + - id: detect-secrets diff --git a/pyproject.toml b/pyproject.toml index a7e0d83..99c5501 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "google-auth==2.38.0", "google-cloud-storage==3.0.0", "google-cloud-secret-manager==2.23.0", - "futurehouse-client==0.3.18.dev25", + "futurehouse-client==0.3.18.dev80", "jupyter==1.1.1", "nbconvert==7.16.6", "notebook==7.3.2", diff --git a/src/fhda/tortoise.py b/src/fhda/tortoise.py new file mode 100644 index 0000000..0897073 --- /dev/null +++ b/src/fhda/tortoise.py @@ -0,0 +1,260 @@ +import os +import uuid +import asyncio +import copy +from typing import Any, Callable, Optional +from os import PathLike +import time +import json +from pydantic import BaseModel, Field +from . import prompts + +from futurehouse_client import FutureHouseClient +from futurehouse_client.models import TaskRequest, RuntimeConfig +from futurehouse_client.models.app import AuthType + + +class StepConfig(BaseModel): + """Configuration for a step in the pipeline.""" + + language: str = Field( + default="PYTHON", description="Language for execution environment" + ) + max_steps: int = Field( + default=30, description="Maximum number of steps for the agent" + ) + timeout: int = Field(default=15 * 60, description="Timeout for the step in seconds") + eval: bool = Field(default=True, description="Whether to use eval mode") + + +class Step(BaseModel): + """A step in the agent execution pipeline.""" + + name: str = Field( + description="Name of the job to run (e.g. 'job-futurehouse-data-analysis-crow-high')" + ) + prompt_template: str = Field(description="Prompt template to use for the step") + cot_prompt: bool = Field( + default=False, description="Whether to augment the query with COT prompting" + ) + prompt_args: dict[str, Any] = Field( + default_factory=dict, + description="Keyword arguments to format the prompt template.", + ) + input_files: dict[str, str] = Field( + default_factory=dict, description="Files to upload {'source_path': 'dest_name'}" + ) + output_files: dict[str, str] = Field( + default_factory=dict, + description="Files to download {'source_name': 'dest_path'}", + ) + step_id: str = Field( + default_factory=lambda: str(uuid.uuid4())[:8], + description="Small UID for the step", + ) + upload_id: Optional[str] = Field(default=None, description="Upload ID for GCS") + parallel: int = Field(default=1, description="Number of parallel tasks to run") + config: StepConfig = Field( + default_factory=StepConfig, description="Configuration for the step" + ) + post_process: Optional[Callable[[dict[str, Any], str], None]] = Field( + default=None, description="Function to run after step completion" + ) + prompt_generator: Optional[Callable[[], list[tuple[str, dict[str, Any]]]]] = Field( + default=None, + description="Function to generate prompts and args for parallel tasks based on previous results", + ) + + def cot_prompting(self, query: str, language: str) -> str: + """Apply chain-of-thought prompting to the query.""" + guidelines = prompts.GENERAL_NOTEBOOK_GUIDELINES.format(language=language) + if language == "R": + guidelines = prompts.R_SPECIFIC_GUIDELINES.format(language=language) + return ( + f"{prompts.CHAIN_OF_THOUGHT_AGNOSTIC.format(language=language)}\n" + f"{guidelines}" + f"Here is the research question to address:\n" + f"\n" + f"{query}\n" + f"\n" + ) + + def format_prompt(self) -> str: + """Format the prompt template with the provided arguments.""" + final_prompt = self.prompt_template.format(**self.prompt_args) + if self.cot_prompt: + final_prompt = self.cot_prompting(final_prompt, self.config.language) + return final_prompt + + +class Tortoise: + """Runner for multi-step agent pipelines.""" + + def __init__(self, api_key: str): + """Initialize the tortoise framework with FutureHouse API key.""" + self.client = FutureHouseClient(auth_type=AuthType.API_KEY, api_key=api_key) + self.steps: list[Step] = [] + self.results: dict[str, Any] = {} + + def add_step(self, step: Step) -> None: + """Add a step to the pipeline.""" + self.steps.append(step) + + def save_results(self, output_dir: str | PathLike = "output") -> None: + """Save the results to a JSON file.""" + results_path = f"{output_dir}/results_{time.strftime('%Y%m%d_%H%M%S')}.json" + print(f"Saving all results to {results_path}") + try: + os.makedirs(output_dir, exist_ok=True) + serializable_results = {} + for step_id, step_result in self.results.items(): + serializable_results[step_id] = dict(step_result) + + with open(results_path, "w") as f: + json.dump(serializable_results, f, indent=2) + print(f"Results successfully saved to {results_path}") + except Exception as e: + print(f"Error saving results to {results_path}: {e}") + + def _create_task_requests( + self, step: Step, runtime_config: RuntimeConfig + ) -> list[TaskRequest]: + """Create task requests with either identical or dynamic prompts. + + Args: + step: The step configuration + runtime_config: The runtime configuration for the task + + Returns: + List of task requests to be executed + """ + task_requests = [] + task_count = max(step.parallel, 1) + + if step.prompt_generator and task_count > 1: + # Generate dynamic prompts based on previous results + prompt_pairs = step.prompt_generator() + # Create a task request for each generated prompt + for prompt_text, prompt_args in prompt_pairs[ + :task_count + ]: # Limit to requested parallel count + step_copy = copy.deepcopy(step) + step_copy.prompt_template = prompt_text + step_copy.prompt_args = prompt_args + query = step_copy.format_prompt() + task_requests.append( + TaskRequest( + name=step.name, + query=query, + runtime_config=runtime_config, + ) + ) + else: + # Default behavior: use the same prompt for all tasks + query = step.format_prompt() + task_requests = [ + TaskRequest( + name=step.name, + query=query, + runtime_config=runtime_config, + ) + ] * task_count + + return task_requests + + async def run_pipeline( + self, output_dir: str | PathLike = "output" + ) -> dict[str, Any]: + """Run the entire pipeline of steps.""" + os.makedirs(output_dir, exist_ok=True) + + for i, step in enumerate(self.steps): + print(f"Running step {i + 1}/{len(self.steps)}: {step.name}") + if not step.upload_id: + step.upload_id = f"{step.name}_{step.step_id}" + + for source_path, dest_name in step.input_files.items(): + print(f"Uploading file {source_path} as {dest_name}") + self.client.upload_file( + step.name, file_path=source_path, upload_id=step.upload_id + ) + + if step.config: + runtime_config = RuntimeConfig( + max_steps=step.config.max_steps, + upload_id=step.upload_id, + environment_config={ + "eval": step.config.eval, + "language": step.config.language, + }, + ) + else: + runtime_config = None + + task_requests = self._create_task_requests(step, runtime_config) + + print( + f"Running {len(task_requests)} task{'s' if len(task_requests) > 1 else ''}" + ) + task_responses = await self.client.arun_tasks_until_done( + task_requests, + progress_bar=True, + verbose=True, + timeout=step.config.timeout, + ) + + task_ids = [str(task.task_id) for task in task_responses] + success_rate = sum( + [task.status == "success" for task in task_responses] + ) / len(task_responses) + print(f"Task success rate: {success_rate * 100}%") + + self.results[step.step_id] = { + "task_ids": task_ids, + "task_responses": task_responses, + "success_rate": success_rate, + } + + os.makedirs(f"{output_dir}/{step.step_id}", exist_ok=True) + + for idx, task_id in enumerate(task_ids): + for source_name, dest_path in step.output_files.items(): + try: + # Add index suffix only when there are multiple tasks + path_suffix = f"_{idx}" if len(task_ids) > 1 else "" + if "." in dest_path: + base, ext = os.path.splitext(dest_path) + dest_path_with_idx = f"{base}{path_suffix}{ext}" + else: + dest_path_with_idx = f"{dest_path}{path_suffix}" + + path = f"{output_dir}/{step.step_id}/{dest_path_with_idx}" + os.makedirs( + os.path.dirname(os.path.abspath(path)), exist_ok=True + ) + print(f"Downloading file {source_name} to {path}") + self.client.download_file( + step.name, + trajectory_id=task_id, + file_path=source_name, + destination_path=path, + ) + except Exception as e: + print( + f"Error downloading {source_name} from task {task_id}: {e}" + ) + + if step.post_process: + print(f"Running post-processing for step {step.step_id}") + step.post_process( + self.results[step.step_id], f"{output_dir}/{step.step_id}" + ) + + print(f"Completed step {i + 1}/{len(self.steps)}") + + self.save_results(output_dir) + return self.results + + def run(self, output_dir: str | PathLike = "output") -> dict[str, Any]: + """Synchronous version of run_pipeline.""" + return asyncio.run(self.run_pipeline(output_dir)) diff --git a/src/scripts/deploy.py b/src/scripts/deploy.py index bad9496..8b8a848 100644 --- a/src/scripts/deploy.py +++ b/src/scripts/deploy.py @@ -12,7 +12,7 @@ ) from futurehouse_client.models.app import TaskQueuesConfig -HIGH = False +HIGH = True ENVIRONMENT = "DEV" ENV_VARS = { @@ -32,9 +32,9 @@ FramePath(path="state.nb_state_html", type="notebook"), ] -MODEL = "claude-3-7-sonnet-latest" -TEMPERATURE = 1 -NUM_RETRIES = 3 +# MODEL = "claude-3-7-sonnet-latest" +# TEMPERATURE = 1 +# NUM_RETRIES = 3 # agent = AgentConfig( # agent_type="ReActAgent", diff --git a/tutorial/consensus.ipynb b/tutorial/consensus.ipynb new file mode 100644 index 0000000..f562ba9 --- /dev/null +++ b/tutorial/consensus.ipynb @@ -0,0 +1,353 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Multi-agent consensus tutorial\n", + "\n", + "In this tutorial, we will be using two different agents, Finch and Crow to do differential expression analysis on some RNASeq data from [here](https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc=GSE52778). Additionally, we do consensus sampling with Finch to improve reliability of the results.\n", + "\n", + "The process follows four steps:\n", + "1. Differential expression analysis: run 10 DEAs in parallel with Finch\n", + "2. Consensus sampling: Aggregate the results of the DEAs with Finch\n", + "3. Literature search: Use Crow to search the literature for the top differentially expressed genes\n", + "4. Visualization: Use Finch to create a final interactive volcano plot containing all differentially expressed genes, their evidence and the evidence score.\n", + "\n", + "Let's get started!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import pandas as pd\n", + "import uuid\n", + "\n", + "from futurehouse_client import FutureHouseClient\n", + "from futurehouse_client.models import TaskRequest, RuntimeConfig\n", + "from futurehouse_client.models.app import AuthType\n", + "import fhda.prompts as prompts" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Here are the prompts we'll be using\n", + "TREATMENT = \"dexamethasone\"\n", + "MECHANISM = \"airway smooth muscle cells\"\n", + "CONTEXT = \"asthma\"\n", + "N_TOP_GENES = 10\n", + "DEA_PROMPT = \"\"\"\n", + "Determine the effect of {treatment} on {mechanism} in {context}. \n", + "\n", + "Perform differential expression analysis and pathway analysis on relevant comparison groups. Map all gene IDs to gene symbols using annotation package such as ‘org.Hs.eg.db’.\n", + "\n", + "Generate volcano plots and heatmap of differentially expressed genes, and dot plots for enriched pathways, use gene symbols for labels where relevant.\n", + "\n", + "Output a single csv file named \"dea_results.csv\" with the results for all tested genes of the most relevant contrast, report both gene ID and gene symbol.\n", + "\n", + "If there is an error, keep trying, do not give up until you reach the end of the analysis. When mapping gene ID to gene symbol, consider all possible forms of gene IDs, keep trying until the gene symbols are obtained.\n", + "\"\"\"\n", + "\n", + "CONSENSUS_PROMPT = f\"\"\"\n", + "Combine these differential expression analysis results by calculating the mode of log2FC and adjusted p values. Output the results in a file named ‘consensus_results.csv’, include the columns gene_symbol, log2FC and adjusted P values. In a separate file named ‘top_genes.csv’, output the top {N_TOP_GENES} gene symbols of the consensus most significant genes with the column name “gene_symbol”. \n", + "\n", + "Create a stacked bar plot showing gene regulation consistency across all analyses. Plot regulation direction (up vs down) on x-axis and percentage of genes in each category on y-axis. Color-code by significance category: all analyses, >50% of analyses and <50% of analyses. Include percentages within each segment and a clear legend. Exclude genes that are non-significant across all analyses.\n", + "\"\"\"\n", + "\n", + "PQA_PROMPT = \"\"\"\n", + " What are the possible mechanisms for {gene} in the effect of {treatment} on {mechanism} in {context}?\n", + " From 1 to 5, with 1 being no evidence of association at all and 5 being strong association with supporting evidence, how strong is the evidence supporting this mechanism?\n", + " Give a concise summary for the evidence in up to 10 words, and a short summary of mechanisms in up to 20 words. Do not include references or links.\n", + " Please share this information in json format in the form of: `\"gene_symbol\": , \"association_evidence_score\":[1...5], \"evidence_summary\": , \"mechanism_summary\": `.\n", + " Share nothing else but the JSON output.\n", + " \"\"\"\n", + "\n", + "VOLCANO_PROMPT = \"\"\"\n", + "Make an interactive volcano plot. Colour-code by significance categories: top up-regulated genes, up-regulated genes, top down-regulated genes, down-regulated genes, and non-significant genes. Genes considered as top differentially expressed genes have extra annotation available in 'pqa_results.csv’.\n", + "\n", + "Include hover information according to the categories, for the top genes, on hover, show gene symbol, log2FC, adjusted p value, mechanism, evidence and evidence score. For up and down regulated genes that are not in top differentially expressed genes, show gene symbol, log2FC and adjusted p value. For non-significant genes, do not include hover information.\n", + "\n", + "For the annotations, remove all text in the brackets in the summary columns, and remove the fullstop at the end. For annotations with 6 words or more in a line, use text-wrap. Don’t include text on the plot itself. Include a legend explaining the color-codes.\n", + "\n", + "PLEASE USE TEXT WRAP FOR THE HOVER INFORMATION!\n", + "\"\"\"\n", + "\n", + "\n", + "def augment_query(query, language):\n", + " guidelines = prompts.GENERAL_NOTEBOOK_GUIDELINES.format(language=language)\n", + " if language == \"R\":\n", + " guidelines = prompts.R_SPECIFIC_GUIDELINES.format(language=language)\n", + " return (\n", + " f\"{prompts.CHAIN_OF_THOUGHT_AGNOSTIC.format(language=language)}\\n\"\n", + " f\"{guidelines}\"\n", + " f\"Here is the research question to address:\\n\"\n", + " f\"\\n\"\n", + " f\"{query}\\n\"\n", + " f\"\\n\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Here we instantiate the FutureHouse client and define the job names\n", + "FH_API_KEY = \"\" # Add your API key here\n", + "FINCH_JOB_NAME = \"job-futurehouse-data-analysis-crow-high\" # Don't change this\n", + "PQA_JOB_NAME = \"job-futurehouse-paperqa2\" # Don't change this\n", + "# We will be creating three folders in GCS to store the results of the three steps\n", + "DEA_UPLOAD_ID = f\"consensus_tutorial_dea_{str(uuid.uuid4())[:8]}\"\n", + "CONSENSUS_UPLOAD_ID = f\"consensus_tutorial_consensus_{str(uuid.uuid4())[:8]}\"\n", + "PQA_UPLOAD_ID = f\"consensus_tutorial_pqa_{str(uuid.uuid4())[:8]}\"\n", + "INITIAL_RNASEQ_FILE = \"datasets/GSE52778_All_Sample_FPKM_Matrix.txt.gz\"\n", + "client = FutureHouseClient(\n", + " auth_type=AuthType.API_KEY,\n", + " api_key=FH_API_KEY,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# First let's upload the dataset to GCS and check the files were uploaded correctly\n", + "client.upload_file(\n", + " FINCH_JOB_NAME, file_path=INITIAL_RNASEQ_FILE, upload_id=DEA_UPLOAD_ID\n", + ")\n", + "# Check what files were uploaded to your gcs folder\n", + "client.list_files(FINCH_JOB_NAME, upload_id=DEA_UPLOAD_ID)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Now let's run 5 Finch DEA tasks in parallel\n", + "NUM_DEA_TASKS = 5\n", + "TIMEOUT = 15 * 60\n", + "runtime_config = RuntimeConfig(\n", + " max_steps=30,\n", + " upload_id=DEA_UPLOAD_ID,\n", + " environment_config={\n", + " \"eval\": True, # DO NOT CHANGE THIS\n", + " \"language\": \"R\",\n", + " },\n", + ")\n", + "task_request = TaskRequest(\n", + " name=FINCH_JOB_NAME,\n", + " query=augment_query(\n", + " DEA_PROMPT.format(treatment=TREATMENT, mechanism=MECHANISM, context=CONTEXT),\n", + " \"R\",\n", + " ),\n", + " runtime_config=runtime_config,\n", + ")\n", + "dea_completed_tasks = await client.arun_tasks_until_done(\n", + " [task_request for i in range(NUM_DEA_TASKS)], progress_bar=True, timeout=TIMEOUT\n", + ")\n", + "dea_task_ids = [str(task.task_id) for task in dea_completed_tasks]\n", + "success = sum([task.status == \"success\" for task in dea_completed_tasks])\n", + "print(f\"Task success rate: {success / NUM_DEA_TASKS * 100}%\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# The Finch runs should take anywhere between 3-10 minutes to complete.\n", + "# Once the runs have completed, lets's download the results upload them to a new folder in GCS and run a consensus step\n", + "for c, task_id in enumerate(dea_task_ids):\n", + " try:\n", + " client.download_file(\n", + " FINCH_JOB_NAME,\n", + " trajectory_id=task_id,\n", + " file_path=\"dea_results.csv\",\n", + " destination_path=f\"output/dea_results/dea_results_{c}.csv\",\n", + " )\n", + " except Exception as e:\n", + " print(f\"Error downloading task results for task {task_id}: {e}\")\n", + "\n", + "# Now let's upload the whole directory of consensus results to GCS\n", + "client.upload_file(\n", + " FINCH_JOB_NAME, file_path=\"output/dea_results\", upload_id=CONSENSUS_UPLOAD_ID\n", + ")\n", + "\n", + "print(\"These files have been uploaded to GCS:\")\n", + "print(client.list_files(FINCH_JOB_NAME, upload_id=CONSENSUS_UPLOAD_ID))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Now lets's run a single consensus step\n", + "runtime_config = RuntimeConfig(\n", + " max_steps=30,\n", + " upload_id=CONSENSUS_UPLOAD_ID,\n", + " environment_config={\n", + " \"eval\": True, # DO NOT CHANGE THIS\n", + " \"language\": \"R\",\n", + " },\n", + ")\n", + "consensus_task_request = TaskRequest(\n", + " name=FINCH_JOB_NAME,\n", + " query=augment_query(CONSENSUS_PROMPT, \"R\"),\n", + " runtime_config=runtime_config,\n", + ")\n", + "consensus_task_response = client.run_tasks_until_done(\n", + " [consensus_task_request], progress_bar=True, timeout=TIMEOUT\n", + ")\n", + "consensus_task_id = consensus_task_response[0].task_id" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Once the consensus step is done, lets's download the results\n", + "client.download_file(\n", + " FINCH_JOB_NAME,\n", + " trajectory_id=consensus_task_id,\n", + " file_path=\"consensus_results.csv\",\n", + " destination_path=\"output/consensus_results.csv\",\n", + ")\n", + "client.download_file(\n", + " FINCH_JOB_NAME,\n", + " trajectory_id=consensus_task_id,\n", + " file_path=\"top_genes.csv\",\n", + " destination_path=\"output/top_genes.csv\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Let's use PaperQA to give us a summary of each gene\n", + "top_genes_df = pd.read_csv(\"output/top_genes.csv\")\n", + "display(top_genes_df.head())\n", + "gene_symbols = top_genes_df[\"gene_symbol\"].tolist()\n", + "pqa_tasks = [\n", + " {\n", + " \"name\": PQA_JOB_NAME,\n", + " \"query\": PQA_PROMPT.format(\n", + " gene=gene, treatment=TREATMENT, mechanism=MECHANISM, context=CONTEXT\n", + " ),\n", + " }\n", + " for gene in gene_symbols\n", + "]\n", + "pqa_task_list = await client.arun_tasks_until_done(\n", + " pqa_tasks, progress_bar=True, timeout=TIMEOUT, verbose=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# when PQAs are done, parse answers to csv\n", + "\n", + "answer_list = []\n", + "for task_response in pqa_task_list:\n", + " try:\n", + " answer = json.loads(\n", + " task_response.environment_frame[\"state\"][\"state\"][\"response\"][\"answer\"][\n", + " \"answer\"\n", + " ]\n", + " )\n", + " if isinstance(answer, list):\n", + " answer = answer[0]\n", + " answer_list.append(answer)\n", + " except Exception as e:\n", + " print(f\"Error parsing answer for task {task_response.task_id}: {e}\")\n", + "\n", + "pqa_df = pd.DataFrame(answer_list)\n", + "pqa_df.to_csv(\"output/pqa_results.csv\", index=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Finally let's create a beutiful interactive plotly plot that brings all the results together\n", + "# Now lets's run a single consensus step\n", + "client.upload_file(\n", + " FINCH_JOB_NAME, file_path=\"output/pqa_results.csv\", upload_id=PQA_UPLOAD_ID\n", + ")\n", + "client.upload_file(\n", + " FINCH_JOB_NAME, file_path=\"output/consensus_results.csv\", upload_id=PQA_UPLOAD_ID\n", + ")\n", + "runtime_config = RuntimeConfig(\n", + " max_steps=30,\n", + " upload_id=PQA_UPLOAD_ID,\n", + " environment_config={\n", + " \"eval\": True, # DO NOT CHANGE THIS\n", + " \"language\": \"PYTHON\",\n", + " },\n", + ")\n", + "volcano_task_request = TaskRequest(\n", + " name=FINCH_JOB_NAME,\n", + " query=augment_query(VOLCANO_PROMPT, \"PYTHON\"),\n", + " runtime_config=runtime_config,\n", + ")\n", + "volcano_task_id = client.create_task(volcano_task_request)\n", + "\n", + "print(\n", + " f\"Task running on platform, you can view progress live for our final results at:https://platform.futurehouse.org/trajectories/{volcano_task_id}\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The final trajectory will have the reliable results of our DEA analysis in an interactive volcano plotly plot containing the top differentially expressed genes, their evidence and the evidence score! All in about 20 minutes!" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/tutorial/datasets/GSE52778_All_Sample_FPKM_Matrix.txt.gz b/tutorial/datasets/GSE52778_All_Sample_FPKM_Matrix.txt.gz new file mode 100644 index 0000000..98db61d Binary files /dev/null and b/tutorial/datasets/GSE52778_All_Sample_FPKM_Matrix.txt.gz differ diff --git a/tutorial/multi_agent_orchestration.ipynb b/tutorial/multi_agent_orchestration.ipynb new file mode 100644 index 0000000..db36af9 --- /dev/null +++ b/tutorial/multi_agent_orchestration.ipynb @@ -0,0 +1,202 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from fhda.tortoise import Tortoise, Step, StepConfig\n", + "import pandas as pd\n", + "import json" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define our parameters\n", + "TREATMENT = \"dexamethasone\"\n", + "MECHANISM = \"airway smooth muscle cells\"\n", + "CONTEXT = \"asthma\"\n", + "N_TOP_GENES = 10\n", + "PARALLEL_DEA = 5\n", + "FH_API_KEY = \"\" # Add your API key here\n", + "\n", + "# Define the prompts\n", + "DEA_PROMPT = \"\"\"\n", + "Determine the effect of {treatment} on {mechanism} in {context}. \n", + "\n", + "Perform differential expression analysis and pathway analysis on relevant comparison groups. Map all gene IDs to gene symbols using annotation package such as 'org.Hs.eg.db'.\n", + "\n", + "Generate volcano plots and heatmap of differentially expressed genes, and dot plots for enriched pathways, use gene symbols for labels where relevant.\n", + "\n", + "Output a single csv file named \"dea_results.csv\" with the results for all tested genes of the most relevant contrast, report both gene ID and gene symbol.\n", + "\n", + "If there is an error, keep trying, do not give up until you reach the end of the analysis. When mapping gene ID to gene symbol, consider all possible forms of gene IDs, keep trying until the gene symbols are obtained.\n", + "\"\"\"\n", + "\n", + "CONSENSUS_PROMPT = f\"\"\"\n", + "Combine these differential expression analysis results by calculating the mode of log2FC and adjusted p values. Output the results in a file named 'consensus_results.csv', include the columns gene_symbol, log2FC and adjusted P values. In a separate file named 'top{N_TOP_GENES}_genes.csv', output the gene symbols of the consensus most significant genes with the column name \"gene_symbol\". \n", + "\n", + "Create a stacked bar plot showing gene regulation consistency across all analyses. Plot regulation direction (up vs down) on x-axis and percentage of genes in each category on y-axis. Color-code by significance category: all analyses, >50% of analyses and <50% of analyses. Include percentages within each segment and a clear legend. Exclude genes that are non-significant across all analyses.\n", + "\"\"\"\n", + "\n", + "PQA_PROMPT = \"\"\"\n", + "What are the possible mechanisms for {gene} in the effect of {treatment} on {mechanism} in {context}?\n", + "From 1 to 5, with 1 being no evidence of association at all and 5 being strong association with supporting evidence, how strong is the evidence supporting this mechanism?\n", + "Give a concise summary for the evidence in up to 10 words, and a short summary of mechanisms in up to 20 words. Do not include references or links.\n", + "Please share this information in json format in the form of: `\"gene_symbol\": , \"association_evidence_score\":[1...5], \"evidence_summary\": , \"mechanism_summary\": `.\n", + "Share nothing else but the JSON output.\n", + "\"\"\"\n", + "\n", + "VOLCANO_PROMPT = f\"\"\"\n", + "Make an interactive volcano plot. Colour-code by significance categories: top up-regulated genes, up-regulated genes, top down-regulated genes, down-regulated genes, and non-significant genes. Genes considered as top have extra annotation available in 'pqa_results.csv'.\n", + "\n", + "Include hover information according to the categories, for the top genes, on hover, show gene symbol, log2FC, adjusted p value, mechanism, evidence and evidence score. For up and down regulated genes that are not in top {N_TOP_GENES}, show gene symbol, log2FC and adjusted p value. For non-significant genes, do not include hover information.\n", + "\n", + "For the annotations, remove all text in the brackets in the summary columns, and remove the fullstop at the end. For annotations with 6 words or more in a line, use text-wrap. Don't include text on the plot itself. Include a legend explaining the color-codes.\n", + "\n", + "PLEASE USE TEXT WRAP FOR THE HOVER INFORMATION!\n", + "\"\"\"\n", + "\n", + "# Initialize Tortoise\n", + "tortoise = Tortoise(api_key=FH_API_KEY)\n", + "\n", + "OUTPUT_DIR = \"output\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Step 1: Differential Expression Analysis (DEA)\n", + "dea_step = Step(\n", + " name=\"job-futurehouse-data-analysis-crow-high\",\n", + " prompt=DEA_PROMPT,\n", + " cot_prompt=True,\n", + " prompt_args={\"treatment\": TREATMENT, \"mechanism\": MECHANISM, \"context\": CONTEXT},\n", + " input_files={\n", + " \"datasets/GSE52778_All_Sample_FPKM_Matrix.txt.gz\": \"GSE52778_series_matrix.txt.gz\"\n", + " },\n", + " output_files={\"dea_results.csv\": \"dea_results/dea_results.csv\"},\n", + " parallel=PARALLEL_DEA,\n", + " config=StepConfig(language=\"R\", max_steps=30, timeout=15 * 60),\n", + ")\n", + "tortoise.add_step(dea_step)\n", + "\n", + "# Step 2: Consensus Analysis\n", + "consensus_step = Step(\n", + " name=\"job-futurehouse-data-analysis-crow-high\",\n", + " prompt=CONSENSUS_PROMPT,\n", + " cot_prompt=True,\n", + " input_files={f\"{OUTPUT_DIR}/{dea_step.step_id}/dea_results\": \"dea_results/\"},\n", + " output_files={\n", + " \"consensus_results.csv\": \"consensus_results.csv\",\n", + " f\"top{N_TOP_GENES}_genes.csv\": f\"top{N_TOP_GENES}_genes.csv\",\n", + " },\n", + " config=StepConfig(language=\"R\", max_steps=30, timeout=15 * 60),\n", + ")\n", + "tortoise.add_step(consensus_step)\n", + "\n", + "\n", + "# Step 3: Literature Search with PaperQA\n", + "def pqa_post_process(results, output_dir):\n", + " \"\"\"Process the results from multiple PQA tasks\"\"\"\n", + "\n", + " answer_list = []\n", + " for task_response in results.get(\"task_responses\", []):\n", + " try:\n", + " answer = json.loads(task_response.answer)\n", + " if isinstance(answer, list):\n", + " answer = answer[0]\n", + " answer_list.append(answer)\n", + " except Exception as e:\n", + " print(f\"Error parsing answer for task {task_response.task_id}: {e}\")\n", + "\n", + " # Create DataFrame and save\n", + " pqa_df = pd.DataFrame(answer_list)\n", + " pqa_df.to_csv(f\"{output_dir}/pqa_results.csv\", index=False)\n", + " return pqa_df\n", + "\n", + "\n", + "# Define a function to create multiple PQA prompts for genes\n", + "def pqa_prompt_generator():\n", + " \"\"\"Generate PQA prompts for each top gene\"\"\"\n", + " top_genes_df = pd.read_csv(\n", + " f\"{OUTPUT_DIR}/{consensus_step.step_id}/top{N_TOP_GENES}_genes.csv\"\n", + " )\n", + " gene_symbols = top_genes_df[\"gene_symbol\"].tolist()\n", + " prompt_pairs = []\n", + " for gene in gene_symbols:\n", + " prompt_pairs.append(\n", + " (\n", + " PQA_PROMPT,\n", + " {\n", + " \"gene\": gene,\n", + " \"treatment\": TREATMENT,\n", + " \"mechanism\": MECHANISM,\n", + " \"context\": CONTEXT,\n", + " },\n", + " )\n", + " )\n", + " return prompt_pairs\n", + "\n", + "\n", + "# Read top genes and create PQA steps\n", + "pqa_step = Step(\n", + " name=\"job-futurehouse-paperqa2\",\n", + " prompt=PQA_PROMPT,\n", + " prompt_generator=pqa_prompt_generator,\n", + " parallel=N_TOP_GENES, # Will process all top genes in parallel\n", + " post_process=pqa_post_process,\n", + ")\n", + "tortoise.add_step(pqa_step)\n", + "\n", + "# Step 4: Visualization with Volcano Plot\n", + "volcano_step = Step(\n", + " name=\"job-futurehouse-data-analysis-crow-high\",\n", + " prompt=VOLCANO_PROMPT,\n", + " cot_prompt=True,\n", + " input_files={\n", + " f\"{OUTPUT_DIR}/{consensus_step.step_id}/consensus_results.csv\": \"consensus_results.csv\",\n", + " f\"{OUTPUT_DIR}/{pqa_step.step_id}/pqa_results.csv\": \"pqa_results.csv\",\n", + " },\n", + " config=StepConfig(language=\"PYTHON\", max_steps=30, timeout=15 * 60),\n", + ")\n", + "tortoise.add_step(volcano_step)\n", + "\n", + "# Run the pipeline\n", + "results = await tortoise.run_pipeline(OUTPUT_DIR)\n", + "print(\"Pipeline execution completed with results:\", results)\n", + "print(\n", + " f\"View the final volcano plot at: https://platform.futurehouse.org/trajectories/{tortoise.results[volcano_step.step_id]['task_ids'][0]}\"\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/uv.lock b/uv.lock index 5883556..07cfc05 100644 --- a/uv.lock +++ b/uv.lock @@ -709,7 +709,7 @@ requires-dist = [ { name = "aiofiles", specifier = "==24.1.0" }, { name = "black", marker = "extra == 'dev'" }, { name = "fhaviary", extras = ["server"], specifier = "==0.19.0" }, - { name = "futurehouse-client", specifier = "==0.3.18.dev25" }, + { name = "futurehouse-client", specifier = "==0.3.18.dev80" }, { name = "google-auth", specifier = "==2.38.0" }, { name = "google-cloud-secret-manager", specifier = "==2.23.0" }, { name = "google-cloud-storage", specifier = "==3.0.0" }, @@ -840,7 +840,7 @@ wheels = [ [[package]] name = "futurehouse-client" -version = "0.3.18.dev25" +version = "0.3.18.dev80" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "cloudpickle" }, @@ -854,9 +854,9 @@ dependencies = [ { name = "tenacity" }, { name = "tqdm" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/33/17/cc8bc7b5aa174d10d451a100681dc88ca4718eeb3058e0b023f0939d4358/futurehouse_client-0.3.18.dev25.tar.gz", hash = "sha256:0685836fe273999fe226204334d19d95a823a32e2beca54b1326f8161006f973", size = 145006 } +sdist = { url = "https://files.pythonhosted.org/packages/0e/d1/16829190aacd8cf7db4415ee953d1f181c7e272c82ead50446e36ead8935/futurehouse_client-0.3.18.dev80.tar.gz", hash = "sha256:d05ccdd837895b6c4a6333045027a7a801067047cf8a3792bb3499d6582a256d", size = 145143 } wheels = [ - { url = "https://files.pythonhosted.org/packages/e9/39/af2cebaaaf4255973b5b380728593bbf65ee60f10433c79dd532b94a27dc/futurehouse_client-0.3.18.dev25-py3-none-any.whl", hash = "sha256:eab284abb067edf1069c889a2d4b1e6628f926b30459ed3ab4fb80084a28ff9e", size = 31545 }, + { url = "https://files.pythonhosted.org/packages/19/cb/037609233b58708ad613c2592eca52d346a3c19f63965668f0ed8afb65f0/futurehouse_client-0.3.18.dev80-py3-none-any.whl", hash = "sha256:a9cdbd4a87de70ff889a7e63a49c386f22ca28101ce07ffed11baa8fe590d472", size = 31724 }, ] [[package]]