diff --git a/AGENTS.md b/AGENTS.md index ffd1842c..06ff1783 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -50,6 +50,60 @@ The operating model intentionally follows current agent practice: - `docker-compose/debug-docker-compose.yaml` expects `local_edge_node` - Kubernetes manifests are not self-consistent enough to treat as production-safe without review; validate names, namespaces, PVCs, and mount paths before use. +### Active Refactors +- CAR / Deeploy networking refactor planning: + - Local working plan lives at `xperimental/container_apps_deeploy_refactor_plan.md`. + - Approved sequencing is test-first: + 1. Add CAR compatibility baseline under `extensions/business/container_apps/tests/`. + 2. Add Deeploy compatibility baseline under `extensions/business/deeploy/tests/`. + 3. Introduce normalized CAR `EXPOSED_PORTS` model with backward-compatible mappers. + 4. Refactor CAR runtime and Deeploy request handling around the normalized model. + - Compatibility rules locked during planning: + - Keep legacy semaphore exports `HOST`, `PORT`, `URL` unchanged. + - Add explicit CAR exports `HOST_IP`, `HOST_PORT`, `CONTAINER_IP`, `CONTAINER_PORT`. + - Keep `shmem` backend-only for normal UI; UI should model dynamic env as `static`, `host_ip`, or `container_ip` with provider selection. + - Current status: + - Phase 0 completed on `2026-03-17`: focused CAR compatibility baseline added under `extensions/business/container_apps/tests/`. + - Verified passing command: `python3 -m unittest discover -s extensions/business/container_apps/tests -p "test_*.py"`. + - Note: existing top-level module `extensions.business.container_apps.test_worker_app_runner` currently has pre-existing failures on this branch and is not the Phase 0 compatibility gate. + - Phase 1 completed on `2026-03-17`: focused Deeploy compatibility baseline added under `extensions/business/deeploy/tests/`. + - Verified passing command: `python3 -m unittest discover -s extensions/business/deeploy/tests -p "test_*.py"`. + - Note: existing top-level module `extensions.business.deeploy.test_deeploy` currently is not a reliable gate on this branch because importing `DeeployManagerApiPlugin` hits an environment/import-time circular dependency in the installed `naeural_core`. + - Phase 2 completed on `2026-03-17`: CAR now parses and validates normalized `EXPOSED_PORTS` config and caches normalized state internally. + - Verified passing commands: + - `python3 -m unittest discover -s extensions/business/container_apps/tests -p "test_*.py"` + - `python3 -m unittest extensions.business.container_apps.tests.test_exposed_ports_model` + - Note: runtime port allocation and tunnel orchestration still follow legacy CAR paths; Phase 2 only adds normalized parsing/validation, preserving existing behavior unless `EXPOSED_PORTS` is explicitly provided. + - Phase 3 completed on `2026-03-17`: CAR now synthesizes normalized `EXPOSED_PORTS` state from legacy `PORT`, legacy port mappings, and legacy tunnel fields, with explicit precedence and conflict handling. + - Verified passing command: `python3 -m unittest discover -s extensions/business/container_apps/tests -p "test_*.py"`. + - Note: legacy runtime allocation and tunnel orchestration are still the active execution paths; Phase 3 changes only the internal normalized view used for later refactor steps. + - Phase 4 completed on `2026-03-17`: CAR runtime now allocates host ports, derives extra tunnels, and resolves health defaults from normalized `EXPOSED_PORTS` state while preserving `self.port` and `extra_ports_mapping` as compatibility outputs. + - Verified passing commands: + - `python3 -m unittest discover -s extensions/business/container_apps/tests -p "test_*.py"` + - `python3 -m unittest extensions.business.container_apps.tests.test_tunnel_runtime_behavior extensions.business.container_apps.tests.test_health_check_behavior` + - Note: main-tunnel and extra-tunnel processes remain separate for compatibility, but both are now driven from the normalized per-port model instead of raw legacy config fields. + - Phase 5 completed on `2026-03-17`: CAR now exports explicit semaphore networking keys `HOST_IP`, `HOST_PORT`, `CONTAINER_IP`, and `CONTAINER_PORT` while keeping legacy `HOST`, `PORT`, and `URL` semantics unchanged. + - Verified passing commands: + - `python3 -m unittest discover -s extensions/business/container_apps/tests -p "test_*.py"` + - `python3 -m unittest extensions.business.container_apps.tests.test_semaphore_exports` + - Note: explicit semaphore keys resolve from the normalized main port, so they work even when CAR is configured only through `EXPOSED_PORTS`. + - Phase 6 completed on `2026-03-17`: Deeploy create/update preparation now explicitly preserves normalized CAR `EXPOSED_PORTS` payloads, and containerized request validation rejects malformed non-dict `EXPOSED_PORTS` before the payload reaches CAR. + - Verified passing commands: + - `python3 -m unittest discover -s extensions/business/deeploy/tests -p "test_*.py"` + - `python3 -m unittest extensions.business.deeploy.tests.test_create_requests extensions.business.deeploy.tests.test_update_requests` + - Note: no Deeploy config flattening was needed; the main change was test coverage plus shallow request-shape validation at the translation boundary. + - Phase 7 completed on `2026-03-17`: Deeploy now translates a UI-friendly `DYNAMIC_ENV_UI` model into backend `DYNAMIC_ENV`, compiling `container_ip` providers to `shmem(..., CONTAINER_IP)` while keeping explicit raw `DYNAMIC_ENV` as the advanced-precedence path. + - Verified passing commands: + - `python3 -m unittest discover -s extensions/business/deeploy/tests -p "test_*.py"` + - `python3 -m unittest extensions.business.deeploy.tests.test_create_requests extensions.business.deeploy.tests.test_update_requests extensions.business.deeploy.tests.test_dynamic_env_resolution` + - Note: the frontend contract for this translation is now defined in backend code, but the actual UI implementation is still outside this repo. + - Phase 8 completed on `2026-03-17`: the `deeploy-dapp` frontend now edits generic CAR ports through `exposedPorts` rows, uses `container_ip` provider selection for dynamic env, serializes `EXPOSED_PORTS` / `DYNAMIC_ENV_UI`, and recovers/edits/display these values with legacy fallbacks. + - Verified passing commands: + - `npm run lint` (in `/home/vi/work/ratio1/repos/deeploy-dapp`) + - `npx tsc --noEmit --incremental false` (in `/home/vi/work/ratio1/repos/deeploy-dapp`) + - `google-chrome --headless=new --disable-gpu --window-size=1440,2200 --screenshot=.playwright/container-networking-preview.png http://127.0.0.1:3005/playwright-preview` + - Note: frontend preview screenshot confirmed the new exposed-ports and dynamic-env sections; legacy raw `shmem` references outside `CONTAINER_IP` are still not expressible in the simplified UI and remain an advanced compatibility edge case. + ### How To Run - Local dev (single node, local image): - `./debug.sh` @@ -571,6 +625,15 @@ Entry format: - Verification: Not run (not requested). - Links: `extensions/business/deeploy/deeploy_manager_api.py` +- ID: `ML-20260317-001` +- Timestamp: `2026-03-17T15:30:21Z` +- Type: `decision` +- Summary: Approved phased CAR/Deeploy networking refactor plan centered on a normalized `EXPOSED_PORTS` model and test-first compatibility baselines. +- Criticality: Cross-cutting architecture and migration plan affecting container apps, Deeploy request handling, semaphore exports, and future UI contracts. +- Details: Planning locked the sequence to separate CAR and Deeploy test baselines before code changes; the target CAR model is a dict-based `EXPOSED_PORTS` keyed by container port with per-port tunnel config and one `is_main_port`; legacy CAR config and legacy semaphore keys remain backward-compatible through translation; new explicit CAR semaphore env keys will be added for unambiguous host/container addressing; UI-facing dynamic env stays higher-level and compiles `container_ip` to backend `shmem(..., CONTAINER_IP)`. +- Verification: `date -u +"%Y-%m-%dT%H:%M:%SZ"`; `sed -n '1,260p' AGENTS.md`; local plan written to `xperimental/container_apps_deeploy_refactor_plan.md` +- Links: `AGENTS.md`, `xperimental/container_apps_deeploy_refactor_plan.md`, `extensions/business/container_apps/container_app_runner.py`, `extensions/business/deeploy/deeploy_mixin.py` + - ID: `ML-20260317-001` - Timestamp: `2026-03-17T21:14:35Z` - Type: `change` diff --git a/extensions/business/container_apps/README.md b/extensions/business/container_apps/README.md index 22e9fb36..107cbab5 100644 --- a/extensions/business/container_apps/README.md +++ b/extensions/business/container_apps/README.md @@ -37,6 +37,10 @@ The Container Apps module provides plugins for managing Docker containers with i ## Features +- **Normalized exposed ports**: `EXPOSED_PORTS` is the forward-looking config surface for container port exposure and per-port tunnel settings. +- **Explicit semaphore networking keys**: container apps now publish `HOST_IP`, `HOST_PORT`, `CONTAINER_IP`, and `CONTAINER_PORT` in addition to legacy `HOST`, `PORT`, and `URL`. +- **UI-friendly dynamic env support**: Deeploy can compile `DYNAMIC_ENV_UI` fragments to backend `DYNAMIC_ENV` entries, including generic plugin semaphore lookups. + ### Health Check Configuration The plugin uses a consolidated `HEALTH_CHECK` configuration dict to determine when the application is ready before starting tunnels. @@ -144,6 +148,55 @@ The plugin uses a consolidated `HEALTH_CHECK` configuration dict to determine wh See `ContainerAppRunnerPlugin.CONFIG` for full configuration options. +### Exposed Ports + +`EXPOSED_PORTS` is a dictionary keyed by internal container port: + +```python +"EXPOSED_PORTS": { + "3000": { + "is_main_port": True, + "host_port": None, + "tunnel": { + "enabled": True, + "engine": "cloudflare", + "token": "cf_token_for_3000", + }, + }, + "3001": { + "host_port": None, + "tunnel": { + "enabled": False, + }, + }, +} +``` + +Notes: + +- `is_main_port` marks the port used for default health checks, semaphore networking exports, and the primary app URL. +- `host_port` is optional; when omitted, CAR allocates a host port automatically. +- `tunnel.enabled=true` currently requires `engine="cloudflare"` and a token. +- Legacy `PORT`, `CONTAINER_RESOURCES["ports"]`, `CLOUDFLARE_TOKEN`, and `EXTRA_TUNNELS` are still accepted as compatibility inputs and normalized internally to `EXPOSED_PORTS`. + +### Dynamic Env and Semaphore Values + +Deeploy accepts a UI-facing `DYNAMIC_ENV_UI` payload and compiles it to backend `DYNAMIC_ENV` entries: + +- `static` -> `{"type": "static", "value": "..."}` +- `host_ip` -> `{"type": "host_ip"}` +- `container_ip(provider)` -> `{"type": "shmem", "path": [provider, "CONTAINER_IP"]}` +- `plugin_value(provider, key)` -> `{"type": "shmem", "path": [provider, key]}` + +When a container app acts as the provider, new consumers should prefer these explicit exported keys: + +- `HOST_IP` +- `HOST_PORT` +- `CONTAINER_IP` +- `CONTAINER_PORT` + +Legacy `HOST`, `PORT`, and `URL` remain available for backward compatibility. + --- ## Future Enhancements diff --git a/extensions/business/container_apps/container_app_runner.py b/extensions/business/container_apps/container_app_runner.py index 5eb6d339..59607ebc 100644 --- a/extensions/business/container_apps/container_app_runner.py +++ b/extensions/business/container_apps/container_app_runner.py @@ -270,8 +270,9 @@ def from_dict(cls, config_dict: dict) -> "HealthCheckConfig": "PASSWORD": None, # Optional registry password or token }, "ENV": {}, # dict of env vars for the container - "DYNAMIC_ENV": {}, # dict of dynamic env vars for the container - "PORT": None, # internal container port if it's a web app (int) + "DYNAMIC_ENV": {}, # backend dynamic env definition; Deeploy may compile DYNAMIC_ENV_UI into this + "EXPOSED_PORTS": {}, # normalized container-port config keyed by internal container port + "PORT": None, # legacy main internal container port if it's a web app (int) "CONTAINER_RESOURCES" : { "cpu": 1, # e.g. "0.5" for half a CPU, or "1.0" for one CPU core "gpu": 0, # 0 - no GPU, 1 - use GPU @@ -434,6 +435,8 @@ def __reset_vars(self): self.volumes = {} self.env = {} self.dynamic_env = {} + self._normalized_exposed_ports = {} + self._normalized_main_exposed_port = None # Container state machine self.container_state = ContainerState.UNINITIALIZED @@ -935,7 +938,7 @@ def _get_effective_health_mode(self, health_config: HealthCheckConfig = None) -> "Falling back to 'tcp' mode.", color='y' ) - return HealthCheckMode.TCP if self.cfg_port else HealthCheckMode.DELAY + return HealthCheckMode.TCP if self._get_main_container_port() else HealthCheckMode.DELAY return HealthCheckMode.ENDPOINT # Direct modes pass through @@ -945,7 +948,7 @@ def _get_effective_health_mode(self, health_config: HealthCheckConfig = None) -> # Auto mode: smart detection if health_config.path: return HealthCheckMode.ENDPOINT - elif self.cfg_port: + elif self._get_main_container_port(): return HealthCheckMode.TCP return HealthCheckMode.DELAY @@ -1226,6 +1229,8 @@ def _validate_runner_config(self): field_name='BUILD_AND_RUN_COMMANDS', ) + self._refresh_normalized_exposed_ports_state() + # Validate health endpoint port (soft error - disables health probing if invalid) self._validate_health_endpoint_port() @@ -1496,6 +1501,28 @@ def start_tunnel_engine(self): # end if return + def _get_main_tunnel_config(self): + """ + Return the normalized tunnel config for the main exposed port, if enabled. + """ + normalized_exposed_ports = self._normalized_exposed_ports or self._refresh_normalized_exposed_ports_state() + main_exposed_port = self._get_main_exposed_port(normalized_exposed_ports) + if not main_exposed_port: + return None + tunnel_config = main_exposed_port.get("tunnel") + if not tunnel_config or tunnel_config.get("enabled") is not True: + return None + return tunnel_config + + def get_cloudflare_token(self): + """ + Prefer the normalized main-port tunnel token over legacy config fields. + """ + tunnel_config = self._get_main_tunnel_config() + if tunnel_config and tunnel_config.get("engine") == "cloudflare": + return tunnel_config.get("token") + return super(ContainerAppRunnerPlugin, self).get_cloudflare_token() + def stop_tunnel_engine(self): """ @@ -1616,11 +1643,6 @@ def _get_host_port_for_container_port(self, container_port): int or None Host port if found, None otherwise """ - # Check main port first - if container_port == self.cfg_port: - return self.port - - # Check extra ports mapping for host_port, c_port in self.extra_ports_mapping.items(): if c_port == container_port: return host_port @@ -1640,16 +1662,10 @@ def _get_valid_container_ports(self): set of int Set of valid container ports """ - valid_ports = set() - - # Main port - if self.cfg_port: - valid_ports.add(self.cfg_port) - - # Extra ports (container ports from mapping) - for container_port in self.extra_ports_mapping.values(): - valid_ports.add(container_port) - + valid_ports = set(self.extra_ports_mapping.values()) + normalized_exposed_ports = self._normalized_exposed_ports or self._normalize_exposed_ports_config() + if normalized_exposed_ports: + valid_ports.update(normalized_exposed_ports.keys()) return valid_ports @@ -1732,26 +1748,17 @@ def _should_start_main_tunnel(self): bool True if main tunnel should start """ - # Check if we have a token (backward compatibility) - has_cloudflare_token = bool(getattr(self, 'cfg_cloudflare_token', None)) - has_params_token = bool( - self.cfg_tunnel_engine_parameters and - self.cfg_tunnel_engine_parameters.get("CLOUDFLARE_TOKEN") + main_container_port = self._get_main_container_port( + self._normalized_exposed_ports or self._normalize_exposed_ports_config() ) - - has_main_token = has_cloudflare_token or has_params_token - - # If no token at all, no main tunnel - if not has_main_token: - # self.Pd("No main tunnel token configured, skipping main tunnel") + if main_container_port is None: return False - # If PORT is defined and in EXTRA_TUNNELS, skip main tunnel - if self.cfg_port and self.cfg_port in self.extra_tunnel_configs: - self.P(f"Main PORT {self.cfg_port} is defined in EXTRA_TUNNELS, using extra tunnel instead") + if main_container_port in self.extra_tunnel_configs: + self.P(f"Main PORT {main_container_port} is defined in EXTRA_TUNNELS, using extra tunnel instead") return False - return True + return self._get_main_tunnel_config() is not None def _start_extra_tunnel(self, container_port, token): @@ -2391,7 +2398,7 @@ def _get_health_check_url(self): path = health.path if health.path.startswith('/') else '/' + health.path # Get container port (default to main port) - container_port = health.port or self.cfg_port + container_port = health.port or self._get_main_container_port() if not container_port: self.Pd("Health URL: no container port (HEALTH_CHECK.PORT or PORT not set)") return None @@ -2450,7 +2457,7 @@ def _get_health_check_port(self): Host port for health checking, or None if not configured """ health = self._get_health_config() - container_port = health.port or self.cfg_port + container_port = health.port or self._get_main_container_port() if not container_port: self.Pd("Health check port: no container port configured (HEALTH_CHECK.PORT or PORT)") return None @@ -2613,13 +2620,24 @@ def _is_app_ready(self): return self._app_ready def _setup_semaphore_env(self): - """Set semaphore environment variables for bundled plugins.""" + """ + Set semaphore environment variables for bundled plugins. + + Legacy keys stay unchanged for compatibility. New code should prefer the + explicit host/container keys added here. + """ localhost_ip = self.log.get_localhost_ip() - port = self.cfg_port + container_port = self._get_main_container_port() + host_port = self._get_host_port_for_container_port(container_port) if container_port else None self.semaphore_set_env('HOST', localhost_ip) - if port: - self.semaphore_set_env('PORT', str(port)) - self.semaphore_set_env('URL', 'http://{}:{}'.format(localhost_ip, port)) + self.semaphore_set_env('HOST_IP', localhost_ip) + if container_port: + # Keep legacy PORT/URL semantics unchanged. + self.semaphore_set_env('PORT', str(container_port)) + self.semaphore_set_env('URL', 'http://{}:{}'.format(localhost_ip, container_port)) + self.semaphore_set_env('CONTAINER_PORT', str(container_port)) + if host_port: + self.semaphore_set_env('HOST_PORT', str(host_port)) container_ip = self._get_container_ip() self.Pd(f"Container IP address: {container_ip}") if container_ip: diff --git a/extensions/business/container_apps/container_utils.py b/extensions/business/container_apps/container_utils.py index 47d501cd..8ac42e63 100644 --- a/extensions/business/container_apps/container_utils.py +++ b/extensions/business/container_apps/container_utils.py @@ -163,6 +163,359 @@ def _get_chainstore_response_data(self): return data + def _normalize_exposed_ports_value(self, exposed_ports): + """ + Normalize and validate a raw EXPOSED_PORTS-style dictionary. + + Parameters + ---------- + exposed_ports : dict + Raw exposed ports config keyed by container port. + """ + if not isinstance(exposed_ports, dict): + raise ValueError("EXPOSED_PORTS must be a dictionary keyed by container port") + + normalized = {} + main_ports = [] + used_host_ports = {} + + for raw_container_port, raw_config in exposed_ports.items(): + try: + container_port = int(raw_container_port) + except (TypeError, ValueError): + raise ValueError(f"EXPOSED_PORTS key must be an integer port, got: {raw_container_port}") + + if container_port < 1 or container_port > 65535: + raise ValueError(f"EXPOSED_PORTS key must be a valid port number, got: {container_port}") + + if raw_config is None: + raw_config = {} + + if not isinstance(raw_config, dict): + raise ValueError( + f"EXPOSED_PORTS[{container_port}] must be a dictionary, got: {type(raw_config)}" + ) + + config = self.deepcopy(raw_config) + is_main_port = config.get("is_main_port", False) + if not isinstance(is_main_port, bool): + raise ValueError(f"EXPOSED_PORTS[{container_port}].is_main_port must be a boolean") + if is_main_port: + main_ports.append(container_port) + + host_port = config.get("host_port") + if host_port is not None: + try: + host_port = int(host_port) + except (TypeError, ValueError): + raise ValueError( + f"EXPOSED_PORTS[{container_port}].host_port must be an integer or null" + ) + if host_port < 1 or host_port > 65535: + raise ValueError( + f"EXPOSED_PORTS[{container_port}].host_port must be a valid port number" + ) + if host_port in used_host_ports: + raise ValueError( + "EXPOSED_PORTS defines duplicate host_port {} for container ports {} and {}".format( + host_port, used_host_ports[host_port], container_port + ) + ) + used_host_ports[host_port] = container_port + + tunnel = config.get("tunnel") + if tunnel is None: + normalized_tunnel = None + else: + if not isinstance(tunnel, dict): + raise ValueError(f"EXPOSED_PORTS[{container_port}].tunnel must be a dictionary") + normalized_tunnel = self.deepcopy(tunnel) + enabled = normalized_tunnel.get("enabled", False) + if not isinstance(enabled, bool): + raise ValueError(f"EXPOSED_PORTS[{container_port}].tunnel.enabled must be a boolean") + engine = normalized_tunnel.get("engine", "cloudflare") + if engine is not None: + engine = str(engine).strip().lower() + if enabled and engine not in ("cloudflare",): + raise ValueError( + f"EXPOSED_PORTS[{container_port}].tunnel.engine must be 'cloudflare' when enabled" + ) + token = normalized_tunnel.get("token") + if enabled and engine == "cloudflare": + if not isinstance(token, str) or not token.strip(): + raise ValueError( + f"EXPOSED_PORTS[{container_port}].tunnel.token is required for enabled cloudflare tunnels" + ) + token = token.strip() + normalized_tunnel = { + "enabled": enabled, + "engine": engine, + "token": token, + } + + normalized[container_port] = { + "container_port": container_port, + "is_main_port": is_main_port, + "host_port": host_port, + "tunnel": normalized_tunnel, + } + + if len(main_ports) > 1: + raise ValueError(f"EXPOSED_PORTS defines multiple main ports: {sorted(main_ports)}") + + return normalized + + def _get_legacy_main_tunnel_token(self): + """ + Return the legacy main Cloudflare token, if configured. + """ + if isinstance(getattr(self, 'cfg_cloudflare_token', None), str): + token = self.cfg_cloudflare_token.strip() + if token: + return token + + params = getattr(self, 'cfg_tunnel_engine_parameters', None) or {} + if isinstance(params, dict): + token = params.get("CLOUDFLARE_TOKEN") + if isinstance(token, str): + token = token.strip() + if token: + return token + return None + + def _merge_legacy_exposed_port_entry( + self, exposed_ports, container_port, is_main_port=None, host_port=None, tunnel=None + ): + """ + Merge legacy CAR fields into a raw EXPOSED_PORTS-style entry. + """ + try: + container_port = int(container_port) + except (TypeError, ValueError): + raise ValueError(f"Legacy container port must be an integer, got: {container_port}") + + if container_port < 1 or container_port > 65535: + raise ValueError(f"Legacy container port must be a valid port number, got: {container_port}") + + entry = exposed_ports.setdefault(str(container_port), {}) + + if is_main_port is True: + existing_is_main = entry.get("is_main_port", False) + if existing_is_main is False: + entry["is_main_port"] = True + elif "is_main_port" not in entry: + entry["is_main_port"] = False + + if host_port is not None: + try: + host_port = int(host_port) + except (TypeError, ValueError): + raise ValueError(f"Legacy host port must be an integer, got: {host_port}") + existing_host_port = entry.get("host_port") + if existing_host_port is not None and int(existing_host_port) != host_port: + raise ValueError( + "Legacy config maps container port {} to conflicting host ports {} and {}".format( + container_port, existing_host_port, host_port + ) + ) + entry["host_port"] = host_port + + if tunnel is not None: + existing_tunnel = entry.get("tunnel") + if existing_tunnel is not None and existing_tunnel != tunnel: + raise ValueError( + "Legacy config defines conflicting tunnel settings for container port {}".format( + container_port + ) + ) + entry["tunnel"] = self.deepcopy(tunnel) + + return entry + + def _build_exposed_ports_config_from_legacy(self): + """ + Build a raw EXPOSED_PORTS-style config from legacy CAR fields. + """ + exposed_ports = {} + main_port = getattr(self, 'cfg_port', None) + container_resources = getattr(self, 'cfg_container_resources', None) or {} + legacy_ports = container_resources.get("ports", []) + + if isinstance(legacy_ports, list): + for container_port in legacy_ports: + self._merge_legacy_exposed_port_entry( + exposed_ports, + container_port=container_port, + is_main_port=(main_port is not None and int(container_port) == int(main_port)), + ) + elif isinstance(legacy_ports, dict): + for raw_host_port, raw_container_port in legacy_ports.items(): + self._merge_legacy_exposed_port_entry( + exposed_ports, + container_port=raw_container_port, + is_main_port=(main_port is not None and int(raw_container_port) == int(main_port)), + host_port=raw_host_port, + ) + + if main_port is not None and str(int(main_port)) not in exposed_ports: + self._merge_legacy_exposed_port_entry( + exposed_ports, + container_port=main_port, + is_main_port=True, + ) + + main_tunnel_token = self._get_legacy_main_tunnel_token() + if main_tunnel_token and main_port is not None: + self._merge_legacy_exposed_port_entry( + exposed_ports, + container_port=main_port, + is_main_port=True, + tunnel={ + "enabled": True, + "engine": "cloudflare", + "token": main_tunnel_token, + }, + ) + + legacy_extra_tunnels = getattr(self, 'cfg_extra_tunnels', None) or {} + if not isinstance(legacy_extra_tunnels, dict): + raise ValueError("EXTRA_TUNNELS must be a dictionary {container_port: token}") + + for raw_container_port, raw_tunnel_config in legacy_extra_tunnels.items(): + normalized_token = self._normalize_extra_tunnel_config(raw_container_port, raw_tunnel_config) + if not normalized_token: + raise ValueError(f"EXTRA_TUNNELS[{raw_container_port}] token is empty") + try: + container_port = int(raw_container_port) + except (TypeError, ValueError): + raise ValueError(f"EXTRA_TUNNELS key must be integer port, got: {raw_container_port}") + + tunnel = { + "enabled": True, + "engine": "cloudflare", + "token": normalized_token, + } + + entry = exposed_ports.get(str(container_port)) + if ( + entry is not None and + entry.get("tunnel") is not None and + main_port is not None and + container_port == int(main_port) and + entry.get("tunnel", {}).get("token") != normalized_token + ): + self.P( + "Legacy tunnel config for main PORT {} is overridden by EXTRA_TUNNELS entry".format( + container_port + ), + color='y' + ) + entry["tunnel"] = tunnel + entry["is_main_port"] = True + continue + + self._merge_legacy_exposed_port_entry( + exposed_ports, + container_port=container_port, + is_main_port=(main_port is not None and container_port == int(main_port)), + tunnel=tunnel, + ) + + return exposed_ports + + def _normalize_exposed_ports_config(self): + """ + Normalize and validate EXPOSED_PORTS configuration or synthesize it + from legacy CAR fields when the new config is absent. + + Returns + ------- + dict + Normalized exposed ports keyed by container port integer. + """ + exposed_ports = getattr(self, 'cfg_exposed_ports', None) + if isinstance(exposed_ports, dict) and len(exposed_ports) > 0: + return self._normalize_exposed_ports_value(exposed_ports) + + if exposed_ports in (None, {}): + legacy_exposed_ports = self._build_exposed_ports_config_from_legacy() + return self._normalize_exposed_ports_value(legacy_exposed_ports) + + raise ValueError("EXPOSED_PORTS must be a dictionary keyed by container port") + + def _refresh_normalized_exposed_ports_state(self): + """ + Recompute and cache normalized exposed ports from the current config. + """ + self._normalized_exposed_ports = self._normalize_exposed_ports_config() + self._normalized_main_exposed_port = self._get_main_exposed_port(self._normalized_exposed_ports) + return self._normalized_exposed_ports + + def _get_main_container_port(self, normalized_exposed_ports=None): + """ + Return the container port marked as main, if any. + """ + if normalized_exposed_ports is None: + normalized_exposed_ports = getattr(self, '_normalized_exposed_ports', {}) + if not normalized_exposed_ports: + normalized_exposed_ports = self._normalize_exposed_ports_config() + main_exposed_port = self._get_main_exposed_port(normalized_exposed_ports) + if not main_exposed_port: + return None + return main_exposed_port.get("container_port") + + def _get_main_exposed_port(self, normalized_exposed_ports=None): + """ + Return the normalized EXPOSED_PORTS entry marked as main. + + Parameters + ---------- + normalized_exposed_ports : dict, optional + Normalized exposed ports mapping. Uses cached value when omitted. + + Returns + ------- + dict or None + Main exposed port entry or None if not configured. + """ + if normalized_exposed_ports is None: + normalized_exposed_ports = getattr(self, '_normalized_exposed_ports', {}) + + for config in normalized_exposed_ports.values(): + if config.get("is_main_port") == True: + return config + return None + + def _get_container_to_host_port_map_from_exposed_ports(self, normalized_exposed_ports=None): + """ + Build container->host mapping from normalized EXPOSED_PORTS for entries + that already define host_port. + """ + if normalized_exposed_ports is None: + normalized_exposed_ports = getattr(self, '_normalized_exposed_ports', {}) + + result = {} + for container_port, config in normalized_exposed_ports.items(): + host_port = config.get("host_port") + if host_port is not None: + result[int(container_port)] = int(host_port) + return result + + def _get_host_to_container_port_map_from_exposed_ports(self, normalized_exposed_ports=None): + """ + Build host->container mapping from normalized EXPOSED_PORTS for entries + that already define host_port. + """ + if normalized_exposed_ports is None: + normalized_exposed_ports = getattr(self, '_normalized_exposed_ports', {}) + + result = {} + for container_port, config in normalized_exposed_ports.items(): + host_port = config.get("host_port") + if host_port is not None: + result[int(host_port)] = int(container_port) + return result + def _get_container_ip(self): """ Get the container's IP address from Docker network settings. @@ -369,147 +722,46 @@ def _allocate_port(self, required_port=0, allow_dynamic=False, sleep_time=5): def _setup_resource_limits_and_ports(self): """ - Sets up resource limits and port mappings for the container based on configuration. - - Port Handling Logic: - 1. Process all ports from CONTAINER_RESOURCES["ports"] first - 2. If main PORT exists and not in ports mapping, allocate it - 3. All ports (including main PORT) go into extra_ports_mapping - 4. Validate no duplicate container ports - - Priority: - - Explicit mappings in CONTAINER_RESOURCES["ports"] take precedence - - Main PORT is allocated dynamically if not explicitly mapped + Sets up resource limits and runtime port mappings from normalized config. """ DEFAULT_CPU_LIMIT = 1 DEFAULT_GPU_LIMIT = 0 DEFAULT_MEM_LIMIT = "512m" - DEFAULT_PORTS = [] - - container_resources = self.cfg_container_resources - if isinstance(container_resources, dict) and len(container_resources) > 0: - self._cpu_limit = float(container_resources.get("cpu", DEFAULT_CPU_LIMIT)) - self._gpu_limit = container_resources.get("gpu", DEFAULT_GPU_LIMIT) - self._mem_limit = container_resources.get("memory", DEFAULT_MEM_LIMIT) - - ports = container_resources.get("ports", DEFAULT_PORTS) - - # Track which container ports have been mapped to avoid duplicates - mapped_container_ports = set() - main_port_mapped = False - - if len(ports) > 0: - if isinstance(ports, list): - # Handle list of container ports - allocate dynamic host ports - self.P("Processing container ports list...") - for container_port in ports: - if container_port in mapped_container_ports: - self.P(f"Warning: Container port {container_port} already mapped, skipping duplicate", color='y') - continue - - self.P(f"Container port {container_port} specified. Finding available host port...") - host_port = self._allocate_port() - self.extra_ports_mapping[host_port] = container_port - mapped_container_ports.add(container_port) - self.P(f"Allocated host port {host_port} -> container port {container_port}") - - # Check if this is the main port - if self.cfg_port and container_port == self.cfg_port: - self.port = host_port - main_port_mapped = True - self.P(f"Main PORT {self.cfg_port} mapped to host port {host_port}", color='g') - - elif isinstance(ports, dict): - # Handle dict of explicit host_port -> container_port mappings - self.P("Processing explicit port mappings...") - - # First, validate for duplicate container ports - container_ports_in_dict = list(ports.values()) - if len(container_ports_in_dict) != len(set(container_ports_in_dict)): - raise ValueError( - f"Duplicate container ports found in CONTAINER_RESOURCES['ports']: {ports}. " - "Each container port can only be mapped once." - ) + container_resources = self.cfg_container_resources if isinstance(self.cfg_container_resources, dict) else {} + self._cpu_limit = float(container_resources.get("cpu", DEFAULT_CPU_LIMIT)) + self._gpu_limit = container_resources.get("gpu", DEFAULT_GPU_LIMIT) + self._mem_limit = container_resources.get("memory", DEFAULT_MEM_LIMIT) + + normalized_exposed_ports = self._refresh_normalized_exposed_ports_state() + + self.extra_ports_mapping = {} + self.inverted_ports_mapping = {} + self.port = None + + if normalized_exposed_ports: + self.P(f"Processing {len(normalized_exposed_ports)} normalized exposed port(s)...") + + for container_port, port_config in normalized_exposed_ports.items(): + requested_host_port = port_config.get("host_port") + if requested_host_port is not None: + self.P(f"Allocating requested host port {requested_host_port} for container port {container_port}...") + host_port = self._allocate_port(requested_host_port, allow_dynamic=False) + if host_port != requested_host_port: + raise RuntimeError( + f"Failed to allocate requested host port {requested_host_port}. " + f"Port may be in use by another process." + ) + else: + self.P(f"Container port {container_port} specified. Finding available host port...") + host_port = self._allocate_port(allow_dynamic=True) - # Process all explicit mappings - for host_port, container_port in ports.items(): - try: - host_port = int(host_port) - container_port = int(container_port) - - # Check if this mapping was already processed - if host_port in self.extra_ports_mapping: - existing_container_port = self.extra_ports_mapping[host_port] - if existing_container_port == container_port: - self.Pd(f"Port mapping {host_port}->{container_port} already exists, skipping") - continue - else: - raise ValueError( - f"Host port {host_port} is already mapped to container port {existing_container_port}. " - f"Cannot map it to {container_port}" - ) - - # Allocate the requested host port - self.P(f"Allocating requested host port {host_port} for container port {container_port}...") - allocated_port = self._allocate_port(host_port, allow_dynamic=False) - - if allocated_port != host_port: - raise RuntimeError( - f"Failed to allocate requested host port {host_port}. " - f"Port may be in use by another process." - ) - - self.extra_ports_mapping[host_port] = container_port - mapped_container_ports.add(container_port) - self.P(f"Allocated host port {host_port} -> container port {container_port}", color='g') - - # Check if this is the main port - if self.cfg_port and container_port == self.cfg_port: - self.port = host_port - main_port_mapped = True - self.P(f"Main PORT {self.cfg_port} mapped to host port {host_port} (from explicit mapping)", color='g') - - except ValueError as e: - raise ValueError(f"Invalid port mapping {host_port}:{container_port} - {e}") - except Exception as e: - self.P(f"Failed to allocate port {host_port}: {e}", color='r') - raise RuntimeError(f"Port allocation failed for {host_port}:{container_port}") - else: - self.P(f"Invalid ports configuration type: {type(ports)}. Expected list or dict.", color='r') - - # Handle main PORT if it exists and wasn't mapped yet - if self.cfg_port and not main_port_mapped: - if self.cfg_port in mapped_container_ports: - # Main PORT was mapped to a different host port in the loop above - # Find which host port it was mapped to - for h_port, c_port in self.extra_ports_mapping.items(): - if c_port == self.cfg_port: - self.port = h_port - self.P(f"Main PORT {self.cfg_port} already mapped to host port {h_port}", color='d') - break - else: - # Allocate a dynamic host port for the main PORT - self.P(f"Main PORT {self.cfg_port} not in explicit mappings. Allocating dynamic host port...") - self.port = self._allocate_port(allow_dynamic=True) - self.extra_ports_mapping[self.port] = self.cfg_port - mapped_container_ports.add(self.cfg_port) - self.P(f"Allocated host port {self.port} -> main PORT {self.cfg_port}", color='g') - # endif main PORT - # endif main_port_mapped - else: - # No container resources specified, use defaults - self._cpu_limit = float(DEFAULT_CPU_LIMIT) - self._gpu_limit = DEFAULT_GPU_LIMIT - self._mem_limit = DEFAULT_MEM_LIMIT - - # Still handle main PORT if specified - if self.cfg_port: - self.P(f"No CONTAINER_RESOURCES specified. Allocating dynamic host port for main PORT {self.cfg_port}...") - self.port = self._allocate_port(allow_dynamic=True) - self.extra_ports_mapping[self.port] = self.cfg_port - self.P(f"Allocated host port {self.port} -> main PORT {self.cfg_port}", color='g') - # endif main PORT - # endif container_resources + self.extra_ports_mapping[host_port] = container_port + self.P(f"Allocated host port {host_port} -> container port {container_port}") + + if port_config.get("is_main_port") is True: + self.port = host_port + self.P(f"Main PORT {container_port} mapped to host port {host_port}", color='g') + # endfor container_port return def _set_directory_permissions(self, path, mode=0o777): @@ -1084,17 +1336,13 @@ def _allocate_extra_tunnel_ports(self, container_ports): def _validate_extra_tunnels_config(self): """ - Validate EXTRA_TUNNELS configuration. - - Key behaviors: - 1. If TUNNEL_ENGINE_ENABLED=False, EXTRA_TUNNELS are IGNORED - 2. Container ports can be defined only in EXTRA_TUNNELS (not in CONTAINER_RESOURCES) - 3. Ports from EXTRA_TUNNELS will be allocated dynamically if needed - 4. Dict keys can be strings or integers + Validate and derive runtime extra tunnel config from normalized ports. Returns: bool: True if valid """ + self.extra_tunnel_configs = {} + # Master switch check if not self.cfg_tunnel_engine_enabled: if self.cfg_extra_tunnels: @@ -1104,48 +1352,35 @@ def _validate_extra_tunnels_config(self): ) return True - if not self.cfg_extra_tunnels: - self.Pd("No EXTRA_TUNNELS configured") - return True + normalized_exposed_ports = self._refresh_normalized_exposed_ports_state() + main_container_port = self._get_main_container_port(normalized_exposed_ports) - if not isinstance(self.cfg_extra_tunnels, dict): - raise ValueError("EXTRA_TUNNELS must be a dictionary {container_port: token}") + for container_port, port_config in normalized_exposed_ports.items(): + tunnel_config = port_config.get("tunnel") + if not tunnel_config or tunnel_config.get("enabled") is not True: + continue - # Track which ports need to be allocated - ports_to_allocate = [] + if container_port not in self.extra_ports_mapping.values(): + requested_host_port = port_config.get("host_port") + if requested_host_port is not None: + host_port = self._allocate_port(requested_host_port, allow_dynamic=False) + else: + host_port = self._allocate_port(allow_dynamic=True) + self.extra_ports_mapping[host_port] = container_port - for port_key, tunnel_config in self.cfg_extra_tunnels.items(): - # Convert port key to integer (handle both string and int keys) - try: - container_port = int(port_key) - except (ValueError, TypeError): - raise ValueError(f"EXTRA_TUNNELS key must be integer port, got: {port_key}") - - # Check if port is already allocated - is_already_mapped = container_port in self.extra_ports_mapping.values() - - if not is_already_mapped: - # Port not in CONTAINER_RESOURCES["ports"], will need to allocate - self.Pd( - f"EXTRA_TUNNELS port {container_port} not in CONTAINER_RESOURCES['ports'], " - f"will allocate dynamically" - ) - ports_to_allocate.append(container_port) + if main_container_port is not None and container_port == main_container_port: + continue - # Normalize and validate tunnel config - try: - normalized = self._normalize_extra_tunnel_config(container_port, tunnel_config) - if not normalized: - raise ValueError(f"EXTRA_TUNNELS[{container_port}] token is empty") - self.extra_tunnel_configs[container_port] = normalized - except Exception as e: - raise ValueError(f"EXTRA_TUNNELS[{container_port}] validation failed: {e}") - - # Allocate ports for EXTRA_TUNNELS not in CONTAINER_RESOURCES - if ports_to_allocate: - self._allocate_extra_tunnel_ports(ports_to_allocate) - - self.P(f"EXTRA_TUNNELS validated: {len(self.extra_tunnel_configs)} tunnel(s) configured", color='g') + token = tunnel_config.get("token") + if not token: + raise ValueError(f"Normalized tunnel for port {container_port} is missing a token") + self.extra_tunnel_configs[container_port] = token + + self.inverted_ports_mapping = { + f"{container_port}/tcp": str(host_port) + for host_port, container_port in self.extra_ports_mapping.items() + } + self.P(f"Extra tunnels derived from normalized config: {len(self.extra_tunnel_configs)} tunnel(s)", color='g') return True ### END EXTRA TUNNELS METHODS ### diff --git a/extensions/business/container_apps/tests/__init__.py b/extensions/business/container_apps/tests/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/extensions/business/container_apps/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/extensions/business/container_apps/tests/support.py b/extensions/business/container_apps/tests/support.py new file mode 100644 index 00000000..3576e3ce --- /dev/null +++ b/extensions/business/container_apps/tests/support.py @@ -0,0 +1,157 @@ +import os +import sys +import types +from collections import deque + + +class _DummyBasePlugin: + CONFIG = {'VALIDATION_RULES': {}} + + def __init__(self, *args, **kwargs): + pass + + def on_init(self): + return + + def reset_tunnel_engine(self): + return + + def maybe_init_tunnel_engine(self): + return + + def maybe_start_tunnel_engine(self): + return + + def maybe_tunnel_engine_ping(self): + return + + def diskapi_save_pickle_to_output(self, *args, **kwargs): + return + + def chainstore_set(self, *args, **kwargs): + return + + def use_cloudflare(self): + return True + + def stop_tunnel_command(self, *args, **kwargs): + return + + def run_tunnel_engine(self): + return None + + def get_cloudflare_token(self): + params = getattr(self, 'cfg_tunnel_engine_parameters', None) or {} + return getattr(self, 'cfg_cloudflare_token', None) or params.get("CLOUDFLARE_TOKEN") + + def time(self): + return 0 + + def sleep(self, *args, **kwargs): + return + + def deepcopy(self, obj): + return obj + + def json_dumps(self, obj): + return str(obj) + + def sanitize_name(self, name): + return name.replace('/', '_') + + +def install_dummy_base_plugin(): + module_hierarchy = [ + ('naeural_core', types.ModuleType('naeural_core')), + ('naeural_core.business', types.ModuleType('naeural_core.business')), + ('naeural_core.business.base', types.ModuleType('naeural_core.business.base')), + ('naeural_core.business.base.web_app', types.ModuleType('naeural_core.business.base.web_app')), + ] + + for name, module in module_hierarchy: + sys.modules.setdefault(name, module) + + base_tunnel_mod = types.ModuleType('naeural_core.business.base.web_app.base_tunnel_engine_plugin') + base_tunnel_mod.BaseTunnelEnginePlugin = _DummyBasePlugin + sys.modules['naeural_core.business.base.web_app.base_tunnel_engine_plugin'] = base_tunnel_mod + + +install_dummy_base_plugin() + +from extensions.business.container_apps.container_app_runner import ContainerAppRunnerPlugin + + +def make_container_app_runner(): + plugin = ContainerAppRunnerPlugin.__new__(ContainerAppRunnerPlugin) + plugin.logged_messages = [] + + def _log(*args, **kwargs): + if args: + plugin.logged_messages.append(str(args[0])) + return + + plugin.P = _log + plugin.Pd = _log + plugin.deque = deque + plugin.os_path = os.path + plugin.os = os + plugin.cfg_instance_id = "car_instance" + plugin.uuid = lambda *a, **k: "efgh" + plugin.time = lambda: 0 + plugin.cfg_max_log_lines = 10 + plugin.cfg_env = {} + plugin.cfg_dynamic_env = {} + plugin.cfg_exposed_ports = {} + plugin.cfg_container_resources = {} + plugin.cfg_volumes = {} + plugin.cfg_file_volumes = {} + plugin.cfg_port = None + plugin.cfg_autoupdate = True + plugin.cfg_autoupdate_interval = 10 + plugin.cfg_image_poll_interval = 10 + plugin.cfg_chainstore_response_key = None + plugin.cfg_chainstore_peers = [] + plugin.cfg_car_verbose = 10 + plugin.cfg_cloudflare_token = None + plugin.cfg_tunnel_engine_enabled = True + plugin.cfg_tunnel_engine_parameters = {} + plugin.cfg_extra_tunnels = {} + plugin.cfg_extra_tunnels_ping_interval = 30 + plugin.cfg_health_check = {} + plugin.cfg_restart_policy = "always" + plugin.volumes = {} + plugin.extra_ports_mapping = {} + plugin.inverted_ports_mapping = {} + plugin.extra_tunnel_configs = {} + plugin.extra_tunnel_processes = {} + plugin.extra_tunnel_urls = {} + plugin.extra_tunnel_log_readers = {} + plugin.extra_tunnel_start_times = {} + plugin._tunnel_consecutive_failures = {} + plugin._tunnel_last_failure_time = {} + plugin._tunnel_next_restart_time = {} + plugin._tunnel_last_successful_start = {} + plugin._health_probing_disabled = False + plugin._normalized_exposed_ports = {} + plugin._normalized_main_exposed_port = None + plugin.container = object() + plugin.container_name = "car_instance_efgh" + plugin.log = types.SimpleNamespace(get_localhost_ip=lambda: "127.0.0.1") + plugin.bc = types.SimpleNamespace(eth_address="0x0", get_evm_network=lambda: "testnet") + plugin.re = __import__("re") + plugin.json_dumps = lambda obj: str(obj) + plugin.deepcopy = lambda obj: obj + plugin.semaphore_env = {} + plugin.semaphore_set_env = lambda key, value: plugin.semaphore_env.__setitem__(key, str(value)) + plugin._get_container_ip = lambda: "172.18.0.5" + + next_dynamic_port = {'value': 20000} + + def allocate_port(required_port=0, allow_dynamic=False, sleep_time=5): + if required_port: + return required_port + next_dynamic_port['value'] += 1 + return next_dynamic_port['value'] + + plugin._allocate_port = allocate_port + return plugin diff --git a/extensions/business/container_apps/tests/test_exposed_ports_model.py b/extensions/business/container_apps/tests/test_exposed_ports_model.py new file mode 100644 index 00000000..8f207fbc --- /dev/null +++ b/extensions/business/container_apps/tests/test_exposed_ports_model.py @@ -0,0 +1,217 @@ +import unittest + +from extensions.business.container_apps.tests.support import make_container_app_runner + + +class ContainerAppRunnerExposedPortsModelTests(unittest.TestCase): + + def test_normalize_exposed_ports_accepts_minimal_valid_config(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": { + "is_main_port": True, + "host_port": None, + "tunnel": { + "enabled": True, + "engine": "cloudflare", + "token": "cf-token", + }, + }, + "3001": { + "tunnel": { + "enabled": False, + }, + }, + } + + normalized = plugin._normalize_exposed_ports_config() + + self.assertEqual(sorted(normalized.keys()), [3000, 3001]) + self.assertEqual(normalized[3000]["container_port"], 3000) + self.assertTrue(normalized[3000]["is_main_port"]) + self.assertIsNone(normalized[3000]["host_port"]) + self.assertEqual(normalized[3000]["tunnel"]["token"], "cf-token") + self.assertFalse(normalized[3001]["is_main_port"]) + self.assertIsNone(normalized[3001]["host_port"]) + self.assertEqual(normalized[3001]["tunnel"]["enabled"], False) + + def test_validate_runner_config_caches_normalized_exposed_ports(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": { + "is_main_port": True, + } + } + plugin.cfg_container_entrypoint = None + plugin.cfg_container_start_command = None + plugin.cfg_build_and_run_commands = [] + + plugin._validate_runner_config() + + self.assertEqual(plugin._normalized_exposed_ports[3000]["container_port"], 3000) + self.assertEqual(plugin._normalized_main_exposed_port["container_port"], 3000) + + def test_normalize_exposed_ports_rejects_multiple_main_ports(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": {"is_main_port": True}, + "3001": {"is_main_port": True}, + } + + with self.assertRaisesRegex(ValueError, "multiple main ports"): + plugin._normalize_exposed_ports_config() + + def test_normalize_exposed_ports_rejects_duplicate_host_ports(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": {"host_port": 18080}, + "3001": {"host_port": 18080}, + } + + with self.assertRaisesRegex(ValueError, "duplicate host_port"): + plugin._normalize_exposed_ports_config() + + def test_normalize_exposed_ports_rejects_invalid_tunnel_config(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": { + "tunnel": { + "enabled": True, + "engine": "cloudflare", + } + } + } + + with self.assertRaisesRegex(ValueError, "tunnel.token is required"): + plugin._normalize_exposed_ports_config() + + def test_normalize_exposed_ports_requires_dict_keyed_by_valid_ports(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "not-a-port": {} + } + + with self.assertRaisesRegex(ValueError, "key must be an integer port"): + plugin._normalize_exposed_ports_config() + + def test_legacy_port_list_normalizes_into_exposed_ports(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3001 + plugin.cfg_container_resources = { + "ports": [3000, 3001, 3002] + } + + normalized = plugin._normalize_exposed_ports_config() + + self.assertEqual(sorted(normalized.keys()), [3000, 3001, 3002]) + self.assertTrue(normalized[3001]["is_main_port"]) + self.assertIsNone(normalized[3000]["host_port"]) + self.assertIsNone(normalized[3002]["tunnel"]) + + def test_legacy_explicit_host_port_mapping_normalizes_into_exposed_ports(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3001 + plugin.cfg_container_resources = { + "ports": { + "18080": 3000, + "18081": 3001, + } + } + + normalized = plugin._normalize_exposed_ports_config() + + self.assertEqual(normalized[3000]["host_port"], 18080) + self.assertEqual(normalized[3001]["host_port"], 18081) + self.assertTrue(normalized[3001]["is_main_port"]) + + def test_legacy_main_tunnel_token_attaches_to_main_port(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.cfg_cloudflare_token = "main-token" + + normalized = plugin._normalize_exposed_ports_config() + + self.assertEqual(normalized[3000]["tunnel"], { + "enabled": True, + "engine": "cloudflare", + "token": "main-token", + }) + + def test_legacy_tunnel_engine_parameters_token_falls_back_for_main_port(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.cfg_tunnel_engine_parameters = { + "CLOUDFLARE_TOKEN": "params-token" + } + + normalized = plugin._normalize_exposed_ports_config() + + self.assertEqual(normalized[3000]["tunnel"]["token"], "params-token") + + def test_legacy_extra_tunnels_create_ports_and_attach_tunnel(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.cfg_extra_tunnels = { + "3002": "extra-token" + } + + normalized = plugin._normalize_exposed_ports_config() + + self.assertTrue(normalized[3000]["is_main_port"]) + self.assertEqual(normalized[3002]["tunnel"], { + "enabled": True, + "engine": "cloudflare", + "token": "extra-token", + }) + + def test_legacy_main_extra_tunnel_override_prefers_explicit_per_port_token(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.cfg_cloudflare_token = "main-token" + plugin.cfg_extra_tunnels = { + "3000": "extra-token" + } + + normalized = plugin._normalize_exposed_ports_config() + + self.assertEqual(normalized[3000]["tunnel"]["token"], "extra-token") + self.assertTrue(any("overridden by EXTRA_TUNNELS" in msg for msg in plugin.logged_messages)) + + def test_legacy_conflicting_explicit_host_ports_are_rejected(self): + plugin = make_container_app_runner() + exposed_ports = { + "3000": { + "host_port": 18080, + } + } + + with self.assertRaisesRegex(ValueError, "conflicting host ports"): + plugin._merge_legacy_exposed_port_entry( + exposed_ports, + container_port=3000, + host_port=18081, + ) + + def test_explicit_exposed_ports_win_over_legacy_fields(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3005": { + "is_main_port": True, + } + } + plugin.cfg_port = 3000 + plugin.cfg_container_resources = { + "ports": [3000, 3001] + } + plugin.cfg_extra_tunnels = { + "3001": "extra-token" + } + + normalized = plugin._normalize_exposed_ports_config() + + self.assertEqual(sorted(normalized.keys()), [3005]) + self.assertTrue(normalized[3005]["is_main_port"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/container_apps/tests/test_health_check_behavior.py b/extensions/business/container_apps/tests/test_health_check_behavior.py new file mode 100644 index 00000000..ced0d010 --- /dev/null +++ b/extensions/business/container_apps/tests/test_health_check_behavior.py @@ -0,0 +1,70 @@ +import types +import unittest + +from extensions.business.container_apps.tests.support import make_container_app_runner +from extensions.business.container_apps.container_app_runner import HealthCheckMode + + +class ContainerAppRunnerHealthCheckTests(unittest.TestCase): + + def test_valid_container_ports_include_main_and_extra_ports(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.extra_ports_mapping = { + 20001: 3000, + 20002: 3002, + 20003: 3003, + } + + self.assertEqual(plugin._get_valid_container_ports(), {3000, 3002, 3003}) + + def test_invalid_health_port_disables_probing(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.extra_ports_mapping = { + 20001: 3000, + 20002: 3002, + } + plugin._get_health_config = lambda: types.SimpleNamespace(port=3005) + + is_valid = plugin._validate_health_endpoint_port() + + self.assertFalse(is_valid) + self.assertTrue(plugin._health_probing_disabled) + + def test_valid_health_port_keeps_probing_enabled(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.extra_ports_mapping = { + 20001: 3000, + 20002: 3002, + } + plugin._get_health_config = lambda: types.SimpleNamespace(port=3002) + + is_valid = plugin._validate_health_endpoint_port() + + self.assertTrue(is_valid) + self.assertFalse(plugin._health_probing_disabled) + + def test_health_defaults_to_main_exposed_port_without_legacy_port(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": { + "is_main_port": True, + }, + "3002": {}, + } + plugin.extra_ports_mapping = { + 20001: 3000, + 20002: 3002, + } + plugin._get_health_config = lambda: types.SimpleNamespace(port=None, path=None, mode="auto") + plugin._refresh_normalized_exposed_ports_state() + + self.assertEqual(plugin._get_valid_container_ports(), {3000, 3002}) + self.assertEqual(plugin._get_health_check_port(), 20001) + self.assertEqual(plugin._get_effective_health_mode(), HealthCheckMode.TCP) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/container_apps/tests/test_legacy_config_mapping.py b/extensions/business/container_apps/tests/test_legacy_config_mapping.py new file mode 100644 index 00000000..f6b40109 --- /dev/null +++ b/extensions/business/container_apps/tests/test_legacy_config_mapping.py @@ -0,0 +1,37 @@ +import unittest + +from extensions.business.container_apps.tests.support import make_container_app_runner + + +class ContainerAppRunnerLegacyConfigMappingTests(unittest.TestCase): + + def test_get_host_port_for_container_port_prefers_main_port_alias(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.port = 20001 + plugin.extra_ports_mapping = { + 20001: 3000, + 20002: 3002, + } + + self.assertEqual(plugin._get_host_port_for_container_port(3000), 20001) + self.assertEqual(plugin._get_host_port_for_container_port(3002), 20002) + + def test_extra_tunnels_can_define_exposed_port_without_legacy_ports_config(self): + plugin = make_container_app_runner() + plugin.cfg_extra_tunnels = { + "3005": "token-3005" + } + + plugin._validate_extra_tunnels_config() + + self.assertEqual(plugin.extra_tunnel_configs, { + 3005: "token-3005" + }) + self.assertEqual(plugin.extra_ports_mapping, { + 20001: 3005 + }) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/container_apps/tests/test_port_runtime_behavior.py b/extensions/business/container_apps/tests/test_port_runtime_behavior.py new file mode 100644 index 00000000..c7daaea0 --- /dev/null +++ b/extensions/business/container_apps/tests/test_port_runtime_behavior.py @@ -0,0 +1,87 @@ +import unittest + +from extensions.business.container_apps.tests.support import make_container_app_runner + + +class ContainerAppRunnerPortRuntimeTests(unittest.TestCase): + + def test_main_port_only_gets_dynamic_host_port(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + + plugin._setup_resource_limits_and_ports() + + self.assertEqual(plugin.port, 20001) + self.assertEqual(plugin.extra_ports_mapping, {20001: 3000}) + + def test_list_ports_map_all_container_ports_and_track_main(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3001 + plugin.cfg_container_resources = { + "ports": [3000, 3001, 3002] + } + + plugin._setup_resource_limits_and_ports() + + self.assertEqual(plugin.extra_ports_mapping, { + 20001: 3000, + 20002: 3001, + 20003: 3002, + }) + self.assertEqual(plugin.port, 20002) + + def test_dict_ports_preserve_requested_host_ports(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3001 + plugin.cfg_container_resources = { + "ports": { + "18080": 3000, + "18081": 3001, + } + } + + plugin._setup_resource_limits_and_ports() + + self.assertEqual(plugin.extra_ports_mapping, { + 18080: 3000, + 18081: 3001, + }) + self.assertEqual(plugin.port, 18081) + + def test_main_port_is_added_when_missing_from_legacy_ports_list(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3001 + plugin.cfg_container_resources = { + "ports": [3000] + } + + plugin._setup_resource_limits_and_ports() + + self.assertEqual(plugin.extra_ports_mapping, { + 20001: 3000, + 20002: 3001, + }) + self.assertEqual(plugin.port, 20002) + + def test_explicit_exposed_ports_drive_runtime_allocation(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": { + "is_main_port": True, + }, + "3002": { + "host_port": 18082, + }, + } + + plugin._setup_resource_limits_and_ports() + + self.assertEqual(plugin.extra_ports_mapping, { + 20001: 3000, + 18082: 3002, + }) + self.assertEqual(plugin.port, 20001) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/container_apps/tests/test_semaphore_exports.py b/extensions/business/container_apps/tests/test_semaphore_exports.py new file mode 100644 index 00000000..f4dcd142 --- /dev/null +++ b/extensions/business/container_apps/tests/test_semaphore_exports.py @@ -0,0 +1,52 @@ +import unittest + +from extensions.business.container_apps.tests.support import make_container_app_runner + + +class ContainerAppRunnerSemaphoreExportTests(unittest.TestCase): + + def test_setup_semaphore_env_keeps_current_legacy_port_behavior(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.port = 20001 + plugin.extra_ports_mapping = { + 20001: 3000, + } + + plugin._setup_semaphore_env() + + self.assertEqual(plugin.semaphore_env["HOST"], "127.0.0.1") + self.assertEqual(plugin.semaphore_env["HOST_IP"], "127.0.0.1") + self.assertEqual(plugin.semaphore_env["PORT"], "3000") + self.assertEqual(plugin.semaphore_env["URL"], "http://127.0.0.1:3000") + self.assertEqual(plugin.semaphore_env["HOST_PORT"], "20001") + self.assertEqual(plugin.semaphore_env["CONTAINER_PORT"], "3000") + self.assertEqual(plugin.semaphore_env["CONTAINER_IP"], "172.18.0.5") + + def test_setup_semaphore_env_exports_explicit_main_port_keys_from_normalized_config(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3005": { + "is_main_port": True, + }, + "3006": {}, + } + plugin.extra_ports_mapping = { + 21001: 3005, + 21002: 3006, + } + plugin.port = 21001 + plugin._refresh_normalized_exposed_ports_state() + + plugin._setup_semaphore_env() + + self.assertEqual(plugin.semaphore_env["HOST_IP"], "127.0.0.1") + self.assertEqual(plugin.semaphore_env["HOST_PORT"], "21001") + self.assertEqual(plugin.semaphore_env["CONTAINER_PORT"], "3005") + self.assertEqual(plugin.semaphore_env["CONTAINER_IP"], "172.18.0.5") + self.assertEqual(plugin.semaphore_env["PORT"], "3005") + self.assertEqual(plugin.semaphore_env["URL"], "http://127.0.0.1:3005") + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/container_apps/tests/test_tunnel_runtime_behavior.py b/extensions/business/container_apps/tests/test_tunnel_runtime_behavior.py new file mode 100644 index 00000000..7e9ce118 --- /dev/null +++ b/extensions/business/container_apps/tests/test_tunnel_runtime_behavior.py @@ -0,0 +1,109 @@ +import unittest + +from extensions.business.container_apps.tests.support import make_container_app_runner + + +class ContainerAppRunnerTunnelRuntimeTests(unittest.TestCase): + + def test_extra_tunnels_allocate_host_ports_for_unmapped_ports(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.cfg_container_resources = { + "ports": [3000] + } + plugin._setup_resource_limits_and_ports() + plugin.cfg_extra_tunnels = { + "3002": "token-3002" + } + + plugin._validate_extra_tunnels_config() + + self.assertEqual(plugin.extra_tunnel_configs, { + 3002: "token-3002" + }) + self.assertEqual(plugin.extra_ports_mapping, { + 20001: 3000, + 20002: 3002, + }) + + def test_main_tunnel_is_skipped_when_main_port_is_in_extra_tunnels(self): + plugin = make_container_app_runner() + plugin.cfg_port = 3000 + plugin.cfg_cloudflare_token = "main-token" + plugin.extra_tunnel_configs = { + 3000: "extra-token" + } + + self.assertFalse(plugin._should_start_main_tunnel()) + + def test_build_tunnel_command_uses_host_port_mapping(self): + plugin = make_container_app_runner() + plugin.extra_ports_mapping = { + 20005: 3002 + } + + command = plugin._build_tunnel_command(3002, "cf-token") + + self.assertEqual(command, [ + "cloudflared", + "tunnel", + "--no-autoupdate", + "run", + "--token", + "cf-token", + "--url", + "http://127.0.0.1:20005", + ]) + + def test_normalized_main_tunnel_drives_cloudflare_token(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": { + "is_main_port": True, + "tunnel": { + "enabled": True, + "engine": "cloudflare", + "token": "normalized-main-token", + }, + } + } + + plugin._refresh_normalized_exposed_ports_state() + + self.assertTrue(plugin._should_start_main_tunnel()) + self.assertEqual(plugin.get_cloudflare_token(), "normalized-main-token") + + def test_validate_extra_tunnels_config_uses_normalized_non_main_tunnels(self): + plugin = make_container_app_runner() + plugin.cfg_exposed_ports = { + "3000": { + "is_main_port": True, + "tunnel": { + "enabled": True, + "engine": "cloudflare", + "token": "main-token", + }, + }, + "3002": { + "tunnel": { + "enabled": True, + "engine": "cloudflare", + "token": "extra-token", + }, + }, + } + + plugin._setup_resource_limits_and_ports() + plugin._validate_extra_tunnels_config() + + self.assertEqual(plugin.extra_ports_mapping, { + 20001: 3000, + 20002: 3002, + }) + self.assertEqual(plugin.extra_tunnel_configs, { + 3002: "extra-token", + }) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/deeploy/deeploy_job_mixin.py b/extensions/business/deeploy/deeploy_job_mixin.py index 917765bf..69b39f3e 100644 --- a/extensions/business/deeploy/deeploy_job_mixin.py +++ b/extensions/business/deeploy/deeploy_job_mixin.py @@ -127,7 +127,14 @@ def list_all_deployed_jobs_from_cstore(self): """ return self.chainstore_hgetall(hkey=DEEPLOY_JOBS_CSTORE_HKEY) - def get_job_pipeline_from_cstore(self, job_id: int): + def get_job_pipeline_from_cstore( + self, + job_id: int, + timeout: int = None, + pin: bool = True, + raise_on_error: bool = False, + show_logs: bool = True, + ): """ Get the pipeline from CSTORE and download it from R1FS. """ @@ -135,7 +142,13 @@ def get_job_pipeline_from_cstore(self, job_id: int): if not cid: return None - return self.get_pipeline_from_r1fs(cid) + return self.get_pipeline_from_r1fs( + cid, + timeout=timeout, + pin=pin, + raise_on_error=raise_on_error, + show_logs=show_logs, + ) def _get_pipeline_from_cstore(self, job_id: int): """ @@ -143,11 +156,24 @@ def _get_pipeline_from_cstore(self, job_id: int): """ return self.chainstore_hget(hkey=DEEPLOY_JOBS_CSTORE_HKEY, key=str(job_id)) - def get_pipeline_from_r1fs(self, cid: str): + def get_pipeline_from_r1fs( + self, + cid: str, + timeout: int = None, + pin: bool = True, + raise_on_error: bool = False, + show_logs: bool = True, + ): """ Get the pipeline from R1FS. """ - return self.r1fs.get_json(cid, show_logs=True) + return self.r1fs.get_json( + cid, + timeout=timeout, + pin=pin, + raise_on_error=raise_on_error, + show_logs=show_logs, + ) def _save_pipeline_to_r1fs(self, pipeline: dict): """ diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 5b577297..c58e589c 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -751,7 +751,7 @@ def _validate_plugin_instance_for_signature(self, signature: str, plugin_instanc index_str = f" at index {index}" if index is not None else "" # Type-specific validation - if signature == CONTAINER_APP_RUNNER_SIGNATURE: + if signature in CONTAINERIZED_APPS_SIGNATURES: # Check IMAGE field if not plugin_instance.get(DEEPLOY_KEYS.APP_PARAMS_IMAGE): raise ValueError( @@ -789,6 +789,12 @@ def _validate_plugin_instance_for_signature(self, signature: str, plugin_instanc f"{DEEPLOY_ERRORS.REQUEST6}. Plugin instance{index_str} with signature '{signature}': 'CONTAINER_RESOURCES.memory' is required." ) + exposed_ports = plugin_instance.get("EXPOSED_PORTS") + if exposed_ports is not None and not isinstance(exposed_ports, dict): + raise ValueError( + f"{DEEPLOY_ERRORS.REQUEST6}. Plugin instance{index_str} with signature '{signature}': 'EXPOSED_PORTS' must be a dictionary." + ) + # Add validation for other plugin types here as needed # elif signature == "SOME_OTHER_PLUGIN": # ... @@ -1510,6 +1516,94 @@ def _has_shmem_dynamic_env(self, plugins): return True return False + def _compile_dynamic_env_ui(self, dynamic_env_ui): + """ + Translate a UI-friendly dynamic env model into backend DYNAMIC_ENV entries. + + Supported UI sources: + - static -> {"type": "static", "value": "..."} + - host_ip -> {"type": "host_ip"} + - container_ip(provider) -> {"type": "shmem", "path": [provider, "CONTAINER_IP"]} + - plugin_value(provider, key) -> {"type": "shmem", "path": [provider, key]} + + Explicit raw DYNAMIC_ENV remains the advanced path and should take + precedence over DYNAMIC_ENV_UI when both are provided. + """ + if not isinstance(dynamic_env_ui, dict): + raise ValueError("DYNAMIC_ENV_UI must be a dictionary") + + compiled = {} + for env_name, entries in dynamic_env_ui.items(): + if not isinstance(entries, list): + raise ValueError(f"DYNAMIC_ENV_UI[{env_name}] must be a list") + + compiled_entries = [] + for entry in entries: + if not isinstance(entry, dict): + raise ValueError(f"DYNAMIC_ENV_UI[{env_name}] entries must be dictionaries") + + source = entry.get("source") + if source == "static": + value = entry.get("value", "") + if not isinstance(value, str): + raise ValueError(f"DYNAMIC_ENV_UI[{env_name}] static value must be a string") + compiled_entries.append({ + "type": "static", + "value": value, + }) + elif source == "host_ip": + compiled_entries.append({ + "type": "host_ip", + }) + elif source == "container_ip": + provider = entry.get("provider") + if not isinstance(provider, str) or not provider.strip(): + raise ValueError(f"DYNAMIC_ENV_UI[{env_name}] container_ip requires a provider") + compiled_entries.append({ + "type": "shmem", + "path": [provider.strip(), "CONTAINER_IP"], + }) + elif source == "plugin_value": + provider = entry.get("provider") + key = entry.get("key") + if not isinstance(provider, str) or not provider.strip(): + raise ValueError(f"DYNAMIC_ENV_UI[{env_name}] plugin_value requires a provider") + if not isinstance(key, str) or not key.strip(): + raise ValueError(f"DYNAMIC_ENV_UI[{env_name}] plugin_value requires a key") + compiled_entries.append({ + "type": "shmem", + "path": [provider.strip(), key.strip()], + }) + else: + raise ValueError(f"DYNAMIC_ENV_UI[{env_name}] has unsupported source '{source}'") + + compiled[env_name] = compiled_entries + + return compiled + + def _translate_dynamic_env_ui_in_instance_payload(self, instance_payload): + """ + Compile DYNAMIC_ENV_UI into DYNAMIC_ENV while preserving explicit DYNAMIC_ENV. + + This keeps the request boundary UI-friendly while allowing advanced + callers to continue sending raw DYNAMIC_ENV definitions directly. + """ + if not isinstance(instance_payload, dict): + return instance_payload + + translated = self.deepcopy(instance_payload) + dynamic_env_ui = translated.pop("DYNAMIC_ENV_UI", None) + dynamic_env = translated.get("DYNAMIC_ENV") + + if isinstance(dynamic_env, dict): + return translated + + if dynamic_env_ui is None: + return translated + + translated["DYNAMIC_ENV"] = self._compile_dynamic_env_ui(dynamic_env_ui) + return translated + def _resolve_shmem_references(self, plugins, name_to_instance, app_id): """ Resolve shmem-type DYNAMIC_ENV entries by replacing plugin names with semaphore keys. @@ -1800,7 +1894,7 @@ def deeploy_prepare_single_plugin_instance(self, inputs): self.ct.CONFIG_PLUGIN.K_INSTANCES : [ { self.ct.CONFIG_INSTANCE.K_INSTANCE_ID : instance_id, - **inputs.app_params + **self._translate_dynamic_env_ui_in_instance_payload(inputs.app_params) } ] } @@ -1873,6 +1967,8 @@ def deeploy_prepare_single_plugin_instance_update(self, inputs, instance_id, plu else: instance_payload = {} + instance_payload = self._translate_dynamic_env_ui_in_instance_payload(instance_payload) + plugin = { self.ct.CONFIG_PLUGIN.K_SIGNATURE: signature, self.ct.CONFIG_PLUGIN.K_INSTANCES: [ @@ -1973,7 +2069,7 @@ def deeploy_prepare_plugins(self, inputs): # Prepare instance with INSTANCE_ID prepared_instance = { self.ct.CONFIG_INSTANCE.K_INSTANCE_ID: instance_id, - **instance_config + **self._translate_dynamic_env_ui_in_instance_payload(instance_config) } # Build name-to-instance mapping if plugin_name was provided @@ -2837,7 +2933,9 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None) } } """ + get_apps_r1fs_timeout = 30 result = {} + failed_pipeline_cids = {} active_jobs = self.bc.get_escrow_active_jobs(sender_escrow) active_job_ids = [int(job["jobId"]) for job in active_jobs] @@ -2861,16 +2959,43 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None) chain_job = self._serialize_chain_job(active_job) grouped_online_apps = online_apps_by_job_id.get(job_id, {}) online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items()} + pipeline_cid = self._get_pipeline_from_cstore(job_id) pipeline = None - try: - pipeline = self.get_job_pipeline_from_cstore(job_id) - except Exception as exc: - self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y') - pipeline = None + if pipeline_cid: + try: + pipeline = self.get_pipeline_from_r1fs( + pipeline_cid, + timeout=get_apps_r1fs_timeout, + pin=False, + raise_on_error=False, + show_logs=True, + ) + except Exception as exc: + self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y') + pipeline = None if pipeline is None: - # If we don't have the details in R1FS, we skip the job. It can happen before the deploy, or for legacy jobs. + if pipeline_cid: + failed_pipeline_cids[str(job_id)] = pipeline_cid + if project_id is not None and chain_job.get("projectHash") != project_id: + self.Pd( + f"Skipping job {job_id}: project_id mismatch and pipeline payload is unavailable.", + color='y' + ) + continue + + self.Pd( + f"Returning partial get_apps data for job {job_id}: pipeline payload unavailable after " + f"{get_apps_r1fs_timeout}s R1FS fetch timeout.", + color='y' + ) + result[str(job_id)] = { + DEEPLOY_KEYS.JOB_ID: job_id, + DEEPLOY_KEYS.PIPELINE: None, + DEEPLOY_KEYS.ONLINE: online_apps, + DEEPLOY_KEYS.CHAIN_JOB: chain_job, + } continue pipeline_owner = pipeline[NetMonCt.OWNER.upper()] @@ -2893,6 +3018,12 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None) DEEPLOY_KEYS.CHAIN_JOB: chain_job, } + if failed_pipeline_cids: + self.P( + f"These job pipeline fetches failed during get_apps: {failed_pipeline_cids}", + color='r' + ) + return result # TODO: REMOVE THIS, once instance_id is coming from ui for instances that have to be updated diff --git a/extensions/business/deeploy/test_deeploy.py b/extensions/business/deeploy/test_deeploy.py index 1f15b5c2..e67831c7 100644 --- a/extensions/business/deeploy/test_deeploy.py +++ b/extensions/business/deeploy/test_deeploy.py @@ -12,6 +12,7 @@ class _BCStub: """ def __init__(self): self.submitted = [] + self.active_jobs = [] def node_addr_to_eth_addr(self, node): """ @@ -42,6 +43,22 @@ def submit_node_update(self, job_id, nodes): """ self.submitted.append((job_id, list(nodes))) + def get_escrow_active_jobs(self, sender_escrow): + """ + Return active jobs configured for the test. + + Parameters + ---------- + sender_escrow : str + Escrow address. + + Returns + ------- + list[dict] + Active jobs for the escrow. + """ + return list(self.active_jobs) + class _InputsStub(dict): """ @@ -137,6 +154,24 @@ def chainstore_get(self, key): """ return self._chainstore.get(key) + def chainstore_hget(self, hkey, key): + """ + Lookup a hash key in local stub storage. + + Parameters + ---------- + hkey : str + Hash key namespace. + key : str + Entry key. + + Returns + ------- + Any + Stored value or None. + """ + return self._chainstore.get((hkey, key)) + def P(self, *args, **kwargs): """ No-op logger. @@ -213,7 +248,10 @@ def deeploy_get_auth_result(self, inputs): dict Auth result with escrow owner. """ - return {DEEPLOY_KEYS.ESCROW_OWNER: "owner"} + return { + DEEPLOY_KEYS.ESCROW_OWNER: "owner", + DEEPLOY_KEYS.SENDER_ESCROW: "escrow1", + } def deeploy_check_payment_and_job_owner(self, inputs, owner, is_create=False, debug=False): """ @@ -237,7 +275,7 @@ def deeploy_check_payment_and_job_owner(self, inputs, owner, is_create=False, de """ return True - def _get_online_apps(self, job_id=None, owner=None): + def _get_online_apps(self, job_id=None, owner=None, project_id=None): """ Stub online app discovery. @@ -519,6 +557,89 @@ def scale_up_job(new_nodes, update_nodes, job_id, owner, running_apps_for_job, w res = self.plugin.scale_up_job_workers(req) self.assertEqual(res[DEEPLOY_KEYS.STATUS], DEEPLOY_STATUS.COMMAND_DELIVERED) + def test_get_apps_uses_fast_r1fs_fetch_without_pin(self): + """ + Ensure get_apps uses the shorter non-pinning R1FS fetch path. + """ + self.plugin.bc.active_jobs = [{ + "jobId": 10, + "projectHash": "project-1", + "requestTimestamp": 1, + "startTimestamp": 2, + "lastNodesChangeTimestamp": 3, + "jobType": 4, + "pricePerEpoch": 5, + "lastExecutionEpoch": 6, + "numberOfNodesRequested": 1, + "balance": 7, + "lastAllocatedEpoch": 8, + "activeNodes": ["0xabc"], + "escrowAddress": "escrow1", + }] + self.plugin._get_online_apps = lambda owner=None, job_id=None, project_id=None: {} + self.plugin._chainstore[("DEEPLOY_DEPLOYED_JOBS", "10")] = "cid-10" + + captured = {} + + def _get_pipeline_from_r1fs(cid, timeout=None, pin=True, raise_on_error=False, show_logs=True): + captured.update({ + "cid": cid, + "timeout": timeout, + "pin": pin, + "raise_on_error": raise_on_error, + "show_logs": show_logs, + }) + return { + "OWNER": "owner", + "DEEPLOY_SPECS": {"project_id": "project-1"}, + } + + self.plugin.get_pipeline_from_r1fs = _get_pipeline_from_r1fs + + apps = self.plugin._get_apps_by_escrow_active_jobs("escrow1", "owner") + + self.assertIn("10", apps) + self.assertEqual(captured["cid"], "cid-10") + self.assertEqual(captured["timeout"], 30) + self.assertFalse(captured["pin"]) + self.assertFalse(captured["raise_on_error"]) + self.assertTrue(captured["show_logs"]) + + def test_get_apps_returns_partial_job_when_pipeline_fetch_fails(self): + """ + Ensure get_apps keeps chain and online data when the pipeline cannot be loaded. + """ + self.plugin.bc.active_jobs = [{ + "jobId": 11, + "projectHash": "project-2", + "requestTimestamp": 10, + "startTimestamp": 20, + "lastNodesChangeTimestamp": 30, + "jobType": 40, + "pricePerEpoch": 50, + "lastExecutionEpoch": 60, + "numberOfNodesRequested": 2, + "balance": 70, + "lastAllocatedEpoch": 80, + "activeNodes": ["0xdef"], + "escrowAddress": "escrow1", + }] + self.plugin._get_online_apps = lambda owner=None, job_id=None, project_id=None: {} + self.plugin._chainstore[("DEEPLOY_DEPLOYED_JOBS", "11")] = "cid-11" + + def _get_pipeline_from_r1fs(cid, timeout=None, pin=True, raise_on_error=False, show_logs=True): + return None + + self.plugin.get_pipeline_from_r1fs = _get_pipeline_from_r1fs + + apps = self.plugin._get_apps_by_escrow_active_jobs("escrow1", "owner") + + self.assertIn("11", apps) + self.assertEqual(apps["11"][DEEPLOY_KEYS.JOB_ID], 11) + self.assertIsNone(apps["11"][DEEPLOY_KEYS.PIPELINE]) + self.assertEqual(apps["11"][DEEPLOY_KEYS.CHAIN_JOB]["projectHash"], "project-2") + self.assertEqual(apps["11"][DEEPLOY_KEYS.ONLINE], {}) + if __name__ == "__main__": unittest.main() diff --git a/extensions/business/deeploy/tests/__init__.py b/extensions/business/deeploy/tests/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/extensions/business/deeploy/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/extensions/business/deeploy/tests/support.py b/extensions/business/deeploy/tests/support.py new file mode 100644 index 00000000..69c03a3d --- /dev/null +++ b/extensions/business/deeploy/tests/support.py @@ -0,0 +1,54 @@ +import copy +from types import SimpleNamespace + +from naeural_core import constants as ct + +from extensions.business.deeploy.deeploy_const import DEEPLOY_KEYS +from extensions.business.deeploy.deeploy_mixin import _DeeployMixin + + +class InputsStub(dict): + def __getattr__(self, item): + try: + return self[item] + except KeyError as exc: + raise AttributeError(item) from exc + + +class _TestDeeployPlugin(_DeeployMixin): + pass + + +def make_deeploy_plugin(): + plugin = _TestDeeployPlugin.__new__(_TestDeeployPlugin) + plugin.ct = ct + plugin.cfg_deeploy_verbose = 10 + plugin.deepcopy = copy.deepcopy + plugin.sanitize_name = lambda value: str(value).replace("/", "_").replace(" ", "_") + plugin.P = lambda *args, **kwargs: None + plugin.Pd = lambda *args, **kwargs: None + plugin.json_dumps = lambda obj, **kwargs: str(obj) + + uuid_counter = {'value': 0} + + def uuid(size=6): + uuid_counter['value'] += 1 + return f"{uuid_counter['value']:0{size}d}"[-size:] + + plugin.uuid = uuid + plugin.const = SimpleNamespace() + return plugin + + +def make_inputs(**kwargs): + return InputsStub(kwargs) + + +def make_plugin_entry(signature, instance_id=None, **config): + entry = { + DEEPLOY_KEYS.PLUGIN_SIGNATURE: signature, + } + if instance_id is not None: + entry[DEEPLOY_KEYS.PLUGIN_INSTANCE_ID] = instance_id + entry.update(config) + return entry diff --git a/extensions/business/deeploy/tests/test_create_requests.py b/extensions/business/deeploy/tests/test_create_requests.py new file mode 100644 index 00000000..9b980515 --- /dev/null +++ b/extensions/business/deeploy/tests/test_create_requests.py @@ -0,0 +1,164 @@ +import unittest + +from extensions.business.deeploy.deeploy_const import DEEPLOY_KEYS +from extensions.business.deeploy.tests.support import make_deeploy_plugin, make_inputs, make_plugin_entry + + +class DeeployCreateRequestPreparationTests(unittest.TestCase): + + def test_prepare_single_plugin_instance_uses_signature_and_app_params(self): + plugin = make_deeploy_plugin() + inputs = make_inputs( + plugin_signature="CONTAINER_APP_RUNNER", + app_params={"IMAGE": "repo/app:latest", "PORT": 3000}, + ) + + prepared = plugin.deeploy_prepare_single_plugin_instance(inputs) + + self.assertEqual(prepared[plugin.ct.CONFIG_PLUGIN.K_SIGNATURE], "CONTAINER_APP_RUNNER") + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertTrue(instance[plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID].startswith("CONTAINER_APP_")) + self.assertEqual(instance["IMAGE"], "repo/app:latest") + self.assertEqual(instance["PORT"], 3000) + + def test_prepare_plugins_groups_instances_by_signature_and_tracks_names(self): + plugin = make_deeploy_plugin() + inputs = make_inputs( + plugins=[ + make_plugin_entry("CONTAINER_APP_RUNNER", plugin_name="frontend", PORT=3000), + make_plugin_entry("CONTAINER_APP_RUNNER", plugin_name="worker", PORT=3001), + make_plugin_entry("A_SIMPLE_PLUGIN", plugin_name="native", PROCESS_DELAY=5), + ] + ) + + prepared_plugins, name_to_instance = plugin.deeploy_prepare_plugins(inputs) + + self.assertEqual(len(prepared_plugins), 2) + grouped = { + item[plugin.ct.CONFIG_PLUGIN.K_SIGNATURE]: item[plugin.ct.CONFIG_PLUGIN.K_INSTANCES] + for item in prepared_plugins + } + self.assertEqual(len(grouped["CONTAINER_APP_RUNNER"]), 2) + self.assertEqual(grouped["CONTAINER_APP_RUNNER"][0]["PORT"], 3000) + self.assertEqual(grouped["CONTAINER_APP_RUNNER"][1]["PORT"], 3001) + self.assertEqual(name_to_instance["frontend"]["signature"], "CONTAINER_APP_RUNNER") + self.assertEqual(name_to_instance["worker"]["signature"], "CONTAINER_APP_RUNNER") + self.assertEqual(name_to_instance["native"]["signature"], "A_SIMPLE_PLUGIN") + + def test_prepare_plugins_regenerates_duplicate_instance_ids(self): + plugin = make_deeploy_plugin() + inputs = make_inputs( + plugins=[ + make_plugin_entry("CONTAINER_APP_RUNNER", instance_id="dup", PORT=3000), + make_plugin_entry("CONTAINER_APP_RUNNER", instance_id="dup", PORT=3001), + ] + ) + + prepared_plugins, _ = plugin.deeploy_prepare_plugins(inputs) + + instances = prepared_plugins[0][plugin.ct.CONFIG_PLUGIN.K_INSTANCES] + self.assertEqual(instances[0][plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID], "dup") + self.assertNotEqual(instances[1][plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID], "dup") + + def test_prepare_single_plugin_instance_preserves_exposed_ports(self): + plugin = make_deeploy_plugin() + inputs = make_inputs( + plugin_signature="CONTAINER_APP_RUNNER", + app_params={ + "IMAGE": "repo/app:latest", + "CONTAINER_RESOURCES": {"cpu": 1, "memory": "256m"}, + "EXPOSED_PORTS": { + "3000": {"is_main_port": True}, + "3001": {"tunnel": {"enabled": True, "engine": "cloudflare", "token": "cf-token"}}, + }, + }, + ) + + prepared = plugin.deeploy_prepare_single_plugin_instance(inputs) + + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertTrue(instance["EXPOSED_PORTS"]["3000"]["is_main_port"]) + self.assertEqual(instance["EXPOSED_PORTS"]["3001"]["tunnel"]["token"], "cf-token") + + def test_validate_plugins_array_accepts_container_runner_with_exposed_ports(self): + plugin = make_deeploy_plugin() + plugins = [ + make_plugin_entry( + "CONTAINER_APP_RUNNER", + IMAGE="repo/app:latest", + CONTAINER_RESOURCES={"cpu": 1, "memory": "128m"}, + EXPOSED_PORTS={ + "3000": {"is_main_port": True}, + }, + ) + ] + + self.assertTrue(plugin._validate_plugins_array(plugins)) + + def test_validate_plugins_array_rejects_non_dict_exposed_ports(self): + plugin = make_deeploy_plugin() + plugins = [ + make_plugin_entry( + "CONTAINER_APP_RUNNER", + IMAGE="repo/app:latest", + CONTAINER_RESOURCES={"cpu": 1, "memory": "128m"}, + EXPOSED_PORTS=["3000"], + ) + ] + + with self.assertRaisesRegex(ValueError, "EXPOSED_PORTS"): + plugin._validate_plugins_array(plugins) + + def test_prepare_single_plugin_instance_translates_dynamic_env_ui(self): + plugin = make_deeploy_plugin() + inputs = make_inputs( + plugin_signature="CONTAINER_APP_RUNNER", + app_params={ + "IMAGE": "repo/app:latest", + "CONTAINER_RESOURCES": {"cpu": 1, "memory": "256m"}, + "DYNAMIC_ENV_UI": { + "API_URL": [ + {"source": "static", "value": "http://"}, + {"source": "container_ip", "provider": "backend"}, + {"source": "static", "value": ":3000"}, + ] + }, + }, + ) + + prepared = plugin.deeploy_prepare_single_plugin_instance(inputs) + + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertNotIn("DYNAMIC_ENV_UI", instance) + self.assertEqual(instance["DYNAMIC_ENV"]["API_URL"], [ + {"type": "static", "value": "http://"}, + {"type": "shmem", "path": ["backend", "CONTAINER_IP"]}, + {"type": "static", "value": ":3000"}, + ]) + + def test_prepare_single_plugin_instance_translates_plugin_value_dynamic_env_ui(self): + plugin = make_deeploy_plugin() + inputs = make_inputs( + plugin_signature="CONTAINER_APP_RUNNER", + app_params={ + "IMAGE": "repo/app:latest", + "CONTAINER_RESOURCES": {"cpu": 1, "memory": "256m"}, + "DYNAMIC_ENV_UI": { + "UPSTREAM_PORT": [ + {"source": "plugin_value", "provider": "native-agent", "key": "PORT"} + ] + }, + }, + ) + + prepared = plugin.deeploy_prepare_single_plugin_instance(inputs) + + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertNotIn("DYNAMIC_ENV_UI", instance) + self.assertEqual(instance["DYNAMIC_ENV"]["UPSTREAM_PORT"], [ + {"type": "shmem", "path": ["native-agent", "PORT"]}, + ]) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/deeploy/tests/test_dynamic_env_resolution.py b/extensions/business/deeploy/tests/test_dynamic_env_resolution.py new file mode 100644 index 00000000..95fe4766 --- /dev/null +++ b/extensions/business/deeploy/tests/test_dynamic_env_resolution.py @@ -0,0 +1,128 @@ +import unittest + +from extensions.business.deeploy.tests.support import make_deeploy_plugin + + +class DeeployDynamicEnvResolutionTests(unittest.TestCase): + + def test_has_shmem_dynamic_env_detects_explicit_shmem_entries(self): + plugin = make_deeploy_plugin() + plugins = [ + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "CONTAINER_APP_RUNNER", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + { + plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "car-1", + "DYNAMIC_ENV": { + "API_URL": [ + {"type": "static", "value": "http://"}, + {"type": "shmem", "path": ["provider", "CONTAINER_IP"]}, + ] + }, + } + ], + } + ] + + self.assertTrue(plugin._has_shmem_dynamic_env(plugins)) + + def test_resolve_shmem_references_rewrites_paths_and_sets_semaphores(self): + plugin = make_deeploy_plugin() + plugins = [ + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "A_SIMPLE_PLUGIN", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + { + plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "native-1", + } + ], + }, + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "CONTAINER_APP_RUNNER", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + { + plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "car-1", + "DYNAMIC_ENV": { + "API_HOST": [ + {"type": "shmem", "path": ["provider-ui-name", "CONTAINER_IP"]} + ] + }, + } + ], + }, + ] + name_to_instance = { + "provider-ui-name": {"instance_id": "native-1", "signature": "A_SIMPLE_PLUGIN"} + } + + resolved = plugin._resolve_shmem_references(plugins, name_to_instance, "app-123") + + provider_instance = resolved[0][plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + consumer_instance = resolved[1][plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + sem_key = "app-123__native-1" + + self.assertEqual( + consumer_instance["DYNAMIC_ENV"]["API_HOST"][0]["path"], + [sem_key, "CONTAINER_IP"], + ) + self.assertEqual(provider_instance["SEMAPHORE"], sem_key) + self.assertEqual(consumer_instance["SEMAPHORED_KEYS"], [sem_key]) + + def test_translate_dynamic_env_ui_keeps_explicit_dynamic_env_precedence(self): + plugin = make_deeploy_plugin() + translated = plugin._translate_dynamic_env_ui_in_instance_payload({ + "DYNAMIC_ENV": { + "API_HOST": [{"type": "host_ip"}] + }, + "DYNAMIC_ENV_UI": { + "API_HOST": [{"source": "container_ip", "provider": "backend"}] + }, + }) + + self.assertEqual(translated["DYNAMIC_ENV"], { + "API_HOST": [{"type": "host_ip"}] + }) + self.assertNotIn("DYNAMIC_ENV_UI", translated) + + def test_translate_dynamic_env_ui_rejects_missing_container_provider(self): + plugin = make_deeploy_plugin() + + with self.assertRaisesRegex(ValueError, "requires a provider"): + plugin._compile_dynamic_env_ui({ + "API_HOST": [{"source": "container_ip"}] + }) + + def test_compile_dynamic_env_ui_supports_plugin_value(self): + plugin = make_deeploy_plugin() + + compiled = plugin._compile_dynamic_env_ui({ + "UPSTREAM_PORT": [ + {"source": "plugin_value", "provider": "native-agent", "key": "PORT"} + ] + }) + + self.assertEqual(compiled, { + "UPSTREAM_PORT": [ + {"type": "shmem", "path": ["native-agent", "PORT"]} + ] + }) + + def test_compile_dynamic_env_ui_rejects_plugin_value_without_provider(self): + plugin = make_deeploy_plugin() + + with self.assertRaisesRegex(ValueError, "plugin_value requires a provider"): + plugin._compile_dynamic_env_ui({ + "UPSTREAM_PORT": [{"source": "plugin_value", "key": "PORT"}] + }) + + def test_compile_dynamic_env_ui_rejects_plugin_value_without_key(self): + plugin = make_deeploy_plugin() + + with self.assertRaisesRegex(ValueError, "plugin_value requires a key"): + plugin._compile_dynamic_env_ui({ + "UPSTREAM_PORT": [{"source": "plugin_value", "provider": "native-agent"}] + }) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/deeploy/tests/test_semaphore_wiring.py b/extensions/business/deeploy/tests/test_semaphore_wiring.py new file mode 100644 index 00000000..65edd5f7 --- /dev/null +++ b/extensions/business/deeploy/tests/test_semaphore_wiring.py @@ -0,0 +1,94 @@ +import unittest + +from extensions.business.deeploy.deeploy_const import JOB_APP_TYPES +from extensions.business.deeploy.tests.support import make_deeploy_plugin + + +class DeeploySemaphoreWiringTests(unittest.TestCase): + + def test_autowire_native_container_semaphore_sets_provider_and_consumer_keys(self): + plugin = make_deeploy_plugin() + plugins = [ + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "A_SIMPLE_PLUGIN", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + {plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "native-1"} + ], + }, + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "CONTAINER_APP_RUNNER", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + {plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "car-1"} + ], + }, + ] + + wired = plugin._autowire_native_container_semaphore("job-001", plugins, JOB_APP_TYPES.NATIVE) + + native_instance = wired[0][plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + car_instance = wired[1][plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + + self.assertEqual(native_instance["SEMAPHORE"], "job-001__native-1") + self.assertEqual(car_instance["SEMAPHORED_KEYS"], ["job-001__native-1"]) + + def test_autowire_skips_when_explicit_shmem_dynamic_env_exists(self): + plugin = make_deeploy_plugin() + plugins = [ + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "A_SIMPLE_PLUGIN", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + {plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "native-1"} + ], + }, + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "CONTAINER_APP_RUNNER", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + { + plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "car-1", + "DYNAMIC_ENV": { + "API_HOST": [{"type": "shmem", "path": ["provider", "CONTAINER_IP"]}] + }, + } + ], + }, + ] + + wired = plugin._autowire_native_container_semaphore("job-001", plugins, JOB_APP_TYPES.NATIVE) + + native_instance = wired[0][plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + car_instance = wired[1][plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + + self.assertNotIn("SEMAPHORE", native_instance) + self.assertNotIn("SEMAPHORED_KEYS", car_instance) + + def test_autowire_skips_when_manual_semaphore_config_already_present(self): + plugin = make_deeploy_plugin() + plugins = [ + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "A_SIMPLE_PLUGIN", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + { + plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "native-1", + "SEMAPHORE": "manual-key", + } + ], + }, + { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "CONTAINER_APP_RUNNER", + plugin.ct.CONFIG_PLUGIN.K_INSTANCES: [ + {plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "car-1"} + ], + }, + ] + + wired = plugin._autowire_native_container_semaphore("job-001", plugins, JOB_APP_TYPES.NATIVE) + + native_instance = wired[0][plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + car_instance = wired[1][plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + + self.assertEqual(native_instance["SEMAPHORE"], "manual-key") + self.assertNotIn("SEMAPHORED_KEYS", car_instance) + + +if __name__ == "__main__": + unittest.main() diff --git a/extensions/business/deeploy/tests/test_update_requests.py b/extensions/business/deeploy/tests/test_update_requests.py new file mode 100644 index 00000000..a0664251 --- /dev/null +++ b/extensions/business/deeploy/tests/test_update_requests.py @@ -0,0 +1,177 @@ +import unittest + +from extensions.business.deeploy.deeploy_const import DEEPLOY_KEYS +from extensions.business.deeploy.tests.support import make_deeploy_plugin, make_inputs + + +class DeeployUpdateRequestPreparationTests(unittest.TestCase): + + def test_prepare_single_plugin_instance_update_uses_plugin_config_and_strips_signature_fields(self): + plugin = make_deeploy_plugin() + + prepared = plugin.deeploy_prepare_single_plugin_instance_update( + inputs=make_inputs(), + instance_id="instance-1", + plugin_config={ + DEEPLOY_KEYS.PLUGIN_SIGNATURE: "CONTAINER_APP_RUNNER", + "signature": "IGNORED", + "IMAGE": "repo/app:latest", + "PORT": 3000, + }, + ) + + self.assertEqual(prepared[plugin.ct.CONFIG_PLUGIN.K_SIGNATURE], "CONTAINER_APP_RUNNER") + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertEqual(instance[plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID], "instance-1") + self.assertEqual(instance["IMAGE"], "repo/app:latest") + self.assertEqual(instance["PORT"], 3000) + self.assertNotIn(DEEPLOY_KEYS.PLUGIN_SIGNATURE, instance) + self.assertNotIn("signature", instance) + + def test_prepare_single_plugin_instance_update_falls_back_to_instance_conf(self): + plugin = make_deeploy_plugin() + fallback_instance = { + plugin.ct.CONFIG_PLUGIN.K_SIGNATURE: "CONTAINER_APP_RUNNER", + "instance_conf": { + plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "old-instance", + "IMAGE": "repo/old:1.0", + "PORT": 3002, + }, + } + + prepared = plugin.deeploy_prepare_single_plugin_instance_update( + inputs=make_inputs(), + instance_id="instance-2", + fallback_instance=fallback_instance, + ) + + self.assertEqual(prepared[plugin.ct.CONFIG_PLUGIN.K_SIGNATURE], "CONTAINER_APP_RUNNER") + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertEqual(instance[plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID], "instance-2") + self.assertEqual(instance["IMAGE"], "repo/old:1.0") + self.assertEqual(instance["PORT"], 3002) + + def test_extract_plugin_request_conf_removes_update_metadata_fields(self): + plugin = make_deeploy_plugin() + result = plugin._extract_plugin_request_conf( + plugin_entry={ + DEEPLOY_KEYS.PLUGIN_SIGNATURE: "CONTAINER_APP_RUNNER", + DEEPLOY_KEYS.PLUGIN_INSTANCE_ID: "instance-1", + "instance_id": "instance-1", + plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "instance-1", + "CHAINSTORE_RESPONSE_KEY": "resp-key", + "CHAINSTORE_PEERS": ["peer-a"], + "IMAGE": "repo/app:latest", + "PORT": 3000, + }, + instance_id_key=plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID, + chainstore_response_key="CHAINSTORE_RESPONSE_KEY", + chainstore_peers_key="CHAINSTORE_PEERS", + ) + + self.assertEqual(result, { + "IMAGE": "repo/app:latest", + "PORT": 3000, + }) + + def test_prepare_single_plugin_instance_update_preserves_exposed_ports(self): + plugin = make_deeploy_plugin() + + prepared = plugin.deeploy_prepare_single_plugin_instance_update( + inputs=make_inputs(), + instance_id="instance-3", + plugin_config={ + DEEPLOY_KEYS.PLUGIN_SIGNATURE: "CONTAINER_APP_RUNNER", + "IMAGE": "repo/app:latest", + "CONTAINER_RESOURCES": {"cpu": 1, "memory": "256m"}, + "EXPOSED_PORTS": { + "3005": {"is_main_port": True}, + "3006": {"tunnel": {"enabled": True, "engine": "cloudflare", "token": "upd-token"}}, + }, + }, + ) + + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertTrue(instance["EXPOSED_PORTS"]["3005"]["is_main_port"]) + self.assertEqual(instance["EXPOSED_PORTS"]["3006"]["tunnel"]["token"], "upd-token") + + def test_extract_plugin_request_conf_keeps_exposed_ports(self): + plugin = make_deeploy_plugin() + result = plugin._extract_plugin_request_conf( + plugin_entry={ + DEEPLOY_KEYS.PLUGIN_SIGNATURE: "CONTAINER_APP_RUNNER", + DEEPLOY_KEYS.PLUGIN_INSTANCE_ID: "instance-1", + "instance_id": "instance-1", + plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID: "instance-1", + "CHAINSTORE_RESPONSE_KEY": "resp-key", + "CHAINSTORE_PEERS": ["peer-a"], + "IMAGE": "repo/app:latest", + "EXPOSED_PORTS": { + "3000": {"is_main_port": True}, + }, + }, + instance_id_key=plugin.ct.CONFIG_INSTANCE.K_INSTANCE_ID, + chainstore_response_key="CHAINSTORE_RESPONSE_KEY", + chainstore_peers_key="CHAINSTORE_PEERS", + ) + + self.assertEqual(result, { + "IMAGE": "repo/app:latest", + "EXPOSED_PORTS": { + "3000": {"is_main_port": True}, + }, + }) + + def test_prepare_single_plugin_instance_update_translates_dynamic_env_ui(self): + plugin = make_deeploy_plugin() + + prepared = plugin.deeploy_prepare_single_plugin_instance_update( + inputs=make_inputs(), + instance_id="instance-4", + plugin_config={ + DEEPLOY_KEYS.PLUGIN_SIGNATURE: "CONTAINER_APP_RUNNER", + "IMAGE": "repo/app:latest", + "CONTAINER_RESOURCES": {"cpu": 1, "memory": "256m"}, + "DYNAMIC_ENV_UI": { + "API_HOST": [ + {"source": "host_ip"}, + {"source": "static", "value": ":3000"}, + ] + }, + }, + ) + + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertNotIn("DYNAMIC_ENV_UI", instance) + self.assertEqual(instance["DYNAMIC_ENV"]["API_HOST"], [ + {"type": "host_ip"}, + {"type": "static", "value": ":3000"}, + ]) + + def test_prepare_single_plugin_instance_update_translates_plugin_value_dynamic_env_ui(self): + plugin = make_deeploy_plugin() + + prepared = plugin.deeploy_prepare_single_plugin_instance_update( + inputs=make_inputs(), + instance_id="instance-5", + plugin_config={ + DEEPLOY_KEYS.PLUGIN_SIGNATURE: "CONTAINER_APP_RUNNER", + "IMAGE": "repo/app:latest", + "CONTAINER_RESOURCES": {"cpu": 1, "memory": "256m"}, + "DYNAMIC_ENV_UI": { + "UPSTREAM_PORT": [ + {"source": "plugin_value", "provider": "native-agent", "key": "PORT"}, + ] + }, + }, + ) + + instance = prepared[plugin.ct.CONFIG_PLUGIN.K_INSTANCES][0] + self.assertNotIn("DYNAMIC_ENV_UI", instance) + self.assertEqual(instance["DYNAMIC_ENV"]["UPSTREAM_PORT"], [ + {"type": "shmem", "path": ["native-agent", "PORT"]}, + ]) + + +if __name__ == "__main__": + unittest.main() diff --git a/ver.py b/ver.py index b9e1954c..7f2a0656 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.99' +__VER__ = '2.10.100'