From 7d044b5631413ae29888956dae454d414fad1b0d Mon Sep 17 00:00:00 2001 From: Mohamed Khairallah Gharbi Date: Tue, 22 Apr 2025 15:41:39 +0200 Subject: [PATCH] implement client worker poc to use bash commands --- python/commandLine/README.md | 194 ++++++++ python/commandLine/client/.dockerignore | 1 + python/commandLine/client/.gitignore | 3 + python/commandLine/client/armonik_config.sh | 5 + python/commandLine/client/main.py | 284 +++++++++++ python/commandLine/client/requirements.txt | 1 + .../commandLine/client/run_command_tests.sh | 246 ++++++++++ python/commandLine/client/test_client.py | 454 ++++++++++++++++++ python/commandLine/worker/.dockerignore | 1 + python/commandLine/worker/Dockerfile | 17 + python/commandLine/worker/main.py | 116 +++++ python/commandLine/worker/requirements.txt | 1 + 12 files changed, 1323 insertions(+) create mode 100644 python/commandLine/README.md create mode 100644 python/commandLine/client/.dockerignore create mode 100644 python/commandLine/client/.gitignore create mode 100644 python/commandLine/client/armonik_config.sh create mode 100644 python/commandLine/client/main.py create mode 100644 python/commandLine/client/requirements.txt create mode 100755 python/commandLine/client/run_command_tests.sh create mode 100644 python/commandLine/client/test_client.py create mode 100644 python/commandLine/worker/.dockerignore create mode 100644 python/commandLine/worker/Dockerfile create mode 100644 python/commandLine/worker/main.py create mode 100644 python/commandLine/worker/requirements.txt diff --git a/python/commandLine/README.md b/python/commandLine/README.md new file mode 100644 index 00000000..e029bdd6 --- /dev/null +++ b/python/commandLine/README.md @@ -0,0 +1,194 @@ +# ArmoniK Command Line Client + +A proof of concept for running shell commands and scripts through ArmoniK distributed computing platform. + +## Overview + +This project demonstrates how to submit shell commands and scripts to ArmoniK for remote execution. It enables: + +- Running bash commands and scripts on worker nodes +- Collecting and aggregating results +- Testing different command types in a distributed environment +- Generating detailed execution reports + +## Prerequisites + +- Python 3.7+ +- ArmoniK cluster running and accessible +- Required Python packages: + - `python-dotenv` + - `requests` (for the client) + - `ArmoniK` (both client and worker) + +## Installation + +1. Create virtual environment: + ``` + python -m venv venv + source venv/bin/activate # On Windows use: venv\Scripts\activate + ``` + +2. Install dependencies: + ``` + pip install -r requirements.txt + ``` + +3. Configure environment: + Create a `.env` file in the client directory with the following variables: + ``` + ARMONIK_ENDPOINT= # Example: localhost:5001 + ARMONIK_PARTITION= # Example: default + ``` + + Alternatively, configure ArmoniK using `client/armonik_config.sh`: + ``` + ARMONIK_ENDPOINT= + ARMONIK_PARTITION=cmdline # Default partition for command line tests + ``` + +## Project Structure + +``` +python/commandLine/ +├── client/ +│ ├── main.py # Main client application +│ ├── test_client.py # Test framework +│ ├── run_command_tests.sh # Shell script test runner +│ ├── armonik_config.sh # ArmoniK configuration +│ ├── test_commands/ # Directory for test command files +│ │ ├── data_processing.cmd +│ │ ├── file_operations.cmd +│ │ ├── multi_step.cmd +│ │ ├── network_diagnostics.cmd +│ │ └── system_info.cmd +│ ├── test_reports/ # Directory for test reports +│ └── test_outputs/ # Directory for test outputs +├── worker/ +│ └── worker.py # Worker implementation +└── README.md # This file +``` + +## Usage + +Partition `cmdline` is used for command line tests. The client submits commands to the ArmoniK worker, which executes them and returns the results. + +### Running the Client + +To submit a shell command for execution: + +```bash +python client/main.py --cmd "echo Hello from ArmoniK" +``` + +To submit a script file: + +```bash +python client/main.py --file my_script.sh +``` + +### Command Options + +``` +--cmd TEXT Shell command to execute +--file PATH Path to a shell script file +--partition TEXT Partition to use for execution +--timeout INTEGER Maximum execution time in seconds +--output PATH Path to save command output +--verbose Enable verbose output +``` + +## Running Tests + +### Using the Test Shell Script + +The included shell script provides a convenient way to run all test commands: + +```bash +cd client +./run_command_tests.sh +``` + +This script: +- Loads configuration from `armonik_config.sh` +- Ensures all test command files exist +- Executes all test commands sequentially +- Saves outputs to timestamped files +- Generates test reports and summaries + +### Using the Python Test Framework + +The test framework validates various command types with ArmoniK: + +```bash +cd client +python test_client.py +``` + +Test reports are generated in the `test_reports` directory. + +### Available Tests + +- **System Information**: Retrieves basic system details from the worker +- **Network Diagnostics**: Tests connectivity and network configuration +- **File Operations**: Creates, modifies, and manages files on the worker +- **Data Processing**: Demonstrates basic data manipulation capabilities +- **Multi-step Process**: Chains multiple commands in a workflow + +## Example Scripts + +### Basic Example + +```bash +# Save as example.sh +echo "=== Running on $(hostname) ===" +echo "Current time: $(date)" +echo "Working directory: $(pwd)" +echo "Hello from ArmoniK!" +``` + +Submit with: +```bash +python client/main.py --file example.sh +``` + +### Data Processing Example + +```bash +# Save as process_data.sh +echo "Processing data..." +echo "Name,Value" > results.csv +for i in {1..10}; do + echo "Item$i,$((RANDOM % 100))" >> results.csv +done +cat results.csv +echo "Processing complete!" +``` + +## Monitoring and Results + +The test framework provides comprehensive reporting: + +- **Test Outputs**: Individual command outputs are saved in the `test_outputs` directory with timestamps +- **Test Reports**: Full execution details are available in the `test_reports` directory + - `test_report_[timestamp].txt`: Detailed execution logs + - `test_summary_[timestamp].txt`: Execution summary with metrics + +Example test summary: +``` +=== Test Summary === +Total tests: 5 +Passed: 5 +Failed: 0 +Success rate: 100.00% +Total execution time: 2.27 seconds +``` + +## Contributing + +1. Create your test command files in the `test_commands` directory +2. Add test methods in `test_client.py` for new command types +3. Run the test suite to validate your commands + +## License + +This project is licensed under the terms of the GNU Affero General Public License as published by the Free Software Foundation. diff --git a/python/commandLine/client/.dockerignore b/python/commandLine/client/.dockerignore new file mode 100644 index 00000000..b694934f --- /dev/null +++ b/python/commandLine/client/.dockerignore @@ -0,0 +1 @@ +.venv \ No newline at end of file diff --git a/python/commandLine/client/.gitignore b/python/commandLine/client/.gitignore new file mode 100644 index 00000000..c3396095 --- /dev/null +++ b/python/commandLine/client/.gitignore @@ -0,0 +1,3 @@ +test_commands/ +test_outputs/ +test_reports/ diff --git a/python/commandLine/client/armonik_config.sh b/python/commandLine/client/armonik_config.sh new file mode 100644 index 00000000..9d1e96f5 --- /dev/null +++ b/python/commandLine/client/armonik_config.sh @@ -0,0 +1,5 @@ +ARMONIK_ENDPOINT="ENDPOINT" +ARMONIK_PARTITION="cmdline" + +VERBOSE=true +TIMEOUT=300 \ No newline at end of file diff --git a/python/commandLine/client/main.py b/python/commandLine/client/main.py new file mode 100644 index 00000000..41984635 --- /dev/null +++ b/python/commandLine/client/main.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import sys +import time +from dataclasses import dataclass +from datetime import timedelta +from pathlib import Path +from typing import Optional + +import grpc +from armonik.client import ArmoniKEvents, ArmoniKResults, ArmoniKSessions, ArmoniKTasks +from armonik.common import TaskDefinition, TaskOptions + +# Add the common directory to the system path +common_path = Path(__file__).resolve().parent.parent / "common" +sys.path.append(str(common_path)) + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("CommandLineClient") + + +@dataclass +class CommandResult: + """Container for command execution results""" + + success: bool + output: str + execution_time: float + task_id: Optional[str] = None + error_message: Optional[str] = None + + +class ArmoniKCommandClient: + """Client for submitting commands to ArmoniK and processing results""" + + def __init__( + self, + endpoint: str = "localhost:5001", + partition: str = "default", + timeout: int = 300, + verbose: bool = False, + ): + self.endpoint = endpoint + self.partition = partition + self.timeout = timeout + self.verbose = verbose + + self.channel = None + self.tasks_client = None + self.results_client = None + self.sessions_client = None + self.events_client = None + + # Initialize connections + self._connect() + + def _connect(self) -> None: + """Establish connection to ArmoniK services""" + try: + self.channel = grpc.insecure_channel(self.endpoint) + self.tasks_client = ArmoniKTasks(self.channel) + self.results_client = ArmoniKResults(self.channel) + self.sessions_client = ArmoniKSessions(self.channel) + self.events_client = ArmoniKEvents(self.channel) + + if self.verbose: + logger.info("Connected to ArmoniK at %s", self.endpoint) + except Exception as e: + logger.error("Failed to connect to ArmoniK: %s", str(e)) + raise + + def _create_session(self) -> str: + """Create a new ArmoniK session""" + task_options = TaskOptions( + max_duration=timedelta(hours=1), + max_retries=2, + priority=1, + partition_id=self.partition, + ) + + session_id = self.sessions_client.create_session( + task_options, partition_ids=[self.partition] + ) + + if self.verbose: + logger.info("Created session: %s", session_id) + + return session_id + + def run_commands(self, commands: str) -> CommandResult: + """ + Submit the commands to ArmoniK and wait for results. + + Args: + commands: Command string to execute (multiple commands separated by newlines) + + Returns: + CommandResult object containing execution results + """ + if self.verbose: + logger.info("Preparing to execute commands:\n%s", commands) + + start_time = time.time() + + try: + session_id = self._create_session() + + output_result = self.results_client.create_results_metadata( + result_names=["payload", "result"], session_id=session_id + ) + payload_id = output_result["payload"].result_id + result_id = output_result["result"].result_id + + payload_data = commands.encode("utf-8") + try: + self.results_client.upload_result_data( + payload_id, session_id, payload_data + ) + if self.verbose: + logger.info("Uploaded command payload with ID: %s", payload_id) + except Exception as e: + return CommandResult( + success=False, + output="", + execution_time=time.time() - start_time, + error_message=f"Error uploading payload: {str(e)}", + ) + + task_definition = TaskDefinition( + payload_id=payload_id, + expected_output_ids=[result_id], + ) + + task_id = self.tasks_client.submit_tasks(session_id, [task_definition])[0] + + if self.verbose: + logger.info("Task submitted with ID: %s", task_id) + logger.info("Waiting for results (timeout: %ss)...", self.timeout) + + t_wait_start = time.time() + + try: + self.events_client.wait_for_result_availability( + result_ids=[result_id], + session_id=session_id, + ) + + if self.verbose: + wait_time = time.time() - t_wait_start + logger.info("Results available in %.2f seconds", wait_time) + + result_data = self.results_client.download_result_data( + result_id, session_id + ).decode("utf-8") + + execution_time = time.time() - start_time + + return CommandResult( + success=True, + output=result_data, + execution_time=execution_time, + task_id=task_id, + ) + + except Exception as e: + return CommandResult( + success=False, + output="", + execution_time=time.time() - start_time, + task_id=task_id, + error_message=f"Error waiting for or downloading results: {str(e)}", + ) + + except Exception as e: + return CommandResult( + success=False, + output="", + execution_time=time.time() - start_time, + error_message=f"Error: {str(e)}", + ) + + def close(self) -> None: + """Close the client connections""" + if self.channel: + self.channel.close() + + +def main(): + """Main entry point for the command line client""" + parser = argparse.ArgumentParser( + description="ArmoniK Command Line Client", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "--endpoint", + type=str, + default="localhost:5001", + help="gRPC endpoint for ArmoniK connection", + ) + parser.add_argument( + "--partition", + type=str, + default="default", + help="Partition ID to use for task execution", + ) + parser.add_argument( + "--timeout", + type=int, + default=300, + help="Maximum time to wait for results (in seconds)", + ) + parser.add_argument( + "--verbose", action="store_true", help="Print detailed information" + ) + parser.add_argument("--file", type=str, help="File containing commands to execute") + parser.add_argument( + "--command", type=str, help="Command string to execute (use quotes)" + ) + parser.add_argument( + "--output", type=str, help="File to write command output (default: stdout)" + ) + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + if args.file: + try: + with open(args.file, "r", encoding="utf-8") as f: + commands = f.read() + except Exception as e: + logger.error("Error reading command file: %s", str(e)) + return 1 + elif args.command: + commands = args.command + else: + print("Enter commands (Ctrl+D to finish):") + commands = sys.stdin.read() + + if not commands.strip(): + logger.error("Error: No commands provided") + return 1 + + client = ArmoniKCommandClient( + endpoint=args.endpoint, + partition=args.partition, + timeout=args.timeout, + verbose=args.verbose, + ) + + try: + result = client.run_commands(commands) + + if result.success: + if args.output: + with open(args.output, "w", encoding="utf-8") as f: + f.write(result.output) + print(f"Command output saved to {args.output}") + else: + print(result.output, end="") + if args.verbose: + print( + f"Command execution completed in {result.execution_time:.2f}s", + file=sys.stderr, + ) + else: + logger.error("Command execution failed: %s", result.error_message) + + return 0 if result.success else 1 + + finally: + client.close() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python/commandLine/client/requirements.txt b/python/commandLine/client/requirements.txt new file mode 100644 index 00000000..bf272cf7 --- /dev/null +++ b/python/commandLine/client/requirements.txt @@ -0,0 +1 @@ +armonik \ No newline at end of file diff --git a/python/commandLine/client/run_command_tests.sh b/python/commandLine/client/run_command_tests.sh new file mode 100755 index 00000000..53ad873a --- /dev/null +++ b/python/commandLine/client/run_command_tests.sh @@ -0,0 +1,246 @@ +#!/bin/bash + +# Colors for formatting +GREEN='\033[0;32m' +BLUE='\033[0;34m' +RED='\033[0;31m' +YELLOW='\033[0;33m' +CYAN='\033[0;36m' +NC='\033[0m' # No Color + +# Get the directory of this script +DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CLIENT="${DIR}/main.py" +TEST_DIR="${DIR}/test_commands" +OUTPUT_DIR="${DIR}/test_outputs" + +REPORT_DIR="${DIR}/test_reports" +CONFIG_FILE="${DIR}/armonik_config.sh" + +# Load config file if exists +if [ -f "$CONFIG_FILE" ]; then + echo -e "${BLUE}Loading configuration from ${CONFIG_FILE}${NC}" + source "$CONFIG_FILE" +else + echo -e "${RED}Configuration file ${CONFIG_FILE} not found. Using default values.${NC}" +fi + +# Debug: Print loaded endpoint and partition +echo -e "${YELLOW}Loaded endpoint: ${ARMONIK_ENDPOINT}${NC}" +echo -e "${YELLOW}Loaded partition: ${ARMONIK_PARTITION}${NC}" + +# Default values +VERBOSE=false +RUN_ALL=false +SPECIFIC_TESTS=() + +# Show usage information +function show_usage { + echo "Usage: $0 [options] [test_files]" + echo "" + echo "Options:" + echo " -e ENDPOINT Specify ArmoniK endpoint" + echo " -p PARTITION Specify ArmoniK partition" + echo " -v Enable verbose mode" + echo " -a Run all tests" + echo " -h Show this help message" + echo "" + echo "Examples:" + echo " $0 -a # Run all tests" + echo " $0 system_info.cmd # Run specific test" + echo " $0 -v -e localhost:5001 # Run with custom endpoint" +} + +while getopts "e:p:vah" opt; do + case $opt in + e) armonik_endpoint="$OPTARG" ;; + p) armonik_partition="$OPTARG" ;; + v) VERBOSE=true ;; + a) RUN_ALL=true ;; + h) show_usage; exit 0 ;; + *) show_usage; exit 1 ;; + esac +done + +shift $((OPTIND-1)) +if [ $# -gt 0 ]; then + for arg in "$@"; do + [[ $arg != *.cmd ]] && arg="${arg}.cmd" + SPECIFIC_TESTS+=("$arg") + done +else + RUN_ALL=true +fi + +# Use the loaded or default values +export ARMONIK_ENDPOINT="${armonik_endpoint:-${ARMONIK_ENDPOINT:-localhost:5001}}" +export ARMONIK_PARTITION="${armonik_partition:-${ARMONIK_PARTITION:-default}}" + +# Create necessary directories +mkdir -p "$OUTPUT_DIR" "$REPORT_DIR" "$TEST_DIR" + +echo -e "${BLUE}=== ArmoniK Command Line Test Runner ===${NC}" +echo -e "${YELLOW}Using endpoint: ${ARMONIK_ENDPOINT}${NC}" +echo -e "${YELLOW}Using partition: ${ARMONIK_PARTITION}${NC}" + +# Function to create test files +function create_test_files { + echo -e "${BLUE}Ensuring test command files exist...${NC}" + python3 -c " +from pathlib import Path +from test_client import CommandLineClientTest +CommandLineClientTest.create_test_files() +print('Test files created successfully.') +" || { echo -e "${RED}Failed to create test command files.${NC}"; exit 1; } +} + +# Ensure test files exist +create_test_files + +# Create test timestamp for reports/outputs +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") +REPORT_FILE="${REPORT_DIR}/test_report_${TIMESTAMP}.txt" +SUMMARY_FILE="${REPORT_DIR}/test_summary_${TIMESTAMP}.txt" + +# Initialize report header +cat > "$REPORT_FILE" << EOF +=== ArmoniK Command Line Test Report === +Date: $(date) +Endpoint: ${ARMONIK_ENDPOINT} +Partition: ${ARMONIK_PARTITION} + +EOF + +# Initialize summary statistics +TOTAL_TESTS=0 +PASSED_TESTS=0 +FAILED_TESTS=0 +TOTAL_TIME=0 + +# Run tests +echo -e "${BLUE}=== Running Tests ===${NC}" +if [ "$RUN_ALL" = true ]; then + # Run all tests if no specific tests are provided + SPECIFIC_TESTS=($(find "$TEST_DIR" -type f -name "*.cmd" | sort)) +fi + +# Print test list +echo -e "${BLUE}Tests to run (${#SPECIFIC_TESTS[@]} total):${NC}" +for test_file in "${SPECIFIC_TESTS[@]}"; do + echo -e " - $(basename "$test_file")" +done +echo "" + +for test_file in "${SPECIFIC_TESTS[@]}"; do + test_name=$(basename "$test_file") + ((TOTAL_TESTS++)) + + echo -e "${CYAN}Running test: ${test_name} (${TOTAL_TESTS}/${#SPECIFIC_TESTS[@]})${NC}" + echo "=== Test: ${test_name} ===" >> "$REPORT_FILE" + echo "Command file: ${test_file}" >> "$REPORT_FILE" + + # Record start time + start_time=$(date +%s.%N) + + # Create output file path + output_file="${OUTPUT_DIR}/${test_name%.cmd}_${TIMESTAMP}.output" + + # Run the command with our client and measure execution time + if [ "$VERBOSE" = true ]; then + echo -e "${YELLOW}Running with verbose output${NC}" + python3 "$CLIENT" --file "$test_file" --endpoint "$ARMONIK_ENDPOINT" \ + --partition "$ARMONIK_PARTITION" --output "$output_file" --verbose + exit_code=$? + else + python3 "$CLIENT" --file "$test_file" --endpoint "$ARMONIK_ENDPOINT" \ + --partition "$ARMONIK_PARTITION" --output "$output_file" + exit_code=$? + fi + + # Calculate duration + end_time=$(date +%s.%N) + duration=$(echo "$end_time - $start_time" | bc) + TOTAL_TIME=$(echo "$TOTAL_TIME + $duration" | bc) + + # Check result and update report + if [ $exit_code -eq 0 ]; then + echo -e "${GREEN}✓ Test ${test_name} passed (${duration} seconds)${NC}" + echo "Result: PASS" >> "$REPORT_FILE" + ((PASSED_TESTS++)) + else + echo -e "${RED}✗ Test ${test_name} failed (${duration} seconds)${NC}" + echo "Result: FAIL" >> "$REPORT_FILE" + ((FAILED_TESTS++)) + fi + + echo "Duration: ${duration} seconds" >> "$REPORT_FILE" + echo "Output file: ${output_file}" >> "$REPORT_FILE" + + # Add output summary to report if file exists + if [ -f "$output_file" ]; then + output_size=$(wc -l < "$output_file") + echo "Output size: ${output_size} lines" >> "$REPORT_FILE" + + echo "" >> "$REPORT_FILE" + echo "Output Preview (first 10 lines):" >> "$REPORT_FILE" + head -n 10 "$output_file" >> "$REPORT_FILE" + + if [ "$output_size" -gt 10 ]; then + echo "..." >> "$REPORT_FILE" + echo "(${output_size} total lines, see output file for complete output)" >> "$REPORT_FILE" + fi + else + echo "No output file generated" >> "$REPORT_FILE" + fi + + echo "" >> "$REPORT_FILE" + echo "-----------------------------------------" >> "$REPORT_FILE" + echo "" >> "$REPORT_FILE" +done + +# Calculate success rate +if [ "$TOTAL_TESTS" -gt 0 ]; then + SUCCESS_RATE=$(echo "scale=2; $PASSED_TESTS * 100 / $TOTAL_TESTS" | bc) +else + SUCCESS_RATE="N/A" +fi + +# Generate summary report +cat > "$SUMMARY_FILE" << EOF +=== ArmoniK Command Line Test Summary === +Date: $(date) +Endpoint: ${ARMONIK_ENDPOINT} +Partition: ${ARMONIK_PARTITION} + +Total tests: ${TOTAL_TESTS} +Passed: ${PASSED_TESTS} +Failed: ${FAILED_TESTS} +Success rate: ${SUCCESS_RATE}% +Total execution time: ${TOTAL_TIME} seconds + +Test output directory: ${OUTPUT_DIR} +Full report file: ${REPORT_FILE} +EOF + +# Append summary to main report +echo "" >> "$REPORT_FILE" +echo "=== Test Summary ===" >> "$REPORT_FILE" +echo "Total tests: ${TOTAL_TESTS}" >> "$REPORT_FILE" +echo "Passed: ${PASSED_TESTS}" >> "$REPORT_FILE" +echo "Failed: ${FAILED_TESTS}" >> "$REPORT_FILE" +echo "Success rate: ${SUCCESS_RATE}%" >> "$REPORT_FILE" +echo "Total execution time: ${TOTAL_TIME} seconds" >> "$REPORT_FILE" + +# Display summary +echo "" +echo -e "${BLUE}=== Test Summary ===${NC}" +echo -e "Total tests: ${CYAN}${TOTAL_TESTS}${NC}" +echo -e "Passed: ${GREEN}${PASSED_TESTS}${NC}" +echo -e "Failed: ${RED}${FAILED_TESTS}${NC}" +echo -e "Success rate: ${CYAN}${SUCCESS_RATE}%${NC}" +echo -e "Total execution time: ${CYAN}${TOTAL_TIME}${NC} seconds" + +echo -e "${BLUE}=== All Tests Completed ===${NC}" +echo -e "Full report saved to: ${YELLOW}${REPORT_FILE}${NC}" +echo -e "Summary report saved to: ${YELLOW}${SUMMARY_FILE}${NC}" +echo -e "Test outputs saved to: ${YELLOW}${OUTPUT_DIR}${NC}" \ No newline at end of file diff --git a/python/commandLine/client/test_client.py b/python/commandLine/client/test_client.py new file mode 100644 index 00000000..e99ea6aa --- /dev/null +++ b/python/commandLine/client/test_client.py @@ -0,0 +1,454 @@ +#!/usr/bin/env python3 +# filepath: /home/mkgharbi/aneo/ArmoniK.Samples/python/commandLine/client/test_client.py +import json +import logging +import os +import subprocess +import time +import unittest +from datetime import datetime +from pathlib import Path + +from dotenv import load_dotenv + +# Configure logging for tests +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("TestClient") + +# Define paths +BASE_DIR = Path(__file__).resolve().parent +CLIENT_PATH = BASE_DIR / "main.py" +TEST_DIR = BASE_DIR / "test_commands" +REPORTS_DIR = BASE_DIR / "test_reports" +OUTPUTS_DIR = BASE_DIR / "test_outputs" # Default output folder +ENV_FILE = BASE_DIR / ".env" # Path to the .env file + +# Load environment variables from .env file +if ENV_FILE.exists(): + load_dotenv(dotenv_path=ENV_FILE) +else: + logging.warning(f"{ENV_FILE} not found. Using default values.") + +# Get endpoint and partition from the environment variables +ENDPOINT = os.getenv("ARMONIK_ENDPOINT", "localhost:5001") +PARTITION = os.getenv("ARMONIK_PARTITION", "default") + +# Create directories if they don't exist +TEST_DIR.mkdir(exist_ok=True) +REPORTS_DIR.mkdir(exist_ok=True) +OUTPUTS_DIR.mkdir(exist_ok=True) + + +class CommandLineClientTest(unittest.TestCase): + """Test suite for ArmoniK Command Line Client""" + + @classmethod + def setUpClass(cls): + """Create test command files once for the entire suite and prepare report""" + logging.info("Setting up test files...") + cls.test_results = [] # To store results for report + cls.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + cls.report_file = REPORTS_DIR / f"test_report_{cls.timestamp}.json" + cls.create_test_files() + + @classmethod + def tearDownClass(cls): + """Generate a detailed report after all tests""" + logging.info("Generating test report...") + + # Calculate summary statistics + total_tests = len(cls.test_results) + passed_tests = sum(1 for result in cls.test_results if result["passed"]) + failed_tests = total_tests - passed_tests + success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0 + total_duration = sum(result["duration"] for result in cls.test_results) + + # Create report data + report_data = { + "timestamp": cls.timestamp, + "endpoint": ENDPOINT, + "partition": PARTITION, + "summary": { + "total_tests": total_tests, + "passed_tests": passed_tests, + "failed_tests": failed_tests, + "success_rate": success_rate, + "total_duration": total_duration, + }, + "test_results": cls.test_results, + } + + # Save report as JSON + with open(cls.report_file, "w") as f: + json.dump(report_data, f, indent=2) + + # Generate human-readable report + readable_report_file = REPORTS_DIR / f"test_report_{cls.timestamp}.txt" + with open(readable_report_file, "w") as f: + f.write("=== ArmoniK Command Line Client Test Report ===\n") + f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"Endpoint: {ENDPOINT}\n") + f.write(f"Partition: {PARTITION}\n\n") + + f.write("--- Summary ---\n") + f.write(f"Total Tests: {total_tests}\n") + f.write(f"Passed: {passed_tests}\n") + f.write(f"Failed: {failed_tests}\n") + f.write(f"Success Rate: {success_rate:.2f}%\n") + f.write(f"Total Duration: {total_duration:.3f}s\n\n") + + f.write("--- Test Details ---\n") + for result in cls.test_results: + status = "PASS" if result["passed"] else "FAIL" + f.write( + f"Test: {result['name']} - {status} ({result['duration']:.3f}s)\n" + ) + if "output_file" in result: + f.write(f" Output: {result['output_file']}\n") + if not result["passed"] and "error" in result: + f.write(f" Error: {result['error']}\n") + f.write("\n") + + logging.info( + f"Test reports generated at {cls.report_file} and {readable_report_file}" + ) + + @classmethod + def create_test_files(cls): + """Create all test command files""" + # 1. System Information + system_info_content = """# System Information Commands +echo "=== System Information ===" +hostname +uname -a +cat /etc/os-release | grep PRETTY_NAME || echo "OS info not found" +echo "=== CPU Information ===" +lscpu | grep "Model name" || echo "CPU info not found" +echo "=== Memory Information ===" +free -h || echo "Memory info not found" +echo "=== Disk Usage ===" +df -h | grep -v tmpfs || echo "Disk info not found" +""" + (TEST_DIR / "system_info.cmd").write_text(system_info_content) + + # 2. Network Diagnostics + network_diagnostics_content = """# Network Diagnostics Commands +echo "=== Network Interfaces ===" +ip addr | grep -E "inet|ether" | grep -v "127.0.0.1" || echo "Network interfaces not found" +echo "=== Network Routes ===" +ip route || echo "Routes not found" +echo "=== DNS Configuration ===" +cat /etc/resolv.conf || echo "resolv.conf not found" +echo "=== Internet Connectivity ===" +ping -c 1 8.8.8.8 || echo "Ping failed" +echo "=== DNS Resolution ===" +nslookup google.com || echo "DNS resolution failed" +""" + (TEST_DIR / "network_diagnostics.cmd").write_text(network_diagnostics_content) + + # 3. File Operations - Enhanced version + file_operations_content = """#!/bin/bash +# File Operations Commands - Comprehensive test +# This script tests various file operations in ArmoniK + +# Generate a unique test directory using current PID +TEST_DIR="/tmp/armonik_test_$$" +echo "=== Creating Test Environment ===" +echo "Creating test directory: $TEST_DIR" +mkdir -p "$TEST_DIR" +cd "$TEST_DIR" +echo "Current working directory: $(pwd)" +echo + +# Creating various test files with different content +echo "=== Creating Test Files ===" +echo "Hello from ArmoniK! This is the first test file." > file1.txt +echo "This is the second test file with no special keywords." > file2.txt +echo "ArmoniK provides distributed computing capabilities." > file3.txt +echo "Line 1: Testing search capabilities" > search_file.txt +echo "Line 2: This line contains ArmoniK keyword" >> search_file.txt +echo "Line 3: This line doesn't contain any special keywords" >> search_file.txt +echo "Line 4: Another ArmoniK reference here" >> search_file.txt +echo "Line 5: Final line for testing" >> search_file.txt + +# Create a small CSV file for data processing tests +cat > data.csv << EOF +Name,Department,Salary +John,Engineering,75000 +Alice,Marketing,65000 +Bob,Engineering,72000 +Carol,HR,58000 +David,Marketing,68000 +EOF + +# Create a binary file +dd if=/dev/urandom of=binary_file.bin bs=1024 count=10 2>/dev/null + +# Create a symbolic link +ln -s file1.txt link_to_file1 + +# Create a subdirectory with more files +mkdir -p subdir/nested +echo "This is a file in a subdirectory" > subdir/subfile.txt +echo "This is a file in a nested subdirectory" > subdir/nested/nestedfile.txt + +# Display created file structure +echo "=== File Structure Created ===" +echo "Total files created: $(find . -type f | wc -l)" +echo "Total directories created: $(find . -type d | wc -l)" +echo + +# File contents display +echo "=== File Contents ===" +echo "----- file1.txt -----" +cat file1.txt +echo "----- file2.txt -----" +cat file2.txt +echo "----- file3.txt -----" +cat file3.txt +echo "----- search_file.txt -----" +cat search_file.txt +echo "----- data.csv -----" +cat data.csv +echo + +# Basic file operations +echo "=== Basic File Operations ===" +echo "File sizes:" +ls -lh file*.txt data.csv binary_file.bin | awk '{print $9 ": " $5}' +echo + +# Text processing +echo "=== Text Processing ===" +echo "Word count in all text files:" +wc -w file*.txt +echo +echo "Total word count: $(cat file*.txt | wc -w) words" +echo "Total line count: $(cat file*.txt | wc -l) lines" +echo "Total character count: $(cat file*.txt | wc -c) characters" +echo + +# Search operations +echo "=== Search Operations ===" +echo "Files containing 'ArmoniK':" +grep -l "ArmoniK" file*.txt search_file.txt +echo +echo "Instances of 'ArmoniK' in all files:" +grep -n "ArmoniK" file*.txt search_file.txt +echo +echo "Count of 'ArmoniK' occurrences: $(grep -c "ArmoniK" file*.txt search_file.txt)" +echo + +# File modification +echo "=== File Modification ===" +echo "Appending to file1.txt..." +echo "This line was appended for testing purposes." >> file1.txt +echo "New content of file1.txt:" +cat file1.txt +echo + +# File permissions +echo "=== File Permissions ===" +echo "Current permissions:" +ls -la +echo +echo "Changing permissions on file3.txt..." +chmod 600 file3.txt +echo "New permissions for file3.txt:" +ls -la file3.txt +echo + +# File transformation +echo "=== File Transformation ===" +echo "Converting data.csv to formatted table:" +column -t -s, data.csv +echo +echo "Extracting Engineering department from CSV:" +grep "Engineering" data.csv | column -t -s, +echo + +# File comparison +echo "=== File Comparison ===" +cp file1.txt file1_copy.txt +echo "Comparing original and copy:" +diff file1.txt file1_copy.txt && echo "Files are identical" +echo "Modifying copy and comparing again:" +echo "This makes the copy different." >> file1_copy.txt +diff file1.txt file1_copy.txt || echo "Files are now different" +echo + +# File archiving +echo "=== File Archiving ===" +echo "Creating archive of text files..." +tar -cf text_files.tar file*.txt +echo "Archive created: $(ls -lh text_files.tar | awk '{print $9 ": " $5}')" +echo + +# Cleanup +echo "=== Cleaning Up ===" +cd /tmp +echo "Removing test directory: $TEST_DIR" +rm -rf "$TEST_DIR" +echo "Cleanup complete for process $$" +""" + (TEST_DIR / "file_operations.cmd").write_text(file_operations_content) + + # 4. Data Processing + data_processing_content = """# Data Processing Commands +echo "=== Generating Sample Data ===" +echo "Name,Age,City" > /tmp/sample_$$.csv +echo "John,35,New York" >> /tmp/sample_$$.csv +echo "Alice,29,London" >> /tmp/sample_$$.csv +echo "Bob,42,Paris" >> /tmp/sample_$$.csv +echo "Jane,31,Tokyo" >> /tmp/sample_$$.csv +echo "=== Basic Processing ===" +cat /tmp/sample_$$.csv +echo "=== Filtering with grep ===" +grep "Alice" /tmp/sample_$$.csv +echo "=== Counting with wc ===" +wc -l /tmp/sample_$$.csv +echo "=== Field extraction with cut ===" +cut -d, -f2,3 /tmp/sample_$$.csv +echo "=== Complex processing with awk ===" +awk -F, 'NR>1{print $1 " from " $3 " is " $2 " years old"}' /tmp/sample_$$.csv +echo "=== Cleaning Up ===" +rm /tmp/sample_$$.csv +echo "Cleanup complete for $$" +""" + (TEST_DIR / "data_processing.cmd").write_text(data_processing_content) + + # 5. Multi-step Process + multi_step_content = """# Multi-step Process Commands +echo "=== Environment Information ===" +echo "Current directory: $(pwd)" +echo "User: $(whoami)" +echo "Shell: $SHELL" +echo "PATH: $PATH" + +echo "=== Date and Time Operations ===" +echo "Current date and time: $(date)" +cal || echo "Calendar command not available" + +echo "=== Process Information ===" +echo "Current running processes for this user:" +ps aux | grep $(whoami) | head -5 || echo "ps command failed" + +echo "=== Text Processing ===" +echo "Creating sample text file..." +cat > /tmp/sample_multi_$$.txt << 'EOF' +Line 1: This is a test file +Line 2: Created for ArmoniK testing +Line 3: It contains multiple lines +Line 4: To demonstrate text processing +Line 5: The End +EOF + +echo "Displaying file content with line numbers:" +cat -n /tmp/sample_multi_$$.txt + +echo "Searching for 'ArmoniK':" +grep "ArmoniK" /tmp/sample_multi_$$.txt + +echo "Counting words:" +wc -w /tmp/sample_multi_$$.txt + +echo "Cleaning up..." +rm /tmp/sample_multi_$$.txt +echo "Cleanup complete for $$" + +echo "=== Multi-step process completed ===" +""" + (TEST_DIR / "multi_step.cmd").write_text(multi_step_content) + + def run_test_with_reporting(self, test_name, command_file): + """Run a test and record results for reporting""" + start_time = time.time() + test_output_file = OUTPUTS_DIR / f"{test_name}_{self.timestamp}.output" + + # First run directly with bash to test the command itself + logging.info(f"Running test {test_name}...") + result = subprocess.run( + ["bash", str(command_file)], + capture_output=True, + text=True, + ) + + # Save output + with open(test_output_file, "w") as f: + f.write(result.stdout) + if result.stderr: + f.write("\n=== STDERR ===\n") + f.write(result.stderr) + + # Calculate duration + duration = time.time() - start_time + + # Prepare result data + test_result = { + "name": test_name, + "command_file": str(command_file), + "output_file": str(test_output_file), + "passed": result.returncode == 0, + "duration": duration, + "return_code": result.returncode, + } + + if result.returncode != 0: + test_result["error"] = result.stderr if result.stderr else "Unknown error" + + # Store result for report + self.__class__.test_results.append(test_result) + + # Log results + status = "PASSED" if result.returncode == 0 else "FAILED" + logging.info(f"Test {test_name} {status} in {duration:.3f}s") + logging.info(f"Output saved to {test_output_file}") + + return result + + def test_system_info(self): + """Test system information commands""" + test_name = "system_info" + command_file = TEST_DIR / f"{test_name}.cmd" + result = self.run_test_with_reporting(test_name, command_file) + self.assertEqual(result.returncode, 0) + self.assertIn("System Information", result.stdout) + + def test_network_diagnostics(self): + """Test network diagnostics commands""" + test_name = "network_diagnostics" + command_file = TEST_DIR / f"{test_name}.cmd" + result = self.run_test_with_reporting(test_name, command_file) + self.assertEqual(result.returncode, 0) + self.assertIn("Network Interfaces", result.stdout) + + def test_file_operations(self): + """Test file operations commands""" + test_name = "file_operations" + command_file = TEST_DIR / f"{test_name}.cmd" + result = self.run_test_with_reporting(test_name, command_file) + self.assertEqual(result.returncode, 0) + self.assertIn("Creating Test Files", result.stdout) + + def test_data_processing(self): + """Test data processing commands""" + test_name = "data_processing" + command_file = TEST_DIR / f"{test_name}.cmd" + result = self.run_test_with_reporting(test_name, command_file) + self.assertEqual(result.returncode, 0) + self.assertIn("Generating Sample Data", result.stdout) + + def test_multi_step_process(self): + """Test multi-step process commands""" + test_name = "multi_step" + command_file = TEST_DIR / f"{test_name}.cmd" + result = self.run_test_with_reporting(test_name, command_file) + self.assertEqual(result.returncode, 0) + self.assertIn("Multi-step process completed", result.stdout) + + +if __name__ == "__main__": + # Run tests + unittest.main() diff --git a/python/commandLine/worker/.dockerignore b/python/commandLine/worker/.dockerignore new file mode 100644 index 00000000..1d17dae1 --- /dev/null +++ b/python/commandLine/worker/.dockerignore @@ -0,0 +1 @@ +.venv diff --git a/python/commandLine/worker/Dockerfile b/python/commandLine/worker/Dockerfile new file mode 100644 index 00000000..804afe18 --- /dev/null +++ b/python/commandLine/worker/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.10-slim AS builder +WORKDIR /app +RUN python -m venv .venv && .venv/bin/pip install --no-cache-dir -U pip setuptools +COPY requirements.txt ./ +RUN .venv/bin/pip install --no-cache-dir -r requirements.txt +COPY . /app + +FROM python:3.10-slim +WORKDIR /app +RUN groupadd --gid 5000 armonikuser \ + && useradd --home-dir /home/armonikuser --create-home --uid 5000 --gid 5000 --shell /bin/sh armonikuser \ + && mkdir /cache && chown armonikuser: /cache +USER armonikuser +ENV PATH="/app/.venv/bin:$PATH" \ + PYTHONUNBUFFERED=1 +COPY --from=builder /app /app +ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/python/commandLine/worker/main.py b/python/commandLine/worker/main.py new file mode 100644 index 00000000..091cc5c3 --- /dev/null +++ b/python/commandLine/worker/main.py @@ -0,0 +1,116 @@ +import logging +import os +import subprocess +import sys +from pathlib import Path + +import grpc +from armonik.common import Output +from armonik.worker import ArmoniKWorker, ClefLogger, TaskHandler + +# Add the common directory to the system path +common_path = Path(__file__).resolve().parent.parent / "common" +sys.path.append(str(common_path)) + + +ClefLogger.setup_logging(logging.INFO) + + +# Task processing +def processor(task_handler: TaskHandler) -> Output: + """ + Processes a task by executing a command script from the payload and returning only stdout. + + Args: + task_handler: The handler for the current task. + + Returns: + Output: The result of the task processing. + """ + logger = ClefLogger.getLogger("ArmoniKWorker") + logger.info("Handling the Task") + + # Get the payload and decode it to string + payload = task_handler.payload + logger.info(f"Received payload: {payload}") + + try: + # Decode the payload to get the complete script + command_script = payload.decode("utf-8") + logger.info(f"Executing script: {command_script}") + + try: + # Execute the entire script as a single command and capture the output + result = subprocess.run( + command_script, + shell=True, + capture_output=True, + text=True, + timeout=300, + check=False, + ) + + # Only use the stdout output + final_output = result.stdout + + # Log information for debugging but don't include in the output + logger.info(f"Script return code: {result.returncode}") + if result.stderr: + logger.info(f"Script stderr: {result.stderr}") + + except subprocess.TimeoutExpired: + logger.error("Script execution timed out") + final_output = "" + + except Exception as e: + logger.error(f"Error executing script: {str(e)}") + final_output = "" + + # Send only the stdout output back + task_handler.send_results( + {task_handler.expected_results[0]: final_output.encode()} + ) + return Output() + + except Exception as e: + logger.error(f"Error processing payload: {str(e)}") + task_handler.send_results({task_handler.expected_results[0]: b""}) + return Output() + + +def main(): + # Create Seq compatible logger + logger = ClefLogger.getLogger("ArmoniKWorker") + # Define agent-worker communication endpoints + worker_scheme = ( + "unix://" + if os.getenv("ComputePlane__WorkerChannel__SocketType", "unixdomainsocket") + == "unixdomainsocket" + else "http://" + ) + agent_scheme = ( + "unix://" + if os.getenv("ComputePlane__AgentChannel__SocketType", "unixdomainsocket") + == "unixdomainsocket" + else "http://" + ) + worker_endpoint = worker_scheme + os.getenv( + "ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock" + ) + agent_endpoint = agent_scheme + os.getenv( + "ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock" + ) + + # Start worker + logger.info("Worker Started") + # Use options to fix Unix socket connection on localhost (cf: ) + with grpc.insecure_channel( + agent_endpoint, options=(("grpc.default_authority", "localhost"),) + ) as agent_channel: + worker = ArmoniKWorker(agent_channel, processor, logger=logger) + logger.info("Worker Connected") + worker.start(worker_endpoint) + + +if __name__ == "__main__": + main() diff --git a/python/commandLine/worker/requirements.txt b/python/commandLine/worker/requirements.txt new file mode 100644 index 00000000..bf272cf7 --- /dev/null +++ b/python/commandLine/worker/requirements.txt @@ -0,0 +1 @@ +armonik \ No newline at end of file