diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad509d6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,132 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# .vscode +.vscode/ \ No newline at end of file diff --git a/README.md b/README.md index 4ddc6cf..5567f02 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,210 @@ -# workflows-cdk -Workflows Connector Development Kit +# Workflows CDK + +A powerful CDK (Connector Development Kit) for building Stacksync Workflows Connectors with Python and Flask. + +## Features + +- 🚀 Automatic route discovery and registration file based (like in Next.js!) +- 🔒 Built-in error handling and Sentry integration +- 📦 Standardized request/response handling +- 🛠️ Error management with standardized error handling +- 🔄 Environment-aware configuration + +## Installation + +```bash +pip install workflows-cdk +``` + +## Quick Start + +1. Create a new project directory: + +```bash +mkdir my-workflow-connector +cd my-workflow-connector +``` + +2. Install the required dependencies: + +```bash +pip install workflows-cdk flask pyyaml +``` + +3. Create the basic project structure: + +``` +my-workflow-connector/ +├── main.py +├── app_config.yaml +├── requirements.txt +└── routes/ + └── hello/ + └── v1/ + └── route.py +``` + +4. Set up your `app_config.yaml`: + +```yaml +app_settings: + app_type: "example" + app_name: "My Workflow Connector" + app_description: "A simple workflow connector" + sentry_dsn: "your-sentry-dsn" # Optional + cors_origins: ["*"] + routes_directory: "routes" + debug: true + host: "0.0.0.0" + port: 2005 +``` + +5. Create your `main.py`: + +```python +from flask import Flask +from workflows_cdk import Router + +# Create Flask app +app = Flask("my-workflow-connector") + +# Initialize router with configuration +router = Router(app) + +# Run the app +if __name__ == "__main__": + router.run_app(app) +``` + +6. Create your first route in `routes/send_message/v1/route.py`: + +```python +from workflows_cdk import Request, Response, ManagedError +from main import router + +@router.route("/execute", methods=["POST"]) +def execute(): + """Execute the send message action.""" + request = Request(flask_request) + data = request.data + + name = data.get("name", "World") + return Response.success(data={ + "message": f"Hello, {name}!" + }) +``` + +## Core Components + +### Router + +The `Router` class is the heart of the CDK, providing: + +- Automatic route discovery based on file system structure +- Built-in error handling and Sentry integration +- CORS configuration +- Health check endpoints +- API documentation + +### Request + +The `Request` class wraps Flask's request object, providing: + +- Easy access to request data and credentials +- Automatic JSON parsing +- Type-safe access to common properties + +### Response + +The `Response` class provides standardized response formatting: + +- Success responses with optional metadata +- Error responses with appropriate status codes +- Environment-aware error details +- Sentry integration + +### ManagedError + +The `ManagedError` class provides structured error handling: + +- Type-safe error creation +- Automatic Sentry reporting +- Environment-aware error details +- Common error types (validation, not found, unauthorized, etc.) + +## Project Structure + +Recommended project structure for a workflow connector: + +``` +my-workflow-connector/ +├── main.py # Application entry point +├── app_config.yaml # Application configuration +├── requirements.txt # Python dependencies +├── README.md # Project documentation +├── Dockerfile # Container configuration +├── .env # Environment variables +└── routes/ # Route modules + └── action_name/ # Group routes by action + ├── v1/ # Version 1 of the action + │ ├── route.py # Route implementation + │ └── schema.json # JSON Schema for validation + └── v2/ # Version 2 of the action + ├── route.py + └── schema.json +``` + +## Error Handling + +The CDK provides comprehensive error handling: + +```python +from workflows_cdk import ManagedError + +# Validation error +raise ManagedError.validation_error( + error="Invalid input", + data={"field": "email"} +) + +# Not found error +raise ManagedError.not_found( + resource="User", + identifier="123" +) + +# Authorization error +raise ManagedError.unauthorized( + message="Invalid API key" +) + +# Server error +raise ManagedError.server_error( + error="Database connection failed" +) +``` + +## Response Formatting + +Standardized response formatting: + +```python +from workflows_cdk import Response + +# Success response +return Response.success( + data={"result": "ok"}, + message="Operation completed", + metadata={"timestamp": "2024-02-17"} +) + +# Error response +return Response.error( + error="Something went wrong", + status_code=400 +) +``` + +## License + +This project is licensed under the Stacksync Connector License (SCL) v1.0. diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..51a7c35 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +flask +werkzeug +pyopenssl==24.1.0 +flask-cors>=4.0.0 +python-dotenv>=1.0.0 +gunicorn==22.0.0 +authlib==1.1.0 +sentry-sdk[Flask] +pydantic>=2.0.0 +pyyaml>=6.0.0 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..6a61628 --- /dev/null +++ b/setup.py @@ -0,0 +1,30 @@ +""" +Setup configuration for workflows-cdk package. +""" + +from setuptools import find_packages, setup + + +setup( + name="workflows_cdk", + version="0.1.0", + description="A CDK for developing Stacksync Workflows Connectors", + author="Stacksync", + author_email="oliviero@stacksync.com", + install_requires=[ + # Core dependencies + "flask", + "werkzeug==2.2", + "pyopenssl==24.1.0", + "flask-cors>=4.0.0", + "python-dotenv>=1.0.0", + "gunicorn==22.0.0", + "sentry-sdk[Flask]", + "pydantic>=2.0.0", + "pyyaml>=6.0.0" + ], + python_requires=">=3.10", + packages=find_packages(where="src"), + package_dir={"": "src"}, + include_package_data=True, +) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..101c2a3 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,2 @@ +from .workflows_cdk import * + diff --git a/src/workflows_cdk/__init__.py b/src/workflows_cdk/__init__.py new file mode 100644 index 0000000..3e633e2 --- /dev/null +++ b/src/workflows_cdk/__init__.py @@ -0,0 +1,8 @@ +""" +Workflows CDK - A CDK for developing Stacksync Workflows Connectors +""" + +__author__ = "Stacksync" +__license__ = "Stacksync Connector License (SCL) v1.0" + +from .core import * diff --git a/src/workflows_cdk/core/__init__.py b/src/workflows_cdk/core/__init__.py new file mode 100644 index 0000000..d618631 --- /dev/null +++ b/src/workflows_cdk/core/__init__.py @@ -0,0 +1,37 @@ +""" +Core functionality for Stacksync Workflows CDK. +Provides base classes and utilities for connector development. +""" + +# import logging + +# # Configure logging for the CDK +# logger = logging.getLogger('workflows_cdk') +# logger.setLevel(logging.DEBUG) + +# # Add a stream handler if none exists +# if not logger.handlers: +# handler = logging.StreamHandler() +# handler.setFormatter(logging.Formatter('Workflows CDK | %(levelname)s - %(message)s')) +# logger.addHandler(handler) + +from .errors import ManagedError +from .responses import Response +from .request import Request +from .dynamic_routing import Router +from .validation import validate_and_parse_json, validate_array, validate_object, parse_str_to_json + +# Create the single global router instance + + +__all__ = [ + 'Request', + 'Response', + 'ManagedError', + 'Router', + 'validate_and_parse_json', + 'validate_array', + 'validate_object', + 'parse_str_to_json' +] + \ No newline at end of file diff --git a/src/workflows_cdk/core/cdk_version.py b/src/workflows_cdk/core/cdk_version.py new file mode 100644 index 0000000..7c02ec7 --- /dev/null +++ b/src/workflows_cdk/core/cdk_version.py @@ -0,0 +1,3 @@ +import pkg_resources +# Get CDK version +CDK_VERSION = pkg_resources.get_distribution("workflows-cdk").version diff --git a/src/workflows_cdk/core/dynamic_routing.py b/src/workflows_cdk/core/dynamic_routing.py new file mode 100644 index 0000000..41174f6 --- /dev/null +++ b/src/workflows_cdk/core/dynamic_routing.py @@ -0,0 +1,956 @@ +""" +Dynamic routing implementation for Flask applications. +Automatically discovers and registers routes based on your file system structure. +""" +from functools import wraps +from typing import Any, Callable, Dict, List, Optional, Union, cast +from flask import Flask, request as flask_request, current_app, Response as FlaskResponse +from flask_cors import CORS +import inspect +import os +import sys +import types +import importlib.util +import logging +import yaml +import json +from pathlib import Path +from sentry_sdk.integrations.flask import FlaskIntegration +from .errors import ManagedError +from .responses import Response +from .sentry import init_sentry +from .validation import validate_request +from .module_metadata import generate_module_metadata +from .get_environment import get_environment +import sentry_sdk +import traceback +from .homepage_template import get_homepage_template +from contextlib import contextmanager +import gc +import re + + + +def load_app_config(app_dir: str) -> Dict[str, Any]: + """Load app config with proper file handle cleanup.""" + config_path = os.path.join(app_dir, "app_config.yaml") + if not os.path.exists(config_path): + return {} + + try: + with open(config_path, "r") as f: + return yaml.safe_load(f) or {} + except (yaml.YAMLError, OSError, ValueError): + return {} + +def load_schema_file(schema_path: str) -> Optional[Dict[str, Any]]: + """Load schema file with explicit resource cleanup. + + """ + if not os.path.exists(schema_path): + return None + + try: + with open(schema_path, 'r') as f: + schema_data = json.load(f) + + if not isinstance(schema_data, dict): + return None + + return schema_data + except json.JSONDecodeError as e: + logging.warning(f"Failed to parse schema file {schema_path}: {e}") + return None + except Exception as e: + logging.warning(f"Error loading schema file {schema_path}: {e}") + return None + +def find_schema_paths(directory: str) -> List[str]: + """Find schema.json file paths without loading content. + + + Args: + directory: Root directory to search from + + Returns: + List[str]: List of route paths that have schema files + """ + schema_paths = [] + try: + for root, _, files in os.walk(directory): + if 'schema.json' in files: + # Calculate route path without loading the file + rel_path = os.path.relpath(root, directory) + path_parts = rel_path.split(os.sep) + path_parts = [part.replace(" ", "_") for part in path_parts] + route_path = '/' + '/'.join(path_parts) + if route_path == '/.': + route_path = '' + schema_paths.append(route_path) + + except Exception as e: + logging.error(f"Error scanning for schema paths: {e}") + + return schema_paths + + + +def is_production_environment() -> bool: + """Check if current environment is production. + + Returns: + bool: True if environment is production, False otherwise + """ + return get_environment() in ["prod", "production"] + +def print_error(message: str) -> None: + """Log error messages in non-production environments only. + + Args: + message: The error message to log + """ + if not is_production_environment(): + print(message) + +def log_error_details(app: Flask, error: Union[ManagedError, Exception], is_managed: bool = False) -> Optional[str]: + """Centralized error logging function.""" + tb_string = "" + + # Only capture in Sentry once and handle gracefully + try: + sentry_sdk.capture_exception(error) + except Exception as sentry_error: + print_error(f"Error capturing exception in Sentry: {sentry_error}") + + # Only log errors in non-production environments + if not is_production_environment(): + try: + exc_info = sys.exc_info() + if exc_info[0] is None: # If no current exception context, create one from the error + exc_info = (type(error), error, error.__traceback__) + + tb_string = ''.join(traceback.format_exception(*exc_info)) + # Log the full traceback + app.logger.error("Traceback: %s", tb_string) + print_error(f"Error: {error}") + except Exception as log_error: + print_error(f"Error during logging: {log_error}") + + return tb_string + +def wrap_route_handler(handler: Callable) -> Callable: + """Wrap route handler with error handling and request context.""" + @wraps(handler) + def wrapped_handler(*args: Any, **kwargs: Any) -> Any: + try: + # Get required fields from route info if it exists + required_fields = getattr(handler, "__route_info__", {}).get("required_fields", []) + + # Execute handler and get response + validate_request(flask_request, required_fields) + response = handler(*args, **kwargs) + + # If response is a dict, convert to JSON response + if isinstance(response, dict): + return Response.success(data=response) + return response + + except ManagedError as managed_error: + # Log error details and let it propagate to the error handler + log_error_details(current_app, managed_error, is_managed=True) + raise + except ValueError as validation_error: + # Log validation error details with specific context + log_error_details(current_app, validation_error, is_managed=False) + # Re-raise as ManagedError to ensure consistent error handling + raise ManagedError( + error=validation_error, + metadata={ + "type": "validation_error", + "original_error": str(validation_error) + }, + status_code=400 + ) from validation_error + except Exception as unhandled_error: + # Log unexpected errors and convert to ManagedError + log_error_details(current_app, unhandled_error) + raise ManagedError( + error=f"Internal server error: {str(unhandled_error)}", + metadata={ + "type": "internal_error", + "original_error": str(unhandled_error) + }, + status_code=500 + ) from unhandled_error + + return wrapped_handler + +class Router: + """Flask File System Router that enables automatic route path detection based on file location.""" + + def __init__(self, app: Optional[Flask] = None, *, + config: Optional[Dict[str, Any]] = None, + sentry_dsn: Optional[str] = None, + cors_origins: Optional[List[str]] = None) -> None: + """Initialize router with storage for routes. + + Args: + app: Optional Flask application instance + config: Optional configuration dictionary + sentry_dsn: Optional Sentry DSN for error tracking + cors_origins: Optional list of allowed CORS origins + """ + # List to store all discovered routes + self.routes: List[Dict[str, Any]] = [] + # List to store module metadata + self.modules_list: List[Dict[str, Any]] = [] + # Flask application instance + self.app: Optional[Flask] = None + self._router_instance = self + self.environment = get_environment() + + # Initial app config loading + self.app_config = load_app_config(os.getcwd()) + self.config = config or {} + self.sentry_dsn = sentry_dsn + self.cors_origins = cors_origins or ["*"] + + # Apply configuration settings + self.refresh_app_config_variables(self.app_config) + + if app is not None: + self.init_app(app) + + def refresh_app_config_variables(self, app_config: Dict[str, Any]) -> None: + """Apply configuration settings to the router instance. + + This function centralizes the logic for applying configuration settings, + making it reusable across initialization and configuration refreshes. + + Args: + app_config: The application configuration dictionary + """ + # Extract key settings sections + self.app_config = app_config + self.app_settings = app_config.get("app_settings", {}) + self.local_development_settings = app_config.get("local_development_settings", {}) + + # Apply core settings + self.app_type = self.app_settings.get("app_type", "unknown_app") + self.port = self.local_development_settings.get("port") or self.app_settings.get("port") or 2003 + self.debug = self.local_development_settings.get("debug", True) or self.app_settings.get("debug", True) + + # Apply Sentry and CORS settings with override priority + self.sentry_dsn = self.app_settings.get("sentry_dsn") or self.local_development_settings.get("sentry_dsn") + self.cors_origins = self.local_development_settings.get("cors_origins") or ["*"] + + # Determine routes directory + routes_directory_possible_key_names = ["routes_directory_path", "routes_directory", "routes_dir", "routes_path"] + self.routes_directory = next( + (self.app_settings[key] for key in routes_directory_possible_key_names if key in self.app_settings), + "src/routes" + ) + + def run_app(self, app: Flask) -> None: + """Run the app.""" + # Enable debug mode + app.debug = True + # Run with output unbuffered + port = self.port + logger = logging.getLogger(__name__) + debug_mode = self.debug + + app.run(host="0.0.0.0", port=port, debug=debug_mode, use_reloader=debug_mode, use_debugger=debug_mode) + + + def configure_logging(self, app: Flask) -> None: + """Configure application logging.""" + logging.basicConfig( + level=logging.INFO, + format='%(levelname)s - %(message)s' + ) + + def configure_sentry(self, app: Flask) -> None: + """Configure Sentry error tracking.""" + dsn = self.sentry_dsn + init_sentry(app, dsn) + + def configure_cors(self, app: Flask) -> None: + """Configure CORS settings.""" + origins = self.cors_origins + if origins: + CORS(app, origins=origins) + + def _create_route_info(self, function: Callable, rule: Optional[str] = None, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Create route information dictionary from a function and options. + + Args: + function: The route handler function + rule: Optional URL rule + options: Optional route options + + Returns: + Dictionary containing route information + + Raises: + ValueError: If module path cannot be determined + """ + options = options or {} + function_module = inspect.getmodule(function) + if not function_module or not function_module.__file__: + raise ValueError("Could not determine the module path for the function") + + + routes_directory = Path(os.path.join(os.getcwd(), self.routes_directory)) + module_file_path = Path(function_module.__file__).resolve() + + # Check if the module is in the routes directory and generate metadata + try: + relative_path = module_file_path.relative_to(routes_directory) + # Generate base path from routes directory structure + path_parts = list(relative_path.parent.parts) + # Replace spaces with underscores in each path part + path_parts = [part.replace(" ", "_") for part in path_parts] + base_path = "/" + "/".join(path_parts) + + # --- Module Metadata Generation --- + module_dir_rel_str = str(relative_path.parent) # Path relative to routes_directory + routes_dir_abs_str = str(routes_directory.resolve()) + module_metadata = generate_module_metadata( + module_dir_rel_str, routes_dir_abs_str, self.app_type + ) + if module_metadata: + # Add metadata if not already present (check by module_id) + if not any(m["module_id"] == module_metadata["module_id"] for m in self.modules_list): + self.modules_list.append(module_metadata) + # --- End Module Metadata Generation --- + + except ValueError: + # Module is not in routes directory, likely a core or manually placed route. + # Use provided rule as is, no automatic metadata generation. + base_path = "" + path_parts = [] + + # Generate the full URL path + if rule: + endpoint_path = rule if rule.startswith("/") else f"/{rule}" + else: + endpoint_path = f"/{function.__name__}" + + full_url_path = f"{base_path}{endpoint_path}" + + # Set default HTTP methods to POST if not specified + http_methods = options.get("methods", ["POST"]) + + # Generate unique endpoint name + endpoint_name = f"{'.'.join(path_parts)}.{function.__name__}" if path_parts else function.__name__ + + # Wrap the function with error handling and response formatting + wrapped_function = wrap_route_handler(function) + # Create route information dictionary + route_info = { + "path": full_url_path, + "endpoint": endpoint_name, + "view_func": wrapped_function, + "methods": http_methods + } + # Add any additional options + for key, value in options.items(): + if key != "methods": # Skip methods as we've already handled it + route_info[key] = value + + return route_info + + + + def _scan_routes_directory(self) -> list: + """Scan the routes directory and return route file paths. + + Returns: + list: List of route file paths (Path objects) + """ + current_working_directory = os.getcwd() + routes_directory = Path(os.path.join(current_working_directory, self.routes_directory)) + + if not routes_directory.exists(): + print_error(f"Routes directory not found at: {routes_directory}") + return [] + + # Find all Python files in routes directory and its subdirectories + # Only include .py files that are directly inside a version directory (v1, v2, vX, etc.) + version_dir_pattern = re.compile(r"^v[\w\d]+$") + route_files = [] + for path in routes_directory.rglob("*.py"): + if path.name == "__init__.py": + continue + + # Get the relative parts from routes directory + path_parts = path.parts + routes_dir_parts = routes_directory.parts + relative_parts = path_parts[len(routes_dir_parts):] + + # We want: ...///file.py (relative_parts = [module, version, file.py]) + # So only include if len(relative_parts) == 3 and relative_parts[1] matches version pattern + if len(relative_parts) == 3 and version_dir_pattern.match(relative_parts[1]): + route_files.append(path) + # Otherwise, skip (this includes any file in subdirs of version dirs) + return route_files + + def _collect_route_information(self) -> tuple: + """Collect information about routes and modules without importing modules. + + Returns: + tuple: (routes_info, modules_list) containing metadata + """ + current_working_directory = os.getcwd() + routes_directory = Path(os.path.join(current_working_directory, self.routes_directory)) + + # Lists to store collected data + routes_info = [] + modules_list = [] + + # Get all route files + route_files = self._scan_routes_directory() + if not route_files: + return routes_info, modules_list + + # Process each route file path to extract information + for route_file_path in route_files: + try: + # Convert file path to a module path + relative_path = route_file_path.relative_to(current_working_directory) + module_name = str(relative_path.with_suffix("")).replace(os.sep, ".") + + # Get route path from directory structure + rel_to_routes_dir = route_file_path.parent.relative_to(routes_directory) + route_path = "/" + str(rel_to_routes_dir).replace(os.sep, "/") + if route_path == "/.": + route_path = "/" + + # Add route info without importing + route_info = { + "path": route_path, + "file": str(route_file_path), + "module": module_name, + "route_file_path": route_file_path # Keep the original Path object for discover_routes + } + routes_info.append(route_info) + + # Generate module metadata + module_dir_rel_str = str(relative_path.parent.relative_to(self.routes_directory)) if str(relative_path.parent) != self.routes_directory else "" + routes_dir_abs_str = str(routes_directory.resolve()) + + module_metadata = generate_module_metadata( + module_dir_rel_str, routes_dir_abs_str, self.app_type + ) + + if module_metadata and not any(m.get("module_id") == module_metadata["module_id"] for m in modules_list): + modules_list.append(module_metadata) + + except Exception as e: + print_error(f"Error collecting info for route file {route_file_path}: {e}") + + return routes_info, modules_list + + def _route_exists(self, path: str) -> bool: + """Check if a route with the given path already exists.""" + return any(r.get('path') == path for r in self.routes) + + def _add_route_if_not_exists(self, route_info: Dict[str, Any]) -> bool: + """Add route only if it doesn't already exist. Returns True if added.""" + if not self._route_exists(route_info['path']): + self.routes.append(route_info) + return True + return False + + def clear_accumulated_data(self) -> None: + """ + Clear accumulated data to prevent memory accumulation in serverless environments. + """ + # Keep only essential routes (everything else can be rediscovered) + core_paths = ['/health', '/app-config', '/modules-list', '/routes', '/'] + + # Filter out schema routes and user routes (they end with /schema or are user-defined) + essential_routes = [] + for route in self.routes: + path = route.get('path', '') + # Keep core routes only + if path in core_paths: + essential_routes.append(route) + # Remove schema routes (they'll be re-registered) + # Remove user routes (they'll be rediscovered) + + self.routes = essential_routes + + # Clear module metadata (it holds file paths and other references) + self.modules_list.clear() + + # Force Python to clean up now (don't wait for automatic GC) + gc.collect() + + def discover_routes(self) -> None: + """ + Discover routes while preventing memory accumulation. + + Key issue: Each discovery loads Python modules that stay in memory forever. + In serverless, this grows with each container reuse. + """ + current_working_directory = os.getcwd() + + # Collect route info without any imports first + routes_info, module_list = self._collect_route_information() + + # Add module metadata (but clear old references first) + for module in module_list: + if not any(m.get("module_id") == module["module_id"] for m in self.modules_list): + self.modules_list.append(module) + + if not routes_info: + return + + # Add working directory to path for imports + if current_working_directory not in sys.path: + sys.path.insert(0, current_working_directory) + + # Use clean import context to prevent module accumulation + with clean_module_import(): + # Set up temporary main module for route discovery + original_router_module = sys.modules.get('main', None) + temporary_main_module = types.ModuleType('main') + setattr(temporary_main_module, 'router', self) + setattr(temporary_main_module, '__file__', os.path.join(current_working_directory, 'main.py')) + sys.modules['main'] = temporary_main_module + + try: + # Process each route file to import modules and register routes + for route_info in routes_info: + try: + route_file_path = route_info["route_file_path"] + module_name = route_info["module"] + + # Create a module specification for importing + module_spec = importlib.util.spec_from_file_location(module_name, str(route_file_path)) + if module_spec is None or module_spec.loader is None: + continue + + # Create the module and set up its environment + route_module = importlib.util.module_from_spec(module_spec) + sys.modules[module_name] = route_module + + # Add the route file's parent directory to path for relative imports + route_parent_directory = str(route_file_path.parent) + if route_parent_directory not in sys.path: + sys.path.insert(0, route_parent_directory) + + # Execute the module to process its contents + module_spec.loader.exec_module(route_module) + + # Find all functions that have been decorated with our route decorator + for function_name, function_object in inspect.getmembers(route_module): + if inspect.isfunction(function_object): + # Check if this function has route information attached + if hasattr(function_object, "__route_info__"): + route_info = getattr(function_object, "__route_info__") + self._add_route_if_not_exists(route_info) + + # Clean up by removing the temporary path addition + if route_parent_directory in sys.path: + sys.path.remove(route_parent_directory) + + except Exception as error: + print_error(f"Error while processing route file {route_file_path}: {error}") + + finally: + # Restore the original state + if original_router_module is not None: + sys.modules['main'] = original_router_module + else: + sys.modules.pop('main', None) + + # Remove the project root from sys.path + if current_working_directory in sys.path: + sys.path.remove(current_working_directory) + + def register_error_handlers(self, app: Flask) -> None: + """Register error handlers for the application.""" + + @app.errorhandler(ManagedError) + def handle_managed_error(error: ManagedError): + return Response.error(error) + + @app.errorhandler(Exception) + def handle_unhandled_error(error: Exception): + return Response.error(error, status_code=500) + + def _get_serializable_routes(self) -> list: + """Create a JSON-serializable version of the routes list.""" + serializable_routes = [] + + for route in self.routes: + serializable_route = { + "path": route.get("path", ""), + "endpoint": route.get("endpoint", ""), + "methods": route.get("methods", ["GET"]), + } + serializable_routes.append(serializable_route) + return serializable_routes + + def _register_core_routes(self, app: Flask) -> None: + """Register core routes.""" + @app.route("/", methods=["GET"]) + def root(): + # Get the connector name from the app settings + connector_name = self.app_settings.get("app_name", "Stacksync Connector") + # Get modules and sort by module_name + _, modules_list = self._collect_route_information() + module_names = sorted([m.get("module_name", "") for m in modules_list if m.get("module_name")]) + # HTML template with Stacksync logo and connector name + html = get_homepage_template(connector_name, self.app_type, self.environment, module_names) + return html + + @app.route("/health", methods=["GET"]) + def health_check(): + return Response.success(data={"status": "healthy"}) + + @app.route("/app-config", methods=["GET"]) + def app_config(): + try: + # Reload app config to get fresh settings + fresh_app_config = load_app_config(os.getcwd()) + + # Apply the fresh configuration + self.refresh_app_config_variables(fresh_app_config) + + # Refresh modules list + _, modules_list = self._collect_route_information() + + # Return updated config + return Response.success(data={ + "app_settings": self.app_settings, + "modules": modules_list + }) + except Exception as error: + import traceback + print(traceback.format_exc()) + print_error(f"Error retrieving app config: {error}") + return Response.error(error) + + @app.route("/modules-list", methods=["GET"]) + def modules_list(): + try: + # Collect module information without importing modules + _, modules_list_data = self._collect_route_information() + + # Ensure we return a clean response without potential circular references + clean_modules = [] + for module in modules_list_data: + if isinstance(module, dict): + # Only include serializable data + clean_module = { + key: value for key, value in module.items() + if isinstance(value, (str, int, float, bool, list, dict, type(None))) + } + clean_modules.append(clean_module) + + return Response.success(data={"modules": clean_modules}) + except Exception as error: + print_error(f"Error retrieving modules list: {error}") + return Response.error( + error=f"Failed to retrieve modules list: {str(error)}", + status_code=500 + ) + + @app.route("/routes", methods=["GET"]) + def routes(): + try: + # Get route file information without importing + routes_info, _ = self._collect_route_information() + # Get serializable version of registered routes + serializable_routes = self._get_serializable_routes() + + # Return both discovered file paths and registered routes + return Response.success(data={ + "endpoints": serializable_routes, + "route_files": routes_info + }) + except Exception as error: + import traceback + print(traceback.format_exc()) + print_error(f"Error retrieving routes list: {error}") + return Response.error(error) + + def register_schema_routes(self, app: Flask) -> None: + """Register schema routes without loading schema content into memory.""" + routes_path = os.path.join(os.getcwd(), self.routes_directory) + + # Only proceed if auto-registration is enabled + if self.app_settings.get("automatically_register_schema_routes", True): + schema_paths = find_schema_paths(routes_path) + + # Register each schema route + for route_path in schema_paths: + self._register_schema_route(route_path, app) + + def _handle_dynamic_schema_request(self, dynamic_path: str) -> FlaskResponse: + """Handle schema requests for paths that might not have been registered at startup. + + This is a catch-all handler that will check if a schema.json file exists for the + requested path and return it if found, even if it was added after startup. + + Args: + dynamic_path: The dynamic part of the path (everything before /schema) + + Returns: + FlaskResponse: The schema response + """ + try: + # Construct the full path to the schema file + current_working_directory = os.getcwd() + routes_directory = os.path.join(current_working_directory, self.routes_directory) + schema_file_path = os.path.join(routes_directory, dynamic_path, 'schema.json') + + # Check if the schema file exists + if not os.path.exists(schema_file_path): + return Response( + data={"schema": {}, "error": f"Schema not found for {dynamic_path}"}, + status_code=404 + ) + + # Load the schema file + try: + with open(schema_file_path, 'r') as f: + schema_data = json.load(f) + + # Log discovery of new schema + if self.environment in ["dev", "development"]: + if not any(r.get('path') == f"/{dynamic_path}/schema" for r in self.routes): + print(f"Dynamically served schema for new path: /{dynamic_path}/schema") + + return Response(data={"schema": schema_data}) + except json.JSONDecodeError: + return Response( + data={"schema": {}, "error": f"Invalid schema format for {dynamic_path}"}, + status_code=400 + ) + except Exception as e: + # Log and return error + print_error(f"Error handling dynamic schema for {dynamic_path}: {e}") + return Response.error( + error=ManagedError( + error=e, + metadata={"dynamic_path": dynamic_path}, + status_code=500 + ) + ) + + def _create_dynamic_schema_handler(self, route_path: str) -> Callable[[], FlaskResponse]: + """Create a handler that dynamically loads schema data on each request. + + For routes registered at startup. + + Args: + route_path: The route path to load schema for + + Returns: + Callable: A handler function that loads and returns schema data + """ + def dynamic_schema_handler() -> FlaskResponse: + try: + # Get absolute path to schema file + current_working_directory = os.getcwd() + routes_directory = os.path.join(current_working_directory, self.routes_directory) + schema_file_path = os.path.join(routes_directory, route_path.lstrip('/'), 'schema.json') + + # Check if file exists + if not os.path.exists(schema_file_path): + return Response( + data={"schema": {}, "error": "Schema not found"}, + status_code=404 + ) + + # Read and parse schema file + try: + with open(schema_file_path, 'r') as f: + schema_data = json.load(f) + + return Response(data={"schema": schema_data}) + except json.JSONDecodeError: + return Response( + data={"schema": {}, "error": "Invalid schema format"}, + status_code=400 + ) + except Exception as e: + # Log and return error + print_error(f"Error loading schema for {route_path}: {e}") + return Response.error( + error=ManagedError( + error=e, + metadata={"route_path": route_path}, + status_code=500 + ) + ) + return dynamic_schema_handler + + def _register_schema_route(self, route_path: str, app: Optional[Flask] = None) -> None: + """Register a schema route for the given path. + + Args: + route_path: The route path to load schema for + app: The Flask app to register with (if None, just adds to routes list) + """ + schema_route = f"{route_path}/schema" + + # Create route info + route_info = { + "path": schema_route, + "endpoint": f"schema_{route_path.replace('/', '_')}", + "view_func": self._create_dynamic_schema_handler(route_path), + "methods": ["GET", "POST"] + } + + # Add route only if it doesn't exist (consistent duplicate checking) + added = self._add_route_if_not_exists(route_info) + + # If app is provided and route was added, register immediately + if app is not None and added and hasattr(app, 'add_url_rule'): + app.add_url_rule( + route_info["path"], + endpoint=route_info["endpoint"], + view_func=route_info["view_func"], + methods=route_info["methods"] + ) + if self.environment in ["dev", "development"]: + print(f"Registered schema route: {route_info['path']} with methods {route_info['methods']}") + + def init_app(self, app: Flask) -> None: + """Initialize the router with a Flask app and register all discovered routes.""" + self.app = app + + self.clear_accumulated_data() + + self.app_config = load_app_config(os.getcwd()) + self.refresh_app_config_variables(self.app_config) + + # Update Flask configuration + self.configure_sentry(app) + app.config.update({ + "JSON_SORT_KEYS": False, + "PROPAGATE_EXCEPTIONS": True, + **self.config, + **self.app_settings + }) + + # Configure components + self.configure_logging(app) + self.configure_cors(app) + + # First discover all routes in the project + self.discover_routes() + + # Register schema routes + self.register_schema_routes(app) + + # Register a catch-all route for dynamic schema discovery + # This will handle any schema requests for paths that don't have explicit routes + app.add_url_rule( + "//schema", + endpoint="dynamic_schema_handler", + view_func=self._handle_dynamic_schema_request, + methods=["GET", "POST"] + ) + + # Register error handlers + self.register_error_handlers(app) + + # Register core routes + self._register_core_routes(app) + + # Register all discovered routes + for route in self.routes: + app.add_url_rule( + route["path"], + endpoint=route["endpoint"], + view_func=route["view_func"], + methods=route.get("methods", ["POST"]), + **{k: v for k, v in route.items() if k not in ["path", "endpoint", "view_func", "methods"]} + ) + if self.environment in ["dev", "development"]: + print(f"Registered route: {route['path']} with methods {route['methods']}") + + def route(self, rule: Optional[str] = None, **options: Any) -> Callable: + """ + Route decorator that combines base path with endpoint path. + + Examples: + + 1. Simple route with parameter: + @router.route("/users/") + def get_user(user_id): + return f"User {user_id}" + + 2. Route with type-specific parameter: + @router.route("/users//posts/") + def get_user_post(user_id, post_id): + return f"User {user_id}, Post {post_id}" + + 3. Route with multiple parameters: + @router.route("/org//users//teams/") + def get_org_user_team(org_id, user_id, team_id): + return f"Org {org_id}, User {user_id}, Team {team_id}" + """ + + def decorator(function: Callable) -> Callable: + try: + # Create route information using helper method + route_info = self._create_route_info(function, rule, options) + + # Store route info on the function for later discovery + setattr(function, "__route_info__", route_info) + + # Add route only if it doesn't exist + self._add_route_if_not_exists(route_info) + + # If Flask app is already initialized, register the route immediately + if self.app: + self.app.add_url_rule( + route_info["path"], + endpoint=route_info["endpoint"], + view_func=route_info["view_func"], + methods=route_info["methods"], + **{k: v for k, v in route_info.items() if k not in ["path", "endpoint", "view_func", "methods"]} + ) + + except Exception as error: + print_error(f"Error registering route for {function.__name__}: {error}") + raise + + @wraps(function) + def wrapper(*args: Any, **kwargs: Any) -> Any: + return function(*args, **kwargs) + return wrapper + return decorator + +@contextmanager +def clean_module_import(): + """ + Simple context manager that cleans up imported modules. + + The real issue: Python keeps ALL imported modules in memory forever. + In serverless, this means each route discovery adds to memory permanently. + """ + modules_before = set(sys.modules.keys()) + path_before = sys.path[:] + + try: + yield + finally: + # Remove any new modules (they're just route files, don't need to persist) + new_modules = set(sys.modules.keys()) - modules_before + for module_name in new_modules: + sys.modules.pop(module_name, None) + + # Reset path + sys.path[:] = path_before + + # Force cleanup of module references + gc.collect() diff --git a/src/workflows_cdk/core/errors.py b/src/workflows_cdk/core/errors.py new file mode 100644 index 0000000..ff7ce97 --- /dev/null +++ b/src/workflows_cdk/core/errors.py @@ -0,0 +1,164 @@ +""" +Simplified error handling with automatic logging and Sentry integration. +""" + +import logging +import traceback +from typing import Any, Dict, Optional, Union +from .cdk_version import CDK_VERSION +from dataclasses import dataclass, field + +import sentry_sdk + +logger = logging.getLogger(__name__) + + +@dataclass +class ManagedError(Exception): + """Base class for managed application errors.""" + error: Union[str, Exception] + data: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None + status_code: int = 400 + + def __post_init__(self): + """Initialize default values for optional fields.""" + self.data = self.data if self.data is not None else {} + self.metadata = self.metadata if self.metadata is not None else {} + + def __str__(self) -> str: + """Return string representation of the error.""" + return str(self.error) + + @classmethod + def validation_error( + cls, + error: str, + data: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None + ) -> "ManagedError": + """Create a validation error.""" + return cls( + error=error, + data=data, + metadata=metadata, + status_code=400 + ) + + + @classmethod + def not_found( + cls, + resource: str, + identifier: Any, + metadata: Optional[Dict[str, Any]] = None + ) -> "ManagedError": + """Create a not found error.""" + return cls( + error=f"{resource} not found: {identifier}", + data={"resource": resource, "identifier": identifier}, + metadata=metadata, + status_code=404 + ) + + @classmethod + def unauthorized( + cls, + message: str = "Unauthorized access", + metadata: Optional[Dict[str, Any]] = None + ) -> "ManagedError": + """Create an unauthorized error.""" + return cls( + error=message, + metadata=metadata, + status_code=401 + ) + + @classmethod + def forbidden( + cls, + message: str = "Access forbidden", + metadata: Optional[Dict[str, Any]] = None + ) -> "ManagedError": + """Create a forbidden error.""" + return cls( + error=message, + metadata=metadata, + status_code=403 + ) + + @classmethod + def server_error( + cls, + error: Union[str, Exception], + metadata: Optional[Dict[str, Any]] = None + ) -> "ManagedError": + """Create a server error.""" + return cls( + error=error, + metadata=metadata, + status_code=500 + ) + + def _log_error(self): + """Log error locally and to Sentry.""" + # Log locally with all context + logger.error( + f"Managed error: {self.error}", + extra={ + "data": self.data, + "metadata": self.metadata + } + ) + + # Send to Sentry with full context + with sentry_sdk.push_scope() as scope: + scope.set_tag("cdk_version", CDK_VERSION) + + # Add data as extras + if self.data: + scope.set_extra("error_data", self.data) + + # Add metadata as tags and extras + if self.metadata: + for key, value in self.metadata.items(): + if isinstance(value, (str, int, float, bool)): + scope.set_tag(key, value) + else: + scope.set_extra(key, value) + + sentry_sdk.capture_exception(self) + + @classmethod + def service_error( + cls, + error: Any, + service: str, + data: Optional[Any] = None + ) -> "ManagedError": + """Create a service error.""" + return cls( + error=error, + data=data, + metadata={ + "error_type": "service", + "service": service + } + ) + + @classmethod + def not_found_error( + cls, + error: Any, + resource: str, + identifier: str + ) -> "ManagedError": + """Create a not found error.""" + return cls( + error=error, + data={"identifier": identifier}, + metadata={ + "error_type": "not_found", + "resource": resource + } + ) \ No newline at end of file diff --git a/src/workflows_cdk/core/get_environment.py b/src/workflows_cdk/core/get_environment.py new file mode 100644 index 0000000..dda00f9 --- /dev/null +++ b/src/workflows_cdk/core/get_environment.py @@ -0,0 +1,15 @@ +import os + +def get_environment(): + """ + Get the environment from the environment variable. + """ + prod_names = ["prod", "production"] + stage_names = ["stage", "staging"] + environment = os.getenv("ENVIRONMENT", "").lower() + if environment in prod_names: + return "prod" + elif environment in stage_names: + return "stage" + else: + return "dev" diff --git a/src/workflows_cdk/core/gunicorn_config.py b/src/workflows_cdk/core/gunicorn_config.py new file mode 100644 index 0000000..1d7efd1 --- /dev/null +++ b/src/workflows_cdk/core/gunicorn_config.py @@ -0,0 +1,5 @@ +# https://docs.gunicorn.org/en/stable/settings.html +bind = "0.0.0.0:8080" +workers = 2 +threads = 1 +timeout = 360 diff --git a/src/workflows_cdk/core/homepage_template.py b/src/workflows_cdk/core/homepage_template.py new file mode 100644 index 0000000..b2e42d6 --- /dev/null +++ b/src/workflows_cdk/core/homepage_template.py @@ -0,0 +1,207 @@ +def get_homepage_template(connector_name: str, app_type: str, environment: str, module_names: list = None) -> str: + """ + Returns the HTML template for the homepage of the app connector, including a list of module names if provided. + """ + module_list_html = "" + if module_names: + module_list_html = """ +
+

