diff --git a/Roost-README.md b/Roost-README.md new file mode 100644 index 000000000..dd5bcc8d8 --- /dev/null +++ b/Roost-README.md @@ -0,0 +1,25 @@ + +# RoostGPT generated pytest code for API Testing + +RoostGPT generats code in `tests` folder within given project path. +Dependency file i.e. `requirements-roost.txt` is also created in the given project path + +Below are the sample steps to run the generated tests. Sample commands contains use of package manager i.e. `uv`. Alternatively python and pip can be used directly. +1. ( Optional ) Create virtual Env . +2. Install dependencies +``` +uv venv // Create virtual Env +uv pip install -r requirements-roost.txt // Install all dependencies + +``` + +Test configurations and test_data is loaded from config.yml. e.g. API HOST, auth, common path parameters of endpoint. +Either set defalt value in this config.yml file OR use ENV. e.g. export API_HOST="https://example.com/api/v2" + +Once configuration values are set, use below commands to run the tests. +``` +// Run generated tests +uv run pytest -m smoke // Run only smoke tests +uv run pytest -s tests/generated-test.py // Run specific test file +``` + \ No newline at end of file diff --git a/requirements-roost.txt b/requirements-roost.txt new file mode 100644 index 000000000..de7bd70f6 --- /dev/null +++ b/requirements-roost.txt @@ -0,0 +1,62 @@ + +agentocr +beautifulsoup4 +boto3 +botocore +chromedriver_binary +click +django_recaptcha +dlib +dnspython +emoji +exifread +ffmpeg +ffpyplayer +Flask +flask_sqlalchemy +geopy +googletrans +gtts +img2pdf +jsonschema +keras +lxml +matplotlib +nltk +numpy +pandas +pil +Pillow +psutil +pyautogui +pycryptodome +pycryptodomex +PyDictionary +pygame +pymysql +pynotifier +PyPDF2 +pytest +python-telegram-bot +pyttsx3 +pywhatkit +PyYAML +qrcode +referencing +Requests +rich +scikit_learn +selenium +sumeval +sumy +tensorflow +textblob +tqdm +tweepy +urllib3 +webdriver_manager +wechaty +wechaty_puppet +wikipedia +wordcloud +xmltodict \ No newline at end of file diff --git a/tests/TEST/api.json b/tests/TEST/api.json new file mode 100644 index 000000000..160d6d49b --- /dev/null +++ b/tests/TEST/api.json @@ -0,0 +1,84 @@ +{ + "info": { + "title": "test", + "version": "1.0.0" + }, + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "swagger": "2.0", + "securityDefinitions": { + "keystone": { + "description": "OpenStack Keystone Authentication", + "type": "apiKey", + "in": "header", + "name": "x-auth-token" + } + }, + "security": [], + "paths": { + "/api/v1/clusters/": { + "get": { + "operationId": "ListClusters", + "summary": "List available clusters", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/Cluster" + } + } + } + }, + "security": [ + { + "keystone": [] + } + ] + }, + "post": { + "operationId": "CreateCluster", + "summary": "Create a cluster", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/Cluster" + } + } + }, + "parameters": [ + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/Cluster" + } + } + ], + "security": [ + { + "keystone": [] + } + ] + } + } + }, + "definitions": { + "Cluster": { + "type": "object", + "properties": { + "name": { + "description": "name of the cluster", + "type": "string" + } + } + } + } +} \ No newline at end of file diff --git a/tests/TEST/api_v1_clusters.json b/tests/TEST/api_v1_clusters.json new file mode 100644 index 000000000..f4757ecb0 --- /dev/null +++ b/tests/TEST/api_v1_clusters.json @@ -0,0 +1,47 @@ +[ + { + "body": {}, + "statusCode": 200, + "scenario": "Successful responses: OK" + }, + { + "body": { + "id": 12345, + "name": "Test Resource", + "status": "active" + }, + "statusCode": 200, + "scenario": "Successful responses: OK" + }, + { + "body": { + "data": [ + "item1", + "item2", + "item3" + ], + "count": 3 + }, + "statusCode": 200, + "scenario": "Successful responses: OK" + }, + { + "body": { + "message": "Operation completed successfully", + "timestamp": "2024-01-15T10:30:00Z" + }, + "statusCode": 200, + "scenario": "Successful responses: OK" + }, + { + "body": { + "result": true, + "details": { + "processed": 100, + "failed": 0 + } + }, + "statusCode": 200, + "scenario": "Successful responses: OK" + } +] \ No newline at end of file diff --git a/tests/TEST/config.yml b/tests/TEST/config.yml new file mode 100644 index 000000000..a8c4e84e6 --- /dev/null +++ b/tests/TEST/config.yml @@ -0,0 +1,19 @@ + +# This config.yml contains user provided data for api testing. Allows to define values here or use ENV to load values. e.g. ENV[API_HOST] = "https://exampl2.com" +# api: +# host: "${API_HOST:-https://example.com/api/v2}" # includes base path +# auth: +# api_key: "${API_KEY:-}" +# api_key_header: "${KEYNAME:-DefaultValue}" # openapi.spec.security.KEY_NAME +# basic_auth: "${username:-}:${password:-}" +# test_data: +# id: "${TEST_ID:-282739-1238371-219393-2833}" # Any test data key value pair e.g. GET /api/v1/cart/:id +# context-id: "${TEST_context-id:-}" # GET /api/v1/{context-id}/summary + + + +api: + host: "${API_HOST:-ApiURL1}" +auth: + keystone: "${x-auth-token:-}" +test_data: {} diff --git a/tests/TEST/conftest.py b/tests/TEST/conftest.py new file mode 100644 index 000000000..584c991ef --- /dev/null +++ b/tests/TEST/conftest.py @@ -0,0 +1,173 @@ +import os +import re +import pytest +import requests +import yaml +from pathlib import Path +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + + +def _expand_env_in_string(value): + """Expand environment variables in string with format ${VAR:-default}""" + if not isinstance(value, str): + return value + + pattern = r'\$\{([^}:]+)(?::-([^}]*))?\}' + + def replace_env(match): + env_var = match.group(1) + default = match.group(2) if match.group(2) is not None else '' + return os.environ.get(env_var, default) + + return re.sub(pattern, replace_env, value) + + +def _expand_env_in_dict(data): + """Recursively expand environment variables in dictionary""" + if isinstance(data, dict): + return {key: _expand_env_in_dict(value) for key, value in data.items()} + elif isinstance(data, list): + return [_expand_env_in_dict(item) for item in data] + elif isinstance(data, str): + return _expand_env_in_string(data) + return data + + +class APIClient: + """Simple API client for making HTTP requests""" + + def __init__(self, host, auth=None, timeout=30, retries=3): + self.host = host.strip() if host else '' + self.auth = auth or {} + self.timeout = timeout + self.session = requests.Session() + + retry_strategy = Retry( + total=retries, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE", "PATCH"] + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + def _build_url(self, endpoint): + """Build full URL from host and endpoint""" + host = self.host.rstrip('/') + endpoint = endpoint.lstrip('/') + return f"{host}/{endpoint}" + + def _get_default_headers(self): + """Get default headers including auth""" + headers = {} + if self.auth.get('keystone'): + headers['x-auth-token'] = self.auth['keystone'] + return headers + + def make_request(self, endpoint, params=None, headers=None, method='GET', json=None, data=None): + """Make HTTP request to the API""" + url = self._build_url(endpoint) + request_headers = self._get_default_headers() + if headers: + request_headers.update(headers) + + response = self.session.request( + method=method.upper(), + url=url, + params=params, + headers=request_headers, + json=json, + data=data, + timeout=self.timeout + ) + return response + + def get(self, endpoint, headers=None, params=None): + """Make GET request""" + return self.make_request(endpoint, params=params, headers=headers, method='GET') + + def post(self, endpoint, headers=None, params=None, json=None, data=None): + """Make POST request""" + return self.make_request(endpoint, params=params, headers=headers, method='POST', json=json, data=data) + + def put(self, endpoint, headers=None, params=None, json=None, data=None): + """Make PUT request""" + return self.make_request(endpoint, params=params, headers=headers, method='PUT', json=json, data=data) + + def delete(self, endpoint, headers=None, params=None): + """Make DELETE request""" + return self.make_request(endpoint, params=params, headers=headers, method='DELETE') + + def patch(self, endpoint, headers=None, params=None, json=None, data=None): + """Make PATCH request""" + return self.make_request(endpoint, params=params, headers=headers, method='PATCH', json=json, data=data) + + +def pytest_configure(config): + """Register custom markers""" + config.addinivalue_line( + "markers", "smoke: For all success scenarios" + ) + + +@pytest.fixture(scope="session") +def config(): + """Load configuration from config.yml file""" + config_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(config_dir, 'config.yml') + + if not os.path.exists(config_path): + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + with open(config_path, 'r') as f: + raw_config = yaml.safe_load(f) + + if raw_config is None: + raw_config = {} + + expanded_config = _expand_env_in_dict(raw_config) + return expanded_config + + +@pytest.fixture(scope="session") +def api_host(config): + """Get API host from configuration""" + api_config = config.get('api', {}) + host = api_config.get('host', '') + return host.strip() if host else '' + + +@pytest.fixture(scope="session") +def auth(config): + """Get authentication configuration""" + return config.get('auth', {}) + + +@pytest.fixture(scope="session") +def keystone_token(auth): + """Get keystone token from auth configuration""" + return auth.get('keystone', '') + + +@pytest.fixture(scope="session") +def config_test_data(config): + """Load test_data from config fixture""" + return config.get('test_data', {}) + + +@pytest.fixture(scope="session") +def api_client(config): + """Create API client instance using configuration""" + api_config = config.get('api', {}) + auth_config = config.get('auth', {}) + + host = api_config.get('host', '') + + return APIClient( + host=host, + auth=auth_config, + timeout=30, + retries=3 + ) diff --git a/tests/TEST/test_api_v1_clusters_get.py b/tests/TEST/test_api_v1_clusters_get.py new file mode 100644 index 000000000..05266a543 --- /dev/null +++ b/tests/TEST/test_api_v1_clusters_get.py @@ -0,0 +1,353 @@ +# ********RoostGPT******** + +# Test generated by RoostGPT for test API-test-yaml using AI Type Claude AI and AI Model claude-opus-4-5-20251101 +# +# Test file generated for /api/v1/clusters/_get for http method type GET +# RoostTestHash=30784b487f +# +# + +# ********RoostGPT******** +""" +API Test Suite for /api/v1/clusters/ endpoint + +This module contains pytest tests for the ListClusters API endpoint. +Tests cover successful responses, schema validation, and edge cases. + +Identified Endpoints: +- GET /api/v1/clusters/ - List available clusters + +Security Schema: +- keystone: Token-based authentication via x-auth-token header + +Response Status Codes: +- 200: OK - Returns array of cluster objects + +Setup: +1. Ensure config.yml is properly configured with API host and auth token +2. Set environment variables API_HOST and x-auth-token if needed +3. Run tests with: pytest test_api_v1_clusters.py -v + +To run smoke tests only: pytest test_api_v1_clusters.py -v -m smoke +""" + +import json +import os +import pytest +from pathlib import Path + +# Import validator +from validator import SwaggerSchemaValidator + + +# Constants +ENDPOINT = "/api/v1/clusters/" +HTTP_METHOD = "GET" +API_SPEC_PATH = "api.json" +TEST_DATA_FILE = "api_v1_clusters.json" + + +@pytest.fixture(scope="module") +def endpoint_test_data(): + """Load test data from JSON file for table-driven tests""" + test_data_path = Path(__file__).parent / TEST_DATA_FILE + if not test_data_path.exists(): + pytest.skip(f"Test data file not found: {TEST_DATA_FILE}") + + with open(test_data_path, 'r') as f: + return json.load(f) + + +@pytest.fixture(scope="module") +def schema_validator(): + """Initialize SwaggerSchemaValidator with API spec""" + spec_path = Path(__file__).parent / API_SPEC_PATH + if not spec_path.exists(): + pytest.skip(f"API spec file not found: {API_SPEC_PATH}") + + return SwaggerSchemaValidator(str(spec_path)) + + +class TestListClustersSuccess: + """Test class for successful ListClusters API responses""" + + @pytest.mark.smoke + def test_list_clusters_success(self, api_client, schema_validator): + """ + Test successful listing of clusters with valid authentication. + + This is a happy path test that validates: + - API returns 200 status code + - Response schema matches OpenAPI specification + """ + response = api_client.get(ENDPOINT) + + assert response.status_code == 200, ( + f"Expected status code 200, got {response.status_code}. " + f"Response: {response.text}" + ) + + # Validate response schema + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, HTTP_METHOD, "200", response + ) + assert validation_result["valid"], ( + f"Schema validation failed: {validation_result.get('message', 'Unknown error')}" + ) + + def test_list_clusters_response_is_array(self, api_client): + """Test that the response is an array as per specification""" + response = api_client.get(ENDPOINT) + + assert response.status_code == 200 + + response_data = response.json() + assert isinstance(response_data, list), ( + f"Expected response to be an array, got {type(response_data).__name__}" + ) + + def test_list_clusters_array_items_structure(self, api_client): + """Test that array items have expected structure with name property""" + response = api_client.get(ENDPOINT) + + assert response.status_code == 200 + + response_data = response.json() + + # If array is not empty, validate item structure + for idx, item in enumerate(response_data): + assert isinstance(item, dict), ( + f"Item at index {idx} should be an object, got {type(item).__name__}" + ) + # name is optional per spec, but if present should be string + if "name" in item: + assert isinstance(item["name"], str), ( + f"Item at index {idx}: 'name' should be string, " + f"got {type(item['name']).__name__}" + ) + + +class TestListClustersTableDriven: + """Table-driven tests using data from JSON file""" + + def test_list_clusters_scenarios(self, api_client, endpoint_test_data, schema_validator): + """ + Table-driven test that iterates through all test scenarios from JSON file. + + Each scenario in the JSON file contains: + - body: Request body (not used for GET requests) + - statusCode: Expected status code + - scenario: Description of the test scenario + """ + for test_case in endpoint_test_data: + scenario = test_case.get("scenario", "Unknown scenario") + expected_status = test_case.get("statusCode") + + # Make the API request + response = api_client.get(ENDPOINT) + + # Validate status code matches expected + assert response.status_code == expected_status, ( + f"Scenario: {scenario} - " + f"Expected status code {expected_status}, got {response.status_code}. " + f"Response: {response.text}" + ) + + # Validate response schema for successful responses + if expected_status == 200: + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, HTTP_METHOD, str(expected_status), response + ) + assert validation_result["valid"], ( + f"Scenario: {scenario} - " + f"Schema validation failed: {validation_result.get('message', 'Unknown error')}" + ) + + +class TestListClustersResponseValidation: + """Tests for response validation and edge cases""" + + def test_response_content_type(self, api_client): + """Test that response has appropriate content type""" + response = api_client.get(ENDPOINT) + + assert response.status_code == 200 + + content_type = response.headers.get("Content-Type", "") + assert "application/json" in content_type.lower() or "json" in content_type.lower(), ( + f"Expected JSON content type, got: {content_type}" + ) + + def test_empty_array_response_valid(self, api_client, schema_validator): + """Test that an empty array response is valid per schema""" + response = api_client.get(ENDPOINT) + + assert response.status_code == 200 + + # Empty array should be valid per the schema (array with no items) + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, HTTP_METHOD, "200", response + ) + assert validation_result["valid"], ( + f"Schema validation failed for response: {validation_result.get('message')}" + ) + + def test_response_headers_present(self, api_client): + """Test that response contains expected headers""" + response = api_client.get(ENDPOINT) + + assert response.status_code == 200 + assert "Content-Type" in response.headers, "Content-Type header should be present" + + +class TestListClustersMissingAuth: + """Tests for missing authentication scenarios""" + + def test_missing_auth_token(self, api_host): + """ + Test API behavior when authentication token is missing. + + Note: The actual response code depends on API implementation. + This test documents the behavior without the auth token. + """ + import requests + + url = f"{api_host.rstrip('/')}/{ENDPOINT.lstrip('/')}" + + # Make request without auth token + response = requests.get(url, timeout=30) + + # API should return an error status (typically 401 or 403) + # We don't assert specific code as it depends on implementation + # but we document the behavior + assert response.status_code in [200, 401, 403], ( + f"Unexpected status code {response.status_code} for missing auth. " + f"Response: {response.text}" + ) + + def test_invalid_auth_token(self, api_host): + """ + Test API behavior with invalid authentication token. + + Uses a clearly invalid token to test authentication handling. + """ + import requests + + url = f"{api_host.rstrip('/')}/{ENDPOINT.lstrip('/')}" + headers = {"x-auth-token": "invalid-token-12345"} + + response = requests.get(url, headers=headers, timeout=30) + + # API should reject invalid token (typically 401 or 403) + assert response.status_code in [200, 401, 403], ( + f"Unexpected status code {response.status_code} for invalid auth. " + f"Response: {response.text}" + ) + + def test_empty_auth_token(self, api_host): + """ + Test API behavior with empty authentication token. + """ + import requests + + url = f"{api_host.rstrip('/')}/{ENDPOINT.lstrip('/')}" + headers = {"x-auth-token": ""} + + response = requests.get(url, headers=headers, timeout=30) + + # API should handle empty token appropriately + assert response.status_code in [200, 401, 403], ( + f"Unexpected status code {response.status_code} for empty auth. " + f"Response: {response.text}" + ) + + +class TestListClustersEdgeCases: + """Edge case and boundary tests""" + + def test_trailing_slash_endpoint(self, api_client, schema_validator): + """Test endpoint with trailing slash""" + response = api_client.get("/api/v1/clusters/") + + assert response.status_code == 200 + + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, HTTP_METHOD, "200", response + ) + assert validation_result["valid"] + + def test_no_trailing_slash_endpoint(self, api_client): + """Test endpoint without trailing slash""" + response = api_client.get("/api/v1/clusters") + + # API may redirect or handle both formats + assert response.status_code in [200, 301, 302, 307, 308], ( + f"Unexpected status code {response.status_code} for endpoint without trailing slash" + ) + + def test_case_sensitivity(self, api_host, keystone_token): + """Test that endpoint path is case-sensitive""" + import requests + + url = f"{api_host.rstrip('/')}/api/v1/CLUSTERS/" + headers = {"x-auth-token": keystone_token} if keystone_token else {} + + response = requests.get(url, headers=headers, timeout=30) + + # Most REST APIs are case-sensitive, expect 404 for wrong case + # But some may be case-insensitive + assert response.status_code in [200, 404], ( + f"Unexpected status code {response.status_code} for case variation" + ) + + def test_extra_query_parameters_ignored(self, api_client, schema_validator): + """Test that extra query parameters are handled gracefully""" + response = api_client.get(ENDPOINT, params={"unknown_param": "value"}) + + # API should either ignore unknown params or return appropriate error + assert response.status_code in [200, 400], ( + f"Unexpected status code {response.status_code} for extra query params" + ) + + if response.status_code == 200: + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, HTTP_METHOD, "200", response + ) + assert validation_result["valid"] + + +class TestListClustersMethodValidation: + """Tests for HTTP method validation""" + + def test_post_method_not_allowed(self, api_client): + """Test that POST method returns appropriate error""" + response = api_client.post(ENDPOINT, json={}) + + # POST should not be allowed on this endpoint + assert response.status_code in [405, 404, 400, 401], ( + f"Expected method not allowed error, got {response.status_code}" + ) + + def test_put_method_not_allowed(self, api_client): + """Test that PUT method returns appropriate error""" + response = api_client.put(ENDPOINT, json={}) + + assert response.status_code in [405, 404, 400, 401], ( + f"Expected method not allowed error, got {response.status_code}" + ) + + def test_delete_method_not_allowed(self, api_client): + """Test that DELETE method returns appropriate error""" + response = api_client.delete(ENDPOINT) + + assert response.status_code in [405, 404, 400, 401], ( + f"Expected method not allowed error, got {response.status_code}" + ) + + def test_patch_method_not_allowed(self, api_client): + """Test that PATCH method returns appropriate error""" + response = api_client.patch(ENDPOINT, json={}) + + assert response.status_code in [405, 404, 400, 401], ( + f"Expected method not allowed error, got {response.status_code}" + ) diff --git a/tests/TEST/test_api_v1_clusters_post.py b/tests/TEST/test_api_v1_clusters_post.py new file mode 100644 index 000000000..bf6bf5359 --- /dev/null +++ b/tests/TEST/test_api_v1_clusters_post.py @@ -0,0 +1,404 @@ +# ********RoostGPT******** + +# Test generated by RoostGPT for test API-test-yaml using AI Type Claude AI and AI Model claude-opus-4-5-20251101 +# +# Test file generated for /api/v1/clusters/_post for http method type POST +# RoostTestHash=c655209eb9 +# +# + +# ********RoostGPT******** +""" +Test Suite for POST /api/v1/clusters/ - Create Cluster API + +This module contains comprehensive pytest tests for the CreateCluster endpoint. +Tests cover successful cluster creation, validation errors, and edge cases. + +Setup: + 1. Ensure config.yml is properly configured with API host and auth token + 2. Set environment variables: API_HOST, x-auth-token + 3. Run tests: pytest test_api_v1_clusters.py -v + +Endpoints Tested: + - POST /api/v1/clusters/ - Create a cluster + +Security Schema: + - keystone: Token-based authentication via x-auth-token header +""" + +import json +import os +from pathlib import Path + +import pytest + +from validator import SwaggerSchemaValidator + + +# Load test data from JSON file +def load_endpoint_test_data(): + """Load test data from api_v1_clusters.json file""" + test_file = Path(__file__).parent / "api_v1_clusters.json" + if test_file.exists(): + with open(test_file, "r") as f: + return json.load(f) + return [] + + +ENDPOINT_TEST_DATA = load_endpoint_test_data() +ENDPOINT = "/api/v1/clusters/" +METHOD = "post" + + +@pytest.fixture(scope="module") +def schema_validator(): + """Initialize SwaggerSchemaValidator with API spec""" + spec_path = Path(__file__).parent / "api.json" + return SwaggerSchemaValidator(str(spec_path)) + + +@pytest.fixture(scope="function") +def merged_test_data(config_test_data): + """ + Fixture to provide merged test data. + Combines config_test_data with scenario-specific data. + """ + def _merge(scenario_data): + """Merge config test data with scenario-specific data""" + merged = dict(config_test_data) + if scenario_data: + merged.update(scenario_data) + return merged + return _merge + + +class TestCreateClusterSuccess: + """Test cases for successful cluster creation scenarios""" + + @pytest.mark.smoke + def test_create_cluster_with_required_fields( + self, api_client, config_test_data, schema_validator + ): + """ + Test successful cluster creation with only required fields. + + This is a happy path test that validates the API accepts + a valid request body and returns 200 OK. + """ + # Build request body from config test data (required fields only) + request_body = {} + if config_test_data.get("name"): + request_body["name"] = config_test_data.get("name") + + # Make API request + response = api_client.post(ENDPOINT, json=request_body) + + # Assert status code + assert response.status_code == 200, ( + f"Expected 200, got {response.status_code}. Response: {response.text}" + ) + + # Validate response schema + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, METHOD, "200", response + ) + assert validation_result["valid"], ( + f"Response schema validation failed: {validation_result.get('message')}" + ) + + @pytest.mark.smoke + @pytest.mark.parametrize( + "test_case", + [tc for tc in ENDPOINT_TEST_DATA if tc.get("statusCode") == 200], + ids=lambda tc: tc.get("scenario", "unknown_scenario") + ) + def test_create_cluster_success_scenarios( + self, api_client, config_test_data, schema_validator, test_case, merged_test_data + ): + """ + Test successful cluster creation with various valid payloads. + + Uses table-driven testing from api_v1_clusters.json for 200 status scenarios. + """ + # Merge config test data with scenario-specific body + scenario_body = test_case.get("body", {}) + merged_data = merged_test_data(scenario_body) + + # Build request body - use merged data + request_body = {} + if merged_data.get("name"): + request_body["name"] = merged_data.get("name") + elif scenario_body.get("name"): + request_body["name"] = scenario_body.get("name") + + # Make API request + response = api_client.post(ENDPOINT, json=request_body) + + expected_status = test_case.get("statusCode") + assert response.status_code == expected_status, ( + f"Scenario: {test_case.get('scenario')}. " + f"Expected {expected_status}, got {response.status_code}. " + f"Response: {response.text}" + ) + + # Validate response schema for success responses + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, METHOD, str(expected_status), response + ) + assert validation_result["valid"], ( + f"Response schema validation failed for scenario '{test_case.get('scenario')}': " + f"{validation_result.get('message')}" + ) + + +class TestCreateClusterTableDriven: + """Table-driven tests using data from api_v1_clusters.json""" + + @pytest.mark.parametrize( + "test_case", + ENDPOINT_TEST_DATA, + ids=lambda tc: f"{tc.get('scenario', 'unknown')}_{tc.get('statusCode', 'unknown')}" + ) + def test_create_cluster_all_scenarios( + self, api_client, config_test_data, schema_validator, test_case, merged_test_data + ): + """ + Test cluster creation with all scenarios from test data file. + + Iterates through all test cases in api_v1_clusters.json and validates + each scenario against expected status code and response schema. + """ + scenario_body = test_case.get("body", {}) + merged_data = merged_test_data(scenario_body) + + # Build request body from merged data + request_body = {} + if merged_data.get("name"): + request_body["name"] = merged_data.get("name") + elif scenario_body.get("name"): + request_body["name"] = scenario_body.get("name") + # Include other fields from scenario body + for key, value in scenario_body.items(): + if key not in request_body: + request_body[key] = value + + # Make API request + response = api_client.post(ENDPOINT, json=request_body) + + expected_status = test_case.get("statusCode") + assert response.status_code == expected_status, ( + f"Scenario: {test_case.get('scenario')}. " + f"Expected {expected_status}, got {response.status_code}. " + f"Response: {response.text}" + ) + + # Validate response schema + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, METHOD, str(expected_status), response + ) + assert validation_result["valid"], ( + f"Response schema validation failed for scenario '{test_case.get('scenario')}': " + f"{validation_result.get('message')}" + ) + + +class TestCreateClusterValidation: + """Test cases for request validation and edge cases""" + + def test_create_cluster_empty_body(self, api_client, schema_validator): + """ + Test cluster creation with empty request body. + + The API spec defines body as required, so empty body should + still be accepted as the schema allows empty object. + """ + request_body = {} + + response = api_client.post(ENDPOINT, json=request_body) + + # Empty body may be valid since 'name' is not marked as required in schema + # The API should return 200 or a validation error + assert response.status_code in [200, 400], ( + f"Expected 200 or 400 for empty body, got {response.status_code}. " + f"Response: {response.text}" + ) + + if response.status_code == 200: + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, METHOD, "200", response + ) + assert validation_result["valid"], ( + f"Response schema validation failed: {validation_result.get('message')}" + ) + + def test_create_cluster_with_null_name(self, api_client, schema_validator): + """ + Test cluster creation with null name value. + + Tests edge case where name field is explicitly set to null. + """ + request_body = {"name": None} + + response = api_client.post(ENDPOINT, json=request_body) + + # Null value may be rejected or accepted depending on API implementation + assert response.status_code in [200, 400], ( + f"Expected 200 or 400 for null name, got {response.status_code}. " + f"Response: {response.text}" + ) + + def test_create_cluster_with_empty_string_name(self, api_client, schema_validator): + """ + Test cluster creation with empty string name. + + Tests boundary condition for name field. + """ + request_body = {"name": ""} + + response = api_client.post(ENDPOINT, json=request_body) + + # Empty string may be valid or invalid depending on API validation + assert response.status_code in [200, 400], ( + f"Expected 200 or 400 for empty string name, got {response.status_code}. " + f"Response: {response.text}" + ) + + def test_create_cluster_with_special_characters_name( + self, api_client, config_test_data, schema_validator + ): + """ + Test cluster creation with special characters in name. + + Tests edge case for name field with special characters. + """ + request_body = {"name": "test-cluster_123!@#"} + + response = api_client.post(ENDPOINT, json=request_body) + + # Special characters may or may not be allowed + assert response.status_code in [200, 400], ( + f"Expected 200 or 400, got {response.status_code}. " + f"Response: {response.text}" + ) + + def test_create_cluster_with_very_long_name(self, api_client, schema_validator): + """ + Test cluster creation with very long name string. + + Tests boundary condition for maximum name length. + """ + # Generate a very long name (1000 characters) + long_name = "a" * 1000 + request_body = {"name": long_name} + + response = api_client.post(ENDPOINT, json=request_body) + + # Long name may be truncated, rejected, or accepted + assert response.status_code in [200, 400, 413], ( + f"Expected 200, 400, or 413 for long name, got {response.status_code}. " + f"Response: {response.text}" + ) + + def test_create_cluster_with_unicode_name(self, api_client, schema_validator): + """ + Test cluster creation with unicode characters in name. + + Tests internationalization support for name field. + """ + request_body = {"name": "集群测试-クラスター-кластер"} + + response = api_client.post(ENDPOINT, json=request_body) + + # Unicode may or may not be supported + assert response.status_code in [200, 400], ( + f"Expected 200 or 400 for unicode name, got {response.status_code}. " + f"Response: {response.text}" + ) + + def test_create_cluster_with_extra_fields( + self, api_client, config_test_data, schema_validator + ): + """ + Test cluster creation with additional unexpected fields. + + Tests API behavior when extra fields are included in request. + """ + request_body = { + "name": config_test_data.get("name", "test-cluster"), + "unexpected_field": "unexpected_value", + "another_extra": 12345 + } + + response = api_client.post(ENDPOINT, json=request_body) + + # Extra fields may be ignored or cause an error + assert response.status_code in [200, 400], ( + f"Expected 200 or 400 for extra fields, got {response.status_code}. " + f"Response: {response.text}" + ) + + def test_create_cluster_with_wrong_type_name(self, api_client, schema_validator): + """ + Test cluster creation with wrong type for name field. + + Tests type validation - name should be string but sending integer. + """ + request_body = {"name": 12345} + + response = api_client.post(ENDPOINT, json=request_body) + + # Wrong type should be rejected + assert response.status_code in [200, 400], ( + f"Expected 200 or 400 for wrong type, got {response.status_code}. " + f"Response: {response.text}" + ) + + +class TestCreateClusterResponseValidation: + """Test cases focusing on response validation""" + + def test_response_contains_name_field( + self, api_client, config_test_data, schema_validator + ): + """ + Test that successful response contains name field as per schema. + + Validates response structure matches OpenAPI specification. + """ + request_body = {} + if config_test_data.get("name"): + request_body["name"] = config_test_data.get("name") + + response = api_client.post(ENDPOINT, json=request_body) + + if response.status_code == 200: + # Validate response schema + validation_result = schema_validator.validate_schema_by_response( + ENDPOINT, METHOD, "200", response + ) + assert validation_result["valid"], ( + f"Response schema validation failed: {validation_result.get('message')}" + ) + + # Check response is valid JSON + try: + response_json = response.json() + assert isinstance(response_json, dict), "Response should be an object" + except json.JSONDecodeError: + pytest.fail("Response is not valid JSON") + + def test_response_content_type(self, api_client, config_test_data): + """ + Test that response has correct content type header. + """ + request_body = {} + if config_test_data.get("name"): + request_body["name"] = config_test_data.get("name") + + response = api_client.post(ENDPOINT, json=request_body) + + if response.status_code == 200: + content_type = response.headers.get("Content-Type", "") + assert "application/json" in content_type or response.text == "", ( + f"Expected JSON content type, got: {content_type}" + ) diff --git a/tests/TEST/validator.py b/tests/TEST/validator.py new file mode 100644 index 000000000..2f965738e --- /dev/null +++ b/tests/TEST/validator.py @@ -0,0 +1,202 @@ + +import json +import yaml +from jsonschema import ( + Draft202012Validator, + Draft7Validator, + Draft4Validator, + ValidationError, +) +from referencing import Registry, Resource +from typing import Dict, Any +import requests + + +class SwaggerSchemaValidator: + """ + Validates JSON, XML, and text responses + """ + + def __init__(self, swagger_source: str): + self.spec = self._load_spec(swagger_source) + self.is_swagger2 = False + self.schemas = self._extract_schemas() + self.registry = Registry() + + for name, schema in self.schemas.items(): + pointer = ( + f"#/definitions/{name}" if self.is_swagger2 + else f"#/components/schemas/{name}" + ) + + wrapped = { + "$schema": "https://json-schema.org/draft/2020-12/schema", + **schema, + } + self.registry = self.registry.with_resource( + pointer, + Resource.from_contents(wrapped) + ) + + def _load_spec(self, source: str) -> Dict[str, Any]: + if source.startswith(("http://", "https://")): + resp = requests.get(source) + resp.raise_for_status() + text = resp.text + + try: + return yaml.safe_load(text) + except yaml.YAMLError: + try: + return json.loads(text) + except json.JSONDecodeError: + raise ValueError("URL does not contain valid YAML or JSON") + + with open(source, "r") as f: + text = f.read() + + if source.endswith((".yaml", ".yml")): + return yaml.safe_load(text) + if source.endswith(".json"): + return json.loads(text) + + raise ValueError("File must be YAML or JSON") + + def _extract_schemas(self): + if "components" in self.spec and "schemas" in self.spec["components"]: + self.is_swagger2 = False + return self.spec["components"]["schemas"] + + if "definitions" in self.spec: + self.is_swagger2 = True + return self.spec["definitions"] + + raise ValueError("No schemas found under components/schemas or definitions") + + def get_version(self): + return self.spec.get("openapi") or self.spec.get("swagger") or "" + + def select_validator(self): + v = self.get_version() + + if v.startswith("2."): + return Draft4Validator + if v.startswith("3.0"): + return Draft7Validator + if v.startswith("3.1"): + return Draft202012Validator + + return Draft202012Validator + + def resolve_ref(self, ref): + if ref.startswith("#/"): + parts = ref.lstrip("#/").split("/") + node = self.spec + for p in parts: + node = node[p] + return node + + raise ValueError(f"External refs not supported: {ref}") + + def deref(self, schema): + if isinstance(schema, dict): + if "$ref" in schema: + resolved = self.resolve_ref(schema["$ref"]) + return self.deref(resolved) + return {k: self.deref(v) for k, v in schema.items()} + + if isinstance(schema, list): + return [self.deref(v) for v in schema] + + return schema + + def detect_format(self, response): + ctype = response.headers.get("Content-Type", "").lower() + if "json" in ctype: + return "json" + if "xml" in ctype: + return "xml" + if "text" in ctype: + return "text" + return "binary" + + def parse_body(self, response, fmt): + if fmt == "json": + return json.loads(response.text) + + if fmt == "xml": + import xmltodict + return xmltodict.parse(response.text) + + if fmt == "text": + return response.text + + return response.content + + def extract_schema_for_media_type(self, response_block, content_type): + content = response_block.get("content", {}) + + if content_type in content: + return content[content_type].get("schema") + + if "json" in content_type: + for k, v in content.items(): + if k == "application/json" or k.endswith("+json"): + return v.get("schema") + + if "xml" in content_type: + for k, v in content.items(): + if "xml" in k: + return v.get("schema") + + if "text/plain" in content: + return content["text/plain"].get("schema") + + return None + + + def validate_schema_by_response(self, endpoint, method, status_code, response): + fmt = self.detect_format(response) + + paths = self.spec.get("paths", {}) + op = paths.get(endpoint, {}).get(method.lower()) + + if not op: + return {"valid": False, "message": f"Method {method} not found at path {endpoint}"} + + responses = op.get("responses", {}) + response_block = responses.get(status_code) + + if not response_block: + return {"valid": False, "message": f"No response block for {status_code}"} + + ctype = response.headers.get("Content-Type", "").split(";")[0].strip() + + if "content" in response_block: + schema = self.extract_schema_for_media_type(response_block, ctype) + else: + schema = response_block.get("schema") + + if schema is None: + return {"valid": True, "message": "No schema defined for this content type"} + + try: + data = self.parse_body(response, fmt) + except Exception as e: + return {"valid": False, "message": f"Body parsing failed: {e}"} + + schema = self.deref(schema) + + validator_cls = self.select_validator() + validator = validator_cls(schema, registry=self.registry) + + try: + validator.validate(data) + return {"valid": True} + except ValidationError as e: + return { + "valid": False, + "message": e.message, + "path": list(e.path), + "schema_path": list(e.schema_path), + }