From 4822fd072994a6333e57b26663aeb8f29e2aa369 Mon Sep 17 00:00:00 2001 From: Mohamed Khairallah Gharbi Date: Tue, 11 Mar 2025 16:28:20 +0100 Subject: [PATCH 1/2] implement a client worker job to compute latency --- python/analysisCDF/.dockerignore | 1 + python/analysisCDF/.env.example | 2 + python/analysisCDF/Dockerfile | 17 + python/analysisCDF/README.md | 47 ++ python/analysisCDF/client/.dockerignore | 3 + python/analysisCDF/client/Dockerfile | 27 ++ python/analysisCDF/client/__init__.py | 33 ++ .../client/armonik_benchmark_cli.py | 18 + python/analysisCDF/client/benchmark.py | 313 +++++++++++++ python/analysisCDF/client/main.py | 62 +++ python/analysisCDF/client/reporting.py | 352 +++++++++++++++ python/analysisCDF/client/requirements.txt | 4 + python/analysisCDF/client/utils.py | 30 ++ python/analysisCDF/client/visualization.py | 422 ++++++++++++++++++ .../worker/worker-requirements.txt | 1 + python/analysisCDF/worker/worker.py | 75 ++++ 16 files changed, 1407 insertions(+) create mode 100644 python/analysisCDF/.dockerignore create mode 100644 python/analysisCDF/.env.example create mode 100644 python/analysisCDF/Dockerfile create mode 100644 python/analysisCDF/README.md create mode 100644 python/analysisCDF/client/.dockerignore create mode 100644 python/analysisCDF/client/Dockerfile create mode 100644 python/analysisCDF/client/__init__.py create mode 100644 python/analysisCDF/client/armonik_benchmark_cli.py create mode 100644 python/analysisCDF/client/benchmark.py create mode 100644 python/analysisCDF/client/main.py create mode 100644 python/analysisCDF/client/reporting.py create mode 100644 python/analysisCDF/client/requirements.txt create mode 100644 python/analysisCDF/client/utils.py create mode 100644 python/analysisCDF/client/visualization.py create mode 100644 python/analysisCDF/worker/worker-requirements.txt create mode 100644 python/analysisCDF/worker/worker.py diff --git a/python/analysisCDF/.dockerignore b/python/analysisCDF/.dockerignore new file mode 100644 index 00000000..7f269dec --- /dev/null +++ b/python/analysisCDF/.dockerignore @@ -0,0 +1 @@ +client/ \ No newline at end of file diff --git a/python/analysisCDF/.env.example b/python/analysisCDF/.env.example new file mode 100644 index 00000000..a6eed2b8 --- /dev/null +++ b/python/analysisCDF/.env.example @@ -0,0 +1,2 @@ +ARMONIK_ENDPOINT=http://localhost:3000 +ARMONIK_PARTITION=cdf \ No newline at end of file diff --git a/python/analysisCDF/Dockerfile b/python/analysisCDF/Dockerfile new file mode 100644 index 00000000..d0748da4 --- /dev/null +++ b/python/analysisCDF/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.10-slim AS builder +WORKDIR /app +RUN python -m venv .venv && .venv/bin/pip install --no-cache-dir -U pip setuptools +COPY worker/worker-requirements.txt ./ +RUN .venv/bin/pip install --no-cache-dir -r worker-requirements.txt +COPY ./worker ./worker + +FROM python:3.10-slim +WORKDIR /app +RUN groupadd --gid 5000 armonikuser \ + && useradd --home-dir /home/armonikuser --create-home --uid 5000 --gid 5000 --shell /bin/sh armonikuser \ + && mkdir /cache && chown armonikuser: /cache +USER armonikuser +ENV PATH="/app/.venv/bin:$PATH" \ + PYTHONUNBUFFERED=1 +COPY --from=builder /app /app +ENTRYPOINT ["python", "worker/worker.py"] diff --git a/python/analysisCDF/README.md b/python/analysisCDF/README.md new file mode 100644 index 00000000..b85266a6 --- /dev/null +++ b/python/analysisCDF/README.md @@ -0,0 +1,47 @@ +# A Python Hello World on ArmoniK + +## Description + +This project contains a worker and a client to interact with ArmoniK's Control Plane. The worker processes a simple task sent by the Agent. The worker does nothing and sends a empty string to the Client. + +## Preqrequisites +Create a partition named "cdf" with the worker's image in the terraform configuration file. + +## Steps + +1. Build the Docker image for the worker: + ```bash + docker build -t cdf-worker -f Dockerfile . + ``` + +2. Deploy ArmoniK locally by following the instructions at [ArmoniK Documentation](https://aneoconsulting.github.io/ArmoniK/). Ensure you create a new partition named "cdf" with the worker's image named "cdf-worker". + +3. Move to the `client` folder and create a virtual environment: + ```bash + cd client + python -m venv .venv + ``` + +4. Activate the virtual environment: + ```bash + source .venv/bin/activate + ``` + +5. Install the client dependencies: + ```bash + pip install -r client-requirements.txt + ``` + +## Usage + +Make sure the file "armonik_benchmark_cli.py" is executable: + +```bash +chmod +x armonik_benchmark_cli.py +``` + +Run the Client with the name of the partition: + +```bash +./armonik_benchmark_cli --partition cdf --endpoint localhost:5001 +``` \ No newline at end of file diff --git a/python/analysisCDF/client/.dockerignore b/python/analysisCDF/client/.dockerignore new file mode 100644 index 00000000..15477c7d --- /dev/null +++ b/python/analysisCDF/client/.dockerignore @@ -0,0 +1,3 @@ +.env +.venv/ +__pycache__/ \ No newline at end of file diff --git a/python/analysisCDF/client/Dockerfile b/python/analysisCDF/client/Dockerfile new file mode 100644 index 00000000..86dc2bbe --- /dev/null +++ b/python/analysisCDF/client/Dockerfile @@ -0,0 +1,27 @@ +# Stage 1: Build the client with a virtual environment and install dependencies +FROM python:3.10-slim AS builder +WORKDIR /app +# Create a virtual environment and update pip and setuptools +RUN python -m venv .venv && .venv/bin/pip install --no-cache-dir -U pip setuptools +# Copy your requirements (adjust name if needed) +COPY requirements.txt ./ +RUN .venv/bin/pip install --no-cache-dir -r requirements.txt +# Copy the whole source (including main.py) +COPY . . + +# Stage 2: Create the runtime image +FROM python:3.10-slim +WORKDIR /app +# Create a non-root user to run the application +RUN groupadd --gid 5000 armonikuser && \ + useradd --home-dir /home/armonikuser --create-home --uid 5000 --gid 5000 \ + --shell /bin/sh --skel /dev/null armonikuser && \ + mkdir -p /cache && chown armonikuser: /cache +USER armonikuser +# Set environment variables +ENV PATH="/app/.venv/bin:$PATH" \ + PYTHONUNBUFFERED=1 +# Copy the built app from the builder stage +COPY --from=builder /app /app +# Set the entrypoint to launch your benchmarking client +ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/python/analysisCDF/client/__init__.py b/python/analysisCDF/client/__init__.py new file mode 100644 index 00000000..6266751c --- /dev/null +++ b/python/analysisCDF/client/__init__.py @@ -0,0 +1,33 @@ +""" +ArmoniK Benchmark Suite + +A toolkit for benchmarking ArmoniK task processing capabilities +with zero-work tasks to measure scheduling overhead. +""" + +__version__ = "1.0.0" + +# Import key components for easy access +from .benchmark import run_batch, run_benchmarks + +# Expose main entry point +from .main import main +from .reporting import print_summary, save_results_to_csv +from .visualization import ( + generate_latency_percentile_graph, + generate_matplotlib_latency_graph, + generate_matplotlib_throughput_graph, + generate_throughput_graph, +) + +__all__ = [ + "run_batch", + "run_benchmarks", + "generate_latency_percentile_graph", + "generate_throughput_graph", + "generate_matplotlib_latency_graph", + "generate_matplotlib_throughput_graph", + "save_results_to_csv", + "print_summary", + "main", +] diff --git a/python/analysisCDF/client/armonik_benchmark_cli.py b/python/analysisCDF/client/armonik_benchmark_cli.py new file mode 100644 index 00000000..a8e0a3f8 --- /dev/null +++ b/python/analysisCDF/client/armonik_benchmark_cli.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +""" +ArmoniK Benchmark CLI + +Command-line tool for benchmarking ArmoniK scheduling overhead. +""" + +import os +import sys + +# Insert the current directory (where main.py is located) into the PYTHONPATH +current_dir = os.path.abspath(os.path.dirname(__file__)) +sys.path.insert(0, current_dir) + +from main import main + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/python/analysisCDF/client/benchmark.py b/python/analysisCDF/client/benchmark.py new file mode 100644 index 00000000..a2f7de41 --- /dev/null +++ b/python/analysisCDF/client/benchmark.py @@ -0,0 +1,313 @@ +import statistics +import time +from datetime import timedelta +from typing import Any, Dict, List + +import grpc +from armonik.client import ArmoniKEvents, ArmoniKResults, ArmoniKSessions, ArmoniKTasks +from armonik.common import TaskDefinition, TaskOptions +from reporting import generate_detailed_report, print_summary, save_results_to_csv +from utils import logger, retry_operation +from visualization import ( + generate_latency_percentile_graph, + generate_matplotlib_latency_graph, + generate_matplotlib_throughput_graph, + generate_throughput_graph, +) + + +def run_batch( + task_client: ArmoniKTasks, + result_client: ArmoniKResults, + session_client: ArmoniKSessions, + events_client: ArmoniKEvents, + partition: str, + batch_size: int, + iteration: int, + scenario: str, +) -> Dict[str, Any]: + """Run a batch of tasks and measure performance metrics, reusing the same clients.""" + metrics = { + "batch_size": batch_size, + "submission_time": 0.0, + "processing_time": 0.0, + "total_time": 0.0, + "scenario": scenario, + "iteration": iteration + 1, + } + start_total = time.time() + + task_options = TaskOptions( + max_duration=timedelta(hours=1), + max_retries=2, + priority=1, + partition_id=partition, + options={}, + ) + + # Create session with retry (you can also reuse a single session if desired) + session_id = retry_operation( + lambda: session_client.create_session(task_options, partition_ids=[partition]), + operation_name="Session creation", + ) + + task_definitions = [] + + # Create outputs metadata + outputs = result_client.create_results_metadata( + result_names=[f"output_{i}" for i in range(batch_size)], + session_id=session_id, + ) + + # Ensure result_ids is populated + result_ids = [outputs[f"output_{i}"].result_id for i in range(batch_size)] + + # Create payloads data + payloads_data = {f"payload_{i}": "".encode() for i in range(batch_size)} + + payloads = result_client.create_results( + results_data=payloads_data, session_id=session_id + ) + + # Ensure payloads are assigned correctly + for i in range(batch_size): + payload_id = payloads[f"payload_{i}"].result_id + output_id = outputs[f"output_{i}"].result_id + + task_definitions.append( + TaskDefinition(payload_id=payload_id, expected_output_ids=[output_id]) + ) + + submission_start = time.time() + task_client.submit_tasks(session_id, task_definitions) + submission_timestamp = time.time() # Record exactly when tasks were submitted + metrics["submission_time"] = submission_timestamp - submission_start + + processing_start = time.time() + try: + downloaded_results = events_client.wait_for_result_availability_and_download( + result_ids=result_ids, + session_id=session_id, + submission_timestamp=submission_timestamp, + ) + + # Verify we received all expected results + if len(downloaded_results) != len(result_ids): + logger.warning( + "Expected %d results but only received %d", + len(result_ids), + len(downloaded_results), + ) + + task_times = [] + task_download_times = [] + task_completion_timestamps = [] + earliest_completion = float("inf") + latest_completion = 0 + + for result_id, result_data in downloaded_results.items(): + if "processing_time" in result_data: + task_times.append(result_data["processing_time"]) + + if "download_time" in result_data: + task_download_times.append(result_data["download_time"]) + + if "completion_timestamp" in result_data: + completion_ts = result_data["completion_timestamp"] + task_completion_timestamps.append(completion_ts) + earliest_completion = min(earliest_completion, completion_ts) + latest_completion = max(latest_completion, completion_ts) + + if task_times: + metrics["min_task_time"] = min(task_times) + metrics["max_task_time"] = max(task_times) + metrics["avg_task_time"] = sum(task_times) / len(task_times) + metrics["median_task_time"] = statistics.median(task_times) + + sorted_times = sorted(task_times) + if len(sorted_times) >= 10: + metrics["p90_task_time"] = sorted_times[int(len(sorted_times) * 0.9)] + metrics["p95_task_time"] = sorted_times[int(len(sorted_times) * 0.95)] + metrics["p99_task_time"] = sorted_times[int(len(sorted_times) * 0.99)] + + if len(task_times) > 1: + metrics["stddev_task_time"] = statistics.stdev(task_times) + metrics["cv_task_time"] = ( + metrics["stddev_task_time"] / metrics["avg_task_time"] + if metrics["avg_task_time"] > 0 + else 0 + ) + + outlier_threshold = ( + metrics["avg_task_time"] + 2 * metrics["stddev_task_time"] + ) + outliers = [t for t in task_times if t > outlier_threshold] + metrics["outlier_count"] = len(outliers) + metrics["outlier_percentage"] = ( + (len(outliers) / len(task_times)) * 100 if task_times else 0 + ) + + logger.debug( + "Task timing stats - Min: %.3fs, Max: %.3fs, Avg: %.3fs, StdDev: %.3fs, Outliers: %d/%d", + metrics["min_task_time"], + metrics["max_task_time"], + metrics["avg_task_time"], + metrics.get("stddev_task_time", 0), + metrics.get("outlier_count", 0), + len(task_times), + ) + + # Add download time metrics if available + if task_download_times: + metrics["min_download_time"] = min(task_download_times) + metrics["max_download_time"] = max(task_download_times) + metrics["avg_download_time"] = sum(task_download_times) / len( + task_download_times + ) + metrics["total_download_time"] = sum(task_download_times) + + if len(task_download_times) > 1: + metrics["stddev_download_time"] = statistics.stdev(task_download_times) + + logger.debug( + "Download time stats - Min: %.3fs, Max: %.3fs, Avg: %.3fs, Total: %.3fs", + metrics["min_download_time"], + metrics["max_download_time"], + metrics["avg_download_time"], + metrics["total_download_time"], + ) + + if task_completion_timestamps and len(task_completion_timestamps) > 1: + metrics["completion_spread"] = latest_completion - earliest_completion + metrics["first_result_time"] = earliest_completion - submission_timestamp + metrics["last_result_time"] = latest_completion - submission_timestamp + + if metrics["completion_spread"] > 0: + metrics["completion_rate"] = ( + len(task_completion_timestamps) / metrics["completion_spread"] + ) + + logger.debug( + "Task completion spread: %.3fs (first: %.3fs, last: %.3fs, rate: %.1f results/sec)", + metrics["completion_spread"], + metrics["first_result_time"], + metrics["last_result_time"], + metrics.get("completion_rate", 0), + ) + + metrics["processing_time"] = time.time() - processing_start + + if "avg_task_time" in metrics and batch_size > 0: + metrics["total_task_time"] = sum(task_times) + metrics["ideal_parallel_time"] = metrics["avg_task_time"] + metrics["parallelization_efficiency"] = ( + metrics["ideal_parallel_time"] / metrics["processing_time"] + if metrics["processing_time"] > 0 + else 0 + ) + metrics["system_utilization"] = ( + metrics["total_task_time"] / (batch_size * metrics["processing_time"]) + if metrics["processing_time"] > 0 + else 0 + ) + + logger.debug( + "System efficiency - Utilization: %.2f%%, Parallelization efficiency: %.2f%%", + metrics["system_utilization"] * 100, + metrics["parallelization_efficiency"] * 100, + ) + + except Exception as e: + logger.error("Error waiting for results: %s", e) + + metrics["total_time"] = time.time() - start_total + metrics["tasks_per_second"] = ( + (batch_size / metrics["total_time"]) if metrics["total_time"] > 0 else 0 + ) + return metrics + + +def run_benchmarks(endpoint: str, partition: str) -> List[Dict[str, Any]]: + """Runs the benchmarking scenarios sequentially without parallelization, + but reuses the same gRPC channel and client objects across all runs. + """ + all_results = [] + + # Create one gRPC channel for all scenarios + with grpc.insecure_channel(endpoint) as channel: + task_client = ArmoniKTasks(channel) + result_client = ArmoniKResults(channel) + session_client = ArmoniKSessions(channel) + events_client = ArmoniKEvents(channel) + + scenarios = [ + {"name": "1x1000", "batch_size": 1, "iterations": 1000}, + {"name": "10x700", "batch_size": 10, "iterations": 700}, + {"name": "100x300", "batch_size": 100, "iterations": 300}, + ] + + for scenario in scenarios: + logger.info("Starting Scenario: %s", scenario["name"]) + scenario_results = [] + start_time = time.time() + successful_runs = 0 + failed_runs = 0 + + # Process iterations one by one + for i in range(scenario["iterations"]): + try: + result = run_batch( + task_client=task_client, + result_client=result_client, + session_client=session_client, + events_client=events_client, + partition=partition, + batch_size=scenario["batch_size"], + iteration=i, + scenario=scenario["name"], + ) + scenario_results.append(result) + successful_runs += 1 + + # Log progress every 10 iterations or at the end + if (i + 1) % 10 == 0 or (i + 1) == scenario["iterations"]: + elapsed = time.time() - start_time + progress = (i + 1) / scenario["iterations"] * 100 + est_remaining = ( + (elapsed / (i + 1)) * (scenario["iterations"] - i - 1) + if i > 0 + else 0 + ) + logger.info( + "Progress: %.1f%% - Completed %d/%d iterations (%d successful, %d failed) - Est. remaining time: %.1fs", + progress, + i + 1, + scenario["iterations"], + successful_runs, + failed_runs, + est_remaining, + ) + except Exception as e: + logger.error( + "Error in batch execution (iteration %d): %s", i + 1, str(e) + ) + failed_runs += 1 + + all_results.extend(scenario_results) + logger.info( + "Completed scenario %s with %d successful and %d failed runs in %.1fs", + scenario["name"], + successful_runs, + failed_runs, + time.time() - start_time, + ) + + save_results_to_csv(all_results) + generate_latency_percentile_graph(all_results) + generate_throughput_graph(all_results) + generate_matplotlib_latency_graph(all_results) + generate_matplotlib_throughput_graph(all_results) + print_summary(all_results) + generate_detailed_report(all_results) + + return all_results diff --git a/python/analysisCDF/client/main.py b/python/analysisCDF/client/main.py new file mode 100644 index 00000000..508cb7b5 --- /dev/null +++ b/python/analysisCDF/client/main.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# filepath: /home/mkgharbi/aneo/ArmoniK.Samples/python/analysisCDF/client/main.py +import argparse +import os +import sys +import time +from typing import List + +from benchmark import run_benchmarks +from dotenv import load_dotenv +from reporting import print_summary +from utils import logger + +# Load environment variables from a .env file if it exists +load_dotenv() + + +def main(args: List[str]) -> None: + """Parses command-line arguments and runs the benchmarking scenarios.""" + # Get defaults from environment variables if available. + default_endpoint = os.getenv("ARMONIK_ENDPOINT", "localhost:7001") + default_partition = os.getenv("ARMONIK_PARTITION", "cdf") + + parser = argparse.ArgumentParser( + description="Benchmark for scheduling overhead using zero-work tasks." + ) + parser.add_argument( + "--endpoint", + type=str, + default=default_endpoint, + help="Endpoint for the connection to ArmoniK control plane.", + ) + parser.add_argument( + "--partition", + type=str, + default=default_partition, + help="Name of the partition to which tasks are submitted.", + ) + parser.add_argument( + "--scenarios", + type=str, + default="1x1000,10x700,100x300", + help="Comma-separated list of scenarios to run (format: batchsize x iterations)", + ) + + parsed_args = parser.parse_args(args) + + start_time = time.time() + logger.info("Starting benchmark at %s", time.strftime("%Y-%m-%d %H:%M:%S")) + + results = run_benchmarks(parsed_args.endpoint, parsed_args.partition) + + print_summary(results) + + total_duration = time.time() - start_time + print("\n" + "=" * 50) + print(f"Benchmark completed in {total_duration:.1f} seconds") + print("=" * 50) + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/python/analysisCDF/client/reporting.py b/python/analysisCDF/client/reporting.py new file mode 100644 index 00000000..5319f16a --- /dev/null +++ b/python/analysisCDF/client/reporting.py @@ -0,0 +1,352 @@ +import csv +import time +from typing import Any, Dict, List + +from utils import logger + + +def save_results_to_csv(results: List[Dict[str, Any]]) -> None: + """Save benchmark results to a CSV file with all captured metrics.""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"benchmark_results_{timestamp}.csv" + + # Get all possible field names from all results (metrics may vary between runs) + fieldnames = set() + for result in results: + fieldnames.update(result.keys()) + + # Ensure core fields come first in a specific order + core_fields = [ + "scenario", + "batch_size", + "iteration", + "submission_time", + "processing_time", + "total_time", + "tasks_per_second", + ] + + # Add task timing fields + task_timing_fields = [ + "min_task_time", + "max_task_time", + "avg_task_time", + "median_task_time", + "p90_task_time", + "p95_task_time", + "p99_task_time", + "stddev_task_time", + "cv_task_time", + "outlier_count", + "outlier_percentage", + ] + + # Add download metrics fields + download_fields = [ + "min_download_time", + "max_download_time", + "avg_download_time", + "total_download_time", + "stddev_download_time", + ] + + # Add completion timing fields + completion_fields = [ + "completion_spread", + "first_result_time", + "last_result_time", + "completion_rate", + ] + + # Add system efficiency fields + efficiency_fields = [ + "total_task_time", + "ideal_parallel_time", + "parallelization_efficiency", + "system_utilization", + ] + + # Sort the fields in a logical order + ordered_fieldnames = [] + for field_list in [ + core_fields, + task_timing_fields, + download_fields, + completion_fields, + efficiency_fields, + ]: + for field in field_list: + if field in fieldnames: + ordered_fieldnames.append(field) + fieldnames.discard(field) + + # Add any remaining fields + ordered_fieldnames.extend(sorted(fieldnames)) + + with open(filename, "w", newline="", encoding="utf-8") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=ordered_fieldnames) + writer.writeheader() + for result in results: + writer.writerow(result) + + logger.info("Detailed results saved to %s", filename) + + # Also save a simplified version with just core metrics for easy analysis + save_simplified_results(results, timestamp) + + +def save_simplified_results(results: List[Dict[str, Any]], timestamp: str) -> None: + """Save a simplified version of the results with just the core metrics.""" + filename = f"simplified_results_{timestamp}.csv" + + core_fields = [ + "scenario", + "batch_size", + "iteration", + "submission_time", + "processing_time", + "total_time", + "tasks_per_second", + "avg_task_time", + "p95_task_time", + "system_utilization", + ] + + with open(filename, "w", newline="", encoding="utf-8") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=core_fields) + writer.writeheader() + for result in results: + # Create a new dict with only the core fields + row = {field: result.get(field, "") for field in core_fields} + writer.writerow(row) + + logger.info("Simplified results saved to %s", filename) + + +def print_summary(results: List[Dict[str, Any]]) -> None: + """Print a summary of benchmark results with enhanced metrics.""" + print("\n" + "=" * 80) + print("BENCHMARK SUMMARY") + print("=" * 80) + + for scenario in ["1x1000", "10x700", "100x300"]: + scenario_results = [r for r in results if r.get("scenario") == scenario] + if not scenario_results: + continue + + batch_size = scenario_results[0]["batch_size"] + print(f"\n{'-' * 70}") + print( + f"SCENARIO: {scenario} (Batch Size: {batch_size}, Runs: {len(scenario_results)})" + ) + print(f"{'-' * 70}") + + # Basic timing metrics + avg_submission = sum( + r.get("submission_time", 0) for r in scenario_results + ) / len(scenario_results) + avg_processing = sum( + r.get("processing_time", 0) for r in scenario_results + ) / len(scenario_results) + avg_total = sum(r.get("total_time", 0) for r in scenario_results) / len( + scenario_results + ) + avg_throughput = sum( + r["batch_size"] / r["total_time"] + for r in scenario_results + if r.get("total_time", 0) > 0 + ) / len(scenario_results) + + print("TIMING METRICS:") + print(f" Submission time: {avg_submission:.4f}s") + print(f" Processing time: {avg_processing:.4f}s") + print(f" Total latency: {avg_total:.4f}s") + print(f" Throughput: {avg_throughput:.2f} tasks/sec") + + # Task processing metrics (if available) + if any("avg_task_time" in r for r in scenario_results): + task_times = [r for r in scenario_results if "avg_task_time" in r] + avg_task_time = sum(r.get("avg_task_time", 0) for r in task_times) / len( + task_times + ) + avg_p95 = ( + sum( + r.get("p95_task_time", 0) + for r in task_times + if "p95_task_time" in r + ) + / len([r for r in task_times if "p95_task_time" in r]) + if any("p95_task_time" in r for r in task_times) + else 0 + ) + + print("\nTASK METRICS:") + print(f" Avg task time: {avg_task_time:.4f}s") + print(f" P95 task time: {avg_p95:.4f}s") + + # Calculate average of min and max across all runs + if any("min_task_time" in r for r in scenario_results): + avg_min = sum( + r.get("min_task_time", 0) + for r in task_times + if "min_task_time" in r + ) / len([r for r in task_times if "min_task_time" in r]) + avg_max = sum( + r.get("max_task_time", 0) + for r in task_times + if "max_task_time" in r + ) / len([r for r in task_times if "max_task_time" in r]) + print(f" Min task time: {avg_min:.4f}s") + print(f" Max task time: {avg_max:.4f}s") + + # Download metrics (if available) + if any("avg_download_time" in r for r in scenario_results): + download_times = [r for r in scenario_results if "avg_download_time" in r] + avg_download = sum( + r.get("avg_download_time", 0) for r in download_times + ) / len(download_times) + + print("\nDOWNLOAD METRICS:") + print(f" Avg download: {avg_download:.4f}s") + + # System efficiency metrics (if available) + if any("system_utilization" in r for r in scenario_results): + efficiency_results = [ + r for r in scenario_results if "system_utilization" in r + ] + avg_utilization = sum( + r.get("system_utilization", 0) for r in efficiency_results + ) / len(efficiency_results) + avg_efficiency = ( + sum( + r.get("parallelization_efficiency", 0) + for r in efficiency_results + if "parallelization_efficiency" in r + ) + / len( + [r for r in efficiency_results if "parallelization_efficiency" in r] + ) + if any("parallelization_efficiency" in r for r in efficiency_results) + else 0 + ) + + print("\nEFFICIENCY METRICS:") + print(f" System utilization: {avg_utilization*100:.2f}%") + print(f" Parallelization eff.: {avg_efficiency*100:.2f}%") + + print("\n" + "=" * 80) + + +def generate_detailed_report(results: List[Dict[str, Any]]) -> None: + """Generate a detailed HTML report with metrics breakdown.""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"detailed_report_{timestamp}.html" + + # Simple HTML report template + html_content = f""" + + + + ArmoniK Benchmark Report {timestamp} + + + +