Available Modules

+
    + {} +
+
+ """.format("\n".join(f'
  • {name}
  • ' for name in module_names)) + return f""" + + + + + + {connector_name} + + + + + + +
    + +
    + {app_type.lower()} +
    +

    {connector_name}

    +
    Running
    +

    This app connector is up and running. You can now use it in your workflows through the Developer Studio.

    + + + Go to Stacksync Workflows + + + {module_list_html} + + +
    + + + """ \ No newline at end of file diff --git a/src/workflows_cdk/core/models/__init__.py b/src/workflows_cdk/core/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/workflows_cdk/core/models/content.py b/src/workflows_cdk/core/models/content.py new file mode 100644 index 0000000..0f85bf8 --- /dev/null +++ b/src/workflows_cdk/core/models/content.py @@ -0,0 +1,203 @@ +""" +Content object handling for Flask applications. +""" + +from typing import Any, Dict, List, Optional +from flask import jsonify, Response + + +class ContentObject: + """ + Class representing a content object. + + This class encapsulates the data for a content object and provides + methods for converting to the format expected by the frontend. + """ + + def __init__( + self, + id: str, + data: List[Dict[str, Any]], + has_next_page: bool = False, + next_cursor: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None + ): + """ + Initialize a ContentObject. + + Args: + id: The ID/name of the content object + data: The data for the content object + has_next_page: Whether there are more pages + next_cursor: Cursor for pagination + metadata: Optional metadata + """ + self.id = id + self.data = data + self.has_next_page = has_next_page + self.next_cursor = next_cursor + self.metadata = metadata + + def to_dict(self) -> Dict[str, Any]: + """ + Convert to the format expected by the frontend. + + Returns: + Dictionary representation of the ContentObject + """ + result = { + "id": self.id, + "content": self.data, + "pagination": { + "has_next_page": self.has_next_page, + "next_cursor": self.next_cursor + } + } + + if self.metadata: + result["metadata"] = self.metadata + + return result + + @classmethod + def from_dict(cls, obj: Dict[str, Any]) -> "ContentObject": + """ + Create a ContentObject from a dictionary. + + Args: + obj: Dictionary containing content object data + + Returns: + ContentObject instance + """ + id_value = obj.get("id") or obj.get("content_object_name") or obj.get("name") + if not id_value: + raise ValueError("Content object must have an id, content_object_name, or name") + + data = obj.get("data") or obj.get("content") or [] + + return cls( + id=id_value, + data=data, + has_next_page=obj.get("has_next_page", False), + next_cursor=obj.get("next_cursor"), + metadata=obj.get("metadata") + ) + + +def create_content_object( + name: str, + data: List[Dict[str, Any]], + has_next_page: bool = False, + next_cursor: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None +) -> Dict[str, Any]: + """ + Create a content object dictionary. + + Args: + name: Name of the content object + data: List of data items + has_next_page: Whether there are more pages + next_cursor: Cursor for pagination + metadata: Optional metadata + + Returns: + Content object dictionary + """ + return { + "content_object_name": name, + "data": data, + "has_next_page": has_next_page, + "next_cursor": next_cursor, + "metadata": metadata + } + + +def content_response( + content_objects: List[Dict[str, Any]], + metadata: Optional[Dict[str, Any]] = None, + status_code: int = 200 +) -> Response: + """ + Create a Flask response with content objects. + + Args: + content_objects: List of content objects created with create_content_object + or dictionaries with content_object_name and data keys + metadata: Optional response metadata + status_code: HTTP status code + + Returns: + Flask response + + Example: + ```python + @app.route("/content", methods=["POST"]) + def content(): + # Option 1: Using create_content_object + users = [ + { + "id": user_id, + "label": user_name + } + ] + + users_object = create_content_object("users", users) + return content_response([users_object]) + + # Option 2: Direct dictionary + return content_response([ + { + "content_object_name": "users", + "data": users + } + ]) + ``` + """ + processed_objects = [] + + for obj in content_objects: + if not isinstance(obj, dict): + continue + + # Get the content object name + name = obj.get("content_object_name") or obj.get("name") + if not name: + continue + + # Get the data + data = obj.get("data", []) + + # Create the content object in the format expected by the frontend + content_obj = { + "id": name, + "content": data, + "pagination": { + "has_next_page": obj.get("has_next_page", False), + "next_cursor": obj.get("next_cursor") + } + } + + # Add metadata if present + if obj.get("metadata"): + content_obj["metadata"] = obj.get("metadata") + + processed_objects.append(content_obj) + + # Create the response + response_data = { + "data": { + "content_objects": processed_objects, + "pagination": { + "has_more": False, + "next_cursor": None + } + } + } + + if metadata: + response_data["metadata"] = metadata + + return jsonify(response_data), status_code + diff --git a/src/workflows_cdk/core/module_metadata.py b/src/workflows_cdk/core/module_metadata.py new file mode 100644 index 0000000..d281dc7 --- /dev/null +++ b/src/workflows_cdk/core/module_metadata.py @@ -0,0 +1,82 @@ +""" +Generates a list of modules based on discovered routes and configuration. +""" +import os +import yaml +import logging +from pathlib import Path +from typing import Dict, Any, Optional + +def load_yaml_file(file_path: Path) -> Dict[str, Any]: + """Safely load a YAML file.""" + if not file_path.exists(): + return {} + try: + with open(file_path, "r") as f: + return yaml.safe_load(f) or {} + except yaml.YAMLError as e: + logging.warning(f"Failed to parse YAML file {file_path}: {e}") + return {} + except Exception as e: + logging.warning(f"Error loading YAML file {file_path}: {e}") + return {} + +def generate_module_metadata(module_path_rel_str: str, routes_dir_abs_str: str, app_type: str) -> Optional[Dict[str, Any]]: + """ + Generates information for a single module based on its relative path within the routes directory. + + Args: + module_path_rel_str: The relative path string of the module's directory from the routes root. + routes_dir_abs_str: The absolute path string of the routes directory. + app_type: The type/name of the application. + + Returns: + A dictionary containing module information, or None if generation fails. + """ + try: + + module_dir_abs = Path(routes_dir_abs_str) / module_path_rel_str + module_type_path = module_dir_abs.parent + + + config_path = module_dir_abs / "module_config.yaml" + module_config = load_yaml_file(config_path) + module_settings = module_config.get("module_settings", {}) + + module_type = module_settings.get("module_type") or module_type_path.name + if not module_type: + logging.warning(f"Could not determine module_type for path {module_dir_abs}. Skipping.") + return None + + + module_category = module_settings.get("module_category", "action") + module_name = module_settings.get("module_name") or module_type.replace("_", " ").title() + module_description = module_settings.get("module_description") or "" + + module_version = module_settings.get("module_version") or module_dir_abs.name + module_version = module_version.replace("v", "") + if not module_version: + logging.warning(f"Could not determine module_version for path {module_dir_abs}. Defaulting to 'latest'.") + module_version = "latest" + + + formatted_version = str(module_version).replace('.', '_') + + module_id = f"{app_type}-{module_type}-{formatted_version}" + + module_data = { + "module_id": module_id, + "app_type": app_type, + "module_type": module_type, + "module_category": module_category, + "module_name": module_name, + "module_version": str(module_version), + "module_description": module_description, + "module_path": module_path_rel_str + } + return module_data + + except Exception as e: + # Log error with the relative path as it's the primary identifier received + logging.error(f"Error generating module info for relative path {module_path_rel_str}: {e}") + return None diff --git a/src/workflows_cdk/core/request.py b/src/workflows_cdk/core/request.py new file mode 100644 index 0000000..a3f69f3 --- /dev/null +++ b/src/workflows_cdk/core/request.py @@ -0,0 +1,58 @@ +"""Request handling for the Workflows CDK.""" + +from typing import Any, Dict, Optional +from pydantic import BaseModel, Field +from flask import Request as FlaskRequest +from flask import request as flask_request +from werkzeug.local import LocalProxy + +class Request: + """Wrapper for Flask request that adds workflow-specific functionality. + + Usage: + @router.route("/execute", methods=["POST"]) + def execute(): + request_data = Request(flask_request) + data = request_data.data + credentials = request_data.credentials.connection_data.value + """ + + def __init__(self, flask_request: FlaskRequest): + """Initialize with a Flask request instance.""" + self._request = flask_request + self._json_data = None + + @property + def request_data(self) -> Dict[str, Any]: + """Get the request data.""" + return self.json + + @property + def json(self) -> Dict[str, Any]: + """Get the cached JSON data from the request.""" + if self._json_data is None: + self._json_data = self._request.get_json(silent=True) or {} + return self._json_data + + @property + def data(self) -> Dict[str, Any]: + """Get the data portion of the request. + + Returns: + Dict[str, Any]: The data portion of the request + """ + return self.json.get("data", {}) + + @property + def credentials(self) -> Dict[str, Any]: + """Get the credentials from the request. + + Returns: + Dict[str, Any]: The credentials from the request + """ + return self.json.get("credentials", {}).get("connection_data", {}).get("value", {}) + + def __getattr__(self, name: str) -> Any: + """Delegate any unknown attributes to the underlying Flask request.""" + return getattr(self._request, name) + diff --git a/src/workflows_cdk/core/responses.py b/src/workflows_cdk/core/responses.py new file mode 100644 index 0000000..57a5e0f --- /dev/null +++ b/src/workflows_cdk/core/responses.py @@ -0,0 +1,189 @@ +""" +Response handling module for Flask applications. +Provides standardized response formatting and error handling. +""" + +from typing import Any, Dict, List, Optional, Union +from datetime import datetime +from flask import jsonify, make_response, Response as FlaskResponse +import os +from werkzeug.exceptions import HTTPException +from .errors import ManagedError +from .get_environment import get_environment + +class Response: + """Standardized response class for API endpoints.""" + + environment = get_environment() + # Cache environment check + _IS_PRODUCTION = environment == "prod" or environment == "stage" + + + @staticmethod + def create_response( + data: Any = None, + metadata: Optional[Dict[str, Any]] = None, + status_code: int = 200 + ) -> FlaskResponse: + """Create a standardized response.""" + response_data = {"data": data} + if metadata: + response_data["metadata"] = metadata + return make_response(jsonify(response_data), status_code) + + def __new__( + cls, + data: Any = None, + metadata: Optional[Dict[str, Any]] = None, + status_code: int = 200 + ) -> FlaskResponse: + """Create a new success response.""" + return cls.create_response(data, metadata, status_code) + + @classmethod + def success( + cls, + data: Any = None, + metadata: Optional[Dict[str, Any]] = None, + status_code: int = 200 + ) -> FlaskResponse: + """Create a success response.""" + return cls.create_response(data, metadata, status_code) + + @classmethod + def content( + cls, + content_objects: List[Any], + metadata: Optional[Dict[str, Any]] = None, + status_code: int = 200 + ) -> FlaskResponse: + """Create a response with content objects. + + Args: + content_objects: List of ContentObject instances or dictionaries + metadata: Optional metadata for the response + status_code: HTTP status code + + Returns: + Flask response with content objects + + Example: + ```python + @app.route("/content", methods=["POST"]) + def content(): + users = [ + { + "id": user_id, + "label": user_name + } + ] + + content_objects = [ + ContentObject(id="users", data=users) + ] + + return Response.content(content_objects) + ``` + """ + # Import here to avoid circular imports + from workflows_cdk.core.models.content import ContentObject + + processed_objects = [] + + for obj in content_objects: + if isinstance(obj, ContentObject): + processed_objects.append(obj.to_dict()) + elif isinstance(obj, dict): + try: + content_obj = ContentObject.from_dict(obj) + processed_objects.append(content_obj.to_dict()) + except ValueError: + # If it's already in the right format, use it directly + if "id" in obj and "content" in obj: + processed_objects.append(obj) + + data = { + "content_objects": processed_objects, + "pagination": { + "has_more": False, + "next_cursor": None + } + } + + return cls.create_response(data, metadata, status_code) + + @classmethod + def error( + cls, + error: Union[ManagedError, Exception, str], + data: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + status_code: int = 400 + ) -> FlaskResponse: + """Create an error response with environment-appropriate detail level.""" + + + # Get stack trace for non-production environments + stack_trace = None + if not cls._IS_PRODUCTION and isinstance(error, Exception): + import traceback + stack_trace = traceback.format_exc() + + if data: + if isinstance(data, str): + data = {"error": data} + + if metadata: + if isinstance(metadata, str): + metadata = {"metadata": metadata} + + # Base metadata + base_metadata = { + "timestamp": datetime.now().isoformat(), + "environment": os.getenv("ENVIRONMENT", "development"), + # "event_id": event_id, + "stack_trace": stack_trace if not cls._IS_PRODUCTION else None, + **(metadata or {}) + } + + # Merge with error metadata if available + metadata = base_metadata + if isinstance(error, ManagedError) and error.metadata: + metadata = {**base_metadata, **(error.metadata or {})} + + if isinstance(error, ManagedError): + response_data: Dict[str, Any] = { + "error": str(error.error), + } + if error.data: + response_data["data"] = error.data + if metadata: + response_data["metadata"] = metadata + elif isinstance(error, HTTPException): + status_code = error.code or status_code + response_data: Dict[str, Any] = { + "error": error.description, + "data": {"code": error.code, "name": error.name}, + } + if metadata: + response_data["metadata"] = metadata + else: + response_data: Dict[str, Any] = { + "error": str(error), + "data": data if data else {}, + "metadata": metadata if metadata else {}, + "status_code": status_code if status_code else 400 + } + + # Override with provided data and metadata if present + if data: + response_data["data"] = data if isinstance(data, dict) else {"error": data} + elif "data" in response_data and not response_data.get("data"): + response_data["data"] = {} + + if metadata: + response_data["metadata"] = metadata if isinstance(metadata, dict) else {"metadata": metadata} + elif "metadata" in response_data and not response_data.get("metadata"): + response_data["metadata"] = {} + + return make_response(jsonify(response_data), status_code) diff --git a/src/workflows_cdk/core/sentry.py b/src/workflows_cdk/core/sentry.py new file mode 100644 index 0000000..34faf1d --- /dev/null +++ b/src/workflows_cdk/core/sentry.py @@ -0,0 +1,180 @@ +""" +Centralized Sentry error handling for the Workflows CDK. +""" + +import os +import traceback +import requests +from typing import Optional +from flask import Flask, request, current_app +import sentry_sdk +from sentry_sdk.integrations.flask import FlaskIntegration + + +def append_external_request_info(event, hint): + """Add external request information to the event.""" + try: + exc_info = hint.get("exc_info") + if exc_info: + exception_type, exception_value, _ = exc_info + if isinstance(exception_value, requests.exceptions.RequestException): + response = getattr(exception_value, 'response', None) + if response is not None: + event.setdefault("extra", {}) + event["extra"]["api_url"] = response.url if hasattr(response, 'url') else "Unknown URL" + event["extra"]["http_status"] = response.status_code + event["extra"]["response_body"] = response.text + event["extra"]["headers"] = dict(response.headers) + except Exception as e: + print(f"Error appending request info: {e}") + return event + + +def append_path_params(event, hint): + """Add path parameters to the event.""" + try: + event.setdefault("tags", {}) + request = event.get("request", {}) + path = request.get("url", "") + if path: + # Remove query parameters and protocol + path = path.split('?')[0] + path = path.split('//')[-1] if '//' in path else path + + # Split path into components + path_components = [p for p in path.split('/') if p][2:] # Skip domain and version + + # URL path to variable name mapping + url_mapping = { + "routes": "route_id", + "workflows": "workflow_id", + "modules": "module_id", + "schemas": "schema_id", + "connections": "connection_id", + "variables": "variable_id", + } + + # Create path dictionary + path_dict = {} + path_components = [c for c in path_components if c != "id"] + for i in range(len(path_components)-1): + if path_components[i] in url_mapping: + path_dict[url_mapping[path_components[i]]] = path_components[i+1] + + # Add path parameters as tags + for key, value in path_dict.items(): + event["tags"][key] = value + + except Exception as e: + print(f"Error appending path params: {e}") + return event + + +def before_send(event, hint): + """Process and enrich Sentry events before sending. + + Args: + event: The event to be sent to Sentry + hint: A dictionary of hints about the event + + Returns: + The processed event or None to drop the event + """ + try: + # Add basic error information + exc_info = hint.get("exc_info") + if exc_info: + exception_type, exception_value, _ = exc_info + + # Add exception details to event context + event.setdefault("extra", {}) + event["extra"]["exception_type"] = str(exception_type.__name__) + event["extra"]["exception_value"] = str(exception_value) + + # Extract and add traceback information + if hasattr(exception_value, "__traceback__"): + tb_summary = "".join(traceback.format_tb(exception_value.__traceback__)) + event["extra"]["traceback_summary"] = str(tb_summary) + + # Initialize tags if not present + event.setdefault("tags", {}) + + # Extract request information if available + request = event.get("request", {}) + if request: + # Add request method and URL + event["tags"]["http_method"] = str(request.get("method", "unknown")) + event["tags"]["request_url"] = str(request.get("url", "unknown")) + + # Process query parameters + query_string = request.get("query_string", "") + if query_string: + try: + # Parse and add query parameters as tags + if isinstance(query_string, bytes): + query_string = query_string.decode() + query_params = dict(item.split("=") for item in query_string.split("&") if "=" in item) + for key, value in query_params.items(): + event["tags"][f"query_{key}"] = str(value) + except Exception: + pass + + # Extract route information from stack frames + exception = event.get("exception", {}) + values = exception.get("values", []) + + for value in values: + frames = value.get("stacktrace", {}).get("frames", []) + + for frame in frames: + file_path = str(frame.get("filename", "")) + function_name = str(frame.get("function", "")) + + # Extract module/route information + if "/routes/" in file_path: + route_path = file_path.split("/routes/")[1].split(".")[0] + event["tags"]["route_path"] = route_path + + # Extract function information + if function_name: + event["tags"]["function"] = function_name + + # Add module information + if file_path: + event["tags"]["module"] = file_path.split("/")[-1] + + # Add external request information + event = append_external_request_info(event, hint) + + # Add path parameters + event = append_path_params(event, hint) + + except Exception as e: + # Log error but don't block event sending + print(f"Error in before_send: {e}") + + return event + + +def init_sentry(app: Flask, dsn: Optional[str] = None) -> None: + """Initialize Sentry with the given configuration.""" + sentry_dsn = dsn or app.config.get("sentry_dsn") + + if not sentry_dsn or not isinstance(sentry_dsn, str): + app.logger.info("Sentry disabled - no valid DSN") + return + + sentry_sdk.init( + dsn=sentry_dsn, + integrations=[FlaskIntegration()], + environment=os.getenv("ENVIRONMENT", "development"), + traces_sample_rate=1.0, + profiles_sample_rate=1.0, + before_send=before_send, + include_local_variables=True, + attach_stacktrace=True, + send_default_pii=False, + include_source_context=True, + debug=False, + ) + app.logger.info("Sentry initialized successfully") diff --git a/src/workflows_cdk/core/validation.py b/src/workflows_cdk/core/validation.py new file mode 100644 index 0000000..cb0c8c9 --- /dev/null +++ b/src/workflows_cdk/core/validation.py @@ -0,0 +1,84 @@ +from flask import request +from .errors import ManagedError +from typing import Any, List, Dict +import json +def validate_request(request: Any, required_fields: List[str]): + """Validate that the request contains all required fields. + + Args: + request: The Flask request object + required_fields: List of field names that must be present in request.json + + Returns: + The validated request.json data + + Raises: + ManagedError: If validation fails + """ + # Skip validation for schema routes + if request.path.endswith('/schema'): + return + + # Skip validation if no required fields + if not required_fields: + return + + # Only validate if there's a JSON body + if request.is_json and request.json: + data = request.json.get("data") + credentials = request.json.get("credentials") + + if not data or not credentials: + raise ManagedError("Missing required fields: data or credentials") + + if required_fields: + missing_fields = [field for field in required_fields if field not in request.json] + if missing_fields: + raise ManagedError(f"Missing required fields: {', '.join(missing_fields)}") + + + +def parse_str_to_json(data): + try: + return json.loads(data) + except json.JSONDecodeError: + raise ValueError("Invalid JSON string") + +def validate_and_parse_json(data, field_name): + if isinstance(data, str): + return parse_str_to_json(data) + elif isinstance(data, (dict, list)): + return data + else: + raise ValueError(f"{field_name} must be a JSON string, dictionary, or list") + +def validate_array(data, field_name): + if isinstance(data, str): + json_data = parse_str_to_json(data) + if not isinstance(json_data, list): + raise ValueError(f"{field_name} must be an array") + return json_data + elif isinstance(data, list): + return data + else: + raise ValueError(f"{field_name} must be an array") + +def validate_object(data, field_name): + if isinstance(data, str): + json_data = parse_str_to_json(data) + if not isinstance(json_data, dict): + raise ValueError(f"{field_name} must be an object") + return json_data + elif isinstance(data, dict): + return data + else: + raise ValueError(f"{field_name} must be an object") + +def validate_string(data, field_name): + if not isinstance(data, str): + raise ValueError(f"{field_name} must be a string") + return data + +def validate_dict(data, field_name): + if not isinstance(data, dict): + raise ValueError(f"{field_name} must be a dictionary")