diff --git a/actions.yaml b/actions.yaml index 6dfbb12269d..f71068be0b2 100644 --- a/actions.yaml +++ b/actions.yaml @@ -97,3 +97,20 @@ restore: restore-to-time: type: string description: Point-in-time-recovery target in PSQL format. +get-cluster-status: + description: Display cluster topology, PostgreSQL units health status, and Raft cluster state. + Only available when role=watcher. + params: + cluster-name: + type: string + description: | + The name of the cluster to filter the output by. + Useful in async-replication (Disaster Recovery) setups where multiple clusters are related. + standby-clusters: + type: boolean + default: false + description: | + Show status information including linked standby clusters (async replication). +trigger-health-check: + description: Manually trigger health checks on PostgreSQL endpoints and return results. + Only available when role=watcher. diff --git a/src/relations/watcher_requirer.py b/src/relations/watcher_requirer.py index 59217d34424..b9556dd225f 100644 --- a/src/relations/watcher_requirer.py +++ b/src/relations/watcher_requirer.py @@ -20,9 +20,11 @@ import os import typing from datetime import datetime +from typing import Any, Literal from charmlibs.systemd import service_running from ops import ( + ActionEvent, ActiveStatus, BlockedStatus, InstallEvent, @@ -40,7 +42,7 @@ ) from constants import RAFT_PORT, WATCHER_RELATION -from raft_controller import RaftController, install_service +from raft_controller import ClusterStatus, RaftController, install_service if typing.TYPE_CHECKING: from charm import PostgresqlOperatorCharm @@ -82,6 +84,14 @@ def __init__(self, charm: "PostgresqlOperatorCharm"): self._on_watcher_relation_broken, ) + # Actions + self.framework.observe( + self.charm.on.get_cluster_status_action, self._on_get_cluster_status + ) + self.framework.observe( + self.charm.on.trigger_health_check_action, self._on_trigger_health_check + ) + @property def unit_ip(self) -> str | None: """Return this unit's IP address.""" @@ -522,3 +532,284 @@ def _on_watcher_relation_broken(self, event: RelationBrokenEvent) -> None: ] if not remaining: self.charm.unit.status = WaitingStatus("Waiting for relation to PostgreSQL") + + # -- Actions -- + + def _build_ip_maps(self, relation: Relation) -> tuple[dict[str, str], dict[str, str]]: + """Build IP-to-AZ and IP-to-unit-name maps from relation data. + + Returns: + Tuple of (ip_to_az, ip_to_unit) dictionaries. + """ + ip_to_az: dict[str, str] = {} + ip_to_unit: dict[str, str] = {} + for unit in relation.units: + if unit_ip := relation.data[unit].get("unit-address"): + ip_to_unit[unit_ip] = unit.name + if unit_az := relation.data[unit].get("unit-az"): + ip_to_az[unit_ip] = unit_az + if watcher_ip := self.unit_ip: + ip_to_unit[watcher_ip] = self.charm.unit.name + return ip_to_az, ip_to_unit + + def _resolve_raft_members( + self, raft_status: ClusterStatus, ip_to_unit: dict[str, str] + ) -> None: + """Resolve Raft member IPs to unit names in-place.""" + resolved = [] + for member_addr in raft_status.get("members", []): + member_ip = member_addr.split(":")[0] + resolved.append(ip_to_unit.get(member_ip, member_addr)) + raft_status["members"] = sorted(resolved) + + def _on_get_cluster_status(self, event: ActionEvent) -> None: + """Handle get-cluster-status action.""" + cluster_name_filter = event.params.get("cluster-name") + cluster_set_mode = event.params.get("standby-clusters", False) + + relations = self.model.relations.get(WATCHER_RELATION, []) + clusters_data: dict[str, dict[str, Any]] = {} + standby_clusters_map: dict[str, list[str]] = {} + for relation in relations: + cluster_name = self._get_cluster_name(relation) + if cluster_name_filter and cluster_name != cluster_name_filter: + continue + clusters_data[cluster_name] = self._format_cluster_status(relation) + standby_clusters_map[cluster_name] = self._get_standby_clusters(relation) + + if not clusters_data: + if cluster_name_filter: + event.fail(f"Cluster '{cluster_name_filter}' not found among related clusters.") + else: + event.set_results({"success": "True", "status": json.dumps({})}) + return + + if cluster_set_mode: + result_status = self._format_cluster_set_status(clusters_data, standby_clusters_map) + elif len(clusters_data) == 1: + # Single cluster: return the cluster status directly + result_status = next(iter(clusters_data.values())) + else: + # Multi-cluster: return list with watcher summary + result_status = { + "clusters": list(clusters_data.values()), + "watcher": { + "unit": self.charm.unit.name, + "address": self.unit_ip, + "clusters_monitored": len(clusters_data), + }, + } + + event.set_results({"success": "True", "status": json.dumps(result_status)}) + + def _get_pg_version(self, relation: Relation) -> str: + """Return Postgresql version of the cluster.""" + if not relation.app: + return "unknown" + + return relation.data[relation.app].get("version", "unknown") + + def _build_postgresql_topology( + self, + relation: Relation, + pg_endpoints: list[str], + ip_to_unit: dict[str, str], + ) -> tuple[ + dict[str, Any], + str | None, + Literal["primary", "standby", "unknown"], + int | Literal["unknown"], + ]: + """Build PostgreSQL topology entries and infer the cluster role.""" + topology: dict[str, Any] = {} + primary_endpoint = None + cluster_role = "unknown" + version = self._get_pg_version(relation) + timeline = "unknown" + + if not pg_endpoints: + return topology, primary_endpoint, cluster_role, timeline + + raft_controller = RaftController(self.charm, f"rel{relation.id}") + # TODO figure out how to share the password for async clusters + health_results = ( + raft_controller.check_all_endpoints(pg_endpoints, password) + if (password := self.get_watcher_password(relation)) + else dict.fromkeys(pg_endpoints, False) + ) + cluster_status = raft_controller.cluster_status(pg_endpoints) + patroni_members = {} + for member in cluster_status: + patroni_members[member["host"]] = member + + for endpoint in pg_endpoints: + unit_name = ip_to_unit.get(endpoint, endpoint) + patroni_member = patroni_members.get(endpoint, {}) + is_healthy = health_results.get(endpoint, False) + + if is_primary := patroni_member.get("role") == "leader": + primary_endpoint = f"{endpoint}:5432" + + role = patroni_member.get("role", "unknown") + lag = patroni_member.get("lag", "unknown") + if role == "leader": + role = "primary" + timeline = patroni_member.get("timeline", "unknown") + cluster_role = "primary" + lag = 0 + elif role == "standby_leader": + role = "standby" + cluster_role = "standby" + timeline = patroni_member.get("timeline", "unknown") + lag = 0 + + topology[unit_name] = { + "address": f"{endpoint}:5432", + "memberrole": role, + "mode": "r/w" if is_primary else "r/o", + "status": "online" if is_healthy else "offline", + "version": version, + "lag": lag, + } + return topology, primary_endpoint, cluster_role, timeline + + def _is_tls_enabled(self, relation: Relation) -> bool: + """Return whether TLS is enabled for the related PostgreSQL cluster.""" + if not relation.app: + return False + return relation.data[relation.app].get("tls-enabled", "false") == "true" + + def _format_cluster_status(self, relation: Relation) -> dict[str, Any]: + """Format cluster status for a single cluster relation.""" + cluster_name = self._get_cluster_name(relation) + pg_endpoints = self._get_raft_partner_addrs(relation) + _ip_to_az, ip_to_unit = self._build_ip_maps(relation) + + # Get Raft status + port = self._get_port_for_relation(relation.id) + password = self._get_raft_password(relation) + raft_controller = RaftController(self.charm, instance_id=f"rel{relation.id}") + raft_status = raft_controller.get_status(port, password) + self._resolve_raft_members(raft_status, ip_to_unit) + has_quorum = raft_status.get("has_quorum", False) + watcher_voting = self._should_watcher_vote(pg_endpoints) and not self._is_disabled( + relation + ) + topology, primary_endpoint, cluster_role, timeline = self._build_postgresql_topology( + relation, pg_endpoints, ip_to_unit + ) + + # Add watcher entry to topology + watcher_port = self._get_port_for_relation(relation.id) + watcher_ip = self.unit_ip or relation.data[self.charm.unit].get("unit-address") + watcher_address = f"{watcher_ip}:{watcher_port}" if watcher_ip else None + topology[self.charm.unit.name] = { + "address": watcher_address, + "memberrole": "watcher", + "mode": "n/a", + "status": "online" if raft_status.get("running", False) else "offline", + "version": "n/a", + "voting": watcher_voting, + } + + status_text = ( + "cluster is tolerant to failures." + if has_quorum + else "cluster is not tolerant to any failures." + ) + + return { + "clustername": cluster_name, + "clusterrole": cluster_role, + "primary": primary_endpoint, + "ssl": "required" if self._is_tls_enabled(relation) else "disabled", + "status": "ok" if has_quorum else "ok_no_tolerance", + "statustext": status_text, + "timeline": timeline, + "topology": topology, + "raft": { + "has_quorum": has_quorum, + "leader": raft_status.get("leader"), + "members": raft_status.get("members", []), + }, + } + + def _format_cluster_set_status( + self, + clusters_data: dict[str, dict[str, Any]], + standby_clusters_map: dict[str, list[str]], + ) -> dict[str, Any]: + """Format cluster-set status for async replication view.""" + clusters_summary: dict[str, Any] = {} + # TODO No way to have multiple primaries + primary_cluster_name = None + + for name, data in clusters_data.items(): + cluster_role = data.get("clusterrole", "unknown") + is_primary = cluster_role == "primary" + summary: dict[str, Any] = { + "clusterrole": cluster_role, + "status": data.get("status", "unknown"), + "primary": data.get("primary"), + "linked_standby_clusters": standby_clusters_map.get(name, []), + } + if is_primary and primary_cluster_name is None: + primary_cluster_name = name + elif cluster_role == "standby": + summary["replication_status"] = "streaming" + summary["replication_lag"] = 0 + summary["timeline"] = data.get("timeline", 0) + clusters_summary[name] = summary + + all_healthy = all(c.get("status") == "ok" for c in clusters_data.values()) + + return { + "clusters": clusters_summary, + "primary_cluster": primary_cluster_name, + "status": "healthy" if all_healthy else "degraded", + "statustext": ("all clusters available." if all_healthy else "some clusters at risk."), + } + + def _on_trigger_health_check(self, event: ActionEvent) -> None: + """Handle trigger-health-check action.""" + clusters: list[dict[str, Any]] = [] + total_healthy = 0 + total_count = 0 + + for relation in self.model.relations.get(WATCHER_RELATION, []): + pg_endpoints = self._get_raft_partner_addrs(relation) + if not pg_endpoints or not (password := self.get_watcher_password(relation)): + continue + + raft_controller = RaftController(self.charm, f"rel{relation.id}") + health_results = raft_controller.check_all_endpoints(pg_endpoints, password) + + _ip_to_az, ip_to_unit = self._build_ip_maps(relation) + + cluster_name = self._get_cluster_name(relation) + endpoint_statuses: dict[str, str] = {} + for endpoint in health_results: + unit_name = ip_to_unit.get(endpoint) + label = unit_name if unit_name else f"{cluster_name}/{endpoint}" + is_healthy = health_results.get(endpoint, False) + endpoint_statuses[label] = "healthy" if is_healthy else "unhealthy" + if is_healthy: + total_healthy += 1 + total_count += 1 + + clusters.append({ + "cluster_name": cluster_name, + "endpoints": endpoint_statuses, + }) + + if total_count == 0: + event.fail("No PostgreSQL endpoints available") + return + + output: dict[str, Any] = { + "clusters": clusters, + "healthy-count": total_healthy, + "total-count": total_count, + } + + event.set_results({"health-check": json.dumps(output)}) diff --git a/tests/integration/ha_tests/test_async_replication_stereo_mode.py b/tests/integration/ha_tests/test_async_replication_stereo_mode.py new file mode 100644 index 00000000000..142b072aa16 --- /dev/null +++ b/tests/integration/ha_tests/test_async_replication_stereo_mode.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Integration tests for async replication with stereo mode watcher. + +Verifies that a single watcher can serve as the third Raft node for both +a primary and a standby PostgreSQL cluster simultaneously, while async +replication is active between them. +""" + +import logging + +import pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed + +from ..helpers import ( + CHARM_BASE, + DATABASE_APP_NAME, +) +from .test_stereo_mode import ( + verify_raft_cluster_health, +) + +logger = logging.getLogger(__name__) + +PRIMARY_APP = DATABASE_APP_NAME # "postgresql" +STANDBY_APP = "postgresql-standby" +WATCHER_APP = "pg-watcher" + + +@pytest.mark.abort_on_fail +async def test_deploy_async_replication_with_watcher(ops_test: OpsTest, charm) -> None: + """Deploy two PG clusters with a shared watcher and async replication. + + Architecture: + - Primary cluster (2 units) + watcher = 3 Raft members + - Standby cluster (2 units) + watcher = 3 Raft members + - Async replication: primary → standby + """ + async with ops_test.fast_forward(): + # Deploy primary cluster + logger.info("Deploying primary cluster (2 units)...") + await ops_test.model.deploy( + charm, + application_name=PRIMARY_APP, + num_units=2, + base=CHARM_BASE, + config={"profile": "testing"}, + ) + + # Deploy standby cluster + logger.info("Deploying standby cluster (2 units)...") + await ops_test.model.deploy( + charm, + application_name=STANDBY_APP, + num_units=2, + base=CHARM_BASE, + config={"profile": "testing"}, + ) + + # Deploy watcher (single instance for both clusters) + logger.info("Deploying watcher (shared by both clusters)...") + await ops_test.model.deploy( + charm, + application_name=WATCHER_APP, + num_units=1, + base=CHARM_BASE, + config={"role": "watcher", "profile": "testing"}, + ) + + # Wait for all apps to settle + await ops_test.model.wait_for_idle( + apps=[PRIMARY_APP, STANDBY_APP, WATCHER_APP], + timeout=1200, + raise_on_error=False, + ) + + # Relate watcher to primary cluster + logger.info("Relating watcher to primary cluster") + await ops_test.model.integrate(f"{PRIMARY_APP}:watcher-offer", f"{WATCHER_APP}:watcher") + + # Relate watcher to standby cluster + logger.info("Relating watcher to standby cluster") + await ops_test.model.integrate(f"{STANDBY_APP}:watcher-offer", f"{WATCHER_APP}:watcher") + + # Wait for watcher to join both Raft clusters + await ops_test.model.wait_for_idle( + apps=[PRIMARY_APP, STANDBY_APP, WATCHER_APP], + status="active", + timeout=600, + ) + + # Verify deployment + assert len(ops_test.model.applications[PRIMARY_APP].units) == 2 + assert len(ops_test.model.applications[STANDBY_APP].units) == 2 + assert len(ops_test.model.applications[WATCHER_APP].units) == 1 + + +@pytest.mark.abort_on_fail +async def test_watcher_raft_quorum_both_clusters(ops_test: OpsTest) -> None: + """Verify the watcher has Raft quorum in both clusters.""" + # Check primary cluster Raft + logger.info("Verifying Raft quorum in primary cluster") + await verify_raft_cluster_health(ops_test, PRIMARY_APP, WATCHER_APP) + + # Check standby cluster Raft + logger.info("Verifying Raft quorum in standby cluster") + await verify_raft_cluster_health(ops_test, STANDBY_APP, WATCHER_APP) + + +@pytest.mark.abort_on_fail +async def test_watcher_topology_shows_both_clusters(ops_test: OpsTest) -> None: + """Verify get-cluster-status action reports both clusters.""" + import json + + watcher_unit = ops_test.model.applications[WATCHER_APP].units[0] + action = await watcher_unit.run_action("get-cluster-status") + action = await action.wait() + + assert action.status == "completed" + status = json.loads(action.results["status"]) + # Multi-cluster: status has a "clusters" list + assert "clusters" in status + assert len(status["clusters"]) == 2, f"Expected 2 clusters, got {len(status['clusters'])}" + + cluster_names = sorted(c["clustername"] for c in status["clusters"]) + logger.info(f"Watcher sees clusters: {cluster_names}") + + # Each cluster should have topology entries (PG units + watcher) + for cluster in status["clusters"]: + assert len(cluster["topology"]) >= 2, ( + f"Cluster {cluster['clustername']} should have topology entries" + ) + + +@pytest.mark.abort_on_fail +async def test_setup_async_replication(ops_test: OpsTest) -> None: + """Set up async replication from primary to standby cluster.""" + # Relate the two clusters for async replication + logger.info("Setting up async replication: primary → standby") + await ops_test.model.integrate( + f"{PRIMARY_APP}:replication-offer", f"{STANDBY_APP}:replication" + ) + + # Wait for relation to be established + await ops_test.model.wait_for_idle( + apps=[PRIMARY_APP, STANDBY_APP], + timeout=600, + raise_on_error=False, + ) + + # Run create-replication action on primary leader + primary_leader = None + for unit in ops_test.model.applications[PRIMARY_APP].units: + if await unit.is_leader_from_status(): + primary_leader = unit + break + assert primary_leader is not None, "Could not find primary cluster leader" + + logger.info(f"Running create-replication on {primary_leader.name}") + action = await primary_leader.run_action("create-replication") + action = await action.wait() + logger.info(f"create-replication result: {action.status} - {action.results}") + + # Wait for replication to be established + # The standby cluster should transition to standby mode + await ops_test.model.wait_for_idle( + apps=[PRIMARY_APP, STANDBY_APP], + timeout=900, + raise_on_error=False, + ) + + # Verify the standby units show as replicas + for attempt in Retrying(stop=stop_after_delay(300), wait=wait_fixed(15), reraise=True): + with attempt: + standby_status = ops_test.model.applications[STANDBY_APP].status + logger.info(f"Standby cluster status: {standby_status}") + # Standby should be active (as a standby cluster) + assert standby_status == "active", ( + f"Standby cluster should be active, got {standby_status}" + ) + + +@pytest.mark.abort_on_fail +async def test_watcher_quorum_after_replication(ops_test: OpsTest) -> None: + """Verify watcher maintains Raft quorum in the primary cluster after replication. + + After create-replication, the standby cluster's Patroni restarts to + follow the primary, which temporarily disrupts its Raft cluster. + We verify the primary cluster's Raft is unaffected and that the + watcher still reports both clusters in its topology. + """ + # Give the standby cluster time to stabilize after replication setup + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[PRIMARY_APP, STANDBY_APP, WATCHER_APP], + timeout=600, + raise_on_error=False, + ) + + # Primary cluster Raft should be unaffected by standby replication setup + logger.info("Verifying Raft quorum in primary cluster (post-replication)") + await verify_raft_cluster_health(ops_test, PRIMARY_APP, WATCHER_APP) + + # Verify the watcher still reports both clusters in topology + import json + + watcher_unit = ops_test.model.applications[WATCHER_APP].units[0] + action = await watcher_unit.run_action("get-cluster-status") + action = await action.wait() + assert action.status == "completed" + status = json.loads(action.results["status"]) + assert "clusters" in status + assert len(status["clusters"]) == 2, ( + f"Watcher should still see 2 clusters after replication, got {len(status['clusters'])}" + ) + logger.info("Watcher still monitors both clusters after replication setup") + + +@pytest.mark.abort_on_fail +async def test_health_check_both_clusters(ops_test: OpsTest) -> None: + """Verify health check action reports both clusters. + + After create-replication, the standby cluster runs in standby mode. + The watcher health check connects to all endpoints, but standby + endpoints may have different connection behavior. We verify the + action completes and reports both clusters with at least the + primary cluster's endpoints healthy. + """ + import json + + watcher_unit = ops_test.model.applications[WATCHER_APP].units[0] + + for attempt in Retrying(stop=stop_after_delay(360), wait=wait_fixed(10), reraise=True): + with attempt: + action = await watcher_unit.run_action("trigger-health-check") + action = await action.wait() + + assert action.status == "completed", f"Action failed: {action.results}" + health = json.loads(action.results["health-check"]) + assert len(health["clusters"]) == 2, ( + f"Expected 2 clusters in health check, got {len(health['clusters'])}" + ) + assert int(health["total-count"]) == 4, ( + f"Expected 4 total endpoints, got {health['total-count']}" + ) + # Primary cluster (2 endpoints) should be healthy; + # standby cluster may or may not respond to SELECT 1 + assert int(health["healthy-count"]) >= 2, ( + f"Expected at least 2 healthy endpoints (primary cluster), " + f"got {health['healthy-count']}" + ) + + logger.info( + f"Health check: {health['healthy-count']}/{health['total-count']} " + f"endpoints healthy across 2 clusters" + ) diff --git a/tests/integration/ha_tests/test_stereo_mode.py b/tests/integration/ha_tests/test_stereo_mode.py index 9cd5473911e..099eaaea062 100644 --- a/tests/integration/ha_tests/test_stereo_mode.py +++ b/tests/integration/ha_tests/test_stereo_mode.py @@ -15,6 +15,7 @@ """ import asyncio +import json import logging import pytest @@ -246,6 +247,27 @@ async def test_build_and_deploy_stereo_mode(ops_test: OpsTest, charm) -> None: assert len(ops_test.model.applications[WATCHER_APP_NAME].units) == 1 +@pytest.mark.abort_on_fail +async def test_watcher_topology_action(ops_test: OpsTest) -> None: + """Test the get-cluster-status action on the watcher.""" + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + + action = await watcher_unit.run_action("get-cluster-status") + action = await action.wait() + + assert action.status == "completed" + assert "status" in action.results + + status = json.loads(action.results["status"]) + # Single cluster: status is the cluster dict directly + assert "clustername" in status + assert "topology" in status + # Topology should have 2 PG units + 1 watcher = 3 entries + assert len(status["topology"]) == 3 + assert "raft" in status + assert status["raft"]["has_quorum"] is True + + @pytest.mark.abort_on_fail async def test_replica_shutdown_with_watcher(ops_test: OpsTest, continuous_writes) -> None: """Test replica shutdown with watcher providing quorum. @@ -727,6 +749,26 @@ async def test_multi_cluster_watcher(ops_test: OpsTest, charm) -> None: ops_test, second_pg_app, WATCHER_APP_NAME, expected_members=3 ) + # Run get-cluster-status and verify both clusters appear + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + action = await watcher_unit.run_action("get-cluster-status") + action = await action.wait() + assert action.status == "completed" + assert "status" in action.results + + status = json.loads(action.results["status"]) + # Multi-cluster: status has a "clusters" list + assert "clusters" in status, "Status should contain clusters list" + assert len(status["clusters"]) == 2, ( + f"Expected 2 clusters in status, got {len(status['clusters'])}" + ) + + # Verify each cluster has topology entries (PG units + watcher) + for cluster in status["clusters"]: + assert len(cluster["topology"]) >= 2, ( + f"Cluster {cluster.get('cluster_name')} should have 2 endpoints" + ) + finally: # Clean up the second cluster relation and app if second_pg_app in ops_test.model.applications: @@ -742,6 +784,45 @@ async def test_multi_cluster_watcher(ops_test: OpsTest, charm) -> None: ) +@pytest.mark.abort_on_fail +async def test_health_check_action(ops_test: OpsTest) -> None: + """Test the trigger-health-check action on the watcher.""" + # Wait for the cluster to fully stabilize after previous network tests + # The watcher may need time to reconnect and receive endpoint data after network manipulation + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + # Also verify Raft cluster health to ensure watcher is fully connected + # After network isolation tests, the watcher may have been redeployed with a new IP + # that isn't in the Raft configuration yet, so we skip the watcher IP check + await verify_raft_cluster_health( + ops_test, DATABASE_APP_NAME, WATCHER_APP_NAME, expected_members=3, check_watcher_ip=False + ) + + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + + # Retry the action multiple times as the watcher needs to receive fresh endpoint data + # from the relation after reconnecting. The pg-endpoints are updated by the PostgreSQL + # leader in update_status (runs every 5 minutes), so we need to wait long enough for + # at least one update_status cycle to complete. + for attempt in Retrying(stop=stop_after_delay(360), wait=wait_fixed(10), reraise=True): + with attempt: + action = await watcher_unit.run_action("trigger-health-check") + action = await action.wait() + + assert action.status == "completed", f"Action failed: {action.results}" + assert "health-check" in action.results + + health = json.loads(action.results["health-check"]) + assert "clusters" in health + assert int(health["healthy-count"]) == 2 + assert int(health["total-count"]) == 2 + + @pytest.mark.abort_on_fail async def test_watcher_production_profile_az_blocked(ops_test: OpsTest, charm) -> None: """Test watcher with profile=production blocks on AZ co-location. @@ -846,3 +927,73 @@ async def test_watcher_production_profile_az_blocked(ops_test: OpsTest, charm) - status="active", timeout=600, ) + + +@pytest.mark.abort_on_fail +async def test_odd_count_raft_exclusion(ops_test: OpsTest, continuous_writes) -> None: + """Test watcher gracefully yields quorum/voting if database scales to an odd count.""" + db_app = ops_test.model.applications[DATABASE_APP_NAME] + + # Ensure starting condition: 2 units (Even) + if len(db_app.units) != 2: + logger.info(f"Test requires 2 DB units initially, found {len(db_app.units)}.") + + # Validate watcher is voting initially + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + action = await watcher_unit.run_action("get-cluster-status") + action = await action.wait() + + status = json.loads(action.results["status"]) + watcher_topology = status["topology"].get(watcher_unit.name) + assert watcher_topology["voting"] is True, "Watcher should be voting when PG is 2 units" + + logger.info("Scaling DB to 3 units to verify watcher Raft eviction") + await db_app.add_unit(count=1) + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=1800, + idle_period=30, + ) + + # Validate watcher stepped down from voting + action = await watcher_unit.run_action("get-cluster-status") + action = await action.wait() + status = json.loads(action.results["status"]) + watcher_topology = status["topology"].get(watcher_unit.name) + assert watcher_topology["voting"] is False, "Watcher should NOT vote when PG is an odd count" + + logger.info("Scaling DB back to 2 units") + await ops_test.model.destroy_unit(db_app.units[-1].name) + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=1800, + idle_period=30, + ) + + # Validate watcher resumed voting + action = await watcher_unit.run_action("get-cluster-status") + action = await action.wait() + status = json.loads(action.results["status"]) + watcher_topology = status["topology"].get(watcher_unit.name) + assert watcher_topology["voting"] is True, ( + "Watcher should resume voting when PG drops to 2 units" + ) + + +@pytest.mark.abort_on_fail +async def test_action_blocking_for_watcher_role(ops_test: OpsTest) -> None: + """Test that PostgreSQL specific actions are blocked dynamically on watcher role.""" + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + + # Execute a database-specific action + logger.info("Triggering PG-only action 'create-backup' on watcher unit") + action = await watcher_unit.run_action("create-backup") + action = await action.wait() + + assert action.status == "failed", "Action should have failed cleanly" + assert ( + "this action is not available for the role assigned to this application" + in action.message.lower() + ), f"Incorrect failure string: {action.message}" diff --git a/tests/integration/spaces/test_spaced_stereo_mode.py b/tests/integration/spaces/test_spaced_stereo_mode.py new file mode 100644 index 00000000000..96045e2360a --- /dev/null +++ b/tests/integration/spaces/test_spaced_stereo_mode.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python3 +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Integration tests for PostgreSQL stereo mode with Juju spaces. + +Verifies that stereo mode works when PostgreSQL and the watcher are +deployed in separate Juju spaces. The watcher-offer/watcher relation +must work across space boundaries for Raft consensus. + +Sets up its own LXD networks and Juju spaces (does not depend on the +jubilant-based conftest fixtures). +""" + +import logging +import subprocess + +import pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed + +from ..ha_tests.helpers import APPLICATION_NAME as TEST_APP_NAME +from ..ha_tests.test_stereo_mode import ( + start_writes, + verify_raft_cluster_health, +) +from ..helpers import ( + APPLICATION_NAME, + CHARM_BASE, + DATABASE_APP_NAME, +) + +logger = logging.getLogger(__name__) + + +async def get_cluster_roles_via_exec(ops_test: OpsTest, unit_name: str) -> dict[str, list[str]]: + """Get Patroni cluster roles by querying the API from inside the unit. + + Uses the Patroni REST API address from the Patroni config file, since + with Juju spaces Patroni binds to a space-specific IP (not localhost). + """ + import json + + # Get the Patroni REST API address from config (bound to pg-space IP) + return_code, stdout, _ = await ops_test.juju( + "exec", + "--unit", + unit_name, + "--", + "bash", + "-c", + "grep 'connect_address' /var/snap/charmed-postgresql/current/etc/patroni/patroni.yaml" + " | head -1 | awk '{print $2}' | tr -d \"'\"", + ) + assert return_code == 0, f"Failed to get Patroni REST address on {unit_name}" + patroni_addr = stdout.strip() + logger.info(f"Patroni REST API on {unit_name}: {patroni_addr}") + + return_code, stdout, stderr = await ops_test.juju( + "exec", + "--unit", + unit_name, + "--", + "curl", + "-sk", + f"https://{patroni_addr}/cluster", + ) + assert return_code == 0, ( + f"Failed to query Patroni cluster on {unit_name}: " + f"rc={return_code}, stdout={stdout!r}, stderr={stderr!r}" + ) + + members: dict[str, list[str]] = {"replicas": [], "primaries": [], "sync_standbys": []} + cluster_info = json.loads(stdout) + logger.info(f"Cluster members on {unit_name}: {cluster_info.get('members', [])}") + for member in cluster_info["members"]: + role = member["role"] + name = "/".join(member["name"].rsplit("-", 1)) + if role == "leader": + members["primaries"].append(name) + elif role == "sync_standby": + members["sync_standbys"].append(name) + else: + members["replicas"].append(name) + return members + + +WATCHER_APP_NAME = "pg-watcher" + +# LXD networks: pg-space for PostgreSQL, watcher-space for the watcher +NETWORKS = { + "pg-space": "10.40.40.1/24", + "watcher-space": "10.50.50.1/24", +} + +DEFAULT_LXD_NETWORK = "lxdbr0" + + +def _create_lxd_network(name: str, subnet: str) -> None: + """Create an LXD bridge network.""" + try: + subprocess.run( + [ + "sudo", + "lxc", + "network", + "create", + name, + "--type=bridge", + f"ipv4.address={subnet}", + "ipv4.nat=true", + "ipv6.address=none", + "dns.mode=none", + ], + capture_output=True, + check=True, + encoding="utf-8", + ) + subprocess.check_output(f"sudo ip link set up dev {name}".split()) + logger.info(f"Created LXD network {name} with subnet {subnet}") + except subprocess.CalledProcessError as e: + if "The network already exists" in (e.stderr or ""): + logger.warning(f"LXD network {name} already exists") + else: + raise + + +@pytest.fixture(scope="module") +def lxd_networks(): + """Create LXD networks for the two spaces.""" + # Set dns.mode=none on default network to avoid DNS conflicts + subprocess.run( + ["sudo", "lxc", "network", "set", DEFAULT_LXD_NETWORK, "dns.mode=none"], + check=True, + ) + + for name, subnet in NETWORKS.items(): + _create_lxd_network(name, subnet) + + yield + + for name in NETWORKS: + try: + subprocess.check_output(f"sudo lxc network delete {name}".split()) + except subprocess.CalledProcessError: + logger.warning(f"Failed to delete LXD network {name}") + + try: + subprocess.check_output(f"sudo lxc network unset {DEFAULT_LXD_NETWORK} dns.mode".split()) + except subprocess.CalledProcessError: + logger.warning("Failed to restore dns.mode on default network") + + +@pytest.fixture(scope="module") +async def spaced_model(ops_test: OpsTest, lxd_networks): + """Set up Juju spaces for the test model.""" + await ops_test.juju("reload-spaces") + + for name, subnet in NETWORKS.items(): + try: + await ops_test.juju("add-space", name, subnet) + except Exception as e: + if "already exists" in str(e): + logger.info(f"Space {name} already exists") + else: + raise + + logger.info(f"Juju spaces configured: {', '.join(NETWORKS)}") + + +@pytest.fixture() +async def continuous_writes(ops_test: OpsTest) -> None: + """Fixture to clean up continuous writes after each test.""" + yield + for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await ops_test.model + .applications[TEST_APP_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + + +@pytest.mark.abort_on_fail +async def test_deploy_stereo_mode_with_spaces(ops_test: OpsTest, charm, spaced_model) -> None: + """Deploy stereo mode with PostgreSQL and watcher in separate Juju spaces. + + - PostgreSQL units: deployed with spaces=pg-space + - Watcher unit: deployed with spaces=watcher-space + - The watcher-offer/watcher relation bridges the two spaces + """ + if DATABASE_APP_NAME in ops_test.model.applications: + pg_units = len(ops_test.model.applications[DATABASE_APP_NAME].units) + watcher_deployed = WATCHER_APP_NAME in ops_test.model.applications + test_app_deployed = APPLICATION_NAME in ops_test.model.applications + + if pg_units == 2 and watcher_deployed and test_app_deployed: + logger.info("Stereo mode already deployed, verifying...") + await ops_test.model.wait_for_idle(status="active", timeout=300) + return + + for app in [DATABASE_APP_NAME, WATCHER_APP_NAME, APPLICATION_NAME]: + if app in ops_test.model.applications: + await ops_test.model.remove_application(app, block_until_done=True) + + async with ops_test.fast_forward(): + # Deploy PostgreSQL: peers + database on pg-space, watcher relation on watcher-space + logger.info("Deploying PostgreSQL with pg-space + watcher-space...") + await ops_test.model.deploy( + charm, + application_name=DATABASE_APP_NAME, + num_units=2, + base=CHARM_BASE, + config={"profile": "testing"}, + constraints={"spaces": ["pg-space", "watcher-space"]}, + bind={ + "database-peers": "pg-space", + "database": "pg-space", + "watcher-offer": "watcher-space", + }, + ) + + # Deploy watcher: all traffic on watcher-space + logger.info("Deploying watcher with spaces=watcher-space...") + await ops_test.model.deploy( + charm, + application_name=WATCHER_APP_NAME, + num_units=1, + base=CHARM_BASE, + config={"role": "watcher", "profile": "testing"}, + constraints={"spaces": ["watcher-space"]}, + bind={"watcher": "watcher-space"}, + ) + + # Deploy test app in pg-space + logger.info("Deploying test application with spaces=pg-space...") + await ops_test.model.deploy( + APPLICATION_NAME, + application_name=APPLICATION_NAME, + base=CHARM_BASE, + channel="edge", + constraints={"spaces": ["pg-space"]}, + bind={"database": "pg-space"}, + ) + + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + timeout=1200, + raise_on_error=False, + ) + + # Relate PostgreSQL to watcher across spaces + logger.info("Relating PostgreSQL to watcher (cross-space)") + try: + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" + ) + except Exception as e: + if "already exists" in str(e): + logger.info(f"Watcher relation already exists: {e}") + else: + raise + + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=600, + ) + + # Relate PostgreSQL to test app + try: + await ops_test.model.integrate(DATABASE_APP_NAME, f"{APPLICATION_NAME}:database") + except Exception as e: + if "already exists" in str(e): + logger.info(f"Database relation already exists: {e}") + else: + raise + + await ops_test.model.wait_for_idle(status="active", timeout=1800) + + assert len(ops_test.model.applications[DATABASE_APP_NAME].units) == 2 + assert len(ops_test.model.applications[WATCHER_APP_NAME].units) == 1 + + +@pytest.mark.abort_on_fail +async def test_raft_quorum_across_spaces(ops_test: OpsTest) -> None: + """Verify Raft quorum is established across spaces.""" + # check_watcher_ip=False because the watcher's Raft address is on + # watcher-space, not the default address returned by unit-get private-address + await verify_raft_cluster_health( + ops_test, DATABASE_APP_NAME, WATCHER_APP_NAME, check_watcher_ip=False + ) + + +@pytest.mark.abort_on_fail +async def test_topology_action_with_spaces(ops_test: OpsTest) -> None: + """Test get-cluster-status action returns correct cross-space topology.""" + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + + action = await watcher_unit.run_action("get-cluster-status") + action = await action.wait() + + assert action.status == "completed" + assert "status" in action.results + + import json + + status = json.loads(action.results["status"]) + # Single cluster: status is the cluster dict directly + assert "clustername" in status + assert "topology" in status + # Topology should have 2 PG units + 1 watcher = 3 entries + assert len(status["topology"]) == 3 + + +@pytest.mark.abort_on_fail +async def test_primary_shutdown_failover_across_spaces( + ops_test: OpsTest, continuous_writes +) -> None: + """Test primary shutdown triggers failover with watcher in a separate space. + + This is the critical test: the watcher must provide the Raft vote + across the space boundary for failover to succeed. + """ + await start_writes(ops_test) + + # because Patroni API is bound to pg-space, + # not the default address that python-libjuju returns + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles_via_exec(ops_test, any_unit) + original_primary = original_roles["primaries"][0] + + if original_roles["sync_standbys"]: + original_replica = original_roles["sync_standbys"][0] + else: + original_replica = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name != original_primary: + original_replica = unit.name + break + assert original_replica is not None + + logger.info(f"Shutting down primary: {original_primary}") + + await ops_test.model.destroy_unit( + original_primary, force=True, destroy_storage=False, max_wait=1500 + ) + + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=600, + idle_period=30, + ) + + # Verify failover happened — watcher's Raft vote across spaces enabled this + remaining_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(10), reraise=True): + with attempt: + new_roles = await get_cluster_roles_via_exec(ops_test, remaining_unit) + logger.info(f"Post-failover roles: {new_roles}") + assert len(new_roles["primaries"]) == 1 + assert new_roles["primaries"][0] == original_replica + + # Scale back up + logger.info("Scaling back up after primary shutdown") + await ops_test.model.applications[DATABASE_APP_NAME].add_unit(count=1) + await ops_test.model.wait_for_idle(status="active", timeout=1800, idle_period=60) + + for attempt in Retrying(stop=stop_after_delay(300), wait=wait_fixed(15), reraise=True): + with attempt: + final_roles = await get_cluster_roles_via_exec( + ops_test, + ops_test.model.applications[DATABASE_APP_NAME].units[0].name, + ) + assert len(final_roles["primaries"]) == 1 + assert len(final_roles["sync_standbys"]) == 1 + + logger.info("Failover verified — watcher Raft vote worked across spaces") + + +@pytest.mark.abort_on_fail +async def test_watcher_shutdown_across_spaces(ops_test: OpsTest, continuous_writes) -> None: + """Test watcher shutdown — no outage even when watcher is in a different space.""" + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles_via_exec(ops_test, any_unit) + + logger.info("Removing watcher unit (separate space)") + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + await ops_test.model.destroy_unit(watcher_unit.name, force=True, max_wait=300) + + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + new_roles = await get_cluster_roles_via_exec(ops_test, any_unit) + assert new_roles["primaries"] == original_roles["primaries"] + + # Re-deploy watcher in the watcher space + logger.info("Re-deploying watcher in watcher-space") + await ops_test.model.applications[WATCHER_APP_NAME].add_unit(count=1) + await ops_test.model.wait_for_idle(status="active", timeout=600) + + await verify_raft_cluster_health( + ops_test, DATABASE_APP_NAME, WATCHER_APP_NAME, check_watcher_ip=False + ) + + +@pytest.mark.abort_on_fail +async def test_health_check_across_spaces(ops_test: OpsTest) -> None: + """Test health check action works across space boundaries.""" + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + await verify_raft_cluster_health( + ops_test, + DATABASE_APP_NAME, + WATCHER_APP_NAME, + expected_members=3, + check_watcher_ip=False, + ) + + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + + for attempt in Retrying(stop=stop_after_delay(360), wait=wait_fixed(10), reraise=True): + with attempt: + action = await watcher_unit.run_action("trigger-health-check") + action = await action.wait() + + assert action.status == "completed", f"Action failed: {action.results}" + assert "health-check" in action.results + + import json + + health = json.loads(action.results["health-check"]) + assert "clusters" in health + assert int(health["healthy-count"]) == 2 + assert int(health["total-count"]) == 2 diff --git a/tests/unit/test_watcher_requirer.py b/tests/unit/test_watcher_requirer.py index 4e36ee4271a..0f7f70d48bf 100644 --- a/tests/unit/test_watcher_requirer.py +++ b/tests/unit/test_watcher_requirer.py @@ -3,6 +3,7 @@ """Unit tests for the watcher requirer relation handler (AZ co-location logic).""" +import json from unittest.mock import MagicMock, patch from ops import ActiveStatus, BlockedStatus, WaitingStatus @@ -304,3 +305,243 @@ def test_relation_broken_removes_port(self): _remove_service.assert_called_once_with() handler._release_port_for_relation.assert_called_once_with(42) + + +class TestWatcherActions: + """Tests for watcher actions output formatting.""" + + def _build_handler(self): + mock_charm = create_mock_charm() + mock_framework = MagicMock() + mock_framework.model = mock_charm.model + with patch.object(WatcherRequirerHandler, "__init__", return_value=None): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + handler.framework = mock_framework + handler._get_standby_clusters = MagicMock(return_value=[]) + return handler, mock_charm + + def test_get_cluster_status_serializes_json_result(self): + """Action output is a JSON string in the `status` key.""" + handler, mock_charm = self._build_handler() + relation = MagicMock() + relation.id = 1 + mock_charm.model.relations.get.return_value = [relation] + handler._get_cluster_name = MagicMock(return_value="cluster-a") + handler._format_cluster_status = MagicMock(return_value={"raft": {"has_quorum": True}}) + + event = MagicMock() + event.params = {"standby-clusters": False} + + handler._on_get_cluster_status(event) + + event.set_results.assert_called_once() + results = event.set_results.call_args.args[0] + assert results["success"] == "True" + parsed = json.loads(results["status"]) + assert parsed["raft"]["has_quorum"] is True + + def test_get_cluster_status_no_relations_returns_empty_json(self): + """No-related-cluster response returns an empty JSON object string.""" + handler, mock_charm = self._build_handler() + mock_charm.model.relations.get.return_value = [] + + event = MagicMock() + event.params = {} + + handler._on_get_cluster_status(event) + + event.set_results.assert_called_once_with({"success": "True", "status": "{}"}) + + def test_get_cluster_status_cluster_filter_not_found_fails(self): + """Unknown cluster filter fails instead of returning status.""" + handler, mock_charm = self._build_handler() + relation = MagicMock() + relation.id = 1 + mock_charm.model.relations.get.return_value = [relation] + handler._get_cluster_name = MagicMock(return_value="cluster-a") + + event = MagicMock() + event.params = {"cluster-name": "cluster-missing"} + + handler._on_get_cluster_status(event) + + event.fail.assert_called_once() + event.set_results.assert_not_called() + + def test_get_cluster_status_cluster_set_uses_role_and_links(self): + """Cluster-set output honors role and includes linked standby clusters.""" + handler, mock_charm = self._build_handler() + rel_primary = MagicMock() + rel_primary.id = 1 + rel_standby = MagicMock() + rel_standby.id = 2 + mock_charm.model.relations.get.return_value = [rel_primary, rel_standby] + handler._get_cluster_name = MagicMock(side_effect=["cluster-a", "cluster-b"]) + handler._format_cluster_status = MagicMock( + side_effect=[ + { + "clusterrole": "primary", + "status": "ok", + "primary": "10.0.0.1:5432", + "timeline": 1, + }, + { + "clusterrole": "standby", + "status": "ok", + "primary": None, + "timeline": 1, + }, + ] + ) + handler._get_standby_clusters = MagicMock(side_effect=[["cluster-b"], ["cluster-a"]]) + + event = MagicMock() + event.params = {"standby-clusters": True} + + handler._on_get_cluster_status(event) + + results = event.set_results.call_args.args[0] + payload = json.loads(results["status"]) + assert payload["primary_cluster"] == "cluster-a" + assert payload["clusters"]["cluster-a"]["linked_standby_clusters"] == ["cluster-b"] + assert payload["clusters"]["cluster-b"]["replication_status"] == "streaming" + + def test_trigger_health_check_marks_non_dict_result_unhealthy(self): + """Non-dict health results are treated as unhealthy values.""" + handler, mock_charm = self._build_handler() + relation = MagicMock() + relation.id = 1 + mock_charm.model.relations.get.return_value = [relation] + handler._get_raft_partner_addrs = MagicMock(return_value=["10.0.0.1"]) + handler._build_ip_maps = MagicMock(return_value=({}, {"10.0.0.1": "postgresql/0"})) + handler._get_cluster_name = MagicMock(return_value="cluster-a") + + event = MagicMock() + + with patch( + "relations.watcher_requirer.RaftController.check_all_endpoints", + return_value={"10.0.0.1": False}, + ): + handler._on_trigger_health_check(event) + + event.set_results.assert_called_once_with({ + "health-check": json.dumps({ + "clusters": [ + {"cluster_name": "cluster-a", "endpoints": {"postgresql/0": "unhealthy"}} + ], + "healthy-count": 0, + "total-count": 1, + }) + }) + results = event.set_results.call_args.args[0] + payload = json.loads(results["health-check"]) + assert payload["healthy-count"] == 0 + assert payload["total-count"] == 1 + assert payload["clusters"][0]["endpoints"]["postgresql/0"] == "unhealthy" + + def test_format_cluster_status_marks_standby_when_recovery_only(self): + """Cluster role becomes standby when healthy members are in recovery.""" + handler, _ = self._build_handler() + relation = MagicMock() + relation.id = 7 + + handler._get_cluster_name = MagicMock(return_value="cluster-a") + handler._get_raft_partner_addrs = MagicMock(return_value=["10.0.0.1"]) + handler._build_ip_maps = MagicMock(return_value=({}, {"10.0.0.1": "postgresql/0"})) + handler._get_port_for_relation = MagicMock(return_value=2222) + handler._get_pg_version = MagicMock(return_value="16") + + with ( + patch( + "relations.watcher_requirer.RaftController.check_all_endpoints", + return_value={"10.0.0.1": True}, + ), + patch( + "relations.watcher_requirer.RaftController.cluster_status", + return_value=[{"role": "standby_leader", "host": "10.0.0.1"}], + ), + patch("relations.watcher_requirer.RaftController.get_status") as _get_status, + ): + _get_status.return_value = { + "running": True, + "connected": True, + "has_quorum": True, + "leader": "10.0.0.1:2222", + "members": ["10.0.0.1:2222"], + } + status = handler._format_cluster_status(relation) + + assert status["clusterrole"] == "standby" + assert status["primary"] is None + + def test_format_cluster_status_uses_unit_address_when_binding_missing(self): + """Watcher topology address falls back to relation unit-address.""" + handler, mock_charm = self._build_handler() + relation = MagicMock() + relation.id = 7 + relation.app = MagicMock() + relation.data = {mock_charm.unit: {"unit-address": "10.1.1.7"}, relation.app: {}} + mock_charm.model.get_binding.return_value = None + + handler._get_cluster_name = MagicMock(return_value="cluster-a") + handler._get_raft_partner_addrs = MagicMock(return_value=[]) + handler._build_ip_maps = MagicMock(return_value=({}, {})) + handler._get_port_for_relation = MagicMock(return_value=2222) + + with ( + patch( + "relations.watcher_requirer.RaftController.check_all_endpoints", + return_value={"10.0.0.1": True}, + ), + patch( + "relations.watcher_requirer.RaftController.cluster_status", + return_value=[{"role": "standby_leader", "host": "10.0.0.1"}], + ), + patch("relations.watcher_requirer.RaftController.get_status") as _get_status, + ): + _get_status.return_value = { + "running": True, + "connected": True, + "has_quorum": True, + "leader": None, + "members": [], + } + status = handler._format_cluster_status(relation) + assert status["topology"]["pg-watcher/0"]["address"] == "10.1.1.7:2222" + + def test_format_cluster_status_does_not_emit_none_port_address(self): + """Watcher topology address is None when no IP source is available.""" + handler, mock_charm = self._build_handler() + relation = MagicMock() + relation.id = 7 + relation.app = MagicMock() + relation.data = {mock_charm.unit: {}, relation.app: {}} + mock_charm.model.get_binding.return_value = None + + handler._get_cluster_name = MagicMock(return_value="cluster-a") + handler._get_raft_partner_addrs = MagicMock(return_value=[]) + handler._build_ip_maps = MagicMock(return_value=({}, {})) + handler._get_port_for_relation = MagicMock(return_value=2222) + + with ( + patch( + "relations.watcher_requirer.RaftController.check_all_endpoints", + return_value={"10.0.0.1": True}, + ), + patch( + "relations.watcher_requirer.RaftController.cluster_status", + return_value=[{"role": "standby_leader", "host": "10.0.0.1"}], + ), + patch("relations.watcher_requirer.RaftController.get_status") as _get_status, + ): + _get_status.return_value = { + "running": True, + "connected": True, + "has_quorum": True, + "leader": None, + "members": [], + } + + status = handler._format_cluster_status(relation) + assert status["topology"]["pg-watcher/0"]["address"] is None