ArmoniK Benchmark Results - {timestamp}

+ """ + + # Add summary section for each scenario + for scenario in ["1x1000", "10x700", "100x300"]: + scenario_results = [r for r in results if r.get("scenario") == scenario] + if not scenario_results: + continue + + batch_size = scenario_results[0]["batch_size"] + + html_content += f""" +
+

Scenario: {scenario}

+

Batch Size: {batch_size}, Total Runs: {len(scenario_results)}

+ +

Performance Summary

+ + + + + + + + + """ + + # Add rows for key metrics + metrics = [ + ("Total Time (s)", "total_time"), + ("Processing Time (s)", "processing_time"), + ("Submission Time (s)", "submission_time"), + ("Tasks per Second", "tasks_per_second"), + ("Task Time (s)", "avg_task_time"), + ("Download Time (s)", "avg_download_time"), + ( + "System Utilization", + "system_utilization", + 100, + ), # Multiply by 100 for percentage + ( + "Parallelization Efficiency", + "parallelization_efficiency", + 100, + ), # Multiply by 100 for percentage + ] + + for label, key, *args in metrics: + multiplier = args[0] if args else 1 + if any(key in r for r in scenario_results): + valid_results = [r[key] for r in scenario_results if key in r] + if valid_results: + avg_val = sum(valid_results) / len(valid_results) * multiplier + min_val = min(valid_results) * multiplier + max_val = max(valid_results) * multiplier + + # Calculate P95 + sorted_vals = sorted(valid_results) + p95_idx = int(len(sorted_vals) * 0.95) + p95_val = ( + sorted_vals[p95_idx] * multiplier + if p95_idx < len(sorted_vals) + else sorted_vals[-1] * multiplier + ) + + html_content += f""" + + + + + + + + """ + + html_content += """ +
MetricAverageMinMaxP95
{label}{avg_val:.4f}{min_val:.4f}{max_val:.4f}{p95_val:.4f}
+
+ """ + + # Close HTML + html_content += """ + + + """ + + # Write HTML file + with open(filename, "w") as f: + f.write(html_content) + + logger.info("Detailed HTML report generated: %s", filename) diff --git a/python/analysisCDF/client/requirements.txt b/python/analysisCDF/client/requirements.txt new file mode 100644 index 00000000..72288417 --- /dev/null +++ b/python/analysisCDF/client/requirements.txt @@ -0,0 +1,4 @@ +armonik +matplotlib +plotly +dotenv \ No newline at end of file diff --git a/python/analysisCDF/client/utils.py b/python/analysisCDF/client/utils.py new file mode 100644 index 00000000..72e63474 --- /dev/null +++ b/python/analysisCDF/client/utils.py @@ -0,0 +1,30 @@ +import logging +import time +from typing import Any, Callable + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Constants +MAX_ATTEMPTS = 3 +BATCH_SIZE_FOR_RESULTS = 1000 + + +def retry_operation( + operation: Callable, max_attempts=MAX_ATTEMPTS, operation_name="Operation" +) -> Any: + """Retry an operation with exponential backoff.""" + for attempt in range(1, max_attempts + 1): + try: + result = operation() + return result + except Exception as e: + logger.error("%s attempt %d failed: %s", operation_name, attempt, e) + if attempt == max_attempts: + raise ValueError(f"Max retries reached for {operation_name}") from e + sleep_time = 2**attempt + logger.info("Retrying in %d seconds...", sleep_time) + time.sleep(sleep_time) diff --git a/python/analysisCDF/client/visualization.py b/python/analysisCDF/client/visualization.py new file mode 100644 index 00000000..7533c93d --- /dev/null +++ b/python/analysisCDF/client/visualization.py @@ -0,0 +1,422 @@ +import time +from typing import Any, Dict, List + +import matplotlib.pyplot as plt +import numpy as np +import plotly.graph_objects as go +import plotly.io as pio +from utils import logger + + +def generate_latency_percentile_graph(results: List[Dict[str, Any]]) -> None: + """Generate an interactive graph showing the CDF of scheduling overhead.""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename_e2e = f"latency_percentile_e2e_{timestamp}.html" + filename_task = f"latency_percentile_task_{timestamp}.html" + filename_normalized = f"latency_percentile_normalized_{timestamp}.html" + + # Create separate figures for different latency metrics + fig_e2e = go.Figure() + fig_task = go.Figure() + fig_normalized = go.Figure() + + colors = { + "1x1000": "blue", + "10x700": "red", + "100x300": "green", + } + + for scenario in ["1x1000", "10x700", "100x300"]: + scenario_results = [r for r in results if r["scenario"] == scenario] + if not scenario_results: + logger.warning("No results found for scenario %s", scenario) + continue + + batch_size = scenario_results[0]["batch_size"] + + # End-to-end latency (total time per batch) + e2e_latencies = sorted([r["total_time"] for r in scenario_results]) + e2e_percentiles = np.linspace(0, 100, len(e2e_latencies)) / 100 + avg_e2e_latency = sum(e2e_latencies) / len(e2e_latencies) + + # Task-level latency (avg_task_time - actual time each task took to process) + task_latencies = sorted( + [ + r.get("avg_task_time", 0) + for r in scenario_results + if "avg_task_time" in r + ] + ) + if task_latencies: + task_percentiles = np.linspace(0, 100, len(task_latencies)) / 100 + avg_task_latency = sum(task_latencies) / len(task_latencies) + else: + task_latencies = [0] + task_percentiles = [0] + avg_task_latency = 0 + + # Normalized latency (total_time/batch_size - fairer comparison across batch sizes) + normalized_latencies = sorted( + [r["total_time"] / r["batch_size"] for r in scenario_results] + ) + normalized_percentiles = np.linspace(0, 100, len(normalized_latencies)) / 100 + avg_normalized_latency = sum(normalized_latencies) / len(normalized_latencies) + + # Add traces to each figure + fig_e2e.add_trace( + go.Scatter( + x=e2e_latencies, + y=e2e_percentiles, + mode="lines", + name=f"{scenario} (avg: {avg_e2e_latency:.3f}s)", + line=dict(color=colors.get(scenario, "black"), width=2), + ) + ) + + if task_latencies[0] > 0: + fig_task.add_trace( + go.Scatter( + x=task_latencies, + y=task_percentiles, + mode="lines", + name=f"{scenario} (avg: {avg_task_latency:.3f}s)", + line=dict(color=colors.get(scenario, "black"), width=2), + ) + ) + + fig_normalized.add_trace( + go.Scatter( + x=normalized_latencies, + y=normalized_percentiles, + mode="lines", + name=f"{scenario} (avg: {avg_normalized_latency:.3f}s)", + line=dict(color=colors.get(scenario, "black"), width=2), + ) + ) + + # Configure layouts - make legends more prominent + def _configure_layout(fig, title, graph_type="e2e"): + # Set x-axis range based on graph type + if graph_type == "e2e": + x_range = [0, 3] + x_dtick = 0.25 + ref_line_x1 = 3 + else: + x_range = [0, 0.75] # Shorter x-axis for task and normalized graphs + x_dtick = 0.05 + ref_line_x1 = 0.75 + + fig.update_layout( + title=dict(text=title, font=dict(size=20)), + xaxis=dict(title="Latency (seconds)", range=x_range, dtick=x_dtick), + yaxis=dict(title="Percentile (CDF)", range=[0, 1]), + legend=dict( + yanchor="top", + y=0.99, + xanchor="right", + x=0.99, + font=dict(size=14), + bordercolor="Black", + borderwidth=1, + ), + grid=dict(rows=1, columns=1), + height=600, + width=900, + template="plotly_white", + margin=dict(l=80, r=80, t=100, b=80), + ) + # Add horizontal grid lines + fig.update_yaxes(gridcolor="lightgray", gridwidth=0.5) + # Add a reference line at the median + fig.add_shape( + type="line", + x0=0, + x1=ref_line_x1, + y0=0.5, + y1=0.5, + line=dict(color="black", width=1, dash="dash"), + ) + + _configure_layout(fig_e2e, "End-to-End Batch Latency Distribution", "e2e") + _configure_layout(fig_task, "Individual Task Latency Distribution", "task") + _configure_layout( + fig_normalized, + "Normalized Per-Task Latency Distribution (Total Time/Batch Size)", + "normalized", + ) + + # Save all figures (only once) + pio.write_html(fig_e2e, filename_e2e) + pio.write_html(fig_task, filename_task) + pio.write_html(fig_normalized, filename_normalized) + + logger.info("Interactive latency percentile graphs saved to:") + logger.info(" - End-to-end batch latency: %s", filename_e2e) + logger.info(" - Individual task latency: %s", filename_task) + logger.info(" - Normalized per-task latency: %s", filename_normalized) + + +def generate_matplotlib_latency_graph(results: List[Dict[str, Any]]) -> None: + """Generate latency percentile graphs using matplotlib.""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + + # Create filenames for different metrics + filenames = { + "e2e": f"mpl_latency_e2e_{timestamp}.png", + "task": f"mpl_latency_task_{timestamp}.png", + "normalized": f"mpl_latency_normalized_{timestamp}.png", + } + + titles = { + "e2e": "End-to-End Batch Latency Distribution", + "task": "Individual Task Latency Distribution", + "normalized": "Normalized Per-Task Latency Distribution", + } + + # Set x-axis limits based on graph type + x_limits = { + "e2e": (0, 3), + "task": (0, 0.75), + "normalized": (0, 0.75), + } + + # Set tick spacing based on graph type + x_ticks = { + "e2e": np.arange(0, 3, 0.25), + "task": np.arange(0, 0.75, 0.05), + "normalized": np.arange(0, 0.75, 0.05), + } + + colors = { + "1x1000": "blue", + "10x700": "red", + "100x300": "green", + } + + # Create separate plots for each metric + for metric_type in ["e2e", "task", "normalized"]: + plt.figure(figsize=(10, 6)) + + for scenario in ["1x1000", "10x700", "100x300"]: + scenario_results = [r for r in results if r["scenario"] == scenario] + if not scenario_results: + logger.warning("No results found for scenario %s", scenario) + continue + + # Get appropriate latency metric based on type + if metric_type == "e2e": + latencies = sorted([r["total_time"] for r in scenario_results]) + elif metric_type == "task": + latencies = sorted( + [ + r.get("avg_task_time", 0) + for r in scenario_results + if "avg_task_time" in r + ] + ) + else: # normalized + latencies = sorted( + [r["total_time"] / r["batch_size"] for r in scenario_results] + ) + + if latencies: + percentiles = np.linspace(0, 100, len(latencies)) / 100 + avg_latency = sum(latencies) / len(latencies) + + plt.plot( + latencies, + percentiles, + "-", + color=colors.get(scenario, "black"), + linewidth=2, + label=f"{scenario} (avg: {avg_latency:.3f}s)", + ) + + # Set plot styling with optimized limits + plt.title(titles[metric_type], fontsize=16, fontweight="bold") + plt.xlabel("Latency (seconds)", fontsize=14) + plt.ylabel("Percentile (CDF)", fontsize=14) + plt.xlim(*x_limits[metric_type]) + plt.ylim(0, 1) + plt.xticks(x_ticks[metric_type]) + plt.grid(True, linestyle="--", alpha=0.7) + + # Add reference line at median + plt.axhline(y=0.5, color="black", linestyle="--", alpha=0.7) + + # Add legend with improved styling + plt.legend(loc="lower right", frameon=True, framealpha=1, fontsize=12) + plt.tight_layout() + + # Save figure + plt.savefig(filenames[metric_type], dpi=300) + plt.close() + + logger.info( + "Matplotlib %s graph saved to %s", + titles[metric_type], + filenames[metric_type], + ) + + +def generate_throughput_graph(results: List[Dict[str, Any]]) -> None: + """Generate an interactive graph showing the throughput (tasks per second).""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"throughput_{timestamp}.html" + + fig = go.Figure() + + scenarios = ["1x1000", "10x700", "100x300"] + colors = { + "1x1000": "blue", + "10x700": "red", + "100x300": "green", + } + + # Calculate all throughputs first to determine appropriate x-axis range + all_throughputs = [] + for scenario in scenarios: + scenario_results = [r for r in results if r["scenario"] == scenario] + if scenario_results: + throughputs = [ + r["batch_size"] / r["total_time"] + for r in scenario_results + if r["total_time"] > 0 + ] + all_throughputs.extend(throughputs) + + # Dynamically set bin range based on actual data + if all_throughputs: + max_throughput = max(all_throughputs) + # Round up to next integer and add a small buffer + max_bin = min(100, int(max_throughput * 1.1) + 1) + else: + max_bin = 10 + + # Create bins with appropriate range + bin_edges = np.linspace(0, max_bin, min(max_bin * 10 + 1, 101)) + bin_width = bin_edges[1] - bin_edges[0] + + for scenario in scenarios: + scenario_results = [r for r in results if r["scenario"] == scenario] + if not scenario_results: + continue + + throughputs = [ + r["batch_size"] / r["total_time"] + for r in scenario_results + if r["total_time"] > 0 + ] + + avg_throughput = sum(throughputs) / len(throughputs) + + hist, _ = np.histogram(throughputs, bins=bin_edges) + bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) + + fig.add_trace( + go.Bar( + x=bin_centers, + y=hist, + name=f"{scenario} (avg: {avg_throughput:.2f} tasks/s)", + marker_color=colors.get(scenario, "gray"), + opacity=0.7, + width=bin_width * 0.9, + ) + ) + + # Configure axes with more prominence + fig.update_layout( + title={"text": "Task Processing Throughput Distribution", "font": {"size": 24}}, + xaxis=dict( + title={"text": "Throughput (tasks per second)", "font": {"size": 18}}, + tickfont={"size": 14}, + ), + yaxis=dict( + title={"text": "Frequency", "font": {"size": 18}}, tickfont={"size": 14} + ), + barmode="overlay", + legend=dict(yanchor="top", y=0.99, xanchor="right", x=0.01, font={"size": 14}), + height=600, + width=900, + ) + + # Enable better interactivity features + fig.update_layout( + hovermode="x unified", hoverlabel=dict(bgcolor="white", font_size=14) + ) + + pio.write_html(fig, filename) + logger.info("Interactive throughput graph saved to %s", filename) + + +def generate_matplotlib_throughput_graph(results: List[Dict[str, Any]]) -> None: + """Generate a throughput distribution graph using matplotlib.""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"mpl_throughput_{timestamp}.png" + + plt.figure(figsize=(10, 6)) + + scenarios = ["1x1000", "10x700", "100x300"] + colors = { + "1x1000": "blue", + "10x700": "red", + "100x300": "green", + } + + # Calculate all throughputs first to determine appropriate x-axis range + all_throughputs = [] + for scenario in scenarios: + scenario_results = [r for r in results if r["scenario"] == scenario] + if scenario_results: + throughputs = [ + r["batch_size"] / r["total_time"] + for r in scenario_results + if r["total_time"] > 0 + ] + all_throughputs.extend(throughputs) + + # Dynamically set bin range based on actual data + if all_throughputs: + max_throughput = max(all_throughputs) + # Round up to next integer and add a small buffer + max_bin = min(100, int(max_throughput * 1.1) + 1) + else: + max_bin = 10 + + # Create bins with appropriate range + bin_edges = np.linspace(0, max_bin, min(max_bin * 10 + 1, 101)) + + for i, scenario in enumerate(scenarios): + scenario_results = [r for r in results if r["scenario"] == scenario] + if not scenario_results: + continue + + throughputs = [ + r["batch_size"] / r["total_time"] + for r in scenario_results + if r["total_time"] > 0 + ] + + avg_throughput = sum(throughputs) / len(throughputs) + + hist, _ = np.histogram(throughputs, bins=bin_edges) + bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) + + plt.bar( + bin_centers, + hist, + width=bin_edges[1] - bin_edges[0], + color=colors.get(scenario, f"C{i}"), + alpha=0.7, + label=f"{scenario} (avg: {avg_throughput:.2f} tasks/s)", + ) + + plt.title("Task Processing Throughput Distribution", fontsize=14) + plt.xlabel("Throughput (tasks per second)") + plt.ylabel("Frequency") + plt.legend(loc="upper right") + plt.grid(axis="y", linestyle="--", alpha=0.7) + plt.tight_layout() + plt.savefig(filename, dpi=300) + plt.close() + logger.info("Matplotlib throughput graph saved to %s", filename) diff --git a/python/analysisCDF/worker/worker-requirements.txt b/python/analysisCDF/worker/worker-requirements.txt new file mode 100644 index 00000000..bf272cf7 --- /dev/null +++ b/python/analysisCDF/worker/worker-requirements.txt @@ -0,0 +1 @@ +armonik \ No newline at end of file diff --git a/python/analysisCDF/worker/worker.py b/python/analysisCDF/worker/worker.py new file mode 100644 index 00000000..61368487 --- /dev/null +++ b/python/analysisCDF/worker/worker.py @@ -0,0 +1,75 @@ +import logging +import os +import sys +from pathlib import Path + +import grpc +from armonik.common import Output +from armonik.worker import ArmoniKWorker, ClefLogger, TaskHandler + +# Add the common directory to the system path +common_path = Path(__file__).resolve().parent.parent / "common" +sys.path.append(str(common_path)) + + +ClefLogger.setup_logging(logging.INFO) + + +# Task processing +def processor(task_handler: TaskHandler) -> Output: + """ + Processes a task by doing nothing. + + Args: + task_handler: The handler for the current task. + + Returns: + Output: The result of the task processing. + """ + logger = ClefLogger.getLogger("ArmoniKWorker") + logger.info("Handling the Task") + + payload = task_handler.payload + logger.info(f"Received payload: {payload}") + result = "" + # Return the result as Output + task_handler.send_results({task_handler.expected_results[0]: result.encode()}) + return Output() + + +def main(): + # Create Seq compatible logger + logger = ClefLogger.getLogger("ArmoniKWorker") + # Define agent-worker communication endpoints + worker_scheme = ( + "unix://" + if os.getenv("ComputePlane__WorkerChannel__SocketType", "unixdomainsocket") + == "unixdomainsocket" + else "http://" + ) + agent_scheme = ( + "unix://" + if os.getenv("ComputePlane__AgentChannel__SocketType", "unixdomainsocket") + == "unixdomainsocket" + else "http://" + ) + worker_endpoint = worker_scheme + os.getenv( + "ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock" + ) + agent_endpoint = agent_scheme + os.getenv( + "ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock" + ) + + # Start worker + logger.info("Worker Started") + # Use options to fix Unix socket connection on localhost (cf: ) + with grpc.insecure_channel( + agent_endpoint, options=(("grpc.default_authority", "localhost"),) + ) as agent_channel: + worker = ArmoniKWorker(agent_channel, processor, logger=logger) + logger.info("Worker Connected") + worker.start(worker_endpoint) + + +if __name__ == "__main__": + main() From 69d1958626b3d2383d45d3a5be87863de6aa4b56 Mon Sep 17 00:00:00 2001 From: Mohamed Khairallah Gharbi Date: Mon, 24 Mar 2025 18:10:59 +0100 Subject: [PATCH 2/2] get only important plots and correcting how to compute latency --- python/analysisCDF/client/benchmark.py | 555 ++++++++++++------ python/analysisCDF/client/reporting.py | 149 ++--- python/analysisCDF/client/requirements.txt | 6 +- python/analysisCDF/client/visualization.py | 618 ++++++++++----------- 4 files changed, 748 insertions(+), 580 deletions(-) diff --git a/python/analysisCDF/client/benchmark.py b/python/analysisCDF/client/benchmark.py index a2f7de41..a308c804 100644 --- a/python/analysisCDF/client/benchmark.py +++ b/python/analysisCDF/client/benchmark.py @@ -1,21 +1,173 @@ +import concurrent.futures import statistics import time from datetime import timedelta -from typing import Any, Dict, List +from typing import Any, Callable, Dict, List, Set, Tuple import grpc from armonik.client import ArmoniKEvents, ArmoniKResults, ArmoniKSessions, ArmoniKTasks from armonik.common import TaskDefinition, TaskOptions -from reporting import generate_detailed_report, print_summary, save_results_to_csv +from reporting import generate_detailed_report, print_summary, save_raw_results_csv from utils import logger, retry_operation from visualization import ( generate_latency_percentile_graph, generate_matplotlib_latency_graph, - generate_matplotlib_throughput_graph, - generate_throughput_graph, ) +def submit_tasks_in_parallel( + task_client: ArmoniKTasks, + session_id: str, + task_definitions: List[TaskDefinition], + batch_size: int, +) -> Tuple[float, float, float, Dict[int, Tuple[float, float]], Dict[int, int]]: + """Submit tasks in parallel for optimal performance with enhanced timing tracking. + + Returns: + Tuple of ( + submission_start, + submission_end, + max_individual_submission_time, + chunk_times, # Maps chunk_idx to (start_time, end_time) + task_to_chunk # Maps task_idx to chunk_idx + ) + """ + task_to_chunk = {} # Maps task_idx to chunk_idx + + # Determine chunking strategy + if batch_size <= 10: + chunks = [task_definitions] + # All tasks are in chunk 0 + for i in range(batch_size): + task_to_chunk[i] = 0 + else: + chunk_size = max(1, min(10, batch_size // 10)) + chunks = [ + task_definitions[i : i + chunk_size] + for i in range(0, len(task_definitions), chunk_size) + ] + # Map each task to its chunk + for i in range(batch_size): + task_to_chunk[i] = i // chunk_size + + max_workers = min(len(chunks), 20) + chunk_end_times = [] + max_individual_time = 0 + chunk_times = {} + + submission_start = time.time() + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + + def submit_chunk(chunk_idx, chunk): + chunk_start = time.time() + try: + task_client.submit_tasks(session_id, chunk) + chunk_end = time.time() + return chunk_idx, chunk_start, chunk_end + except Exception as e: + logger.error("Error submitting chunk %s: %s", chunk_idx, e) + raise + + futures = { + executor.submit(submit_chunk, idx, chunk): idx + for idx, chunk in enumerate(chunks) + } + + for future in concurrent.futures.as_completed(futures): + try: + chunk_idx, chunk_start, chunk_end = future.result() + individual_time = chunk_end - chunk_start + chunk_end_times.append(chunk_end) + chunk_times[chunk_idx] = (chunk_start, chunk_end) + max_individual_time = max(max_individual_time, individual_time) + except Exception as e: + logger.error("Task submission failed: %s", e) + + if chunk_end_times: + submission_end = max(chunk_end_times) + else: + submission_end = time.time() + + return ( + submission_start, + submission_end, + max_individual_time, + chunk_times, + task_to_chunk, + ) + + +def process_results_in_parallel( + events_client: ArmoniKEvents, + result_ids: List[str], + session_id: str, + submission_timestamp: float, + batch_size: int, +) -> Dict[str, Dict]: + """Process and download results in parallel without needing to fix download times.""" + # For very small batches, use direct download + if batch_size <= 1: + results = events_client.wait_for_result_availability_and_download( + result_ids=result_ids, + session_id=session_id, + submission_timestamp=submission_timestamp, + ) + return results + # Configure chunk size and parallelism based on batch size + if batch_size <= 20: + chunk_size = 1 + max_workers = batch_size + parallelism = 1 + elif batch_size <= 100: + chunk_size = 5 + max_workers = min(20, batch_size // chunk_size + 1) + parallelism = 2 + else: + chunk_size = 20 + max_workers = 20 + parallelism = 4 + + chunks = [ + result_ids[i : i + chunk_size] for i in range(0, len(result_ids), chunk_size) + ] + all_results = {} + # Process chunks in parallel - download times are already correctly calculated by client + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + + def download_with_timestamp(chunk_idx, chunk): + try: + results = events_client.wait_for_result_availability_and_download( + result_ids=chunk, + session_id=session_id, + parallelism=parallelism, + submission_timestamp=submission_timestamp, + ) + + for result_id, result_data in results.items(): + result_data["chunk_idx"] = chunk_idx + + return results + except Exception as e: + logger.error("Error in chunk %s: %s", chunk_idx, e) + raise + + # Submit all chunks for parallel processing + future_to_chunk = { + executor.submit(download_with_timestamp, i, chunk): i + for i, chunk in enumerate(chunks) + } + + for future in concurrent.futures.as_completed(future_to_chunk): + try: + chunk_results = future.result() + all_results.update(chunk_results) + except Exception as e: + chunk_idx = future_to_chunk[future] + logger.error("Error processing result chunk %s: %s", chunk_idx, e) + + return all_results + + def run_batch( task_client: ArmoniKTasks, result_client: ArmoniKResults, @@ -26,17 +178,15 @@ def run_batch( iteration: int, scenario: str, ) -> Dict[str, Any]: - """Run a batch of tasks and measure performance metrics, reusing the same clients.""" + """Run a batch of tasks with optimized performance focusing on E2E metrics.""" metrics = { "batch_size": batch_size, - "submission_time": 0.0, - "processing_time": 0.0, - "total_time": 0.0, "scenario": scenario, "iteration": iteration + 1, } - start_total = time.time() - + # Start time for the entire operation + t1_start = time.time() + # Task preparation - create session, outputs, payloads, etc. task_options = TaskOptions( max_duration=timedelta(hours=1), max_retries=2, @@ -45,201 +195,259 @@ def run_batch( options={}, ) - # Create session with retry (you can also reuse a single session if desired) session_id = retry_operation( lambda: session_client.create_session(task_options, partition_ids=[partition]), operation_name="Session creation", ) - - task_definitions = [] - - # Create outputs metadata + t_end_create_session = time.time() + metrics["create_session_time"] = t_end_create_session - t1_start + t_start_create_results = time.time() outputs = result_client.create_results_metadata( result_names=[f"output_{i}" for i in range(batch_size)], session_id=session_id, ) - # Ensure result_ids is populated result_ids = [outputs[f"output_{i}"].result_id for i in range(batch_size)] - # Create payloads data - payloads_data = {f"payload_{i}": "".encode() for i in range(batch_size)} - + payloads_data = {f"payload_{i}": b"" for i in range(batch_size)} payloads = result_client.create_results( results_data=payloads_data, session_id=session_id ) + t_end_create_results = time.time() + metrics["create_results_time"] = t_end_create_results - t_start_create_results - # Ensure payloads are assigned correctly + task_definitions = [None] * batch_size for i in range(batch_size): payload_id = payloads[f"payload_{i}"].result_id output_id = outputs[f"output_{i}"].result_id - - task_definitions.append( - TaskDefinition(payload_id=payload_id, expected_output_ids=[output_id]) + task_definitions[i] = TaskDefinition( + payload_id=payload_id, expected_output_ids=[output_id] ) - submission_start = time.time() - task_client.submit_tasks(session_id, task_definitions) - submission_timestamp = time.time() # Record exactly when tasks were submitted - metrics["submission_time"] = submission_timestamp - submission_start - + preparation_time = time.time() - t1_start + metrics["preparation_time"] = preparation_time + # Submit tasks with detailed timing + ( + t2_submit_start, + t2_submit_end, + max_individual_submit, + chunk_times, + task_to_chunk, + ) = submit_tasks_in_parallel(task_client, session_id, task_definitions, batch_size) + + metrics["submission_time"] = t2_submit_end - t2_submit_start + # Process and download results with accurate timing processing_start = time.time() - try: - downloaded_results = events_client.wait_for_result_availability_and_download( - result_ids=result_ids, - session_id=session_id, - submission_timestamp=submission_timestamp, - ) - - # Verify we received all expected results - if len(downloaded_results) != len(result_ids): - logger.warning( - "Expected %d results but only received %d", - len(result_ids), - len(downloaded_results), - ) - - task_times = [] - task_download_times = [] - task_completion_timestamps = [] - earliest_completion = float("inf") - latest_completion = 0 - - for result_id, result_data in downloaded_results.items(): - if "processing_time" in result_data: - task_times.append(result_data["processing_time"]) - - if "download_time" in result_data: - task_download_times.append(result_data["download_time"]) - - if "completion_timestamp" in result_data: - completion_ts = result_data["completion_timestamp"] - task_completion_timestamps.append(completion_ts) - earliest_completion = min(earliest_completion, completion_ts) - latest_completion = max(latest_completion, completion_ts) - - if task_times: - metrics["min_task_time"] = min(task_times) - metrics["max_task_time"] = max(task_times) - metrics["avg_task_time"] = sum(task_times) / len(task_times) - metrics["median_task_time"] = statistics.median(task_times) - - sorted_times = sorted(task_times) - if len(sorted_times) >= 10: - metrics["p90_task_time"] = sorted_times[int(len(sorted_times) * 0.9)] - metrics["p95_task_time"] = sorted_times[int(len(sorted_times) * 0.95)] - metrics["p99_task_time"] = sorted_times[int(len(sorted_times) * 0.99)] - - if len(task_times) > 1: - metrics["stddev_task_time"] = statistics.stdev(task_times) - metrics["cv_task_time"] = ( - metrics["stddev_task_time"] / metrics["avg_task_time"] - if metrics["avg_task_time"] > 0 - else 0 - ) - - outlier_threshold = ( - metrics["avg_task_time"] + 2 * metrics["stddev_task_time"] - ) - outliers = [t for t in task_times if t > outlier_threshold] - metrics["outlier_count"] = len(outliers) - metrics["outlier_percentage"] = ( - (len(outliers) / len(task_times)) * 100 if task_times else 0 - ) - - logger.debug( - "Task timing stats - Min: %.3fs, Max: %.3fs, Avg: %.3fs, StdDev: %.3fs, Outliers: %d/%d", - metrics["min_task_time"], - metrics["max_task_time"], - metrics["avg_task_time"], - metrics.get("stddev_task_time", 0), - metrics.get("outlier_count", 0), - len(task_times), - ) - - # Add download time metrics if available - if task_download_times: - metrics["min_download_time"] = min(task_download_times) - metrics["max_download_time"] = max(task_download_times) - metrics["avg_download_time"] = sum(task_download_times) / len( - task_download_times - ) - metrics["total_download_time"] = sum(task_download_times) - - if len(task_download_times) > 1: - metrics["stddev_download_time"] = statistics.stdev(task_download_times) - - logger.debug( - "Download time stats - Min: %.3fs, Max: %.3fs, Avg: %.3fs, Total: %.3fs", - metrics["min_download_time"], - metrics["max_download_time"], - metrics["avg_download_time"], - metrics["total_download_time"], - ) - - if task_completion_timestamps and len(task_completion_timestamps) > 1: - metrics["completion_spread"] = latest_completion - earliest_completion - metrics["first_result_time"] = earliest_completion - submission_timestamp - metrics["last_result_time"] = latest_completion - submission_timestamp + first_result_time = None + last_result_time = None - if metrics["completion_spread"] > 0: - metrics["completion_rate"] = ( - len(task_completion_timestamps) / metrics["completion_spread"] - ) + all_results = process_results_in_parallel( + events_client, result_ids, session_id, t2_submit_end, batch_size + ) - logger.debug( - "Task completion spread: %.3fs (first: %.3fs, last: %.3fs, rate: %.1f results/sec)", - metrics["completion_spread"], - metrics["first_result_time"], - metrics["last_result_time"], - metrics.get("completion_rate", 0), + # Collection end time + t6_collection_end = time.time() + + # Calculate primary metrics + metrics["end_to_end_latency"] = t6_collection_end - t2_submit_start + metrics["total_time"] = t6_collection_end - t1_start + metrics["processing_time"] = t6_collection_end - processing_start + + if metrics["end_to_end_latency"] > 0: + metrics["throughput"] = batch_size / metrics["end_to_end_latency"] + else: + metrics["throughput"] = 0 + + # Extract timing information from results + completion_timestamps = [] + availability_times = [] + download_times = [] + download_complete_timestamps = [] + + # Process all result data + for result_id, result_data in all_results.items(): + # Track completion timestamps + if "completion_timestamp" in result_data: + completion_time = result_data["completion_timestamp"] + completion_timestamps.append(completion_time) + + # Track result availability relative to submission + availability_time = completion_time - t2_submit_end + availability_times.append(availability_time) + + # Track first and last result times + if first_result_time is None or completion_time < first_result_time: + first_result_time = completion_time + if last_result_time is None or completion_time > last_result_time: + last_result_time = completion_time + + # Track download times + if "download_time" in result_data: + download_times.append(result_data["download_time"]) + + # Track download completion timestamps + if "download_complete_timestamp" in result_data: + download_complete_timestamps.append( + result_data["download_complete_timestamp"] ) - metrics["processing_time"] = time.time() - processing_start + # Calculate task execution time (time to first result) + if first_result_time is not None: + metrics["task_execution_time"] = first_result_time - t2_submit_end + else: + metrics["task_execution_time"] = metrics["processing_time"] * 0.8 + + # Calculate average download time + if download_times: + metrics["download_time"] = sum(download_times) / len(download_times) + metrics["min_download_time"] = min(download_times) + metrics["max_download_time"] = max(download_times) + else: + metrics["download_time"] = metrics["processing_time"] * 0.2 + + # Calculate batch completion time (time from submission start to last download completion) + if download_complete_timestamps: + metrics["batch_completion_time"] = ( + max(download_complete_timestamps) - t2_submit_start + ) + metrics["download_completion_spread"] = max(download_complete_timestamps) - min( + download_complete_timestamps + ) + else: + # Fallback to end-to-end latency if download timestamps not available + metrics["batch_completion_time"] = metrics["end_to_end_latency"] - if "avg_task_time" in metrics and batch_size > 0: - metrics["total_task_time"] = sum(task_times) - metrics["ideal_parallel_time"] = metrics["avg_task_time"] - metrics["parallelization_efficiency"] = ( - metrics["ideal_parallel_time"] / metrics["processing_time"] - if metrics["processing_time"] > 0 - else 0 - ) - metrics["system_utilization"] = ( - metrics["total_task_time"] / (batch_size * metrics["processing_time"]) - if metrics["processing_time"] > 0 - else 0 - ) + # Calculate per-task statistics + per_task_stats = [] + per_task_e2e_times = [] - logger.debug( - "System efficiency - Utilization: %.2f%%, Parallelization efficiency: %.2f%%", - metrics["system_utilization"] * 100, - metrics["parallelization_efficiency"] * 100, - ) + for i in range(batch_size): + result_id = result_ids[i] + if result_id not in all_results: + continue + + result_data = all_results[result_id] + task_stat = { + "task_index": i, + "result_id": result_id, + "batch_size": batch_size, + "scenario": scenario, + } + + # Get the submission time for this specific task (or its chunk) + chunk_idx = task_to_chunk.get(i, 0) + chunk_start_time, chunk_end_time = chunk_times.get( + chunk_idx, (t2_submit_start, t2_submit_end) + ) + task_stat["submit_time"] = chunk_start_time + + # Track completion time (when result became available) + if "completion_timestamp" in result_data: + completion_timestamp = result_data["completion_timestamp"] + task_stat["completion_timestamp"] = completion_timestamp + task_stat["completion_time"] = completion_timestamp - chunk_start_time + task_stat["result_availability_time"] = completion_timestamp - t2_submit_end + + # Track download time and completion + if "download_time" in result_data: + task_stat["download_time"] = result_data["download_time"] + + if "download_complete_timestamp" in result_data: + download_complete_time = result_data["download_complete_timestamp"] + task_stat["download_complete_timestamp"] = download_complete_time + + # Calculate true end-to-end time: from task submission to download completion + task_e2e_time = download_complete_time - chunk_start_time + task_stat["e2e_latency"] = task_e2e_time + per_task_e2e_times.append(task_e2e_time) + + per_task_stats.append(task_stat) + + metrics["per_task_stats"] = per_task_stats + + # Calculate per-task E2E statistics + if per_task_e2e_times: + metrics["min_task_e2e"] = min(per_task_e2e_times) + metrics["max_task_e2e"] = max(per_task_e2e_times) + metrics["avg_task_e2e"] = statistics.fmean(per_task_e2e_times) + metrics["median_task_e2e"] = statistics.median(per_task_e2e_times) + metrics["task_e2e_spread"] = metrics["max_task_e2e"] - metrics["min_task_e2e"] + + # Calculate percentiles + sorted_e2e = sorted(per_task_e2e_times) + metrics["p50_task_e2e"] = sorted_e2e[int(len(sorted_e2e) * 0.5)] + metrics["p95_task_e2e"] = sorted_e2e[int(len(sorted_e2e) * 0.95)] + metrics["p99_task_e2e"] = ( + sorted_e2e[int(len(sorted_e2e) * 0.99)] + if len(sorted_e2e) >= 100 + else sorted_e2e[-1] + ) - except Exception as e: - logger.error("Error waiting for results: %s", e) + logger.debug( + "Per-task E2E times - Min: %.3fs, Avg: %.3fs, Max: %.3fs, Spread: %.3fs", + metrics["min_task_e2e"], + metrics["avg_task_e2e"], + metrics["max_task_e2e"], + metrics["task_e2e_spread"], + ) - metrics["total_time"] = time.time() - start_total - metrics["tasks_per_second"] = ( - (batch_size / metrics["total_time"]) if metrics["total_time"] > 0 else 0 + # Generate detailed log message with all metrics + log_msg = ( + f"Batch {metrics['iteration']} ({batch_size} tasks) - " + f"Preparation: {metrics['preparation_time']:.3f}s, " + f"Submission: {metrics['submission_time']:.3f}s, " + f"Task Exec: {metrics['task_execution_time']:.3f}s, " + f"Download: {metrics['download_time']:.3f}s, " + f"Batch E2E: {metrics['end_to_end_latency']:.3f}s, " + f"Batch Completion: {metrics.get('batch_completion_time', 0):.3f}s, " + f"Avg Task E2E: {metrics.get('avg_task_e2e', 0):.3f}s, " + f"Throughput: {metrics['throughput']:.2f} tasks/s" ) + logger.info(log_msg) + return metrics def run_benchmarks(endpoint: str, partition: str) -> List[Dict[str, Any]]: - """Runs the benchmarking scenarios sequentially without parallelization, - but reuses the same gRPC channel and client objects across all runs. - """ + """Run benchmark scenarios with optimized channel settings.""" all_results = [] - # Create one gRPC channel for all scenarios - with grpc.insecure_channel(endpoint) as channel: + channel_options = [ + ("grpc.max_receive_message_length", 100 * 1024 * 1024), + ("grpc.max_send_message_length", 100 * 1024 * 1024), + ("grpc.keepalive_time_ms", 30000), + ("grpc.keepalive_timeout_ms", 10000), + ("grpc.http2.max_pings_without_data", 0), + ("grpc.http2.min_time_between_pings_ms", 10000), + ("grpc.http2.min_ping_interval_without_data_ms", 5000), + ] + + with grpc.insecure_channel(endpoint, options=channel_options) as channel: task_client = ArmoniKTasks(channel) result_client = ArmoniKResults(channel) session_client = ArmoniKSessions(channel) events_client = ArmoniKEvents(channel) + logger.info("Starting warm-up phase (2 batches of 100 tasks)...") + for i in range(2): + try: + logger.info("Running warm-up batch %d/2...", i + 1) + run_batch( + task_client=task_client, + result_client=result_client, + session_client=session_client, + events_client=events_client, + partition=partition, + batch_size=100, + iteration=i, + scenario="warmup", + ) + logger.info("Warm-up batch %d completed", i + 1) + except Exception as e: + logger.warning("Error in warm-up batch %d: %s", i + 1, e) + + logger.info("Warm-up phase completed. Starting benchmark scenarios...") + scenarios = [ {"name": "1x1000", "batch_size": 1, "iterations": 1000}, {"name": "10x700", "batch_size": 10, "iterations": 700}, @@ -253,7 +461,6 @@ def run_benchmarks(endpoint: str, partition: str) -> List[Dict[str, Any]]: successful_runs = 0 failed_runs = 0 - # Process iterations one by one for i in range(scenario["iterations"]): try: result = run_batch( @@ -269,17 +476,19 @@ def run_benchmarks(endpoint: str, partition: str) -> List[Dict[str, Any]]: scenario_results.append(result) successful_runs += 1 - # Log progress every 10 iterations or at the end - if (i + 1) % 10 == 0 or (i + 1) == scenario["iterations"]: + if (i + 1) % max(1, min(50, scenario["iterations"] // 20)) == 0 or ( + i + 1 + ) == scenario["iterations"]: elapsed = time.time() - start_time progress = (i + 1) / scenario["iterations"] * 100 est_remaining = ( - (elapsed / (i + 1)) * (scenario["iterations"] - i - 1) + ((elapsed / (i + 1)) * (scenario["iterations"] - i - 1)) if i > 0 else 0 ) + logger.info( - "Progress: %.1f%% - Completed %d/%d iterations (%d successful, %d failed) - Est. remaining time: %.1fs", + "Progress: %.1f%% - Completed %d/%d iterations (%d successful, %d failed) - Est. remaining: %.1fs", progress, i + 1, scenario["iterations"], @@ -289,24 +498,22 @@ def run_benchmarks(endpoint: str, partition: str) -> List[Dict[str, Any]]: ) except Exception as e: logger.error( - "Error in batch execution (iteration %d): %s", i + 1, str(e) + "Error in batch execution (iteration %d): %s", i + 1, e ) failed_runs += 1 all_results.extend(scenario_results) logger.info( - "Completed scenario %s with %d successful and %d failed runs in %.1fs", + "Completed scenario %s with %d successful and %d failed runs in %.1f seconds", scenario["name"], successful_runs, failed_runs, time.time() - start_time, ) - save_results_to_csv(all_results) + save_raw_results_csv(all_results) generate_latency_percentile_graph(all_results) - generate_throughput_graph(all_results) generate_matplotlib_latency_graph(all_results) - generate_matplotlib_throughput_graph(all_results) print_summary(all_results) generate_detailed_report(all_results) diff --git a/python/analysisCDF/client/reporting.py b/python/analysisCDF/client/reporting.py index 5319f16a..35c372af 100644 --- a/python/analysisCDF/client/reporting.py +++ b/python/analysisCDF/client/reporting.py @@ -1,4 +1,5 @@ import csv +import os import time from typing import Any, Dict, List @@ -10,6 +11,10 @@ def save_results_to_csv(results: List[Dict[str, Any]]) -> None: timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"benchmark_results_{timestamp}.csv" + # Create directory for results if it doesn't exist + os.makedirs("benchmark_results", exist_ok=True) + filepath = os.path.join("benchmark_results", filename) + # Get all possible field names from all results (metrics may vary between runs) fieldnames = set() for result in results: @@ -21,9 +26,12 @@ def save_results_to_csv(results: List[Dict[str, Any]]) -> None: "batch_size", "iteration", "submission_time", + "task_execution_time", # Add task execution time to core fields + "download_time", "processing_time", "total_time", "tasks_per_second", + "true_end_to_end_latency", ] # Add task timing fields @@ -58,14 +66,6 @@ def save_results_to_csv(results: List[Dict[str, Any]]) -> None: "completion_rate", ] - # Add system efficiency fields - efficiency_fields = [ - "total_task_time", - "ideal_parallel_time", - "parallelization_efficiency", - "system_utilization", - ] - # Sort the fields in a logical order ordered_fieldnames = [] for field_list in [ @@ -73,7 +73,6 @@ def save_results_to_csv(results: List[Dict[str, Any]]) -> None: task_timing_fields, download_fields, completion_fields, - efficiency_fields, ]: for field in field_list: if field in fieldnames: @@ -83,13 +82,13 @@ def save_results_to_csv(results: List[Dict[str, Any]]) -> None: # Add any remaining fields ordered_fieldnames.extend(sorted(fieldnames)) - with open(filename, "w", newline="", encoding="utf-8") as csvfile: + with open(filepath, "w", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=ordered_fieldnames) writer.writeheader() for result in results: writer.writerow(result) - logger.info("Detailed results saved to %s", filename) + logger.info("Detailed results saved to %s", filepath) # Also save a simplified version with just core metrics for easy analysis save_simplified_results(results, timestamp) @@ -97,22 +96,25 @@ def save_results_to_csv(results: List[Dict[str, Any]]) -> None: def save_simplified_results(results: List[Dict[str, Any]], timestamp: str) -> None: """Save a simplified version of the results with just the core metrics.""" - filename = f"simplified_results_{timestamp}.csv" + os.makedirs("benchmark_results", exist_ok=True) + filepath = os.path.join("benchmark_results", f"simplified_results_{timestamp}.csv") core_fields = [ "scenario", "batch_size", "iteration", "submission_time", + "task_execution_time", # Include task execution time in simplified results + "download_time", "processing_time", "total_time", + "true_end_to_end_latency", "tasks_per_second", "avg_task_time", "p95_task_time", - "system_utilization", ] - with open(filename, "w", newline="", encoding="utf-8") as csvfile: + with open(filepath, "w", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=core_fields) writer.writeheader() for result in results: @@ -120,7 +122,7 @@ def save_simplified_results(results: List[Dict[str, Any]], timestamp: str) -> No row = {field: result.get(field, "") for field in core_fields} writer.writerow(row) - logger.info("Simplified results saved to %s", filename) + logger.info("Simplified results saved to %s", filepath) def print_summary(results: List[Dict[str, Any]]) -> None: @@ -145,12 +147,34 @@ def print_summary(results: List[Dict[str, Any]]) -> None: avg_submission = sum( r.get("submission_time", 0) for r in scenario_results ) / len(scenario_results) + + # Add task execution time to summary + avg_task_exec = sum( + r.get("task_execution_time", 0) for r in scenario_results + ) / len(scenario_results) + + # Add download time to summary + avg_download_time = sum( + r.get("download_time", 0) for r in scenario_results + ) / len(scenario_results) + avg_processing = sum( r.get("processing_time", 0) for r in scenario_results ) / len(scenario_results) + avg_total = sum(r.get("total_time", 0) for r in scenario_results) / len( scenario_results ) + + # True end-to-end latency if available + if any("true_end_to_end_latency" in r for r in scenario_results): + avg_e2e = sum( + r.get("true_end_to_end_latency", r.get("total_time", 0)) + for r in scenario_results + ) / len(scenario_results) + else: + avg_e2e = avg_total + avg_throughput = sum( r["batch_size"] / r["total_time"] for r in scenario_results @@ -158,10 +182,15 @@ def print_summary(results: List[Dict[str, Any]]) -> None: ) / len(scenario_results) print("TIMING METRICS:") - print(f" Submission time: {avg_submission:.4f}s") - print(f" Processing time: {avg_processing:.4f}s") - print(f" Total latency: {avg_total:.4f}s") - print(f" Throughput: {avg_throughput:.2f} tasks/sec") + print(f" Submission time: {avg_submission:.4f}s") + print( + f" Task execution time: {avg_task_exec:.4f}s (submission to results available)" + ) + print(f" Download time: {avg_download_time:.4f}s") + print(f" Processing time: {avg_processing:.4f}s") + print(f" Total latency: {avg_total:.4f}s") + print(f" End-to-end: {avg_e2e:.4f}s") + print(f" Throughput: {avg_throughput:.2f} tasks/sec") # Task processing metrics (if available) if any("avg_task_time" in r for r in scenario_results): @@ -199,7 +228,6 @@ def print_summary(results: List[Dict[str, Any]]) -> None: print(f" Min task time: {avg_min:.4f}s") print(f" Max task time: {avg_max:.4f}s") - # Download metrics (if available) if any("avg_download_time" in r for r in scenario_results): download_times = [r for r in scenario_results if "avg_download_time" in r] avg_download = sum( @@ -209,38 +237,14 @@ def print_summary(results: List[Dict[str, Any]]) -> None: print("\nDOWNLOAD METRICS:") print(f" Avg download: {avg_download:.4f}s") - # System efficiency metrics (if available) - if any("system_utilization" in r for r in scenario_results): - efficiency_results = [ - r for r in scenario_results if "system_utilization" in r - ] - avg_utilization = sum( - r.get("system_utilization", 0) for r in efficiency_results - ) / len(efficiency_results) - avg_efficiency = ( - sum( - r.get("parallelization_efficiency", 0) - for r in efficiency_results - if "parallelization_efficiency" in r - ) - / len( - [r for r in efficiency_results if "parallelization_efficiency" in r] - ) - if any("parallelization_efficiency" in r for r in efficiency_results) - else 0 - ) - - print("\nEFFICIENCY METRICS:") - print(f" System utilization: {avg_utilization*100:.2f}%") - print(f" Parallelization eff.: {avg_efficiency*100:.2f}%") - print("\n" + "=" * 80) def generate_detailed_report(results: List[Dict[str, Any]]) -> None: """Generate a detailed HTML report with metrics breakdown.""" timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"detailed_report_{timestamp}.html" + os.makedirs("benchmark_results", exist_ok=True) + filepath = os.path.join("benchmark_results", f"detailed_report_{timestamp}.html") # Simple HTML report template html_content = f""" @@ -289,39 +293,34 @@ def generate_detailed_report(results: List[Dict[str, Any]]) -> None: # Add rows for key metrics metrics = [ ("Total Time (s)", "total_time"), + ("End-to-End Latency (s)", "true_end_to_end_latency"), + ( + "Task Execution Time (s)", + "task_execution_time", + ), # Add task execution time to report + ("Download Time (s)", "download_time"), ("Processing Time (s)", "processing_time"), ("Submission Time (s)", "submission_time"), ("Tasks per Second", "tasks_per_second"), ("Task Time (s)", "avg_task_time"), ("Download Time (s)", "avg_download_time"), - ( - "System Utilization", - "system_utilization", - 100, - ), # Multiply by 100 for percentage - ( - "Parallelization Efficiency", - "parallelization_efficiency", - 100, - ), # Multiply by 100 for percentage ] - for label, key, *args in metrics: - multiplier = args[0] if args else 1 + for label, key in metrics: if any(key in r for r in scenario_results): valid_results = [r[key] for r in scenario_results if key in r] if valid_results: - avg_val = sum(valid_results) / len(valid_results) * multiplier - min_val = min(valid_results) * multiplier - max_val = max(valid_results) * multiplier + avg_val = sum(valid_results) / len(valid_results) + min_val = min(valid_results) + max_val = max(valid_results) # Calculate P95 sorted_vals = sorted(valid_results) p95_idx = int(len(sorted_vals) * 0.95) p95_val = ( - sorted_vals[p95_idx] * multiplier + sorted_vals[p95_idx] if p95_idx < len(sorted_vals) - else sorted_vals[-1] * multiplier + else sorted_vals[-1] ) html_content += f""" @@ -346,7 +345,27 @@ def generate_detailed_report(results: List[Dict[str, Any]]) -> None: """ # Write HTML file - with open(filename, "w") as f: + with open(filepath, "w") as f: f.write(html_content) - logger.info("Detailed HTML report generated: %s", filename) + logger.info("Detailed HTML report generated: %s", filepath) + + +def save_raw_results_csv(results: List[Dict[str, Any]]) -> None: + """Save completely raw results with all fields included, for detailed analysis.""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + os.makedirs("benchmark_results", exist_ok=True) + filepath = os.path.join("benchmark_results", f"raw_results_{timestamp}.csv") + + # Get all fields from all results + all_fields = set() + for result in results: + all_fields.update(result.keys()) + + with open(filepath, "w", newline="", encoding="utf-8") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=sorted(all_fields)) + writer.writeheader() + for result in results: + writer.writerow(result) + + logger.info("Raw results saved to %s", filepath) diff --git a/python/analysisCDF/client/requirements.txt b/python/analysisCDF/client/requirements.txt index 72288417..09ba460d 100644 --- a/python/analysisCDF/client/requirements.txt +++ b/python/analysisCDF/client/requirements.txt @@ -1,4 +1,6 @@ -armonik matplotlib plotly -dotenv \ No newline at end of file +dotenv +--extra-index-url https://test.pypi.org/simple/ +--trusted-host test.pypi.org +armonik==3.25.1.dev3677 \ No newline at end of file diff --git a/python/analysisCDF/client/visualization.py b/python/analysisCDF/client/visualization.py index 7533c93d..17da9fff 100644 --- a/python/analysisCDF/client/visualization.py +++ b/python/analysisCDF/client/visualization.py @@ -9,16 +9,15 @@ def generate_latency_percentile_graph(results: List[Dict[str, Any]]) -> None: - """Generate an interactive graph showing the CDF of scheduling overhead.""" + """Generate interactive CDF graphs for per-task, batch E2E, and batch completion latencies.""" timestamp = time.strftime("%Y%m%d_%H%M%S") - filename_e2e = f"latency_percentile_e2e_{timestamp}.html" - filename_task = f"latency_percentile_task_{timestamp}.html" - filename_normalized = f"latency_percentile_normalized_{timestamp}.html" + filename_per_task_e2e = f"per_task_e2e_cdf_{timestamp}.html" + filename_batch_e2e = f"batch_e2e_cdf_{timestamp}.html" + filename_batch_completion = f"batch_completion_cdf_{timestamp}.html" - # Create separate figures for different latency metrics - fig_e2e = go.Figure() - fig_task = go.Figure() - fig_normalized = go.Figure() + fig_per_task_e2e = go.Figure() + fig_batch_e2e = go.Figure() + fig_batch_completion = go.Figure() colors = { "1x1000": "blue", @@ -26,166 +25,194 @@ def generate_latency_percentile_graph(results: List[Dict[str, Any]]) -> None: "100x300": "green", } + label_map = { + "1x1000": "1-task batch", + "10x700": "10-task batch", + "100x300": "100-task batch", + } + for scenario in ["1x1000", "10x700", "100x300"]: scenario_results = [r for r in results if r["scenario"] == scenario] if not scenario_results: logger.warning("No results found for scenario %s", scenario) continue - batch_size = scenario_results[0]["batch_size"] + # 1. Per-task end-to-end latencies (from submission to download) + all_per_task_e2e_times = [] + for result in scenario_results: + if "per_task_stats" in result: + for task_stat in result["per_task_stats"]: + if "e2e_latency" in task_stat: + all_per_task_e2e_times.append(task_stat["e2e_latency"]) + + if all_per_task_e2e_times: + all_per_task_e2e_times.sort() + percentiles = np.linspace(0, 100, len(all_per_task_e2e_times)) / 100 + avg_e2e_time = sum(all_per_task_e2e_times) / len(all_per_task_e2e_times) + display_name = label_map.get(scenario, scenario) + + fig_per_task_e2e.add_trace( + go.Scatter( + x=all_per_task_e2e_times, + y=percentiles, + mode="lines", + name=f"{display_name} (avg: {avg_e2e_time:.3f}s)", + line=dict(color=colors.get(scenario, "black"), width=2), + ) + ) - # End-to-end latency (total time per batch) - e2e_latencies = sorted([r["total_time"] for r in scenario_results]) - e2e_percentiles = np.linspace(0, 100, len(e2e_latencies)) / 100 - avg_e2e_latency = sum(e2e_latencies) / len(e2e_latencies) + # 2. Batch end-to-end latencies (submission to all results collected) + batch_e2e_times = [r.get("end_to_end_latency", 0) for r in scenario_results] - # Task-level latency (avg_task_time - actual time each task took to process) - task_latencies = sorted( - [ - r.get("avg_task_time", 0) - for r in scenario_results - if "avg_task_time" in r - ] - ) - if task_latencies: - task_percentiles = np.linspace(0, 100, len(task_latencies)) / 100 - avg_task_latency = sum(task_latencies) / len(task_latencies) - else: - task_latencies = [0] - task_percentiles = [0] - avg_task_latency = 0 - - # Normalized latency (total_time/batch_size - fairer comparison across batch sizes) - normalized_latencies = sorted( - [r["total_time"] / r["batch_size"] for r in scenario_results] - ) - normalized_percentiles = np.linspace(0, 100, len(normalized_latencies)) / 100 - avg_normalized_latency = sum(normalized_latencies) / len(normalized_latencies) - - # Add traces to each figure - fig_e2e.add_trace( - go.Scatter( - x=e2e_latencies, - y=e2e_percentiles, - mode="lines", - name=f"{scenario} (avg: {avg_e2e_latency:.3f}s)", - line=dict(color=colors.get(scenario, "black"), width=2), - ) - ) + if batch_e2e_times: + batch_e2e_times.sort() + percentiles = np.linspace(0, 100, len(batch_e2e_times)) / 100 + avg_batch_e2e = sum(batch_e2e_times) / len(batch_e2e_times) + display_name = label_map.get(scenario, scenario) - if task_latencies[0] > 0: - fig_task.add_trace( + fig_batch_e2e.add_trace( go.Scatter( - x=task_latencies, - y=task_percentiles, + x=batch_e2e_times, + y=percentiles, mode="lines", - name=f"{scenario} (avg: {avg_task_latency:.3f}s)", + name=f"{display_name} (avg: {avg_batch_e2e:.3f}s)", line=dict(color=colors.get(scenario, "black"), width=2), ) ) - fig_normalized.add_trace( - go.Scatter( - x=normalized_latencies, - y=normalized_percentiles, - mode="lines", - name=f"{scenario} (avg: {avg_normalized_latency:.3f}s)", - line=dict(color=colors.get(scenario, "black"), width=2), + # 3. Full batch completion times (submission to last task download completed) + batch_completion_times = [ + r.get("batch_completion_time", 0) + for r in scenario_results + if "batch_completion_time" in r + ] + + if batch_completion_times: + batch_completion_times.sort() + percentiles = np.linspace(0, 100, len(batch_completion_times)) / 100 + avg_completion = sum(batch_completion_times) / len(batch_completion_times) + display_name = label_map.get(scenario, scenario) + + fig_batch_completion.add_trace( + go.Scatter( + x=batch_completion_times, + y=percentiles, + mode="lines", + name=f"{display_name} (avg: {avg_completion:.3f}s)", + line=dict(color=colors.get(scenario, "black"), width=2), + ) ) - ) - # Configure layouts - make legends more prominent - def _configure_layout(fig, title, graph_type="e2e"): - # Set x-axis range based on graph type - if graph_type == "e2e": - x_range = [0, 3] - x_dtick = 0.25 - ref_line_x1 = 3 - else: - x_range = [0, 0.75] # Shorter x-axis for task and normalized graphs - x_dtick = 0.05 - ref_line_x1 = 0.75 - - fig.update_layout( - title=dict(text=title, font=dict(size=20)), - xaxis=dict(title="Latency (seconds)", range=x_range, dtick=x_dtick), - yaxis=dict(title="Percentile (CDF)", range=[0, 1]), - legend=dict( - yanchor="top", - y=0.99, - xanchor="right", - x=0.99, - font=dict(size=14), - bordercolor="Black", - borderwidth=1, - ), - grid=dict(rows=1, columns=1), - height=600, - width=900, - template="plotly_white", - margin=dict(l=80, r=80, t=100, b=80), - ) - # Add horizontal grid lines + # Configure layout for all figures + x_label = "Latency (seconds)" + y_label = "Percentile (CDF)" + + # Configure layout for per-task E2E figure + fig_per_task_e2e.update_layout( + title=dict( + text="Per-Task End-to-End Latency Distribution (submission to download)", + font=dict(size=20), + ), + xaxis=dict( + title=dict(text=x_label, font=dict(size=16)), + tickfont=dict(size=14), + range=[0, 0.35], # Set x-axis range to 0-0.35 seconds + dtick=0.05, # Set tick spacing to 0.05 seconds + ), + yaxis=dict( + title=dict(text=y_label, font=dict(size=16)), + range=[0, 1], + tickfont=dict(size=14), + ), + legend=dict(yanchor="top", y=0.99, xanchor="right", x=0.99, font=dict(size=14)), + height=600, + width=900, + template="plotly_white", + margin=dict(l=80, r=80, t=100, b=80), + ) + + # Configure layout for batch E2E figure + fig_batch_e2e.update_layout( + title=dict( + text="Batch End-to-End Latency Distribution (submission to all results collected)", + font=dict(size=20), + ), + xaxis=dict( + title=dict(text=x_label, font=dict(size=16)), + tickfont=dict(size=14), + range=[0, 0.35], # Set x-axis range to 0-0.35 seconds + dtick=0.05, # Set tick spacing to 0.05 seconds + ), + yaxis=dict( + title=dict(text=y_label, font=dict(size=16)), + range=[0, 1], + tickfont=dict(size=14), + ), + legend=dict(yanchor="top", y=0.99, xanchor="right", x=0.99, font=dict(size=14)), + height=600, + width=900, + template="plotly_white", + margin=dict(l=80, r=80, t=100, b=80), + ) + + # Configure layout for batch completion figure + fig_batch_completion.update_layout( + title=dict( + text="Full Batch Completion Time Distribution (submission to last task download)", + font=dict(size=20), + ), + xaxis=dict( + title=dict(text=x_label, font=dict(size=16)), + tickfont=dict(size=14), + range=[0, 0.35], # Set x-axis range to 0-0.35 seconds + dtick=0.05, # Set tick spacing to 0.05 seconds + ), + yaxis=dict( + title=dict(text=y_label, font=dict(size=16)), + range=[0, 1], + tickfont=dict(size=14), + ), + legend=dict(yanchor="top", y=0.99, xanchor="right", x=0.99, font=dict(size=14)), + height=600, + width=900, + template="plotly_white", + margin=dict(l=80, r=80, t=100, b=80), + ) + + # Add grid lines and median line to all figures + for fig in [fig_per_task_e2e, fig_batch_e2e, fig_batch_completion]: + fig.update_xaxes(gridcolor="lightgray", gridwidth=0.5) fig.update_yaxes(gridcolor="lightgray", gridwidth=0.5) - # Add a reference line at the median fig.add_shape( type="line", x0=0, - x1=ref_line_x1, + x1=0.35, # Match x-axis range y0=0.5, y1=0.5, line=dict(color="black", width=1, dash="dash"), ) - _configure_layout(fig_e2e, "End-to-End Batch Latency Distribution", "e2e") - _configure_layout(fig_task, "Individual Task Latency Distribution", "task") - _configure_layout( - fig_normalized, - "Normalized Per-Task Latency Distribution (Total Time/Batch Size)", - "normalized", - ) - - # Save all figures (only once) - pio.write_html(fig_e2e, filename_e2e) - pio.write_html(fig_task, filename_task) - pio.write_html(fig_normalized, filename_normalized) + # Save the figures + pio.write_html(fig_per_task_e2e, filename_per_task_e2e) + pio.write_html(fig_batch_e2e, filename_batch_e2e) + pio.write_html(fig_batch_completion, filename_batch_completion) - logger.info("Interactive latency percentile graphs saved to:") - logger.info(" - End-to-end batch latency: %s", filename_e2e) - logger.info(" - Individual task latency: %s", filename_task) - logger.info(" - Normalized per-task latency: %s", filename_normalized) + logger.info( + "Interactive per-task E2E latency graph saved to: %s", filename_per_task_e2e + ) + logger.info("Interactive batch E2E latency graph saved to: %s", filename_batch_e2e) + logger.info( + "Interactive batch completion time graph saved to: %s", + filename_batch_completion, + ) def generate_matplotlib_latency_graph(results: List[Dict[str, Any]]) -> None: """Generate latency percentile graphs using matplotlib.""" timestamp = time.strftime("%Y%m%d_%H%M%S") - - # Create filenames for different metrics - filenames = { - "e2e": f"mpl_latency_e2e_{timestamp}.png", - "task": f"mpl_latency_task_{timestamp}.png", - "normalized": f"mpl_latency_normalized_{timestamp}.png", - } - - titles = { - "e2e": "End-to-End Batch Latency Distribution", - "task": "Individual Task Latency Distribution", - "normalized": "Normalized Per-Task Latency Distribution", - } - - # Set x-axis limits based on graph type - x_limits = { - "e2e": (0, 3), - "task": (0, 0.75), - "normalized": (0, 0.75), - } - - # Set tick spacing based on graph type - x_ticks = { - "e2e": np.arange(0, 3, 0.25), - "task": np.arange(0, 0.75, 0.05), - "normalized": np.arange(0, 0.75, 0.05), - } + filename_per_task_e2e = f"mpl_per_task_e2e_{timestamp}.png" + filename_batch_e2e = f"mpl_batch_e2e_{timestamp}.png" + filename_batch_completion = f"mpl_batch_completion_{timestamp}.png" # New file colors = { "1x1000": "blue", @@ -193,230 +220,143 @@ def generate_matplotlib_latency_graph(results: List[Dict[str, Any]]) -> None: "100x300": "green", } - # Create separate plots for each metric - for metric_type in ["e2e", "task", "normalized"]: - plt.figure(figsize=(10, 6)) - - for scenario in ["1x1000", "10x700", "100x300"]: - scenario_results = [r for r in results if r["scenario"] == scenario] - if not scenario_results: - logger.warning("No results found for scenario %s", scenario) - continue - - # Get appropriate latency metric based on type - if metric_type == "e2e": - latencies = sorted([r["total_time"] for r in scenario_results]) - elif metric_type == "task": - latencies = sorted( - [ - r.get("avg_task_time", 0) - for r in scenario_results - if "avg_task_time" in r - ] - ) - else: # normalized - latencies = sorted( - [r["total_time"] / r["batch_size"] for r in scenario_results] - ) - - if latencies: - percentiles = np.linspace(0, 100, len(latencies)) / 100 - avg_latency = sum(latencies) / len(latencies) - - plt.plot( - latencies, - percentiles, - "-", - color=colors.get(scenario, "black"), - linewidth=2, - label=f"{scenario} (avg: {avg_latency:.3f}s)", - ) - - # Set plot styling with optimized limits - plt.title(titles[metric_type], fontsize=16, fontweight="bold") - plt.xlabel("Latency (seconds)", fontsize=14) - plt.ylabel("Percentile (CDF)", fontsize=14) - plt.xlim(*x_limits[metric_type]) - plt.ylim(0, 1) - plt.xticks(x_ticks[metric_type]) - plt.grid(True, linestyle="--", alpha=0.7) - - # Add reference line at median - plt.axhline(y=0.5, color="black", linestyle="--", alpha=0.7) - - # Add legend with improved styling - plt.legend(loc="lower right", frameon=True, framealpha=1, fontsize=12) - plt.tight_layout() - - # Save figure - plt.savefig(filenames[metric_type], dpi=300) - plt.close() - - logger.info( - "Matplotlib %s graph saved to %s", - titles[metric_type], - filenames[metric_type], - ) - - -def generate_throughput_graph(results: List[Dict[str, Any]]) -> None: - """Generate an interactive graph showing the throughput (tasks per second).""" - timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"throughput_{timestamp}.html" - - fig = go.Figure() - - scenarios = ["1x1000", "10x700", "100x300"] - colors = { - "1x1000": "blue", - "10x700": "red", - "100x300": "green", + # Map for improved labels + label_map = { + "1x1000": "1-task batch", + "10x700": "10-task batch", + "100x300": "100-task batch", } - # Calculate all throughputs first to determine appropriate x-axis range - all_throughputs = [] - for scenario in scenarios: - scenario_results = [r for r in results if r["scenario"] == scenario] - if scenario_results: - throughputs = [ - r["batch_size"] / r["total_time"] - for r in scenario_results - if r["total_time"] > 0 - ] - all_throughputs.extend(throughputs) - - # Dynamically set bin range based on actual data - if all_throughputs: - max_throughput = max(all_throughputs) - # Round up to next integer and add a small buffer - max_bin = min(100, int(max_throughput * 1.1) + 1) - else: - max_bin = 10 - - # Create bins with appropriate range - bin_edges = np.linspace(0, max_bin, min(max_bin * 10 + 1, 101)) - bin_width = bin_edges[1] - bin_edges[0] - - for scenario in scenarios: + # Generate per-task E2E CDF + plt.figure(figsize=(10, 6)) + for scenario in ["1x1000", "10x700", "100x300"]: scenario_results = [r for r in results if r["scenario"] == scenario] if not scenario_results: continue - throughputs = [ - r["batch_size"] / r["total_time"] - for r in scenario_results - if r["total_time"] > 0 - ] - - avg_throughput = sum(throughputs) / len(throughputs) - - hist, _ = np.histogram(throughputs, bins=bin_edges) - bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) - - fig.add_trace( - go.Bar( - x=bin_centers, - y=hist, - name=f"{scenario} (avg: {avg_throughput:.2f} tasks/s)", - marker_color=colors.get(scenario, "gray"), - opacity=0.7, - width=bin_width * 0.9, + # Collect all per-task E2E latencies + latencies = [] + for result in scenario_results: + if "per_task_stats" in result: + for task_stat in result["per_task_stats"]: + if "e2e_latency" in task_stat: + latencies.append(task_stat["e2e_latency"]) + + if latencies: + latencies.sort() + percentiles = np.linspace(0, 100, len(latencies)) / 100 + avg_latency = sum(latencies) / len(latencies) + display_name = label_map.get(scenario, scenario) + + plt.plot( + latencies, + percentiles, + "-", + color=colors.get(scenario, "black"), + linewidth=2, + label=f"{display_name} (avg: {avg_latency:.3f}s)", ) - ) - # Configure axes with more prominence - fig.update_layout( - title={"text": "Task Processing Throughput Distribution", "font": {"size": 24}}, - xaxis=dict( - title={"text": "Throughput (tasks per second)", "font": {"size": 18}}, - tickfont={"size": 14}, - ), - yaxis=dict( - title={"text": "Frequency", "font": {"size": 18}}, tickfont={"size": 14} - ), - barmode="overlay", - legend=dict(yanchor="top", y=0.99, xanchor="right", x=0.01, font={"size": 14}), - height=600, - width=900, - ) - - # Enable better interactivity features - fig.update_layout( - hovermode="x unified", hoverlabel=dict(bgcolor="white", font_size=14) + # Configure per-task E2E plot + plt.title( + "Per-Task End-to-End Latency Distribution", fontsize=16, fontweight="bold" ) + plt.xlabel("Latency (seconds)", fontsize=14) + plt.ylabel("Percentile (CDF)", fontsize=14) + plt.xlim(0, 0.35) # Set x-axis range to 0-0.35 seconds + plt.ylim(0, 1) + plt.xticks(np.arange(0, 0.36, 0.05)) # Set tick spacing to 0.05 seconds + plt.grid(True, linestyle="--", alpha=0.7) + plt.axhline(y=0.5, color="black", linestyle="--", alpha=0.7) + plt.legend(loc="lower right", frameon=True, framealpha=1, fontsize=12) + plt.tight_layout() + plt.savefig(filename_per_task_e2e, dpi=300) + plt.close() - pio.write_html(fig, filename) - logger.info("Interactive throughput graph saved to %s", filename) + # Generate batch E2E CDF + plt.figure(figsize=(10, 6)) + for scenario in ["1x1000", "10x700", "100x300"]: + scenario_results = [r for r in results if r["scenario"] == scenario] + if not scenario_results: + continue + # Collect batch E2E latencies + batch_e2e_times = [r.get("end_to_end_latency", 0) for r in scenario_results] + + if batch_e2e_times: + batch_e2e_times.sort() + percentiles = np.linspace(0, 100, len(batch_e2e_times)) / 100 + avg_latency = sum(batch_e2e_times) / len(batch_e2e_times) + display_name = label_map.get(scenario, scenario) + + plt.plot( + batch_e2e_times, + percentiles, + "-", + color=colors.get(scenario, "black"), + linewidth=2, + label=f"{display_name} (avg: {avg_latency:.3f}s)", + ) -def generate_matplotlib_throughput_graph(results: List[Dict[str, Any]]) -> None: - """Generate a throughput distribution graph using matplotlib.""" - timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"mpl_throughput_{timestamp}.png" + # Configure batch E2E plot + plt.title("Batch End-to-End Latency Distribution", fontsize=16, fontweight="bold") + plt.xlabel("Latency (seconds)", fontsize=14) + plt.ylabel("Percentile (CDF)", fontsize=14) + plt.xlim(0, 0.35) # Set x-axis range to 0-0.35 seconds + plt.ylim(0, 1) + plt.xticks(np.arange(0, 0.36, 0.05)) # Set tick spacing to 0.05 seconds + plt.grid(True, linestyle="--", alpha=0.7) + plt.axhline(y=0.5, color="black", linestyle="--", alpha=0.7) + plt.legend(loc="lower right", frameon=True, framealpha=1, fontsize=12) + plt.tight_layout() + plt.savefig(filename_batch_e2e, dpi=300) + plt.close() + # Generate batch completion CDF plt.figure(figsize=(10, 6)) - - scenarios = ["1x1000", "10x700", "100x300"] - colors = { - "1x1000": "blue", - "10x700": "red", - "100x300": "green", - } - - # Calculate all throughputs first to determine appropriate x-axis range - all_throughputs = [] - for scenario in scenarios: - scenario_results = [r for r in results if r["scenario"] == scenario] - if scenario_results: - throughputs = [ - r["batch_size"] / r["total_time"] - for r in scenario_results - if r["total_time"] > 0 - ] - all_throughputs.extend(throughputs) - - # Dynamically set bin range based on actual data - if all_throughputs: - max_throughput = max(all_throughputs) - # Round up to next integer and add a small buffer - max_bin = min(100, int(max_throughput * 1.1) + 1) - else: - max_bin = 10 - - # Create bins with appropriate range - bin_edges = np.linspace(0, max_bin, min(max_bin * 10 + 1, 101)) - - for i, scenario in enumerate(scenarios): + for scenario in ["1x1000", "10x700", "100x300"]: scenario_results = [r for r in results if r["scenario"] == scenario] if not scenario_results: continue - throughputs = [ - r["batch_size"] / r["total_time"] + # Collect batch completion times + completion_times = [ + r.get("batch_completion_time", 0) for r in scenario_results - if r["total_time"] > 0 + if "batch_completion_time" in r ] - avg_throughput = sum(throughputs) / len(throughputs) - - hist, _ = np.histogram(throughputs, bins=bin_edges) - bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) - - plt.bar( - bin_centers, - hist, - width=bin_edges[1] - bin_edges[0], - color=colors.get(scenario, f"C{i}"), - alpha=0.7, - label=f"{scenario} (avg: {avg_throughput:.2f} tasks/s)", - ) + if completion_times: + completion_times.sort() + percentiles = np.linspace(0, 100, len(completion_times)) / 100 + avg_completion = sum(completion_times) / len(completion_times) + display_name = label_map.get(scenario, scenario) + + plt.plot( + completion_times, + percentiles, + "-", + color=colors.get(scenario, "black"), + linewidth=2, + label=f"{display_name} (avg: {avg_completion:.3f}s)", + ) - plt.title("Task Processing Throughput Distribution", fontsize=14) - plt.xlabel("Throughput (tasks per second)") - plt.ylabel("Frequency") - plt.legend(loc="upper right") - plt.grid(axis="y", linestyle="--", alpha=0.7) + # Configure batch completion plot + plt.title("Full Batch Completion Time Distribution", fontsize=16, fontweight="bold") + plt.xlabel("Latency (seconds)", fontsize=14) + plt.ylabel("Percentile (CDF)", fontsize=14) + plt.xlim(0, 0.35) # Set x-axis range to 0-0.35 seconds + plt.ylim(0, 1) + plt.xticks(np.arange(0, 0.36, 0.05)) # Set tick spacing to 0.05 seconds + plt.grid(True, linestyle="--", alpha=0.7) + plt.axhline(y=0.5, color="black", linestyle="--", alpha=0.7) + plt.legend(loc="lower right", frameon=True, framealpha=1, fontsize=12) plt.tight_layout() - plt.savefig(filename, dpi=300) + plt.savefig(filename_batch_completion, dpi=300) plt.close() - logger.info("Matplotlib throughput graph saved to %s", filename) + + logger.info("Matplotlib per-task E2E graph saved to %s", filename_per_task_e2e) + logger.info("Matplotlib batch E2E graph saved to %s", filename_batch_e2e) + logger.info( + "Matplotlib batch completion graph saved to %s", filename_batch_completion + )