diff --git a/.yamllint b/.yamllint index 5381e7989c..edd827bea1 100644 --- a/.yamllint +++ b/.yamllint @@ -3,7 +3,6 @@ # Default configuration: https://yamllint.readthedocs.io/en/stable/configuration.html#default-configuration extends: default - rules: document-start: disable document-end: disable @@ -20,3 +19,4 @@ rules: ignore: | modules/python/clusterloader2/**/*.yaml modules/python/clusterloader2/**/*.yml + scenarios/perf-eval/job-scheduling/**/*.yaml diff --git a/modules/python/clients/kubernetes_client.py b/modules/python/clients/kubernetes_client.py index 8f59ec2f83..760af295f5 100644 --- a/modules/python/clients/kubernetes_client.py +++ b/modules/python/clients/kubernetes_client.py @@ -8,16 +8,17 @@ # https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/#taint-based-evictions # https://kubernetes.io/docs/reference/labels-annotations-taints/ builtin_taints_keys = [ - "node.kubernetes.io/not-ready", - "node.kubernetes.io/unreachable", - "node.kubernetes.io/pid-pressure", - "node.kubernetes.io/out-of-disk", - "node.kubernetes.io/memory-pressure", - "node.kubernetes.io/disk-pressure", - "node.kubernetes.io/network-unavailable", - "node.kubernetes.io/unschedulable", - "node.cloudprovider.kubernetes.io/uninitialized", - "node.cloudprovider.kubernetes.io/shutdown", + "node.kubernetes.io/not-ready", + "node.kubernetes.io/unreachable", + "node.kubernetes.io/pid-pressure", + "node.kubernetes.io/out-of-disk", + "node.kubernetes.io/memory-pressure", + "node.kubernetes.io/disk-pressure", + "node.kubernetes.io/network-unavailable", + "node.kubernetes.io/unschedulable", + "node.cloudprovider.kubernetes.io/uninitialized", + "node.cloudprovider.kubernetes.io/shutdown", + "kwok.x-k8s.io/kwok", ] # Configure logging diff --git a/modules/python/clusterloader2/default/cli.py b/modules/python/clusterloader2/default/cli.py new file mode 100644 index 0000000000..2093cb4c11 --- /dev/null +++ b/modules/python/clusterloader2/default/cli.py @@ -0,0 +1,394 @@ +import argparse +import json +import os +import time +from datetime import datetime, timezone +from typing import Tuple + +from clients.kubernetes_client import KubernetesClient +from clusterloader2.utils import (get_measurement, parse_xml_to_json, + run_cl2_command, str2bool) + +DEFAULT_PODS_PER_NODE = 40 + +DEFAULT_NODES_PER_NAMESPACE = 100 +CPU_REQUEST_LIMIT_MILLI = 1 +DAEMONSETS_PER_NODE = {"aws": 2, "azure": 6, "aks": 6} +CPU_CAPACITY = {"aws": 0.94, "azure": 0.87, "aks": 0.87} +# TODO: Remove aks once CL2 update provider name to be azure + + +def calculate_config( + cpu_per_node: int, + node_count: int, + max_pods: int, + provider: str, + service_test: bool, + cnp_test: bool, + ccnp_test: bool, +) -> Tuple[int, int, int, int]: + throughput = 100 + nodes_per_namespace = min(node_count, DEFAULT_NODES_PER_NAMESPACE) + + pods_per_node = DEFAULT_PODS_PER_NODE + if service_test: + pods_per_node = max_pods + + if cnp_test or ccnp_test: + pods_per_node = max_pods + # Different cloud has different reserved values and number of daemonsets + # Using the same percentage will lead to incorrect nodes number as the number of nodes grow + # For AWS, see: https://github.com/awslabs/amazon-eks-ami/blob/main/templates/al2/runtime/bootstrap.sh#L290 + # For Azure, see: https://learn.microsoft.com/en-us/azure/aks/node-resource-reservations#cpu-reservations + capacity = CPU_CAPACITY[provider] + cpu_request = (cpu_per_node * 1000 * capacity) // pods_per_node + cpu_request = max(cpu_request, CPU_REQUEST_LIMIT_MILLI) + + return throughput, nodes_per_namespace, pods_per_node, cpu_request + + +def configure_clusterloader2( + cpu_per_node: int, + node_count: int, + node_per_step: int, + max_pods: int, + repeats: int, + operation_timeout: str, + provider: str, + cilium_enabled: bool, + scrape_containerd: bool, + service_test: bool, + cnp_test: bool, + ccnp_test: bool, + num_cnps: int, + num_ccnps: int, + dualstack: bool, + cl2_override_file: str, + workload_type: str, + job_count: int, + job_parallelism: int, + job_completions: int, + job_throughput: int, +) -> None: + + # Calculate steps + steps = node_count // node_per_step + + # Initialize throughput and workload-specific configurations + if workload_type == "job": + throughput = job_count // repeats if job_throughput == -1 else job_throughput + elif workload_type == "pod": + throughput, nodes_per_namespace, pods_per_node, cpu_request = calculate_config( + cpu_per_node, + node_count, + max_pods, + provider, + service_test, + cnp_test, + ccnp_test, + ) + else: + raise ValueError("Invalid workload_type. Must be 'pod' or 'job'.") + + # Write configurations to the override file + with open(cl2_override_file, "w", encoding="utf-8") as file: + file.write(f"CL2_NODES: {node_count}\n") + file.write(f"CL2_NODES_PER_STEP: {node_per_step}\n") + file.write(f"CL2_OPERATION_TIMEOUT: {operation_timeout}\n") + file.write(f"CL2_REPEATS: {repeats}\n") + file.write(f"CL2_STEPS: {steps}\n") + + if workload_type == "job": + # Job-specific configurations + file.write(f"CL2_JOBS: {job_count}\n") + file.write(f"CL2_JOB_PARALLELISM: {job_parallelism}\n") + file.write(f"CL2_JOB_COMPLETIONS: {job_completions}\n") + file.write(f"CL2_LOAD_TEST_THROUGHPUT: {throughput}\n") + elif workload_type == "pod": + # Pod-specific configurations + file.write(f"CL2_LOAD_TEST_THROUGHPUT: {throughput}\n") + file.write(f"CL2_NODES_PER_NAMESPACE: {nodes_per_namespace}\n") + file.write(f"CL2_PODS_PER_NODE: {pods_per_node}\n") + file.write(f"CL2_DEPLOYMENT_SIZE: {pods_per_node}\n") + file.write(f"CL2_LATENCY_POD_CPU: {cpu_request}\n") + + if scrape_containerd: + file.write(f"CL2_SCRAPE_CONTAINERD: {str(scrape_containerd).lower()}\n") + file.write("CONTAINERD_SCRAPE_INTERVAL: 5m\n") + + if cilium_enabled: + file.write("CL2_CILIUM_METRICS_ENABLED: true\n") + file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_OPERATOR: true\n") + file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_AGENT: true\n") + file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_AGENT_INTERVAL: 30s\n") + + if service_test: + file.write("CL2_SERVICE_TEST: true\n") + else: + file.write("CL2_SERVICE_TEST: false\n") + + if cnp_test: + file.write("CL2_CNP_TEST: true\n") + file.write(f"CL2_CNPS_PER_NAMESPACE: {num_cnps}\n") + file.write(f"CL2_DUALSTACK: {dualstack}\n") + file.write("CL2_GROUP_NAME: cnp-ccnp\n") + + if ccnp_test: + file.write("CL2_CCNP_TEST: true\n") + file.write(f"CL2_CCNPS: {num_ccnps}\n") + file.write(f"CL2_DUALSTACK: {dualstack}\n") + file.write("CL2_GROUP_NAME: cnp-ccnp\n") + + # Print the generated configuration for debugging + with open(cl2_override_file, "r", encoding="utf-8") as file: + print(f"Content of file {cl2_override_file}:\n{file.read()}") + + +def validate_clusterloader2(node_count: int, operation_timeout_in_minutes: int = 10) -> None: + kube_client = KubernetesClient() + ready_node_count = 0 + timeout = time.time() + (operation_timeout_in_minutes * 60) + while time.time() < timeout: + ready_nodes = kube_client.get_ready_nodes() + ready_node_count = len(ready_nodes) + print(f"Currently {ready_node_count} nodes are ready.") + if ready_node_count >= node_count: + print(f"All {node_count} nodes are ready.") + break + print(f"Waiting for {node_count} nodes to be ready.") + time.sleep(10) + if ready_node_count < node_count: + raise Exception(f"Only {ready_node_count} nodes are ready, expected {node_count} nodes!") + + +def execute_clusterloader2( + cl2_image: str, + cl2_config_dir: str, + cl2_report_dir: str, + cl2_config_file: str, + kubeconfig: str, + prometheus_enabled: bool, + provider: str, + scrape_containerd: bool, +) -> None: + run_cl2_command( + kubeconfig, + cl2_image, + cl2_config_dir, + cl2_report_dir, + provider, + cl2_config_file=cl2_config_file, + overrides=True, + enable_prometheus=prometheus_enabled, + scrape_containerd=scrape_containerd, + ) + + +def process_pod_workload(template, cpu_per_node, node_count, max_pods, provider, service_test, cnp_test, ccnp_test): + _, _, pods_per_node, _ = calculate_config(cpu_per_node, node_count, max_pods, provider, service_test, cnp_test, ccnp_test) + pod_count = node_count * pods_per_node + template["pod_count"] = pod_count + return template + + +def process_job_workload(template, job_count, job_parallelism, job_completions, job_throughput): + if job_count is None or job_parallelism is None or job_completions is None or job_throughput is None: + raise ValueError("For job workloads, job_count, job_parallelism, job_completions, and job_throughput must be provided.") + template.update( + { + "job_count": job_count, + "job_parallelism": job_parallelism, + "job_completions": job_completions, + "job_throughput": job_throughput, + } + ) + return template + + +def process_cl2_reports(cl2_report_dir, template): + content = "" + for f in os.listdir(cl2_report_dir): + file_path = os.path.join(cl2_report_dir, f) + with open(file_path, "r", encoding="utf-8") as file: + print(f"Processing {file_path}") + measurement, group_name = get_measurement(file_path) + if not measurement: + continue + print(measurement, group_name) + data = json.loads(file.read()) + + if "dataItems" in data: + items = data["dataItems"] + if not items: + print(f"No data items found in {file_path}") + print(f"Data:\n{data}") + continue + for item in items: + result = template.copy() + result["group"] = group_name + result["measurement"] = measurement + result["result"] = item + content += json.dumps(result) + "\n" + else: + result = template.copy() + result["group"] = group_name + result["measurement"] = measurement + result["result"] = data + content += json.dumps(result) + "\n" + return content + + +def collect_clusterloader2( + cpu_per_node: int, + node_count: int, + max_pods: int, + repeats: int, + cl2_report_dir: str, + cloud_info: str, + run_id: str, + run_url: str, + service_test: bool, + cnp_test: bool, + ccnp_test: bool, + result_file: str, + test_type: str, + start_timestamp: str, + workload_type: str, + job_count: int, + job_parallelism: int, + job_completions: int, + job_throughput: int, +) -> None: + + details = parse_xml_to_json(os.path.join(cl2_report_dir, "junit.xml"), indent=2) + json_data = json.loads(details) + testsuites = json_data["testsuites"] + provider = json.loads(cloud_info)["cloud"] + + if testsuites: + status = "success" if testsuites[0]["failures"] == 0 else "failure" + else: + raise Exception(f"No testsuites found in the report! Raw data: {details}") + + # Initialize the template + template = { + "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "node_count": node_count, + "churn_rate": repeats, + "status": status, + "group": None, + "measurement": None, + "result": None, + "cloud_info": cloud_info, + "run_id": run_id, + "run_url": run_url, + "test_type": test_type, + "start_timestamp": start_timestamp, + } + + # Conditionally include cpu_per_node if provided + if cpu_per_node is not None: + template["cpu_per_node"] = cpu_per_node + + # Process workload type + if workload_type == "pod": + template = process_pod_workload(template, cpu_per_node, node_count, max_pods, provider, service_test, cnp_test, ccnp_test) + elif workload_type == "job": + template = process_job_workload(template, job_count, job_parallelism, job_completions, job_throughput) + else: + raise ValueError("Invalid workload_type. Must be 'pod' or 'job'.") + + # Process CL2 report files + content = process_cl2_reports(cl2_report_dir, template) + + # Write results to the result file + os.makedirs(os.path.dirname(result_file), exist_ok=True) + with open(result_file, "w", encoding="utf-8") as file: + file.write(content) + + +def main(): + parser = argparse.ArgumentParser(description="CLI Kubernetes resources.") + subparsers = parser.add_subparsers(dest="command") + + # Sub-command for configure_clusterloader2 + parser_configure = subparsers.add_parser("configure", help="Override CL2 config file") + parser_configure.add_argument("--cpu_per_node", type=int, help="CPU per node") + parser_configure.add_argument("--node_count", type=int, help="Number of nodes") + parser_configure.add_argument("--node_per_step", type=int, default=1, help="Number of nodes per scaling step") + parser_configure.add_argument("--max_pods", type=int, nargs="?", default=0, help="Maximum number of pods per node") + parser_configure.add_argument("--repeats", type=int, default=1, help="Number of times to repeat the deployment churn") + parser_configure.add_argument("--operation_timeout", type=str, help="Timeout before failing the scale up test") + parser_configure.add_argument("--provider", type=str, help="Cloud provider name") + parser_configure.add_argument("--cilium_enabled", type=str2bool, choices=[True, False], default=False, help="Whether cilium is enabled. Must be either True or False") + parser_configure.add_argument("--scrape_containerd", type=str2bool, choices=[True, False], default=False, help="Whether to scrape containerd metrics. Must be either True or False") + parser_configure.add_argument("--service_test", type=str2bool, choices=[True, False], default=False, help="Whether service test is running. Must be either True or False") + parser_configure.add_argument("--cnp_test", type=str2bool, choices=[True, False], nargs="?", default=False, help="Whether cnp test is running. Must be either True or False") + parser_configure.add_argument("--ccnp_test", type=str2bool, choices=[True, False], nargs="?", default=False, help="Whether ccnp test is running. Must be either True or False") + parser_configure.add_argument("--num_cnps", type=int, nargs="?", default=0, help="Number of cnps") + parser_configure.add_argument("--num_ccnps", type=int, nargs="?", default=0, help="Number of ccnps") + parser_configure.add_argument("--dualstack", type=str2bool, choices=[True, False], nargs="?", default=False, help="Whether cluster is dualstack. Must be either True or False") + parser_configure.add_argument("--cl2_override_file", type=str, help="Path to the overrides of CL2 config file") + parser_configure.add_argument("--workload_type", type=str, choices=["pod", "job"], default="pod", help="Type of workload to run") + parser_configure.add_argument("--job_count", type=int, default=1000, help="Number of jobs to run") + parser_configure.add_argument("--job_parallelism", type=int, default=1, help="Number of jobs to run in parallel") + parser_configure.add_argument("--job_completions", type=int, default=1, help="Number of job completions") + parser_configure.add_argument("--job_throughput", type=int, default=-1, help="Job throughput") + + # Sub-command for validate_clusterloader2 + parser_validate = subparsers.add_parser("validate", help="Validate cluster setup") + parser_validate.add_argument("--node_count", type=int, help="Number of desired nodes") + parser_validate.add_argument("--operation_timeout_in_minutes", type=int, default=600, help="Operation timeout to wait for nodes to be ready") + + # Sub-command for execute_clusterloader2 + parser_execute = subparsers.add_parser("execute", help="Execute scale up operation") + parser_execute.add_argument("--cl2_image", type=str, help="Name of the CL2 image") + parser_execute.add_argument("--cl2_config_dir", type=str, help="Path to the CL2 config directory") + parser_execute.add_argument("--cl2_report_dir", type=str, help="Path to the CL2 report directory") + parser_execute.add_argument("--cl2_config_file", type=str, help="Path to the CL2 config file") + parser_execute.add_argument("--kubeconfig", type=str, help="Path to the kubeconfig file") + parser_execute.add_argument("--provider", type=str, help="Cloud provider name") + parser_execute.add_argument("--prometheus_enabled", type=str2bool, choices=[True, False], default=False, help="Whether to enable Prometheus scraping. Must be either True or False") + parser_execute.add_argument("--scrape_containerd", type=str2bool, choices=[True, False], default=False, help="Whether to scrape containerd metrics. Must be either True or False") + + # Sub-command for collect_clusterloader2 + parser_collect = subparsers.add_parser("collect", help="Collect scale-up data") + parser_collect.add_argument("--cpu_per_node", type=int, help="CPU per node") + parser_collect.add_argument("--node_count", type=int, help="Number of nodes") + parser_collect.add_argument("--max_pods", type=int, nargs="?", default=0, help="Maximum number of pods per node") + parser_collect.add_argument("--repeats", type=int, default=1, help="Number of times to repeat the deployment churn") + parser_collect.add_argument("--cl2_report_dir", type=str, help="Path to the CL2 report directory") + parser_collect.add_argument("--cloud_info", type=str, help="Cloud information") + parser_collect.add_argument("--run_id", type=str, help="Run ID") + parser_collect.add_argument("--run_url", type=str, help="Run URL") + parser_collect.add_argument("--service_test", type=str2bool, choices=[True, False], default=False, help="Whether service test is running. Must be either True or False") + parser_collect.add_argument("--cnp_test", type=str2bool, choices=[True, False], nargs="?", default=False, help="Whether cnp test is running. Must be either True or False") + parser_collect.add_argument("--ccnp_test", type=str2bool, choices=[True, False], nargs="?", default=False, help="Whether ccnp test is running. Must be either True or False") + parser_collect.add_argument("--result_file", type=str, help="Path to the result file") + parser_collect.add_argument("--test_type", type=str, nargs="?", default="default-config", help="Description of test type") + parser_collect.add_argument("--start_timestamp", type=str, help="Test start timestamp") + parser_collect.add_argument("--workload_type", type=str, choices=["pod", "job"], default="pod", help="Type of workload to run") + parser_collect.add_argument("--job_count", type=int, default=1000, help="Number of jobs to run") + parser_collect.add_argument("--job_parallelism", type=int, default=1, help="Number of jobs to run in parallel") + parser_collect.add_argument("--job_completions", type=int, default=1, help="Number of job completions") + parser_collect.add_argument("--job_throughput", type=int, default=-1, help="Job throughput") + + args = parser.parse_args() + args_dict = vars(args) + + command = args_dict.pop("command") + + if command == "configure": + configure_clusterloader2(**args_dict) + elif command == "validate": + validate_clusterloader2(**args_dict) + elif command == "execute": + execute_clusterloader2(**args_dict) + elif command == "collect": + collect_clusterloader2(**args_dict) + else: + raise ValueError(f"Unknown command: {command}") + + +if __name__ == "__main__": + main() diff --git a/modules/python/clusterloader2/utils.py b/modules/python/clusterloader2/utils.py index 05b81aa553..b45313d0ff 100644 --- a/modules/python/clusterloader2/utils.py +++ b/modules/python/clusterloader2/utils.py @@ -13,6 +13,7 @@ NETWORK_METRIC_PREFIXES = ["APIResponsivenessPrometheus", "InClusterNetworkLatency", "NetworkProgrammingLatency"] PROM_QUERY_PREFIX = "GenericPrometheusQuery" RESOURCE_USAGE_SUMMARY_PREFIX = "ResourceUsageSummary" +JOB_LIFECYCLE_LATENCY_PREFIX = {"JobLifecycleLatency_JobLifecycleLatency_":"JobLifecycleLatency_JobLifecycleLatency"} NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX = "NetworkPolicySoakMeasurement" def run_cl2_command(kubeconfig, cl2_image, cl2_config_dir, cl2_report_dir, provider, cl2_config_file="config.yaml", overrides=False, enable_prometheus=False, tear_down_prometheus=True, @@ -70,6 +71,10 @@ def get_measurement(file_path): if file_name.startswith(RESOURCE_USAGE_SUMMARY_PREFIX): group_name = file_name.split("_")[1] return RESOURCE_USAGE_SUMMARY_PREFIX, group_name + for file_prefix, measurement in JOB_LIFECYCLE_LATENCY_PREFIX.items(): + if file_name.startswith(file_prefix): + group_name = file_name.split("_")[2] + return measurement,group_name if file_name.startswith(NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX): group_name = file_name.split("_")[1] return NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX, group_name diff --git a/modules/python/tests/mock_data/default/report/JobLifecycleLatency_JobLifecycleLatency_job-scheduling b/modules/python/tests/mock_data/default/report/JobLifecycleLatency_JobLifecycleLatency_job-scheduling new file mode 100644 index 0000000000..92342d108b --- /dev/null +++ b/modules/python/tests/mock_data/default/report/JobLifecycleLatency_JobLifecycleLatency_job-scheduling @@ -0,0 +1,11 @@ +{ + "data": { + "Perc50": 78000, + "Perc90": 141000, + "Perc99": 155000 + }, + "unit": "ms", + "labels": { + "Metric": "create_to_start" + } +} \ No newline at end of file diff --git a/modules/python/tests/mock_data/default/report/junit.xml b/modules/python/tests/mock_data/default/report/junit.xml new file mode 100644 index 0000000000..a7eb38b4db --- /dev/null +++ b/modules/python/tests/mock_data/default/report/junit.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/modules/python/tests/test_default_cli.py b/modules/python/tests/test_default_cli.py new file mode 100644 index 0000000000..9b6c7c21b4 --- /dev/null +++ b/modules/python/tests/test_default_cli.py @@ -0,0 +1,183 @@ +import json +import os +import tempfile +import unittest +from unittest.mock import patch + +from clusterloader2.default.cli import ( + collect_clusterloader2, + configure_clusterloader2, + validate_clusterloader2, +) + + +class TestConfigureClusterLoader2(unittest.TestCase): + def test_configure_clusterloader2(self): + # Create a temporary file for the override file + fd, tmp_path = tempfile.mkstemp() + + try: + # Call the function with test data + configure_clusterloader2( + cpu_per_node=2, + node_count=100, + node_per_step=10, + max_pods=40, + repeats=1, + operation_timeout="15m", + provider="azure", + cilium_enabled=False, + scrape_containerd=False, + service_test=True, + cnp_test=False, + ccnp_test=False, + num_cnps=0, + num_ccnps=0, + dualstack=False, + cl2_override_file=tmp_path, + workload_type="job", + job_count=1000, + job_parallelism=1, + job_completions=1, + job_throughput=1000, + ) + + # Verify the content of the override file + with open(tmp_path, "r", encoding="utf-8") as f: + content = f.read() + + # Assert each key-value pair + self.assertIn("CL2_NODES: 100", content) + self.assertIn("CL2_NODES_PER_STEP: 10", content) + self.assertIn("CL2_OPERATION_TIMEOUT: 15m", content) + self.assertIn("CL2_REPEATS: 1", content) + self.assertIn("CL2_STEPS: 10", content) + self.assertIn("CL2_JOBS: 1000", content) + self.assertIn("CL2_JOB_PARALLELISM: 1", content) + self.assertIn("CL2_JOB_COMPLETIONS: 1", content) + self.assertIn("CL2_LOAD_TEST_THROUGHPUT: 1000", content) + self.assertIn("CL2_SERVICE_TEST: true", content) + finally: + os.close(fd) + + +class TestValidateClusterLoader2(unittest.TestCase): + + @patch("clients.kubernetes_client.config.load_kube_config") + @patch("clients.kubernetes_client.KubernetesClient.get_ready_nodes") + def test_validate_clusterloader2_timeout( + self, mock_get_ready_nodes, mock_load_kube_config + ): + + # kubeconfig is not needed for this test but it has to be loaded to run KubernetesClient + mock_load_kube_config.return_value = None + # Mock the KubernetesClient and its get_ready_nodes method + mock_get_ready_nodes.return_value = ["node1"] # Only 1 node ready + + # Call the function and expect an exception due to timeout + with self.assertRaises(Exception) as context: + validate_clusterloader2(node_count=2, operation_timeout_in_minutes=1) + + # Verify the exception message + self.assertIn( + "Only 1 nodes are ready, expected 2 nodes!", str(context.exception) + ) + + @patch("clients.kubernetes_client.config.load_kube_config") + @patch("clients.kubernetes_client.KubernetesClient.get_ready_nodes") + def test_validate_clusterloader2_success( + self, mock_get_ready_nodes, mock_load_kube_config + ): + mock_load_kube_config.return_value = None + # Mock the KubernetesClient and its get_ready_nodes method + mock_get_ready_nodes.side_effect = [ + ["node1"], # First call: 1 node ready + ["node1", "node2"], # Second call: 2 nodes ready + ] + + # Call the function with test data + try: + validate_clusterloader2(node_count=2, operation_timeout_in_minutes=1) + except Exception as e: + self.fail(f"validate_clusterloader2 raised an exception unexpectedly: {e}") + + # Verify that get_ready_nodes was at least 2 calls + # The first call should return 1 node, and the second call should return 2 nodes + self.assertGreaterEqual(mock_get_ready_nodes.call_count, 2) + + +class TestCollectClusterLoader2(unittest.TestCase): + def test_collect_clusterloader2(self): + # Create a temporary directory for the report + cl2_report_dir = os.path.join( + os.path.dirname(__file__), "mock_data", "default", "report" + ) + # Create a temporary file for result output + fd, result_file = tempfile.mkstemp() + + try: + # Call the function with test data + collect_clusterloader2( + cpu_per_node=2, + node_count=100, + max_pods=40, + repeats=1, + cl2_report_dir=cl2_report_dir, + cloud_info=json.dumps({"cloud": "aws"}), + run_id="run123", + run_url="http://example.com/run123", + service_test=True, + cnp_test=False, + ccnp_test=False, + result_file=result_file, + test_type="unit-test", + start_timestamp=None, + workload_type="pod", + job_count=None, + job_parallelism=None, + job_completions=None, + job_throughput=None, + ) + + # Verify the content of the result file + if os.path.exists(result_file): + with open(result_file, "r", encoding="utf-8") as f: + content = f.read() + + # Parse the content as JSON + result_data = json.loads(content) + + # Assert each key-value pair + self.assertEqual(result_data["node_count"], 100) + self.assertEqual(result_data["churn_rate"], 1) + self.assertEqual(result_data["status"], "success") + self.assertEqual(result_data["group"], "job-scheduling") + self.assertEqual( + result_data["measurement"], + "JobLifecycleLatency_JobLifecycleLatency", + ) + + # Assert nested result data + self.assertEqual(result_data["result"]["data"]["Perc50"], 78000) + self.assertEqual(result_data["result"]["data"]["Perc90"], 141000) + self.assertEqual(result_data["result"]["data"]["Perc99"], 155000) + self.assertEqual(result_data["result"]["unit"], "ms") + self.assertEqual( + result_data["result"]["labels"]["Metric"], "create_to_start" + ) + + # Assert other fields + self.assertEqual(result_data["cloud_info"], '{"cloud": "aws"}') + self.assertEqual(result_data["run_id"], "run123") + self.assertEqual(result_data["run_url"], "http://example.com/run123") + self.assertEqual(result_data["test_type"], "unit-test") + self.assertEqual(result_data["cpu_per_node"], 2) + self.assertEqual(result_data["pod_count"], 4000) + else: + self.fail("Result file does not exist or is empty.") + finally: + os.close(fd) + + +if __name__ == "__main__": + unittest.main() diff --git a/pipelines/perf-eval/Controller/job-scheduling.yml b/pipelines/perf-eval/Controller/job-scheduling.yml new file mode 100644 index 0000000000..4e257bc634 --- /dev/null +++ b/pipelines/perf-eval/Controller/job-scheduling.yml @@ -0,0 +1,66 @@ +trigger: none +schedules: + - cron: "30 1 */2 * *" # Every 2 days at 1:30 AM + displayName: "1:30 AM every 2 days" + branches: + include: + - main + - vitto/kwok-cl2 # to be removed after the PR is merged + always: true +variables: + SCENARIO_TYPE: perf-eval + SCENARIO_NAME: job-scheduling + SCENARIO_VERSION: main +stages: + - stage: azure_eastus2 + dependsOn: [] + jobs: + - template: /jobs/competitive-test.yml + parameters: + cloud: azure + regions: + - eastus2 + engine: clusterloader2 + engine_input: + image: "ghcr.io/azure/clusterloader2:v20250423" + topology: kwok + matrix: + default: + node_count: 2000 # 2k kwok nodes + job_throughput: 800 # qps + job_count: 20000 + cilium_enabled: False + scale_timeout: "1h" + service_test: False + workload_type: "job" + cl2_config_file: config.yaml + max_parallel: 1 + timeout_in_minutes: 360 + credential_type: service_connection + ssh_key_enabled: false + - stage: aws_eastus2 + dependsOn: [] + jobs: + - template: /jobs/competitive-test.yml + parameters: + cloud: aws + regions: + - us-east-2 + engine: clusterloader2 + engine_input: + image: "ghcr.io/azure/clusterloader2:v20250423" + topology: kwok + matrix: + default: + node_count: 2000 + job_throughput: 800 + job_count: 20000 + cilium_enabled: False + scale_timeout: "1h" + service_test: False + workload_type: "job" + cl2_config_file: config.yaml + max_parallel: 1 + timeout_in_minutes: 360 + credential_type: service_connection + ssh_key_enabled: false diff --git a/scenarios/perf-eval/job-scheduling/config/config.yaml b/scenarios/perf-eval/job-scheduling/config/config.yaml new file mode 100644 index 0000000000..51515cf7a0 --- /dev/null +++ b/scenarios/perf-eval/job-scheduling/config/config.yaml @@ -0,0 +1,62 @@ +name: job-scheduling + +{{$job_count := DefaultParam .CL2_JOBS 20000}} +{{$qps := DefaultParam .CL2_LOAD_TEST_THROUGHPUT 800}} + +namespace: + number: 1 + prefix: job-scheduling + deleteStaleNamespaces: true + deleteAutomanagedNamespaces: true + enableExistingNamespaces: false + +tuningSets: +- name: Uniform{{$qps}}qps + qpsLoad: + qps: {{$qps}} + +steps: + - name: Start measurements + measurements: + - Identifier: JobLifecycleLatency + Method: JobLifecycleLatency + Params: + action: start + labelSelector: group=job-scheduling + timeout: 3h + - Identifier: WaitForFinishedJobs + Method: WaitForFinishedJobs + Params: + action: start + labelSelector: group=job-scheduling + timeout: 3h + +{{range $i := Loop $job_count}} + - name: Create job {{$i}} + phases: + - namespaceRange: + min: 1 + max: 1 + replicasPerNamespace: 1 + tuningSet: Uniform{{$qps}}qps + objectBundle: + - basename: test-job-{{$i}} + objectTemplatePath: job_template.yaml + templateFillMap: + Group: job-scheduling +{{end}} + + - name: Waiting for jobs to be finished + measurements: + - Identifier: WaitForFinishedJobs + Method: WaitForFinishedJobs + Params: + action: gather + timeout: 3h + - name: Collect measurements + measurements: + - Identifier: JobLifecycleLatency + Method: JobLifecycleLatency + Params: + action: gather + timeout: 3h diff --git a/scenarios/perf-eval/job-scheduling/config/job_template.yaml b/scenarios/perf-eval/job-scheduling/config/job_template.yaml new file mode 100644 index 0000000000..6cd286155d --- /dev/null +++ b/scenarios/perf-eval/job-scheduling/config/job_template.yaml @@ -0,0 +1,29 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: {{.Name}} + labels: + group: {{.Group}} +spec: + template: + spec: + restartPolicy: Never + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: type + operator: In + values: + - kwok + # A taint was added to an automatically created Node. + # You can remove taints of Node or add this tolerations. + tolerations: + - key: "kwok.x-k8s.io/node" + operator: "Exists" + effect: "NoSchedule" + containers: + - image: busybox + name: {{.Name}} + command: ["sh", "-c", "echo Job is running; sleep 5; echo Job done!"] diff --git a/scenarios/perf-eval/job-scheduling/terraform-inputs/aws.tfvars b/scenarios/perf-eval/job-scheduling/terraform-inputs/aws.tfvars new file mode 100644 index 0000000000..494df1d9de --- /dev/null +++ b/scenarios/perf-eval/job-scheduling/terraform-inputs/aws.tfvars @@ -0,0 +1,92 @@ +scenario_type = "perf-eval" +scenario_name = "job-scheduling" +deletion_delay = "3h" +owner = "aks" + +network_config_list = [ + { + role = "client" + vpc_name = "client-vpc" + vpc_cidr_block = "10.0.0.0/16" + subnet = [ + { + name = "client-subnet" + cidr_block = "10.0.0.0/24" + zone_suffix = "a" + map_public_ip_on_launch = true + }, + { + name = "client-subnet-2" + cidr_block = "10.0.1.0/24" + zone_suffix = "b" + map_public_ip_on_launch = true + } + ] + security_group_name = "client-sg" + route_tables = [ + { + name = "internet-rt" + cidr_block = "0.0.0.0/0" + } + ], + route_table_associations = [ + { + name = "client-subnet-rt-assoc" + subnet_name = "client-subnet" + route_table_name = "internet-rt" + }, + { + name = "client-subnet-rt-assoc-2" + subnet_name = "client-subnet-2" + route_table_name = "internet-rt" + } + ] + sg_rules = { + ingress = [] + egress = [ + { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_block = "0.0.0.0/0" + } + ] + } + } +] + +eks_config_list = [{ + role = "client" + eks_name = "job-scheduling" + vpc_name = "client-vpc" + policy_arns = ["AmazonEKSClusterPolicy", "AmazonEKSVPCResourceController", "AmazonEKSWorkerNodePolicy", "AmazonEKS_CNI_Policy", "AmazonEC2ContainerRegistryReadOnly"] + eks_managed_node_groups = [ + { + name = "idle" + ami_type = "AL2_x86_64" + instance_types = ["m4.large"] + min_size = 2 + max_size = 2 + desired_size = 2 + capacity_type = "ON_DEMAND" + labels = { terraform = "true", k8s = "true" } + }, + { + name = "virtualnodes" + ami_type = "AL2_x86_64" + instance_types = ["m4.2xlarge"] + min_size = 3 + max_size = 3 + desired_size = 3 + capacity_type = "ON_DEMAND" + labels = { terraform = "true", k8s = "true" } + } + ] + + eks_addons = [ + { + name = "coredns" + } + ] +}] + diff --git a/scenarios/perf-eval/job-scheduling/terraform-inputs/azure.tfvars b/scenarios/perf-eval/job-scheduling/terraform-inputs/azure.tfvars new file mode 100644 index 0000000000..5210da93b6 --- /dev/null +++ b/scenarios/perf-eval/job-scheduling/terraform-inputs/azure.tfvars @@ -0,0 +1,34 @@ +scenario_type = "perf-eval" +scenario_name = "job-scheduling" +deletion_delay = "3h" +owner = "aks" + +aks_config_list = [ + { + role = "client" + aks_name = "job-scheduling" + dns_prefix = "job-scheduling" + subnet_name = "aks-network" + sku_tier = "Standard" + network_profile = { + network_plugin = "azure" + network_plugin_mode = "overlay" + } + default_node_pool = { + name = "default" + node_count = 2 + vm_size = "Standard_D2_v3" + os_disk_type = "Managed" + only_critical_addons_enabled = true + temporary_name_for_rotation = "defaulttmp" + } + extra_node_pool = [ + { + name = "virtualnodes" + node_count = 3 + vm_size = "Standard_D8_v3" + node_labels = { "nosch" = "true" } + } + ] + } +] diff --git a/scenarios/perf-eval/job-scheduling/terraform-test-inputs/aws.json b/scenarios/perf-eval/job-scheduling/terraform-test-inputs/aws.json new file mode 100644 index 0000000000..cb30052b14 --- /dev/null +++ b/scenarios/perf-eval/job-scheduling/terraform-test-inputs/aws.json @@ -0,0 +1,4 @@ +{ + "run_id" : "123456789", + "region" : "us-east-2" +} diff --git a/scenarios/perf-eval/job-scheduling/terraform-test-inputs/azure.json b/scenarios/perf-eval/job-scheduling/terraform-test-inputs/azure.json new file mode 100644 index 0000000000..ea27a572c6 --- /dev/null +++ b/scenarios/perf-eval/job-scheduling/terraform-test-inputs/azure.json @@ -0,0 +1,4 @@ +{ + "run_id" : "123456789", + "region" : "eastus" +} diff --git a/steps/engine/clusterloader2/default/cleanup.yml b/steps/engine/clusterloader2/default/cleanup.yml new file mode 100644 index 0000000000..eeacfb4cb7 --- /dev/null +++ b/steps/engine/clusterloader2/default/cleanup.yml @@ -0,0 +1,6 @@ +steps: +- script: | + kubectl delete --all nodeclaim --ignore-not-found + kubectl delete --all nodepool --ignore-not-found + displayName: "Delete NodeClaim and NodePool" + condition: always() diff --git a/steps/engine/clusterloader2/default/collect.yml b/steps/engine/clusterloader2/default/collect.yml new file mode 100644 index 0000000000..e51b673ca5 --- /dev/null +++ b/steps/engine/clusterloader2/default/collect.yml @@ -0,0 +1,38 @@ +parameters: + - name: cloud + type: string + default: '' + - name: engine_input + type: object + default: {} + - name: region + type: string +steps: + - template: /steps/cloud/${{ parameters.cloud }}/collect-cloud-info.yml + parameters: + region: ${{ parameters.region }} + - script: | + echo "Collecting results -- " + set -eo pipefail + + PYTHONPATH=$PYTHONPATH:$(pwd) python3 $PYTHON_SCRIPT_FILE collect \ + --node_count $NODE_COUNT \ + --job_throughput $JOB_THROUGHPUT \ + --job_count $JOB_COUNT \ + --cl2_report_dir $CL2_REPORT_DIR \ + --cloud_info "$CLOUD_INFO" \ + --run_id $RUN_ID \ + --run_url $RUN_URL \ + --service_test $SERVICE_TEST \ + --cnp_test ${CNP_TEST:-False} \ + --ccnp_test ${CCNP_TEST:-False} \ + --result_file $TEST_RESULTS_FILE \ + --test_type $TEST_TYPE \ + --workload_type $WORKLOAD_TYPE \ + workingDirectory: modules/python + env: + CLOUD: ${{ parameters.cloud }} + RUN_URL: $(RUN_URL) + PYTHON_SCRIPT_FILE: $(Pipeline.Workspace)/s/modules/python/clusterloader2/default/cli.py + CL2_REPORT_DIR: $(Pipeline.Workspace)/s/scenarios/perf-eval/job-scheduling/results + displayName: "Collect Results" diff --git a/steps/engine/clusterloader2/default/execute.yml b/steps/engine/clusterloader2/default/execute.yml new file mode 100644 index 0000000000..e3d36ffdd3 --- /dev/null +++ b/steps/engine/clusterloader2/default/execute.yml @@ -0,0 +1,50 @@ +parameters: + - name: cloud + type: string + default: "" + - name: engine_input + type: object + default: {} + - name: region + type: string +steps: + - script: | + set -eo pipefail + + PYTHONPATH=$PYTHONPATH:$(pwd) python3 $PYTHON_SCRIPT_FILE configure \ + --node_count $NODE_COUNT \ + --job_throughput $JOB_THROUGHPUT \ + --job_count $JOB_COUNT \ + --workload_type $WORKLOAD_TYPE \ + --operation_timeout $SCALE_TIMEOUT \ + --cilium_enabled $CILIUM_ENABLED \ + --provider $CLOUD \ + --service_test $SERVICE_TEST \ + --cnp_test ${CNP_TEST:-False} \ + --ccnp_test ${CCNP_TEST:-False} \ + --num_cnps ${NUM_CNPS:-0} \ + --num_ccnps ${NUM_CCNPS:-0} \ + --dualstack ${DUALSTACK:-False} \ + --cl2_override_file ${CL2_CONFIG_DIR}/overrides.yaml + + PYTHONPATH=$PYTHONPATH:$(pwd) python3 $PYTHON_SCRIPT_FILE execute \ + --cl2_image ${CL2_IMAGE} \ + --cl2_config_dir ${CL2_CONFIG_DIR} \ + --cl2_report_dir $CL2_REPORT_DIR \ + --cl2_config_file $CL2_CONFIG_FILE \ + --prometheus_enabled ${PROMETHEUS_ENABLED:-False} \ + --kubeconfig ${HOME}/.kube/config \ + --provider $CLOUD + workingDirectory: modules/python + env: + ${{ if eq(parameters.cloud, 'azure') }}: + CLOUD: aks + ${{ else }}: + CLOUD: ${{ parameters.cloud }} + REGION: ${{ parameters.region }} + PYTHON_SCRIPT_FILE: $(Pipeline.Workspace)/s/modules/python/clusterloader2/default/cli.py + CL2_IMAGE: ${{ parameters.engine_input.image }} + CL2_CONFIG_DIR: $(Pipeline.Workspace)/s/scenarios/perf-eval/job-scheduling/config + CL2_CONFIG_FILE: ${CL2_CONFIG_DIR}/config.yaml + CL2_REPORT_DIR: $(Pipeline.Workspace)/s/scenarios/perf-eval/job-scheduling/results + displayName: "Run Benchmark" diff --git a/steps/engine/clusterloader2/default/validate.yml b/steps/engine/clusterloader2/default/validate.yml new file mode 100644 index 0000000000..85652fe9f4 --- /dev/null +++ b/steps/engine/clusterloader2/default/validate.yml @@ -0,0 +1,20 @@ +parameters: + - name: desired_nodes + type: number + - name: operation_timeout_in_minutes + type: number + default: 20 +steps: + - script: | + set -eo pipefail + + PYTHONPATH=$PYTHONPATH:$(pwd) python3 $PYTHON_SCRIPT_FILE validate \ + --node_count $DESIRED_NODES \ + --operation_timeout_in_minutes $VALIDATION_TIMEOUT + workingDirectory: modules/python + timeoutInMinutes: ${{ parameters.operation_timeout_in_minutes }} + displayName: "Validate node count" + env: + DESIRED_NODES: ${{ parameters.desired_nodes }} + VALIDATION_TIMEOUT: ${{ parameters.operation_timeout_in_minutes }} + PYTHON_SCRIPT_FILE: $(Pipeline.Workspace)/s/modules/python/clusterloader2/default/cli.py diff --git a/steps/topology/kwok/collect-clusterloader2.yml b/steps/topology/kwok/collect-clusterloader2.yml new file mode 100644 index 0000000000..693f105f15 --- /dev/null +++ b/steps/topology/kwok/collect-clusterloader2.yml @@ -0,0 +1,16 @@ +parameters: + - name: cloud + type: string + default: '' + - name: engine_input + type: object + default: {} + - name: regions + type: object + default: {} +steps: + - template: /steps/engine/clusterloader2/default/collect.yml + parameters: + cloud: ${{ parameters.cloud }} + engine_input: ${{ parameters.engine_input }} + region: ${{ parameters.regions[0] }} diff --git a/steps/topology/kwok/execute-clusterloader2.yml b/steps/topology/kwok/execute-clusterloader2.yml new file mode 100644 index 0000000000..d4c3f85555 --- /dev/null +++ b/steps/topology/kwok/execute-clusterloader2.yml @@ -0,0 +1,16 @@ +parameters: + - name: cloud + type: string + default: '' + - name: engine_input + type: object + default: {} + - name: regions + type: object + default: {} +steps: + - template: /steps/engine/clusterloader2/default/execute.yml + parameters: + cloud: ${{ parameters.cloud }} + engine_input: ${{ parameters.engine_input }} + region: ${{ parameters.regions[0] }} diff --git a/steps/topology/kwok/start-kwok.yml b/steps/topology/kwok/start-kwok.yml new file mode 100644 index 0000000000..316bd12c31 --- /dev/null +++ b/steps/topology/kwok/start-kwok.yml @@ -0,0 +1,68 @@ +parameters: + - name: kwok_repo + type: string + default: "kubernetes-sigs/kwok" + - name: kwok_release + type: string + default: "v0.6.1" +steps: + - script: | + kubectl apply -f "https://github.com/${KWOK_REPO}/releases/download/${KWOK_RELEASE}/kwok.yaml" + kubectl apply -f "https://github.com/${KWOK_REPO}/releases/download/${KWOK_RELEASE}/stage-fast.yaml" + for ((i=0; i