From 3c7fdcb5773a8e5aa4824e21fbe6d95d07b9b500 Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Thu, 30 Apr 2026 03:15:42 +0300 Subject: [PATCH 1/4] Watcher role --- config.yaml | 8 + metadata.yaml | 6 + src/charm.py | 368 ++++++-- src/cluster.py | 39 +- src/constants.py | 1 + src/raft_controller.py | 113 ++- src/relations/async_replication.py | 9 + src/relations/watcher.py | 589 ++++++++++++ src/relations/watcher_requirer.py | 524 +++++++++++ templates/patroni.yml.j2 | 9 +- .../integration/ha_tests/test_stereo_mode.py | 848 ++++++++++++++++++ tests/spread/test_stereo_mode.py/task.yaml | 7 + tests/unit/test_charm.py | 39 +- tests/unit/test_cluster.py | 22 +- tests/unit/test_raft_controller.py | 16 +- tests/unit/test_watcher_relation.py | 288 ++++++ tests/unit/test_watcher_requirer.py | 306 +++++++ 17 files changed, 3049 insertions(+), 143 deletions(-) create mode 100644 src/relations/watcher.py create mode 100644 src/relations/watcher_requirer.py create mode 100644 tests/integration/ha_tests/test_stereo_mode.py create mode 100644 tests/spread/test_stereo_mode.py/task.yaml create mode 100644 tests/unit/test_watcher_relation.py create mode 100644 tests/unit/test_watcher_requirer.py diff --git a/config.yaml b/config.yaml index 594c1c60669..6a2e389f900 100644 --- a/config.yaml +++ b/config.yaml @@ -2,6 +2,14 @@ # See LICENSE file for licensing details. options: + role: + description: | + Deployment role for this application. Set at deploy time and cannot be changed afterwards. + "postgresql" (default) runs the full PostgreSQL database server with Patroni. + "watcher" runs a lightweight Raft witness for stereo mode (2-node clusters), + providing quorum without running PostgreSQL. + type: string + default: "postgresql" synchronous-node-count: description: | Sets the number of synchronous nodes to be maintained in the cluster. Should be diff --git a/metadata.yaml b/metadata.yaml index 36fff6de212..523b3a86c27 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -45,8 +45,14 @@ provides: interface: cos_agent limit: 1 optional: true + watcher-offer: + interface: postgresql_watcher + limit: 1 requires: + watcher: + interface: postgresql_watcher + optional: true replication: interface: postgresql_async limit: 1 diff --git a/src/charm.py b/src/charm.py index ff46a63386b..9dd9dad044e 100755 --- a/src/charm.py +++ b/src/charm.py @@ -113,6 +113,7 @@ PLUGIN_OVERRIDES, POSTGRESQL_DATA_PATH, RAFT_PASSWORD_KEY, + RAFT_PORT, REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION, REPLICATION_PASSWORD_KEY, @@ -132,9 +133,12 @@ USER_PASSWORD_KEY, ) from ldap import PostgreSQLLDAP +from raft_controller import install_service from relations.async_replication import PostgreSQLAsyncReplication from relations.postgresql_provider import PostgreSQLProvider from relations.tls import TLS +from relations.watcher import PostgreSQLWatcherRelation +from relations.watcher_requirer import WatcherRequirerHandler from rotate_logs import RotateLogs from utils import label2name, new_password, render_file @@ -239,18 +243,21 @@ def is_compatible( def refresh_snap( self, *, snap_name: str, snap_revision: str, refresh: charm_refresh.Machines ) -> None: - # Update the configuration. - self._charm.set_unit_status(MaintenanceStatus("updating configuration"), refresh=refresh) - self._charm.update_config(refresh=refresh) + if not self._charm.is_watcher_role: + # Update the configuration. + self._charm.set_unit_status( + MaintenanceStatus("updating configuration"), refresh=refresh + ) + self._charm.update_config(refresh=refresh) - # TODO add graceful shutdown before refreshing snap? - # TODO future improvement: if snap refresh fails (i.e. same snap revision installed) after - # graceful shutdown, restart workload + # TODO add graceful shutdown before refreshing snap? + # TODO future improvement: if snap refresh fails (i.e. same snap revision installed) after + # graceful shutdown, restart workload - self._charm.set_unit_status(MaintenanceStatus("refreshing the snap"), refresh=refresh) - self._charm._install_snap_package(revision=snap_revision, refresh=refresh) + self._charm.set_unit_status(MaintenanceStatus("refreshing the snap"), refresh=refresh) + self._charm._install_snap_package(revision=snap_revision, refresh=refresh) - self._charm._post_snap_refresh(refresh) + self._charm._post_snap_refresh(refresh) def charm_tracing_config(endpoint_requirer: COSAgentProvider) -> None: @@ -308,6 +315,155 @@ def __init__(self, *args): if isinstance(handler, ops.log.JujuLogHandler): handler.setFormatter(logging.Formatter("{name}:{message}", style="{")) + configured_role = self.model.config.get("role", "postgresql") + if not isinstance(configured_role, str) or configured_role not in ( + "postgresql", + "watcher", + ): + self.unit.status = BlockedStatus( + f"invalid role '{configured_role}' (must be 'postgresql' or 'watcher')" + ) + return + elif isinstance(self.unit.status, BlockedStatus) and self.unit.status.message.startswith( + "invalid role '" + ): + self.unit.status = ActiveStatus() + + if not self._validate_initial_role_unchanged(): + return + + # Watcher mode: lightweight Raft witness, no PostgreSQL + if self.is_watcher_role: + self._init_watcher_mode() + # Set tracing_endpoint for @trace_charm decorator compatibility + self.tracing_endpoint = None + else: + # PostgreSQL mode: full database server + self._init_postgresql_mode() + + self.refresh: charm_refresh.Machines | None + try: + self.refresh = charm_refresh.Machines( + _PostgreSQLRefresh( + workload_name="PostgreSQL", charm_name="postgresql", _charm=self + ) + ) + except (charm_refresh.UnitTearingDown, charm_refresh.PeerRelationNotReady): + self.refresh = None + self._reconcile_refresh_status() + + if self.refresh is not None and not self.refresh.next_unit_allowed_to_refresh: + if self.refresh.in_progress: + self._post_snap_refresh(self.refresh) + else: + self.refresh.next_unit_allowed_to_refresh = True + + @cached_property + def get_role(self) -> str: + """Get cached role if available or configured role if not.""" + configured_role = str(self.model.config.get("role", "postgresql")) + if not self._peers: + return configured_role + stored_role = self._peers.data[self.app].get("role") + if stored_role is None: + return configured_role + return stored_role + + @cached_property + def is_watcher_role(self) -> bool: + """Return True if this charm is deployed in watcher mode.""" + return self.get_role == "watcher" + + def _validate_initial_role_unchanged(self) -> bool: + """Validate configured role against persisted peer-role during startup.""" + if not self._peers: + return True + + configured_role = str(self.model.config.get("role", "postgresql")) + stored_role = self._peers.data[self.app].get("role") + if stored_role is None or stored_role == configured_role: + if isinstance(self.unit.status, BlockedStatus) and self.unit.status.message.startswith( + "role change not supported" + ): + self.unit.status = ActiveStatus() + return True + + logger.error( + f"Role change is not supported. Deployed as '{stored_role}', " + f"but config now says '{configured_role}'." + ) + self.unit.status = BlockedStatus( + f"role change not supported (deployed as '{stored_role}')" + ) + return False + + def _validate_role_unchanged(self) -> bool: + """Validate that the role has not changed since initial deployment. + + Persists the role to the peer databag on first leader election and checks + for changes on config-changed. Returns True if valid, False if blocked. + """ + if not self._peers: + return True + configured_role = str(self.model.config.get("role", "postgresql")) + stored_role = self._peers.data[self.app].get("role") + if stored_role is None: + # First time — persist the role (leader only) + if self.unit.is_leader(): + self._peers.data[self.app]["role"] = configured_role + return True + if stored_role != configured_role: + logger.error( + f"Role change is not supported. Deployed as '{stored_role}', " + f"but config now says '{configured_role}'." + ) + self.unit.status = BlockedStatus( + f"role change not supported (deployed as '{stored_role}')" + ) + return False + return True + + def _init_watcher_mode(self): + """Initialize the charm in watcher mode (lightweight Raft witness).""" + self.watcher_requirer = WatcherRequirerHandler(self) + # Watcher mode delegates all event handling to WatcherRequirerHandler. + # We still observe leader_elected to persist the role in peer data. + self.framework.observe(self.on.leader_elected, self._on_watcher_leader_elected) + self.framework.observe(self.on.config_changed, self._on_watcher_config_changed) + + # Register handlers for PostgreSQL-specific actions so users get a + # clear message rather than a generic Juju "action not found" error. + _pg_only_actions = [ + "create_backup", + "create_replication", + "get_primary", + "list_backups", + "pre_refresh_check", + "force_refresh_start", + "resume_refresh", + "promote_to_primary", + "restore", + ] + for action_name in _pg_only_actions: + self.framework.observe( + getattr(self.on, f"{action_name}_action"), + self._on_action_not_available_for_watcher, + ) + + def _on_action_not_available_for_watcher(self, event: ActionEvent) -> None: + """Fail any PG-specific action run against a watcher unit.""" + event.fail("this action is not available for the role assigned to this application") + + def _on_watcher_leader_elected(self, event): + """Persist the role in peer data on first leader election (watcher mode).""" + self._validate_role_unchanged() + + def _on_watcher_config_changed(self, event): + """Block if role was changed after deployment (watcher mode).""" + self._validate_role_unchanged() + + def _init_postgresql_mode(self): + """Initialize the charm in postgresql mode (full database server).""" self.peer_relation_app = DataPeerData( self.model, relation_name=PEER, @@ -353,22 +509,12 @@ def __init__(self, *args): self.tls = TLS(self, PEER) self.tls_transfer = TLSTransfer(self, PEER) self.async_replication = PostgreSQLAsyncReplication(self) + self.watcher_offer = PostgreSQLWatcherRelation(self) # self.logical_replication = PostgreSQLLogicalReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) - self.refresh: charm_refresh.Machines | None - try: - self.refresh = charm_refresh.Machines( - _PostgreSQLRefresh( - workload_name="PostgreSQL", charm_name="postgresql", _charm=self - ) - ) - except (charm_refresh.UnitTearingDown, charm_refresh.PeerRelationNotReady): - self.refresh = None - self._reconcile_refresh_status() - # Support for disabling the operator. disable_file = Path(f"{os.environ.get('CHARM_DIR')}/disable") if disable_file.exists(): @@ -379,12 +525,6 @@ def __init__(self, *args): self.unit.status = BlockedStatus("Disabled") sys.exit(0) - if self.refresh is not None and not self.refresh.next_unit_allowed_to_refresh: - if self.refresh.in_progress: - self._post_snap_refresh(self.refresh) - else: - self.refresh.next_unit_allowed_to_refresh = True - self._observer.start_observer() self._rotate_logs.start_log_rotation() self._grafana_agent = COSAgentProvider( @@ -410,52 +550,59 @@ def _post_snap_refresh(self, refresh: charm_refresh.Machines): Called after snap refresh """ - try: - if ( - (raw_cert := self.get_secret(UNIT_SCOPE, "internal-cert")) - and (cert := load_pem_x509_certificate(raw_cert.encode())) - and ( - cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value - != self._unit_ip - ) - ): - self.tls.generate_internal_peer_cert() - except Exception: - logger.exception("Unable to check or update internal cert") + if not self.is_watcher_role: + try: + if ( + (raw_cert := self.get_secret(UNIT_SCOPE, "internal-cert")) + and (cert := load_pem_x509_certificate(raw_cert.encode())) + and ( + cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + != self._unit_ip + ) + ): + self.tls.generate_internal_peer_cert() + except Exception: + logger.exception("Unable to check or update internal cert") - if not self._patroni.start_patroni(): - self.set_unit_status(ops.BlockedStatus("Failed to start PostgreSQL"), refresh=refresh) - return + if not self._patroni.start_patroni(): + self.set_unit_status(BlockedStatus("Failed to start PostgreSQL"), refresh=refresh) + return - self._setup_exporter() - self.backup.start_stop_pgbackrest_service() - self._setup_pgbackrest_exporter() + self._setup_exporter() + self.backup.start_stop_pgbackrest_service() + self._setup_pgbackrest_exporter() - # Wait until the database initialise. - self.set_unit_status(WaitingStatus("waiting for database initialisation"), refresh=refresh) - try: - for attempt in Retrying(stop=stop_after_attempt(6), wait=wait_fixed(10)): - with attempt: - # Check if the member hasn't started or hasn't joined the cluster yet. - if ( - not self._patroni.member_started - or self.unit.name.replace("/", "-") not in self._patroni.cluster_members - or not self._patroni.is_replication_healthy() - ): - logger.debug( - "Instance not yet back in the cluster." - f" Retry {attempt.retry_state.attempt_number}/6" - ) - raise Exception() - except RetryError: - logger.debug( - "Did not allow next unit to refresh: member not ready or not joined the cluster yet" + # Wait until the database initialise. + self.set_unit_status( + WaitingStatus("waiting for database initialisation"), refresh=refresh ) - else: try: - self._patroni.set_max_timelines_history() - except Exception: - logger.warning("Unable to patch in max_timelines_history") + for attempt in Retrying(stop=stop_after_attempt(6), wait=wait_fixed(10)): + with attempt: + # Check if the member hasn't started or hasn't joined the cluster yet. + if ( + not self._patroni.member_started + or self.unit.name.replace("/", "-") + not in self._patroni.cluster_members + or not self._patroni.is_replication_healthy() + ): + logger.debug( + "Instance not yet back in the cluster." + f" Retry {attempt.retry_state.attempt_number}/6" + ) + raise Exception() + except RetryError: + logger.debug( + "Did not allow next unit to refresh: member not ready or not joined the cluster yet" + ) + else: + try: + self._patroni.set_max_timelines_history() + except Exception: + logger.warning("Unable to patch in max_timelines_history") + refresh.next_unit_allowed_to_refresh = True + else: + install_service() refresh.next_unit_allowed_to_refresh = True def set_unit_status( @@ -479,7 +626,7 @@ def set_unit_status( self.unit.status = status def _reconcile_refresh_status(self, _=None): - if self.unit.is_leader(): + if not self.is_watcher_role and self.unit.is_leader(): self.async_replication.set_app_status() # Workaround for other unit statuses being set in a stateful way (i.e. unable to recompute @@ -499,7 +646,7 @@ def _reconcile_refresh_status(self, _=None): ): self.unit.status = refresh_status new_refresh_unit_status = refresh_status.message - else: + elif not self.is_watcher_role: # Clear refresh status from unit status self._set_primary_status_message() elif ( @@ -756,7 +903,7 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: # checked for none in the early exit method departing_member = event.departing_unit.name.replace("/", "-") # type: ignore if member_ip := self._patroni.get_member_ip(departing_member): - self._patroni.remove_raft_member(member_ip) + self._patroni.remove_raft_member(f"{member_ip}:{RAFT_PORT}") except RemoveRaftMemberFailedError: logger.debug( "Deferring on_peer_relation_departed: Failed to remove member from raft cluster" @@ -900,6 +1047,10 @@ def _raft_reinitialisation(self) -> None: self._patroni.remove_raft_data() logger.info(f"Stopping {self.unit.name}") self.unit_peer_data["raft_stopped"] = "True" + self.watcher_offer.disable_watcher() + if self.watcher_offer.is_active: + logger.info("waiting for RAFT watcher to disconnect.") + return if self.unit.is_leader(): self._stuck_raft_cluster_stopped_check() @@ -1000,16 +1151,56 @@ def _on_peer_relation_changed(self, event: HookEvent): event.defer() return + # In Raft mode with a watcher, ensure this member is properly registered in the DCS. + # A new member may be running but not registered if it was added to Raft after starting. + if ( + self.watcher_offer.is_watcher_connected + and not self._patroni.is_member_registered_in_cluster() + ): + logger.info("Member running but not registered in Raft cluster - restarting Patroni") + self._patroni.restart_patroni() + event.defer() + return + self._start_stop_pgbackrest_service(event) - # This is intended to be executed only when leader is reinitializing S3 connection due to the leader change. + if not self._handle_s3_initialization(event): + return + + # Update watcher relation with fresh peer IPs when peer data changes + # This ensures pg-endpoints stay current when unit IPs change + if self.unit.is_leader(): + self.watcher_offer.update_endpoints() + + self._update_new_unit_status() + + def _on_secret_changed(self, event: SecretChangedEvent) -> None: + """Handle the secret_changed event.""" + if not self.unit.is_leader(): + return + + if (admin_secret_id := self.config.system_users) and admin_secret_id == event.secret.id: + try: + self._update_admin_password(admin_secret_id) + except PostgreSQLUpdateUserPasswordError: + event.defer() + + # Split off into separate function, because of complexity _on_peer_relation_changed + def _handle_s3_initialization(self, event: HookEvent) -> bool: + """Handle S3 initialization during peer relation changes. + + Returns: + True if processing should continue, False if we should return early. + """ + # This is intended to be executed only when leader is reinitializing S3 connection + # due to the leader change. if ( "s3-initialization-start" in self.app_peer_data and "s3-initialization-done" not in self.unit_peer_data and self.is_primary and not self.backup._on_s3_credential_changed_primary(event) ): - return + return False # Clean-up unit initialization data after successful sync to the leader. if "s3-initialization-done" in self.app_peer_data and not self.unit.is_leader(): @@ -1020,18 +1211,7 @@ def _on_peer_relation_changed(self, event: HookEvent): "s3-initialization-start": "", }) - self._update_new_unit_status() - - def _on_secret_changed(self, event: SecretChangedEvent) -> None: - """Handle the secret_changed event.""" - if not self.unit.is_leader(): - return - - if (admin_secret_id := self.config.system_users) and admin_secret_id == event.secret.id: - try: - self._update_admin_password(admin_secret_id) - except PostgreSQLUpdateUserPasswordError: - event.defer() + return True # Split off into separate function, because of complexity _on_peer_relation_changed def _start_stop_pgbackrest_service(self, event: HookEvent) -> None: @@ -1058,6 +1238,8 @@ def _update_new_unit_status(self) -> None: if self.primary_endpoint: self._update_relation_endpoints() self.async_replication.handle_read_only_mode() + # Update watcher relation with current cluster endpoints + self.watcher_offer.update_endpoints() else: self.set_unit_status(WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)) @@ -1110,6 +1292,10 @@ def _update_member_ip(self) -> bool: self.unit_peer_data.update({"ip": current_ip}) self._patroni.stop_patroni() self._update_certificate() + # Update watcher relation - unit address for all units, endpoints only for leader + self.watcher_offer.update_unit_address() + if self.unit.is_leader(): + self.watcher_offer.update_endpoints() return True else: self.unit_peer_data.update({"ip-to-remove": ""}) @@ -1427,6 +1613,10 @@ def _on_install(self, event: InstallEvent) -> None: def _on_leader_elected(self, event: LeaderElectedEvent) -> None: # noqa: C901 """Handle the leader-elected event.""" + # Persist and validate role + if not self._validate_role_unchanged(): + return + # consider configured system user passwords system_user_passwords = {} if admin_secret_id := self.config.system_users: @@ -1492,6 +1682,10 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: # noqa: C901 def _on_config_changed(self, event) -> None: # noqa: C901 """Handle configuration changes, like enabling plugins.""" + # Block if role was changed after deployment + if not self._validate_role_unchanged(): + return + if not self._peers: # update endpoint addresses logger.debug("Defer on_config_changed: no peer relation") @@ -2026,6 +2220,12 @@ def _on_update_status(self, _) -> None: # Restart topology observer if it is gone self._observer.start_observer() + # Keep this unit data current for watcher AZ/IP checks. + self.watcher_offer.update_unit_address() + + # Ensure watcher is in Raft cluster (handles cases where relation events weren't delivered) + self.watcher_offer.ensure_watcher_in_raft() + if self.unit.is_leader() and "refresh_remove_trigger" not in self.app_peer_data: self.postgresql.drop_hba_triggers() self.app_peer_data["refresh_remove_trigger"] = "True" diff --git a/src/cluster.py b/src/cluster.py index 24ffaaea1f6..2374b9f0d9c 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -53,6 +53,7 @@ POSTGRESQL_CONF_PATH, POSTGRESQL_DATA_PATH, POSTGRESQL_LOGS_PATH, + RAFT_PARTNER_PREFIX, TLS_CA_BUNDLE_FILE, ) from utils import _change_owner, label2name, parallel_patroni_get_request, render_file @@ -529,6 +530,28 @@ def is_member_isolated(self) -> bool: return len(r.json()["members"]) == 0 + def is_member_registered_in_cluster(self) -> bool: + """Check if this member is registered in the Raft DCS cluster. + + In Raft mode, a new member may be running and replicating but not yet + registered in the DCS if it hasn't been added to the Raft cluster. + + Returns: + True if this member appears in the /cluster endpoint, False otherwise. + """ + try: + cluster_status = self.cluster_status() + except RetryError: + logger.debug("Could not get cluster status to check member registration") + return False + + if not cluster_status: + return False + + # Check if this member's name appears in the cluster members list + member_name = self.member_name + return any(member.get("name") == member_name for member in cluster_status) + def online_cluster_members(self) -> list[ClusterMember]: """Return list of online cluster members.""" try: @@ -681,6 +704,9 @@ def render_patroni_yml_file( user_databases_map=user_databases_map, slots=slots, instance_password_encryption=self.charm.config.instance_password_encryption, + watcher=self.charm.watcher_offer.watcher_raft_address + if self.charm.watcher_offer.is_active + else None, ) render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) @@ -880,7 +906,7 @@ def get_running_cluster_members(self) -> list[str]: except Exception: return [] - def remove_raft_member(self, member_ip: str) -> None: + def remove_raft_member(self, member_address: str | None) -> None: """Remove a member from the raft cluster. The raft cluster is a different cluster from the Patroni cluster. @@ -891,6 +917,10 @@ def remove_raft_member(self, member_ip: str) -> None: RaftMemberNotFoundError: if the member to be removed is not part of the raft cluster. """ + if not member_address: + logger.debug("Remove raft member: No address provided") + return + if self.charm.has_raft_keys(): logger.debug("Remove raft member: Raft already in recovery") return @@ -909,12 +939,12 @@ def remove_raft_member(self, member_ip: str) -> None: raise RemoveRaftMemberFailedError() from None # Check whether the member is still part of the raft cluster. - if not member_ip or f"partner_node_status_server_{member_ip}:2222" not in raft_status: + if f"{RAFT_PARTNER_PREFIX}{member_address}" not in raft_status: return # If there's no quorum and the leader left raft cluster is stuck if not raft_status["has_quorum"] and ( - not raft_status["leader"] or raft_status["leader"].host == member_ip + not raft_status["leader"] or raft_status["leader"].address == member_address ): self.charm.set_unit_status( BlockedStatus("Raft majority loss, run: promote-to-primary") @@ -932,10 +962,9 @@ def remove_raft_member(self, member_ip: str) -> None: ) return - # Suppressing since the call will be removed soon # Remove the member from the raft cluster. try: - result = syncobj_util.executeCommand(raft_host, ["remove", f"{member_ip}:2222"]) + result = syncobj_util.executeCommand(raft_host, ["remove", member_address]) except UtilityException: logger.debug("Remove raft member: Remove call failed") raise RemoveRaftMemberFailedError() from None diff --git a/src/constants.py b/src/constants.py index 2d0bd51d9ec..78c429f733d 100644 --- a/src/constants.py +++ b/src/constants.py @@ -92,6 +92,7 @@ WATCHER_SECRET_LABEL = "watcher-secret" # noqa: S105 RAFT_PORT = 2222 +RAFT_PARTNER_PREFIX = "partner_node_status_server_" BACKUP_TYPE_OVERRIDES = {"full": "full", "differential": "diff", "incremental": "incr"} PLUGIN_OVERRIDES = {"audit": "pgaudit", "uuid_ossp": '"uuid-ossp"'} diff --git a/src/raft_controller.py b/src/raft_controller.py index ad009d97ae0..33ed6af35ee 100644 --- a/src/raft_controller.py +++ b/src/raft_controller.py @@ -18,7 +18,7 @@ import logging from contextlib import suppress -from ipaddress import IPv4Address +from ipaddress import ip_address from shutil import rmtree from typing import TYPE_CHECKING, TypedDict @@ -38,7 +38,7 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from cluster import ClusterMember -from constants import PATRONI_CLUSTER_STATUS_ENDPOINT +from constants import PATRONI_CLUSTER_STATUS_ENDPOINT, RAFT_PARTNER_PREFIX, RAFT_PORT from utils import create_directory, parallel_patroni_get_request, render_file if TYPE_CHECKING: @@ -74,7 +74,7 @@ class ClusterStatus(TypedDict): members: list[str] -def install_service() -> bool: +def install_service() -> None: """Install the systemd template service for the Raft controller. Returns: @@ -87,14 +87,8 @@ def install_service() -> bool: render_file(SERVICE_FILE, rendered, 0o644, change_owner=False) # Reload systemd to pick up the new service - try: - daemon_reload() - logger.info(f"Installed systemd service {SERVICE_FILE}") - except SystemdError as e: - logger.error(f"Failed to reload systemd: {e}") - return False - - return True + daemon_reload() + logger.info(f"Installed systemd service {SERVICE_FILE}") class RaftController: @@ -159,13 +153,13 @@ def configure( # Validate addresses to prevent injection into the systemd unit file try: - IPv4Address(self_addr) + ip_address(self_addr) except Exception: logger.error(f"Invalid self_addr format: {self_addr}") return False try: for addr in partner_addrs: - IPv4Address(addr) + ip_address(addr) except Exception: logger.error(f"Invalid partner address format: {addr}") return False @@ -238,7 +232,8 @@ def remove_service(self) -> bool: return False try: - rmtree(self.data_dir) + with suppress(FileNotFoundError): + rmtree(self.data_dir) except Exception as e: logger.error(f"Failed to remove Raft controller directory: {e}") return False @@ -252,6 +247,7 @@ def restart(self) -> bool: True if restarted successfully, False otherwise. """ try: + service_enable(self.service_name) service_restart(self.service_name) logger.info(f"Restarted Raft controller service {self.service_name}") return True @@ -259,6 +255,90 @@ def restart(self) -> bool: logger.error(f"Failed to restart Raft controller: {e}") return False + def get_stale_watchers( + self, member_address: str, raft_password: str, partner_addrs: list[str], port: int + ) -> list[str]: + """Collect stale watcher raft members.""" + port_postfix = str(port) + watcher_addr = f"{member_address}:{port}" + watcher_key = f"{RAFT_PARTNER_PREFIX}{watcher_addr}" + + # Get the status of the raft cluster. + syncobj_util = TcpUtility(password=raft_password, timeout=3) + + stale_addrs = [] + addrs = [watcher_addr, *[f"{addr}:{RAFT_PORT}" for addr in partner_addrs]] + for raft_host in addrs: + try: + raft_status = syncobj_util.executeCommand(raft_host, ["status"]) + except Exception as e: + logger.warning(f"Collect stale addrs: Cannot connect to raft cluster: {e}") + continue + if not raft_status: + logger.warning("Collect stale addrs: No raft status") + continue + for key in raft_status: + if ( + key.startswith(RAFT_PARTNER_PREFIX) + and key.endswith(port_postfix) + and key != watcher_key + ): + stale_addrs.append(key.split(RAFT_PARTNER_PREFIX)[-1]) + return stale_addrs + logger.warning("Collect stale addrs: No member available") + return stale_addrs + + def remove_raft_member( + self, member_address: str, raft_password: str, partner_addrs: list[str] + ) -> None: + """Remove a member from the raft cluster. + + The raft cluster is a different cluster from the Patroni cluster. + It is responsible for defining which Patroni member can update + the primary member in the DCS. + + Raises: + RaftMemberNotFoundError: if the member to be removed + is not part of the raft cluster. + """ + if not member_address: + logger.debug("Remove raft member: No address provided") + return + + # Get the status of the raft cluster. + syncobj_util = TcpUtility(password=raft_password, timeout=3) + + for raft_host in [f"{addr}:{RAFT_PORT}" for addr in partner_addrs]: + try: + raft_status = syncobj_util.executeCommand(raft_host, ["status"]) + except Exception as e: + logger.warning(f"Remove raft watcher: Cannot connect to raft cluster: {e}") + continue + if not raft_status: + logger.warning("Remove raft watcher: No raft status") + continue + + # Check whether the member is still part of the raft cluster. + if f"{RAFT_PARTNER_PREFIX}{member_address}" not in raft_status: + return + + # If there's no quorum and the leader left raft cluster is stuck + if not raft_status["has_quorum"] or not raft_status["leader"]: + logger.warning("Remove raft watcher: No quorum or leader") + continue + + # Remove the member from the raft cluster. + try: + result = syncobj_util.executeCommand(raft_host, ["remove", member_address]) + except Exception as e: + logger.debug(f"Remove raft watcher: Remove call failed {e}") + continue + + if not result or not result.startswith("SUCCESS"): + logger.debug(f"Remove raft watcher: Remove call not successful with {result}") + continue + return + def get_status(self, self_port: int, password: str | None) -> ClusterStatus: """Get the Raft controller status. @@ -288,11 +368,10 @@ def get_status(self, self_port: int, password: str | None) -> ClusterStatus: ) # Extract member addresses from partner_node_status_server_* keys - prefix = "partner_node_status_server_" members: list[str] = [str(raft_status["self"])] for key in raft_status: - if key.startswith(prefix): - members.append(key[len(prefix) :]) + if key.startswith(RAFT_PARTNER_PREFIX): + members.append(key[len(RAFT_PARTNER_PREFIX) :]) status["members"] = sorted(members) return status except Exception as e: diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index b590f5442ee..f1e45230dc3 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -405,6 +405,7 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: for unit in {*self.charm._peers.units, self.charm.unit} # type: ignore ): self.charm.app_peer_data.update({"cluster_initialised": "True"}) + self.charm.watcher_offer.enable_watcher() elif self._is_following_promoted_cluster(): self.charm.set_unit_status( WaitingStatus("Waiting for the database to be started in all units") @@ -541,10 +542,14 @@ def _on_async_relation_broken(self, _) -> None: self.charm.app_peer_data.update({"promoted-cluster-counter": ""}) self.charm.update_config() + if self.charm.unit.is_leader(): + self.charm.watcher_offer.update_endpoints() + def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: """Update the Patroni configuration if one of the clusters was already promoted.""" if self.charm.unit.is_leader(): self.set_app_status() + self.charm.watcher_offer.update_endpoints() primary_cluster = self._get_primary_cluster() logger.debug("Primary cluster: %s", primary_cluster) @@ -604,6 +609,9 @@ def _on_async_relation_joined(self, _) -> None: "unit-promoted-cluster-counter": highest_promoted_cluster_counter }) + if self.charm.unit.is_leader(): + self.charm.watcher_offer.update_endpoints() + def _on_create_replication(self, event: ActionEvent) -> None: """Set up asynchronous replication between two clusters.""" if self._get_primary_cluster() is not None: @@ -757,6 +765,7 @@ def _stop_database(self, event: RelationChangedEvent) -> bool: if not self.charm.unit.is_leader() and not os.path.exists(POSTGRESQL_DATA_PATH): logger.debug("Early exit on_async_relation_changed: following promoted cluster.") return False + self.charm.watcher_offer.disable_watcher() try: for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): diff --git a/src/relations/watcher.py b/src/relations/watcher.py new file mode 100644 index 00000000000..aae0d2b5f5f --- /dev/null +++ b/src/relations/watcher.py @@ -0,0 +1,589 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""PostgreSQL Watcher Relation implementation. + +This module handles the relation between the PostgreSQL charm and a watcher/witness charm +that participates in the Raft consensus for stereo mode (2-node PostgreSQL clusters). + +The watcher provides quorum without storing data, enabling automatic failover +when one of the two PostgreSQL nodes becomes unavailable. +""" + +import contextlib +import json +import logging +import os +from functools import cached_property +from typing import TYPE_CHECKING + +from ops import ( + Object, + Relation, + RelationBrokenEvent, + RelationChangedEvent, + RelationJoinedEvent, + Secret, + SecretNotFoundError, +) +from pysyncobj.utility import TcpUtility + +from constants import ( + RAFT_PARTNER_PREFIX, + RAFT_PASSWORD_KEY, + RAFT_PORT, + REPLICATION_CONSUMER_RELATION, + REPLICATION_OFFER_RELATION, + WATCHER_OFFER_RELATION, + WATCHER_PASSWORD_KEY, + WATCHER_SECRET_LABEL, + WATCHER_USER, +) +from utils import new_password + +if TYPE_CHECKING: + from charm import PostgresqlOperatorCharm + +logger = logging.getLogger(__name__) + + +class PostgreSQLWatcherRelation(Object): + """Handles the watcher relation for stereo mode support.""" + + def __init__(self, charm: "PostgresqlOperatorCharm"): + """Initialize the watcher relation handler. + + Args: + charm: The PostgreSQL operator charm instance. + """ + super().__init__(charm, WATCHER_OFFER_RELATION) + self.charm = charm + + self.framework.observe( + self.charm.on[WATCHER_OFFER_RELATION].relation_joined, + self._on_watcher_relation_joined, + ) + self.framework.observe( + self.charm.on[WATCHER_OFFER_RELATION].relation_changed, + self._on_watcher_relation_changed, + ) + self.framework.observe( + self.charm.on[WATCHER_OFFER_RELATION].relation_broken, + self._on_watcher_relation_broken, + ) + + @cached_property + def _relation(self) -> Relation | None: + """Return the watcher relation if it exists.""" + return self.model.get_relation(WATCHER_OFFER_RELATION) + + @property + def is_watcher_connected(self) -> bool: + """Check if a watcher is connected to this cluster. + + Returns: + True if a watcher is connected, False otherwise. + """ + try: + syncobj_util = TcpUtility(password=self.charm._patroni.raft_password, timeout=3) + raft_status = syncobj_util.executeCommand(f"127.0.0.1:{RAFT_PORT}", ["status"]) + if raft_status: + # Check if watcher is in the partner_node_status entries + member_key = f"{RAFT_PARTNER_PREFIX}{self.watcher_raft_address}" + return member_key in raft_status + except Exception as e: + logger.debug(f"Error checking Raft membership: {e}") + return False + + def enable_watcher(self) -> None: + """Clear up disable flag.""" + if not self._relation or not self.charm.unit.is_leader(): + return None + + self._relation.data[self.charm.app].pop("disable-watcher", None) + self.update_watcher_secret() + + def disable_watcher(self) -> None: + """Inform watcher to stop service.""" + if not self._relation or not self.charm.unit.is_leader(): + return None + + self._relation.data[self.charm.app].update({"disable-watcher": "True"}) + try: + self.charm._patroni.remove_raft_member(self.watcher_raft_address) + except Exception as e: + logger.warning(f"Error remove Raft watcher: {e}") + + @cached_property + def is_active(self) -> bool: + """Check if the watcher should be added to peers.""" + if not self._relation: + return False + + return self._relation.data[self._relation.app].get("raft-status") == "connected" + + @cached_property + def watcher_raft_address(self) -> str | None: + """Return the watcher's Raft address for inclusion in partner_addrs. + + Returns: + The watcher's Raft address (ip:port), or None if not available. + """ + if not self._relation: + return None + + unit_address = None + port = None + # Get the watcher unit address from the relation data + for unit in self._relation.units: + if unit_address := self._relation.data[unit].get("unit-address"): + break + port_str = self._relation.data[self._relation.app].get("watcher-raft-port") + if port_str: + try: + port = int(port_str) + except ValueError: + logger.warning(f"Invalid watcher-raft-port value: {port_str}") + + if unit_address and port is not None: + return f"{unit_address}:{port}" + return None + + def _on_watcher_relation_joined(self, event: RelationJoinedEvent) -> None: + """Handle a new watcher joining the relation. + + Shares cluster information including Raft password and PostgreSQL endpoints + with the watcher charm. + + Args: + event: The relation joined event. + """ + # Every unit should publish its own per-unit data. + self.update_unit_address(event.relation) + + if not self.charm.unit.is_leader(): + return + + logger.info("Watcher relation joined, sharing cluster information") + + # Ensure watcher user exists before creating the secret, + # so both raft-password and watcher-password are included from the start + watcher_pw = self._ensure_watcher_user() + + # Create or get the watcher secret containing Raft password and watcher password + secret = self._get_or_create_watcher_secret(watcher_password=watcher_pw) + if secret is None: + logger.warning("Failed to create watcher secret, deferring event") + event.defer() + return + + # Grant the secret to the watcher application + try: + secret.grant(event.relation) + except Exception as e: + logger.warning(f"Failed to grant secret to watcher: {e}") + + # Update relation data with cluster information + self._update_relation_data(event.relation) + + def _on_watcher_relation_changed(self, event: RelationChangedEvent) -> None: + """Handle watcher relation data changes. + + Updates Patroni configuration to include the watcher in the Raft cluster. + + Args: + event: The relation changed event. + """ + # Keep this unit's relation data current on every relation-changed hook. + self.update_unit_address(event.relation) + + if not self.charm.is_cluster_initialised: + logger.debug("Cluster not initialized, deferring watcher relation changed") + event.defer() + return + + watcher_address = None + for unit in event.relation.units: + if unit_address := event.relation.data[unit].get("unit-address"): + watcher_address = unit_address + break + + if watcher_address: + logger.info(f"Watcher address updated: {watcher_address}") + # Only the leader handles Raft membership changes and user management + # to avoid race conditions between multiple PostgreSQL units + if self.charm.unit.is_leader(): + self._cleanup_old_watcher_from_raft() + self._ensure_watcher_user() + # Update Patroni configuration to include watcher in Raft + self.charm.update_config() + + # Update relation data for the watcher + if self.charm.unit.is_leader(): + self._update_relation_data(event.relation) + + def _cleanup_old_watcher_from_raft(self) -> None: + """Remove any old watcher IPs from Raft that differ from the current watcher. + + When a watcher unit is replaced (e.g., destroyed and re-deployed), it gets + a new IP address. The old IP remains in the Raft cluster membership, which + prevents the new watcher from being recognized as a valid cluster member. + This method finds and removes any such stale watcher entries. + + Args: + current_watcher_address: The current watcher's IP address. + """ + # Get all PostgreSQL unit IPs (these should stay in the cluster) + # Use _units_ips for fresh IPs from unit relation data + pg_ips = set(self.charm._units_ips) + port_postfix = str(RAFT_PORT) + + # Get Raft cluster status to find all members + try: + syncobj_util = TcpUtility(password=self.charm._patroni.raft_password, timeout=3) + if raft_status := syncobj_util.executeCommand(f"127.0.0.1:{RAFT_PORT}", ["status"]): + # Find all partner nodes in the Raft cluster + # Keys look like: partner_node_status_server_10.131.50.142:2222 + stale_members: list[str] = [] + for key in raft_status: + if ( + key.startswith(RAFT_PARTNER_PREFIX) + and not key.endswith(port_postfix) + and raft_status[key] != 2 + ): + member_addr = key.replace(RAFT_PARTNER_PREFIX, "") + member_ip = member_addr.split(":")[0] + + # Check if this is a stale watcher (not a PostgreSQL node and not current watcher) + if member_ip not in pg_ips and member_addr != self.watcher_raft_address: + stale_members.append(member_addr) + + # Remove stale watcher members + for stale_addr in stale_members: + logger.info(f"Removing stale watcher from Raft cluster: {stale_addr}") + self._remove_watcher_from_raft(stale_addr) + except Exception as e: + logger.debug(f"Error during Raft cleanup: {e}") + + def _on_watcher_relation_broken(self, event: RelationBrokenEvent) -> None: + """Handle watcher relation being broken. + + Updates Patroni configuration to remove the watcher from the Raft cluster. + + Args: + event: The relation broken event. + """ + logger.info("Watcher relation broken, updating Patroni configuration") + + if not self.charm.is_cluster_initialised: + return + + self._cleanup_old_watcher_from_raft() + # Update Patroni configuration without the watcher + self.charm.update_config() + + def _remove_watcher_from_raft(self, watcher_address: str) -> None: + """Remove the watcher from the Raft cluster. + + This is critical for maintaining correct quorum calculations. If a dead + watcher remains in the cluster membership, it counts toward the total + node count, making it harder to achieve quorum. + + Args: + watcher_address: The watcher's IP address. + """ + if self.watcher_raft_address: + logger.info(f"Removing watcher from Raft cluster: {watcher_address}") + self.charm._patroni.remove_raft_member(watcher_address) + + if self.charm.is_cluster_initialised: + self.charm.update_config() + + def _ensure_watcher_user(self) -> str | None: + """Ensure the watcher PostgreSQL user exists for health checks. + + Creates the watcher user if it doesn't exist, and updates the watcher + secret with the password so the watcher charm can authenticate. + + Returns: + The watcher password, or None if user creation failed. + """ + if not self.charm.is_cluster_initialised: + logger.debug("Cluster not initialized, cannot create watcher user") + return None + + try: + users = self.charm.postgresql.list_users() + if WATCHER_USER in users: + logger.debug(f"User {WATCHER_USER} already exists") + # Get existing password from secret if available + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + content = secret.get_content(refresh=True) + existing_pw = content.get(WATCHER_PASSWORD_KEY) + if existing_pw: + return existing_pw + # Password not in secret — fall through to regenerate + except SecretNotFoundError: + # Secret doesn't exist yet, will be created below with new password + pass + + # Generate a password for the watcher user + watcher_password = new_password() + + # Create the watcher user (minimal privileges - only needs to connect and run SELECT 1) + if WATCHER_USER not in users: + logger.info(f"Creating PostgreSQL user: {WATCHER_USER}") + self.charm.postgresql.create_user(WATCHER_USER, watcher_password) + else: + # User exists but we don't have the password, update it + logger.info(f"Updating password for PostgreSQL user: {WATCHER_USER}") + self.charm.postgresql.update_user_password(WATCHER_USER, watcher_password) + + # Grant connect privilege on postgres database (for health checks) + self.charm.postgresql.grant_database_privileges_to_user( + WATCHER_USER, "postgres", ["connect"] + ) + + # Update the secret to include the watcher password + self._update_watcher_secret_with_password(watcher_password) + + return watcher_password + + except Exception as e: + logger.error(f"Failed to ensure watcher user: {e}") + return None + + def _update_watcher_secret_with_password(self, watcher_password: str) -> None: + """Update the watcher secret to include the watcher password. + + Args: + watcher_password: The password for the watcher PostgreSQL user. + """ + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + content = secret.get_content(refresh=True) + content[WATCHER_PASSWORD_KEY] = watcher_password + secret.set_content(content) + logger.info("Updated watcher secret with watcher password") + except SecretNotFoundError: + logger.warning( + "Watcher secret not found, password change cannot be propagated to watcher. " + "It will be synced on next relation-changed event." + ) + except Exception as e: + logger.error(f"Failed to update watcher secret with password: {e}") + + def _get_existing_watcher_password(self) -> str | None: + """Get the watcher password from an existing secret if available.""" + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + content = secret.get_content(refresh=True) + return content.get(WATCHER_PASSWORD_KEY) + except SecretNotFoundError: + return None + except Exception as e: + logger.debug(f"Failed to get existing watcher password: {e}") + return None + + def _get_or_create_watcher_secret(self, watcher_password: str | None = None) -> Secret | None: + """Get or create the secret for sharing Raft credentials with the watcher. + + Args: + watcher_password: Optional watcher password to include in the secret. + + Returns: + The Juju secret containing Raft password, or None if creation failed. + """ + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + logger.debug("Found existing watcher secret") + return secret + except SecretNotFoundError: + logger.debug("No existing watcher secret found, creating new one") + + # Get the Raft password from the internal secret + try: + raft_password = self.charm._patroni.raft_password + except Exception as e: + logger.warning(f"Error getting raft_password: {e}") + raft_password = None + + if not raft_password: + logger.warning("Raft password not available, cannot create secret") + return None + + # Create a new secret with the Raft password (and watcher password if available) + try: + content = {RAFT_PASSWORD_KEY: raft_password} + # Include watcher password if provided, or look it up from existing secret + watcher_pw = watcher_password or self._get_existing_watcher_password() + if watcher_pw: + content[WATCHER_PASSWORD_KEY] = watcher_pw + secret = self.charm.model.app.add_secret( + content=content, + label=WATCHER_SECRET_LABEL, + ) + logger.info("Created watcher secret") + return secret + except Exception as e: + logger.error(f"Failed to create watcher secret: {e}") + return None + + def _update_relation_data(self, relation: Relation) -> None: + """Update the relation data with cluster information. + + Args: + relation: The watcher relation. + """ + if not self.charm.unit.is_leader(): + return + + # Get the secret ID for sharing + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + secret_id = secret.id + if not secret_id: + # When a secret is retrieved by label, the ops library may lazily load the ID. + # Calling get_info() forces it to resolve. + secret_id = secret.get_info().id + if secret_id is None: + logger.warning("Watcher secret has no ID") + return + # Ensure the secret is granted to the watcher relation (handles + # cases where the secret was recreated after initial relation_joined) + with contextlib.suppress(Exception): + secret.grant(relation) + except SecretNotFoundError: + logger.warning("Watcher secret not found") + return + except Exception as e: + logger.error(f"Error getting secret: {e}") + return + + # Collect PostgreSQL unit endpoints using fresh IPs from unit relation data. + # _units_ips reads directly from unit relation data (always fresh), while + # _peer_members_ips reads from app peer data (may be stale after network disruptions). + pg_endpoints: list[str] = sorted(self.charm._units_ips) + if not pg_endpoints: + logger.warning("No PostgreSQL endpoints available") + return + + # Update relation data + relation.data[self.charm.app].update({ + "cluster-name": self.charm.cluster_name, + "raft-secret-id": secret_id, + "version": self.charm._patroni.get_postgresql_version(), + "raft-partner-addrs": json.dumps(pg_endpoints), + "raft-port": str(RAFT_PORT), + "patroni-cas": self.charm.tls.get_peer_ca_bundle(), + "standby-clusters": json.dumps(self._get_standby_clusters()), + "tls-enabled": "true" if self.charm.is_tls_enabled else "false", + }) + self.update_watcher_secret() + + # Also share this unit's per-unit data. + self.update_unit_address(relation) + + def update_unit_address(self, relation: Relation | None = None) -> None: + """Update this unit's address in the watcher relation. + + Called when the unit's IP changes (e.g., after network isolation). + This updates unit-specific data in the relation, not application data. + Can be called by any unit, not just the leader. + """ + if relation is None: + relation = self._relation + + if not relation: + return + + unit_ip = self.charm._unit_ip + if unit_ip is None: + return + + changed = False + current_address = relation.data[self.charm.unit].get("unit-address") + if current_address != unit_ip: + logger.info( + f"Updating unit-address in watcher relation from {current_address} to {unit_ip}" + ) + relation.data[self.charm.unit]["unit-address"] = unit_ip + changed = True + + unit_az = os.environ.get("JUJU_AVAILABILITY_ZONE") + current_az = relation.data[self.charm.unit].get("unit-az") + if unit_az and current_az != unit_az: + relation.data[self.charm.unit]["unit-az"] = unit_az + changed = True + + if changed: + logger.debug("Updated watcher relation unit data") + + def update_endpoints(self) -> None: + """Update the watcher with current cluster endpoints. + + Called when cluster membership changes (peer joins/departs). + Also dynamically adds new PostgreSQL peers to the running Raft cluster. + """ + if self.charm.unit.is_leader() and (relation := self._relation): + self._update_relation_data(relation) + + def _get_standby_clusters(self) -> list[str]: + """Return the names of related standby clusters.""" + standby_clusters = [] + for relation in [ + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + ]: + if relation is None: + continue + # We are interested in the other side's application name + if relation.app and self.charm.async_replication.is_primary_cluster(): + standby_clusters.append(relation.app.name) + return sorted(set(standby_clusters)) + + def update_watcher_secret(self) -> None: + """Update the watcher secret with current Raft password. + + Called when credentials are rotated. Preserves existing secret content + (e.g., watcher-password) while updating the Raft password. + """ + if not self.charm.unit.is_leader(): + return + + try: + if raft_password := self.charm._patroni.raft_password: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + content = secret.get_content(refresh=True) + if content.get(RAFT_PASSWORD_KEY) != raft_password: + content[RAFT_PASSWORD_KEY] = raft_password + secret.set_content(content) + logger.info("Updated watcher secret with new Raft password") + except SecretNotFoundError: + logger.debug("Watcher secret not found, nothing to update") + + def ensure_watcher_in_raft(self) -> None: + """Ensure the connected watcher is in the Raft cluster and has fresh endpoint data. + + Called periodically from update_status to handle cases where Juju + relation events weren't delivered (e.g., when a watcher unit is replaced). + This method: + 1. Cleans up any stale watcher IPs from the Raft cluster + 2. Adds the current watcher to Raft if not present + 3. Updates the watcher relation data with fresh PostgreSQL IPs + + The last point is critical because after network disruptions that cause IP + changes, the watcher may have stale pg-endpoints and be unable to health + check the PostgreSQL nodes properly. + """ + if not self.charm.is_cluster_initialised or not self.is_active: + return + + # Only the leader handles Raft membership changes to avoid races + if self.charm.unit.is_leader(): + self._cleanup_old_watcher_from_raft() + + # Update watcher relation data with fresh PostgreSQL IPs + if relation := self._relation: + self._update_relation_data(relation) diff --git a/src/relations/watcher_requirer.py b/src/relations/watcher_requirer.py new file mode 100644 index 00000000000..5a922a0ec56 --- /dev/null +++ b/src/relations/watcher_requirer.py @@ -0,0 +1,524 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""PostgreSQL Watcher Requirer Relation implementation. + +This module handles the watcher (requirer) side of the relation, used when the +charm is deployed with role=watcher. It connects to one or more PostgreSQL +applications (which provide the watcher-offer relation) and participates in +Raft consensus as a lightweight witness for stereo mode (2-node clusters). + +Multi-cluster support: +- Each watcher relation gets its own RaftController instance +- Ports are assigned dynamically starting from RAFT_PORT (2223) and persisted + in a port allocation file at /var/snap/charmed-postgresql/common/watcher-raft/ports.json +- Each RaftController uses instance-specific data directories and systemd services +""" + +import json +import logging +import os +import typing +from datetime import datetime + +from charmlibs.systemd import service_running +from ops import ( + ActiveStatus, + BlockedStatus, + InstallEvent, + MaintenanceStatus, + Object, + Relation, + RelationBrokenEvent, + RelationChangedEvent, + RelationJoinedEvent, + SecretChangedEvent, + SecretNotFoundError, + StartEvent, + UpdateStatusEvent, + WaitingStatus, +) + +from constants import RAFT_PORT, WATCHER_RELATION +from raft_controller import RaftController, install_service + +if typing.TYPE_CHECKING: + from charm import PostgresqlOperatorCharm + +logger = logging.getLogger(__name__) + +SNAP_NAME = "charmed-postgresql" +SNAP_CHANNEL = "16/edge" + + +class WatcherRequirerHandler(Object): + """Handles the watcher requirer relation and watcher-mode lifecycle.""" + + def __init__(self, charm: "PostgresqlOperatorCharm"): + super().__init__(charm, WATCHER_RELATION) + self.charm = charm + + # Lifecycle events + self.framework.observe(self.charm.on.install, self._on_install) + self.framework.observe(self.charm.on.leader_elected, self._on_leader_elected) + self.framework.observe(self.charm.on.start, self._on_start) + self.framework.observe(self.charm.on.update_status, self._on_update_status) + + # Relation events + self.framework.observe( + self.charm.on[WATCHER_RELATION].relation_joined, + self._on_watcher_relation_joined, + ) + self.framework.observe( + self.charm.on[WATCHER_RELATION].relation_changed, + self._on_watcher_relation_changed, + ) + self.framework.observe( + self.charm.on.secret_changed, + self._on_watcher_relation_changed, + ) + self.framework.observe( + self.charm.on[WATCHER_RELATION].relation_broken, + self._on_watcher_relation_broken, + ) + + @property + def unit_ip(self) -> str | None: + """Return this unit's IP address.""" + if binding := self.model.get_binding(WATCHER_RELATION): + return str(binding.network.bind_address) + return None + + @property + def is_related(self) -> bool: + """Check if the watcher is related to any PostgreSQL cluster.""" + relations = self.model.relations.get(WATCHER_RELATION, []) + return len(relations) > 0 + + # -- Port allocation -- + + def _load_port_allocations(self) -> dict[str, int]: + """Load port allocations from persistent file. + + Returns: + Dictionary mapping relation_id (as string) to port number. + """ + if "port-allocations" in self.charm.app_peer_data: + return json.loads(self.charm.app_peer_data["port-allocations"]) + return {} + + def _save_port_allocations(self, allocations: dict[str, int]) -> None: + """Save port allocations to persistent file.""" + self.charm.app_peer_data["port-allocations"] = json.dumps(allocations) + + def _is_disabled(self, relation: Relation) -> bool: + """Is disabled flag set.""" + if not relation: + return False + return "disable-watcher" in relation.data[relation.app] + + def _get_port_for_relation(self, relation_id: int) -> int: + """Get or assign a port for a given relation ID. + + Args: + relation_id: The Juju relation ID. + + Returns: + The assigned port number. + """ + allocations = self._load_port_allocations() + key = str(relation_id) + + if key in allocations: + return allocations[key] + + # Assign next available port starting from RAFT_PORT + used_ports = set(allocations.values()) + port = RAFT_PORT + 1 + while port in used_ports: + port += 1 + + allocations[key] = port + self._save_port_allocations(allocations) + logger.info(f"Assigned port {port} to relation {relation_id}") + return port + + def _release_port_for_relation(self, relation_id: int) -> None: + """Release the port allocated for a relation. + + Args: + relation_id: The Juju relation ID. + """ + allocations = self._load_port_allocations() + key = str(relation_id) + if key in allocations: + port = allocations.pop(key) + self._save_port_allocations(allocations) + logger.info(f"Released port {port} from relation {relation_id}") + + # -- Per-relation helpers -- + + def _get_raft_password(self, relation: Relation) -> str | None: + """Get the Raft password from the relation secret. + + Args: + relation: The specific watcher relation. + """ + if not relation.app or not ( + secret_id := relation.data[relation.app].get("raft-secret-id") + ): + return None + + try: + secret = self.model.get_secret(id=secret_id) + content = secret.get_content(refresh=True) + return content.get("raft-password") + except SecretNotFoundError: + logger.warning(f"Secret {secret_id} not found") + return None + + def get_watcher_password(self, relation: Relation) -> str | None: + """Get the watcher PostgreSQL user password from the relation secret. + + Args: + relation: The specific watcher relation. + """ + if not relation.app or not ( + secret_id := relation.data[relation.app].get("raft-secret-id") + ): + return None + + try: + secret = self.model.get_secret(id=secret_id) + content = secret.get_content(refresh=True) + return content.get("watcher-password") + except SecretNotFoundError: + logger.warning(f"Secret {secret_id} not found") + return None + + def _get_raft_partner_addrs(self, relation: Relation) -> list[str]: + """Get Raft partner addresses from the relation. + + Args: + relation: The specific watcher relation. + """ + if not relation.app or not ( + raft_addrs_json := relation.data[relation.app].get("raft-partner-addrs") + ): + return [] + + try: + return json.loads(raft_addrs_json) + except json.JSONDecodeError: + logger.warning("Failed to parse raft-partner-addrs JSON") + return [] + + def _get_cluster_name(self, relation: Relation) -> str: + """Get the cluster name from the relation app data. + + Args: + relation: The specific watcher relation. + + Returns: + The cluster name, or a fallback label. + """ + if relation.app and (name := relation.data[relation.app].get("cluster-name")): + return name + return f"relation-{relation.id}" + + def _get_patroni_cas(self, relation: Relation) -> str | None: + if relation.app and (name := relation.data[relation.app].get("patroni-cas")): + return name + return f"relation-{relation.id}" + + def _get_standby_clusters(self, relation: Relation) -> list[str]: + """Get related standby clusters from the relation app data. + + Args: + relation: The specific watcher relation. + + Returns: + A list of standby cluster names. + """ + if not relation.app or not ( + standby_clusters_json := relation.data[relation.app].get("standby-clusters") + ): + return [] + + try: + return json.loads(standby_clusters_json) + except json.JSONDecodeError: + logger.warning("Failed to parse standby-clusters JSON") + return [] + + # -- Lifecycle events -- + + def _on_install(self, event: InstallEvent) -> None: + """Install prerequisites for the application.""" + logger.debug("Install start time: %s", datetime.now()) + + self.charm.set_unit_status(MaintenanceStatus("installing RAFT controller")) + + # Install the charmed PostgreSQL snap. + self.charm._install_snap_package(revision=None) + install_service() + + def _on_start(self, event: StartEvent) -> None: + """Handle start event in watcher mode.""" + if not self.is_related: + self.charm.unit.status = WaitingStatus("Waiting for relation to PostgreSQL") + return + # Don't set ActiveStatus here -- let _on_update_status promote to Active + # once Raft is actually connected + self.charm.unit.status = WaitingStatus("Starting Raft connection") + + def _on_leader_elected(self, _) -> None: + self._update_unit_address_if_changed() + + def _update_unit_address_if_changed(self) -> None: + """Update unit-address in relation data if IP has changed, for ALL relations.""" + if not (new_address := self.unit_ip): + return + + unit_az = os.environ.get("JUJU_AVAILABILITY_ZONE") + for relation in self.model.relations.get(WATCHER_RELATION, []): + current_address = relation.data[self.charm.unit].get("unit-address") + current_az = relation.data[self.charm.app].get("unit-az") + address_changed = current_address != new_address + az_changed = bool(unit_az and current_az != unit_az) + + if not address_changed and not az_changed: + continue + + if address_changed: + logger.info( + f"Unit IP changed from {current_address} to {new_address} " + f"in relation {relation.id}, updating relation data" + ) + relation.data[self.charm.unit]["unit-address"] = new_address + + if az_changed: + relation.data[self.charm.app]["unit-az"] = str(unit_az) + + if ( + address_changed + and (raft_password := self._get_raft_password(relation)) + and (partner_addrs := self._get_raft_partner_addrs(relation)) + ): + port = self._get_port_for_relation(relation.id) + raft_controller = RaftController(self.charm, f"rel{relation.id}") + changed = raft_controller.configure( + port, + new_address, + partner_addrs, + raft_password, + self._get_patroni_cas(relation), + ) + if changed and service_running(raft_controller.service_name): + logger.info( + f"Restarting Raft controller for relation {relation.id} due to IP change" + ) + raft_controller.restart() + for stale_addr in raft_controller.get_stale_watchers( + new_address, raft_password, partner_addrs, port + ): + raft_controller.remove_raft_member(stale_addr, raft_password, partner_addrs) + + def _on_update_status(self, event: UpdateStatusEvent) -> None: + """Handle update status event in watcher mode.""" + if not self.charm.unit.is_leader(): + if self.charm._peers and len(self.charm._peers.units) > 0: + self.charm.unit.status = BlockedStatus("Multiple watcher units. One expected.") + event.defer() + return + + if not (relations := self.model.relations.get(WATCHER_RELATION, [])): + self.charm.unit.status = WaitingStatus("Waiting for relation to PostgreSQL") + return + + self._update_unit_address_if_changed() + + connected_count = 0 + disabled = False + total_endpoints = 0 + az_warnings: list[str] = [] + info_warnings: list[str] = [] + + for relation in relations: + port = self._get_port_for_relation(relation.id) + password = self._get_raft_password(relation) + raft_controller = RaftController(self.charm, instance_id=f"rel{relation.id}") + raft_status = raft_controller.get_status(port, password) + disabled = self._is_disabled(relation) + connected_count += 1 if raft_status.get("connected") else 0 + + pg_endpoints = self._get_raft_partner_addrs(relation) + total_endpoints += len(pg_endpoints) + partner_addrs = self._get_raft_partner_addrs(relation) + + if password and not self._should_watcher_vote(partner_addrs): + cluster_name = self._get_cluster_name(relation) + raft_controller.remove_raft_member( + f"{self.unit_ip}:{port}", password, pg_endpoints + ) + info_warnings.append( + f"WARNING: cluster '{cluster_name}' has odd number units;" + " adding a watcher creates even Raft membership," + " which degrades partition tolerance" + ) + raft_controller.remove_service() + disabled = True + + az_warning = self._check_az_colocation(relation) + if az_warning: + az_warnings.append(az_warning) + + if connected_count == 0 and not disabled: + self.charm.unit.status = WaitingStatus("Connecting to Raft cluster") + return + + cluster_count = len(relations) + msg = ( + f"Raft connected, monitoring {total_endpoints} PostgreSQL endpoints" + if cluster_count == 1 + else ( + f"Raft connected to {connected_count}/{cluster_count} clusters, " + f"monitoring {total_endpoints} PostgreSQL endpoints" + ) + ) + + # AZ co-location blocks in production; odd-count warnings never block + if az_warnings and self.charm.config.profile == "production": + self.charm.unit.status = BlockedStatus("AZ co-location: " + "; ".join(az_warnings)) + return + + if all_warnings := az_warnings + info_warnings: + msg += "; " + "; ".join(all_warnings) + + self.charm.unit.status = ActiveStatus(msg) + + def _check_az_colocation(self, relation: Relation) -> str | None: + """Check if the watcher is in the same AZ as any PostgreSQL unit. + + Args: + relation: The specific watcher relation. + + Returns: + Warning message if co-located, None otherwise. + """ + if not (watcher_az := os.environ.get("JUJU_AVAILABILITY_ZONE")): + return None + + colocated_units = [] + for unit in relation.units: + unit_az = relation.data[unit].get("unit-az") + if unit_az and unit_az == watcher_az: + colocated_units.append(unit.name) + + if colocated_units: + return f"WARNING: watcher shares AZ '{watcher_az}' with {', '.join(colocated_units)}" + return None + + # -- Relation events -- + + def _on_watcher_relation_joined(self, event: RelationJoinedEvent) -> None: + """Handle watcher relation joined event.""" + if not self.charm.unit.is_leader(): + if self.charm._peers and len(self.charm._peers.units) > 0: + self.charm.unit.status = BlockedStatus("Multiple watcher units. One expected.") + event.defer() + return + + logger.info(f"Joined watcher relation {event.relation.id} with PostgreSQL cluster") + if unit_ip := self.unit_ip: + event.relation.data[self.charm.unit]["unit-address"] = unit_ip + unit_az = os.environ.get("JUJU_AVAILABILITY_ZONE") + if unit_az: + event.relation.data[self.charm.app]["unit-az"] = unit_az + + def _should_watcher_vote(self, partner_addrs: list[str]) -> bool: + pg_num = len(partner_addrs) + return pg_num < 3 or pg_num % 2 == 0 + + def _on_watcher_relation_changed( + self, event: RelationChangedEvent | SecretChangedEvent + ) -> None: + """Handle watcher relation changed event.""" + if not self.charm.unit.is_leader(): + return + + if self.charm._peers is None or not (unit_ip := self.unit_ip): + logger.debug("Deferring watcher relation: Peer relation not yet joined") + event.defer() + return + + relations = ( + [event.relation] + if isinstance(event, RelationChangedEvent) + else self.model.relations.get(WATCHER_RELATION, []) + ) + for relation in relations: + logger.info(f"Watcher relation {relation.id} data changed") + + if not (raft_password := self._get_raft_password(relation)) or not ( + partner_addrs := self._get_raft_partner_addrs(relation) + ): + logger.debug("Raft details are not yet available") + return + + # Get or assign a port for this relation + port = self._get_port_for_relation(relation.id) + + raft_controller = RaftController(self.charm, f"rel{relation.id}") + if self._is_disabled(relation) or not self._should_watcher_vote(partner_addrs): + logger.debug("Disabling the watcher") + raft_controller.remove_service() + raft_controller.remove_raft_member( + f"{self.unit_ip}:{port}", raft_password, partner_addrs + ) + relation.data[self.charm.app]["raft-status"] = "disabled" + return + + if raft_controller.configure( + port, unit_ip, partner_addrs, raft_password, self._get_patroni_cas(relation) + ): + logger.info( + f"Restarting Raft controller for relation {relation.id} to apply config changes" + ) + raft_controller.restart() + + relation.data[self.charm.unit]["unit-address"] = unit_ip + relation.data[self.charm.app]["watcher-raft-port"] = str(port) + if unit_az := os.environ.get("JUJU_AVAILABILITY_ZONE"): + relation.data[self.charm.app]["unit-az"] = unit_az + # Only set raft-status and ActiveStatus after verifying the service is running + if service_running(raft_controller.service_name): + relation.data[self.charm.app]["raft-status"] = "connected" + # Check AZ co-location and enforce based on profile + if ( + az_warning := self._check_az_colocation(relation) + ) and self.charm.config.profile == "production": + self.charm.unit.status = BlockedStatus(f"AZ co-location: {az_warning}") + else: + self.charm.unit.status = ActiveStatus() + else: + self.charm.unit.status = WaitingStatus("Raft controller not running") + + def _on_watcher_relation_broken(self, event: RelationBrokenEvent) -> None: + """Handle watcher relation broken event.""" + relation_id = event.relation.id + logger.info(f"Watcher relation {relation_id} broken") + + # Stop and clean up the Raft controller for this relation + controller = RaftController(self.charm, instance_id=f"rel{relation_id}") + controller.remove_service() + + # Release the port allocation + self._release_port_for_relation(relation_id) + + # Check if any relations remain + remaining = [ + r for r in self.model.relations.get(WATCHER_RELATION, []) if r.id != relation_id + ] + if not remaining: + self.charm.unit.status = WaitingStatus("Waiting for relation to PostgreSQL") diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index d17fad8fe56..9414bb1cf5b 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -37,12 +37,15 @@ raft: data_dir: {{ conf_path }}/raft self_addr: '{{ self_ip }}:2222' password: {{ raft_password }} - {% if partner_addrs -%} + {% if partner_addrs or watcher -%} partner_addrs: {% endif -%} {% for partner_addr in partner_addrs -%} - {{ partner_addr }}:2222 {% endfor %} + {%- if watcher %} + - {{ watcher }} + {% endif %} bootstrap: dcs: @@ -200,6 +203,10 @@ postgresql: {%- endif %} {%- endfor %} {%- endif %} + {%- if watcher_addr %} + # Allow watcher to connect for health checks + - {{ 'hostssl' if enable_tls else 'host' }} postgres watcher {{ watcher_addr }}/32 scram-sha-256 + {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 scram-sha-256 # Allow replications connections from other cluster members. {%- for endpoint in extra_replication_endpoints %} diff --git a/tests/integration/ha_tests/test_stereo_mode.py b/tests/integration/ha_tests/test_stereo_mode.py new file mode 100644 index 00000000000..0bb361e6bf6 --- /dev/null +++ b/tests/integration/ha_tests/test_stereo_mode.py @@ -0,0 +1,848 @@ +#!/usr/bin/env python3 +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Integration tests for PostgreSQL stereo mode with watcher. + +Tests the deployment and failover scenarios for 2-node PostgreSQL clusters +with a watcher/witness node for quorum. + +Test scenarios from acceptance criteria: +1. Replica shutdown: clients rerouted to primary, no significant outage +2. Primary shutdown: replica promoted, old primary becomes replica when healthy +3. Watcher shutdown: no service outage +4. Network isolation variants of above +""" + +import asyncio +import logging + +import pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed +from yaml import safe_load + +from ..helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, +) +from .helpers import APPLICATION_NAME as TEST_APP_NAME +from .helpers import ( + are_writes_increasing, + check_writes, + cut_network_from_unit, + cut_network_from_unit_without_ip_change, + get_cluster_roles, + get_primary, + restore_network_for_unit, + restore_network_for_unit_without_ip_change, +) + + +async def start_writes(ops_test: OpsTest) -> None: + """Start continuous writes to PostgreSQL (assumes relation already exists).""" + for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await ops_test.model + .applications[TEST_APP_NAME] + .units[0] + .run_action("start-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to create continuous_writes table" + + +logger = logging.getLogger(__name__) + + +async def verify_raft_cluster_health( + ops_test: OpsTest, + db_app_name: str, + watcher_app_name: str, + expected_members: int = 3, + check_watcher_ip: bool = True, +) -> None: + """Verify that the Raft cluster has the expected number of members and quorum. + + This function checks that all PostgreSQL units see the expected number of + Raft members (including the watcher) and have quorum. This is critical + after watcher re-deployment to ensure the cluster is properly formed. + + Args: + ops_test: The OpsTest instance. + db_app_name: The PostgreSQL application name. + watcher_app_name: The watcher application name. + expected_members: Expected number of Raft members (default 3 for stereo mode). + check_watcher_ip: Whether to verify the watcher IP in Raft status (default True). + Set to False after network isolation tests where watcher may have been + redeployed with a new IP that isn't yet in the Raft configuration. + + Raises: + AssertionError: If the Raft cluster is not healthy. + """ + logger.info(f"Verifying Raft cluster health with {expected_members} expected members") + + # Get watcher address for verification using juju exec to avoid cached IPs + watcher_unit = ops_test.model.applications[watcher_app_name].units[0] + return_code, watcher_ip, _ = await ops_test.juju( + "exec", "--unit", watcher_unit.name, "--", "unit-get", "private-address" + ) + assert return_code == 0, f"Failed to get watcher address from {watcher_unit.name}" + watcher_ip = watcher_ip.strip() + + for attempt in Retrying(stop=stop_after_delay(360), wait=wait_fixed(10), reraise=True): + with attempt: + for unit in ops_test.model.applications[db_app_name].units: + # Get the Raft password from Patroni config using juju exec directly + # We need to avoid shell interpretation issues with run_command_on_unit + complete_command = [ + "exec", + "--unit", + unit.name, + "--", + "cat", + "/var/snap/charmed-postgresql/current/etc/patroni/patroni.yaml", + ] + return_code, stdout, _ = await ops_test.juju(*complete_command) + assert return_code == 0, f"Failed to read patroni.yaml on {unit.name}" + + conf = safe_load(stdout) + password = conf.get("raft", {}).get("password") + assert password, f"Could not find Raft password in patroni.yaml on {unit.name}" + + # Check Raft status using the password via juju exec directly + complete_command = [ + "exec", + "--unit", + unit.name, + "--", + "charmed-postgresql.syncobj-admin", + "-conn", + conf["raft"]["self_addr"], + "-pass", + password, + "-status", + ] + return_code, output, _ = await ops_test.juju(*complete_command) + if return_code != 0: + logger.warning(f"Raft status check failed on {unit.name}: {output}") + raise AssertionError(f"Raft status check failed on {unit.name}") + logger.info(f"Raft status on {unit.name}: {output[:200]}...") + + # Verify quorum + assert "has_quorum: True" in output or "has_quorum:True" in output, ( + f"Unit {unit.name} does not have Raft quorum" + ) + + # Verify watcher is in the cluster (if requested) + # After network isolation tests, the watcher may have been redeployed + # with a new IP that isn't yet updated in the Raft configuration + if check_watcher_ip: + assert watcher_ip in output, ( + f"Watcher {watcher_ip} not found in Raft cluster on {unit.name}\n" + f"Raft output: {output}" + ) + + logger.info("Raft cluster health verified successfully") + + +WATCHER_APP_NAME = "pg-watcher" + + +@pytest.mark.abort_on_fail +async def test_build_and_deploy_stereo_mode(ops_test: OpsTest, charm) -> None: + """Build and deploy PostgreSQL in stereo mode with watcher. + + Deploys 2 PostgreSQL units and a watcher (same charm, role=watcher), + then relates them to form a 3-node Raft cluster for quorum. + """ + # Check if PostgreSQL is already deployed (e.g., from a previous test run) + # If so, verify it's in the expected state or skip deployment + if DATABASE_APP_NAME in ops_test.model.applications: + logger.info("PostgreSQL already deployed, checking state...") + pg_units = len(ops_test.model.applications[DATABASE_APP_NAME].units) + watcher_deployed = WATCHER_APP_NAME in ops_test.model.applications + test_app_deployed = APPLICATION_NAME in ops_test.model.applications + + if pg_units == 2 and watcher_deployed and test_app_deployed: + logger.info("Stereo mode already deployed with expected state, verifying...") + await ops_test.model.wait_for_idle(status="active", timeout=300) + return + + # If state is incorrect, we need to clean up and redeploy + logger.info(f"Unexpected state (pg_units={pg_units}), cleaning up...") + for app in [DATABASE_APP_NAME, WATCHER_APP_NAME, APPLICATION_NAME]: + if app in ops_test.model.applications: + await ops_test.model.remove_application(app, block_until_done=True) + + async with ops_test.fast_forward(): + # Deploy PostgreSQL with 2 units from the start + logger.info("Deploying PostgreSQL charm with 2 units...") + await ops_test.model.deploy( + charm, + application_name=DATABASE_APP_NAME, + num_units=2, + series="noble", + config={"profile": "testing", "synchronous-mode-strict": False}, + ) + # Deploy watcher using the same charm with role=watcher + logger.info("Deploying watcher (same charm, role=watcher)...") + await ops_test.model.deploy( + charm, + application_name=WATCHER_APP_NAME, + num_units=1, + series="noble", + config={"role": "watcher", "profile": "testing"}, + ) + logger.info("Deploying test application...") + await ops_test.model.deploy( + APPLICATION_NAME, + application_name=APPLICATION_NAME, + series="noble", + channel="edge", + ) + + # Wait for initial deployment + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + timeout=1200, + raise_on_error=False, # Watcher may be waiting for relation + ) + + # Relate PostgreSQL (watcher-offer) to watcher (watcher) + # The relation may already exist if deploying into a model with prior state + logger.info("Relating PostgreSQL to watcher") + try: + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" + ) + except Exception as e: + if "already exists" in str(e) or "relation" in str(e).lower(): + logger.info(f"Watcher relation already exists: {e}") + else: + raise + + # Wait for watcher to join Raft cluster + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=600, + ) + + # Relate PostgreSQL to test app + try: + await ops_test.model.integrate(DATABASE_APP_NAME, f"{APPLICATION_NAME}:database") + except Exception as e: + if "already exists" in str(e) or "relation" in str(e).lower(): + logger.info(f"Database relation already exists: {e}") + else: + raise + + await ops_test.model.wait_for_idle(status="active", timeout=1800) + + # Verify deployment + assert len(ops_test.model.applications[DATABASE_APP_NAME].units) == 2 + assert len(ops_test.model.applications[WATCHER_APP_NAME].units) == 1 + + +@pytest.mark.abort_on_fail +async def test_replica_shutdown_with_watcher(ops_test: OpsTest, continuous_writes) -> None: + """Test replica shutdown with watcher providing quorum. + + Expected behavior: + - All connected clients to the primary should not be interrupted + - Clients connected to replica should be re-routed to primary + - No significant outage (less than a minute) + """ + await start_writes(ops_test) + + # Get current cluster roles + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit) + primary = original_roles["primaries"][0] + + # Get the replica unit + replica = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name != primary: + replica = unit.name + break + + assert replica is not None, "Could not find replica unit" + logger.info(f"Shutting down replica: {replica}") + + # Shutdown the replica + await ops_test.model.destroy_unit(replica, force=True, destroy_storage=False, max_wait=1500) + + # Wait for the cluster to stabilize after unit removal + # The primary needs time to reconfigure the cluster and update secrets + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + # Verify writes continue (primary should still be available) + # With watcher, we should maintain quorum + await are_writes_increasing(ops_test, down_unit=replica) + + # Wait for cluster to stabilize + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=600, + idle_period=30, + ) + + # Scale back up + logger.info("Scaling back up after replica shutdown") + await ops_test.model.applications[DATABASE_APP_NAME].add_unit(count=1) + await ops_test.model.wait_for_idle(status="active", timeout=1500) + + # Wait for the new replica to become a sync_standby + # This ensures the cluster is fully ready for the next test + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(10), reraise=True): + with attempt: + new_roles = await get_cluster_roles( + ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name + ) + logger.info(f"Cluster roles: {new_roles}") + assert len(new_roles["primaries"]) == 1, "Should have exactly one primary" + assert new_roles["primaries"][0] == primary, "Primary should not have changed" + assert len(new_roles["sync_standbys"]) == 1, "New replica should become sync_standby" + + await check_writes(ops_test) + + +@pytest.mark.abort_on_fail +async def test_primary_shutdown_with_watcher(ops_test: OpsTest, continuous_writes) -> None: + """Test primary shutdown with watcher providing quorum. + + Expected behavior: + - Old primary should be network-isolated (Patroni handles this) + - Replica should be promoted to primary + - Clients re-routed to new primary + - When old primary is healthy, it should become a replica + """ + await start_writes(ops_test) + + # Get current cluster roles + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit) + original_primary = original_roles["primaries"][0] + + # Get the replica - prefer sync_standby if available, otherwise any replica + # After a previous test scales up, the new unit may not yet be a sync_standby + if original_roles["sync_standbys"]: + original_replica = original_roles["sync_standbys"][0] + elif original_roles["replicas"]: + original_replica = original_roles["replicas"][0] + else: + # Fall back to finding the other unit manually + original_replica = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name != original_primary: + original_replica = unit.name + break + assert original_replica is not None, "Could not find replica unit" + + logger.info(f"Shutting down primary: {original_primary}") + + # Shutdown the primary + await ops_test.model.destroy_unit( + original_primary, force=True, destroy_storage=False, max_wait=1500 + ) + + # With watcher providing quorum, failover should happen automatically + # Wait for the model to stabilize first + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=600, + idle_period=30, + ) + + # Wait for the replica to be promoted to primary + # Patroni needs time to detect leader failure and elect new leader (30-90s) + remaining_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(10), reraise=True): + with attempt: + new_roles = await get_cluster_roles(ops_test, remaining_unit) + logger.info(f"Waiting for failover - current roles: {new_roles}") + assert len(new_roles["primaries"]) == 1, "Should have exactly one primary" + assert new_roles["primaries"][0] == original_replica, ( + f"Replica {original_replica} should have been promoted, " + f"but primary is {new_roles['primaries'][0]}" + ) + + # Wait for the charm to reconfigure after failover + # This ensures the relation endpoints are updated for the test app to reconnect + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + # Scale back up FIRST - with synchronous_mode_strict=true, the primary cannot + # accept writes when there's no sync_standby available. We need 2 units before + # we can verify writes are working. + logger.info("Scaling back up after primary shutdown") + await ops_test.model.applications[DATABASE_APP_NAME].add_unit(count=1) + # Wait longer for the new unit to fully join the cluster + # The new unit needs to: start PostgreSQL, join Raft cluster, become sync_standby + await ops_test.model.wait_for_idle(status="active", timeout=1800, idle_period=60) + + # Wait for the new replica to become a sync_standby + # This can take a while as the new unit needs to fully sync and be recognized + for attempt in Retrying(stop=stop_after_delay(300), wait=wait_fixed(15), reraise=True): + with attempt: + final_roles = await get_cluster_roles( + ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name + ) + logger.info(f"Final cluster roles: {final_roles}") + assert len(final_roles["primaries"]) == 1, "Should have exactly one primary" + assert len(final_roles["sync_standbys"]) == 1, "New replica should become sync_standby" + + # Now that we have a sync_standby, restart continuous writes and verify + # The continuous writes app caches the connection string, so we need to clear + # and restart it after failover to pick up the new primary's address. + # First clear the old writes state + action = ( + await ops_test.model + .applications[TEST_APP_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + + # Then start fresh writes + await start_writes(ops_test) + + # Verify writes continue on the new primary + await are_writes_increasing(ops_test, down_unit=original_primary) + + await check_writes(ops_test) + + +@pytest.mark.abort_on_fail +async def test_watcher_shutdown_no_outage(ops_test: OpsTest, continuous_writes) -> None: + """Test watcher shutdown - should not cause service outage. + + Expected behavior: + - No outage experienced by either primary or replica + - Cluster continues to function (but loses quorum guarantee) + """ + await start_writes(ops_test) + + # Get current cluster state + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit) + + logger.info("Removing watcher unit") + + # Remove the watcher + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + await ops_test.model.destroy_unit(watcher_unit.name, force=True, max_wait=300) + + # Verify writes continue without interruption + await are_writes_increasing(ops_test) + + # PostgreSQL cluster should remain active + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + # Verify cluster roles unchanged + new_roles = await get_cluster_roles(ops_test, any_unit) + assert new_roles["primaries"] == original_roles["primaries"] + + # Re-deploy watcher + logger.info("Re-deploying watcher") + await ops_test.model.applications[WATCHER_APP_NAME].add_unit(count=1) + await ops_test.model.wait_for_idle(status="active", timeout=600) + + # Verify the Raft cluster is properly formed with the new watcher + # This is critical - without this verification, subsequent tests might fail + # because the watcher is not actually participating in the Raft cluster + await verify_raft_cluster_health(ops_test, DATABASE_APP_NAME, WATCHER_APP_NAME) + + await check_writes(ops_test) + + +@pytest.mark.abort_on_fail +async def test_primary_network_isolation_with_watcher( + ops_test: OpsTest, continuous_writes +) -> None: + """Test network isolation of primary with watcher. + + Expected behavior: + - Isolated primary's connections terminated + - Replica promoted to primary + - When network restored, old primary becomes replica + """ + await start_writes(ops_test) + + # Get current cluster state + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit) + primary = original_roles["primaries"][0] + replica = original_roles["sync_standbys"][0] + + # Get primary machine name for network manipulation + primary_unit = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name == primary: + primary_unit = unit + break + + assert primary_unit is not None + primary_machine = primary_unit.machine.hostname + + logger.info(f"Isolating primary network: {primary} on {primary_machine}") + + try: + # Cut network from primary (this removes the eth0 interface entirely) + cut_network_from_unit_without_ip_change(primary_machine) + + # Wait for failover to happen - Patroni needs time to detect leader failure + # and elect a new leader. This can take 30-90 seconds depending on TTL settings. + # Use explicit retry loop instead of just wait_for_idle. + new_primary = None + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(10), reraise=True): + with attempt: + new_primary = await get_primary(ops_test, DATABASE_APP_NAME, down_unit=primary) + logger.info(f"Current primary: {new_primary}, expected: {replica}") + assert new_primary == replica, ( + f"Waiting for failover: replica {replica} should be promoted, " + f"but primary is still {new_primary}" + ) + await are_writes_increasing(ops_test, down_unit=primary_unit.name) + finally: + # Restore network + logger.info(f"Restoring network for {primary_machine}") + restore_network_for_unit_without_ip_change(primary_machine) + + # Wait for cluster to stabilize with restored network + # The old primary may take time to rejoin after getting a new IP address, + # so we use raise_on_error=False and wait longer + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + timeout=900, + idle_period=30, + raise_on_error=False, # Old primary may be in error while rejoining + ) + + # Wait for the old primary to rejoin as replica + # This can take a while as it needs to recover with a new IP + for attempt in Retrying(stop=stop_after_delay(300), wait=wait_fixed(15), reraise=True): + with attempt: + final_roles = await get_cluster_roles(ops_test, replica) + logger.info(f"Final cluster roles: {final_roles}") + assert replica in final_roles["primaries"], ( + "Replica should remain primary after network restore" + ) + # Old primary should not be primary anymore + assert ( + primary not in final_roles["primaries"] and primary in final_roles["sync_standbys"] + ), "Old primary should now be a sync standby" + + # Use use_ip_from_inside=True because the old primary got a new IP after network restore + # and Juju's cached IP may be stale + await check_writes(ops_test, use_ip_from_inside=True) + + +@pytest.mark.abort_on_fail +async def test_replica_network_isolation_with_watcher( + ops_test: OpsTest, continuous_writes +) -> None: + """Test network isolation of replica with watcher. + + Expected behavior: + - Primary remains primary (doesn't failover) - Raft quorum maintained with watcher + - With synchronous_mode_strict=true, writes pause (no sync_standby available) + - After network restore, writes resume + - No data loss + + Note: This test uses iptables-based network isolation to preserve the replica's IP, + avoiding the complexity of IP changes when using eth0 device removal. + """ + await start_writes(ops_test) + + # Get current cluster state - use use_ip_from_inside=True because the previous test + # may have left units with stale IPs in Juju's cache after network restore + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit, use_ip_from_inside=True) + primary = original_roles["primaries"][0] + replica = original_roles["sync_standbys"][0] + + # Get replica machine for network manipulation + replica_unit = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name == replica: + replica_unit = unit + break + + assert replica_unit is not None + replica_machine = replica_unit.machine.hostname + + logger.info(f"Isolating replica network: {replica} on {replica_machine}") + + try: + # Cut network from replica using iptables (preserves IP) + cut_network_from_unit_without_ip_change(replica_machine) + + # Give Patroni time to detect the network isolation. + await asyncio.sleep(30) + + # Primary should remain primary (no failover should happen) + # Raft quorum is maintained with primary + watcher (2 out of 3) + current_primary = await get_primary(ops_test, DATABASE_APP_NAME, down_unit=replica) + assert current_primary == primary, "Primary should not change during replica isolation" + await are_writes_increasing(ops_test, down_unit=replica) + finally: + # Restore network + logger.info(f"Restoring network for {replica_machine}") + restore_network_for_unit_without_ip_change(replica_machine) + + # Wait for cluster to stabilize - replica should rejoin + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=600, + idle_period=30, + ) + + # Verify cluster has a primary after restore (may or may not be the same one, + # since Patroni can switchover during network restore/rejoin) + final_roles = await get_cluster_roles(ops_test, any_unit, use_ip_from_inside=True) + assert len(final_roles["primaries"]) == 1, ( + "Cluster should have exactly one primary after restore" + ) + + # Verify writes continue after network restore + # Use use_ip_from_inside=True because previous tests may have caused IP changes + await are_writes_increasing(ops_test, use_ip_from_inside=True) + await check_writes(ops_test, use_ip_from_inside=True) + + +@pytest.mark.abort_on_fail +async def test_watcher_network_isolation(ops_test: OpsTest, continuous_writes) -> None: + """Test network isolation of watcher. + + Expected behavior: + - No service outage for PostgreSQL cluster + - Cluster loses quorum guarantee but continues operating + """ + await start_writes(ops_test) + + # Get watcher machine + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + watcher_machine = watcher_unit.machine.hostname + + # Get current cluster state - use use_ip_from_inside=True because previous tests + # may have left units with stale IPs in Juju's cache after network manipulation + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit, use_ip_from_inside=True) + + logger.info(f"Isolating watcher network: {watcher_machine}") + + try: + # Cut network from watcher + cut_network_from_unit(watcher_machine) + + # Verify writes continue without interruption + await are_writes_increasing(ops_test, use_ip_from_inside=True) + + # Cluster roles should remain unchanged + current_roles = await get_cluster_roles(ops_test, any_unit, use_ip_from_inside=True) + assert current_roles["primaries"] == original_roles["primaries"] + + finally: + # Restore network + logger.info(f"Restoring watcher network: {watcher_machine}") + restore_network_for_unit(watcher_machine) + + # Wait for full recovery + await ops_test.model.wait_for_idle(status="active", timeout=600) + + # Use use_ip_from_inside=True because the watcher got a new IP after network restore + await check_writes(ops_test, use_ip_from_inside=True) + + +@pytest.mark.abort_on_fail +async def test_multi_cluster_watcher(ops_test: OpsTest, charm) -> None: + """Verify that a single watcher can monitor multiple PostgreSQL clusters. + + The watcher relation no longer has limit: 1, so the watcher can relate + to multiple PostgreSQL clusters simultaneously. Each relation gets its own + Raft instance with a dedicated port and data directory. + """ + second_pg_app = "postgresql-b" + + try: + # Deploy a second PostgreSQL cluster + logger.info("Deploying second PostgreSQL cluster for multi-cluster watcher test") + await ops_test.model.deploy( + charm, + application_name=second_pg_app, + num_units=2, + series="noble", + config={"profile": "testing", "synchronous-mode-strict": False}, + ) + await ops_test.model.wait_for_idle( + apps=[second_pg_app], + status="active", + timeout=1200, + ) + + # Relate the watcher to the second cluster + logger.info("Relating watcher to second PostgreSQL cluster") + await ops_test.model.integrate( + f"{second_pg_app}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" + ) + + # Use fast_forward to trigger update_status quickly, which runs + # ensure_watcher_in_raft to add the watcher to the second cluster's Raft + async with ops_test.fast_forward(): + # Wait for the watcher to connect to both clusters + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, second_pg_app, WATCHER_APP_NAME], + status="active", + timeout=600, + ) + + # Verify both Raft clusters have the watcher as a member + # Check first cluster + await verify_raft_cluster_health( + ops_test, DATABASE_APP_NAME, WATCHER_APP_NAME, expected_members=3 + ) + # Check second cluster + await verify_raft_cluster_health( + ops_test, second_pg_app, WATCHER_APP_NAME, expected_members=3 + ) + + finally: + # Clean up the second cluster relation and app + if second_pg_app in ops_test.model.applications: + await ops_test.model.remove_application( + second_pg_app, block_until_done=True, force=True + ) + + # Verify original watcher is still healthy after removing the second cluster + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=300, + ) + + +@pytest.mark.abort_on_fail +async def test_watcher_production_profile_az_blocked(ops_test: OpsTest, charm) -> None: + """Test watcher with profile=production blocks on AZ co-location. + + When all units are in the same availability zone (common on single-host + LXD deployments), a watcher with profile=production should enter + BlockedStatus because it shares an AZ with the PostgreSQL units. + This validates the AZ enforcement behavior. + + If JUJU_AVAILABILITY_ZONE is not set (some CI environments), the watcher + should reach active status since no AZ co-location can be detected. + + Since watcher-offer has limit: 1, we must remove the existing testing watcher + before deploying the production one, then restore it afterward. + """ + production_watcher = "pg-watcher-prod" + + # Remove existing watcher to free the watcher-offer relation slot + logger.info("Removing existing testing watcher to free relation slot") + if WATCHER_APP_NAME in ops_test.model.applications: + await ops_test.model.remove_application( + WATCHER_APP_NAME, block_until_done=True, force=True + ) + + try: + # Deploy a production-profile watcher + logger.info("Deploying watcher with profile=production") + await ops_test.model.deploy( + charm, + application_name=production_watcher, + num_units=1, + series="noble", + config={"role": "watcher", "profile": "production"}, + ) + + # Wait for initial install + await ops_test.model.wait_for_idle( + apps=[production_watcher], + timeout=600, + raise_on_error=False, + ) + + # Relate to the existing PostgreSQL cluster + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:watcher-offer", f"{production_watcher}:watcher" + ) + + # Wait for the watcher to settle (it may block or go active depending on AZ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[production_watcher], + timeout=600, + raise_on_error=False, + ) + + # Check the watcher's status + watcher_unit = ops_test.model.applications[production_watcher].units[0] + status = watcher_unit.workload_status + status_msg = watcher_unit.workload_status_message + + if status == "blocked": + # AZ is set and co-located — expected on single-host deployments + assert "AZ co-location" in status_msg, ( + f"Blocked status should mention AZ co-location, got: {status_msg}" + ) + logger.info(f"Production watcher correctly blocked: {status_msg}") + elif status == "active": + # AZ is not set — no co-location detected, watcher is active + logger.info("JUJU_AVAILABILITY_ZONE not set, watcher is active (no AZ enforcement)") + else: + pytest.fail( + f"Unexpected watcher status: {status} - {status_msg}. " + "Expected 'blocked' (AZ co-location) or 'active' (no AZ)." + ) + + finally: + # Clean up production watcher + if production_watcher in ops_test.model.applications: + await ops_test.model.remove_application( + production_watcher, block_until_done=True, force=True + ) + + # Restore the original testing watcher + logger.info("Restoring original testing watcher") + await ops_test.model.deploy( + charm, + application_name=WATCHER_APP_NAME, + num_units=1, + series="noble", + config={"role": "watcher", "profile": "testing"}, + ) + await ops_test.model.wait_for_idle( + apps=[WATCHER_APP_NAME], + timeout=600, + raise_on_error=False, + ) + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" + ) + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=600, + ) diff --git a/tests/spread/test_stereo_mode.py/task.yaml b/tests/spread/test_stereo_mode.py/task.yaml new file mode 100644 index 00000000000..65ce3cff758 --- /dev/null +++ b/tests/spread/test_stereo_mode.py/task.yaml @@ -0,0 +1,7 @@ +summary: test_stereo_mode.py +environment: + TEST_MODULE: ha_tests/test_stereo_mode.py +execute: | + tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" +artifacts: + - allure-results diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 9e042fef302..a742a27d35c 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -93,6 +93,23 @@ def test_config_fallback(harness): assert charm.config.connection_authentication_timeout == 120 +def test_validate_initial_role_unchanged_allows_matching_role(harness): + rel_id = harness.model.get_relation(PEER).id + with harness.hooks_disabled(): + harness.update_relation_data(rel_id, harness.charm.app.name, {"role": "postgresql"}) + + assert harness.charm._validate_initial_role_unchanged() + + +def test_validate_initial_role_unchanged_blocks_role_mismatch(harness): + rel_id = harness.model.get_relation(PEER).id + with harness.hooks_disabled(): + harness.update_relation_data(rel_id, harness.charm.app.name, {"role": "watcher"}) + + assert not harness.charm._validate_initial_role_unchanged() + assert isinstance(harness.model.unit.status, BlockedStatus) + + def test_on_install(harness): with ( patch("charm.snap.SnapCache") as _snap_cache, @@ -1967,6 +1984,10 @@ def test_update_member_ip(harness): with ( patch("charm.PostgresqlOperatorCharm._update_certificate") as _update_certificate, patch("charm.Patroni.stop_patroni") as _stop_patroni, + patch("charm.PostgresqlOperatorCharm.update_endpoint_addresses"), + patch("charm.PostgresqlOperatorCharm.update_config"), + patch.object(harness.charm.watcher_offer, "update_unit_address"), + patch.object(harness.charm.watcher_offer, "update_endpoints"), ): rel_id = harness.model.get_relation(PEER).id # Test when the IP address of the unit hasn't changed. @@ -2557,7 +2578,7 @@ def test_on_peer_relation_departed(harness): mock_ip_address = "1.1.1.1" _get_member_ip.return_value = mock_ip_address harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_not_called() _get_ips_to_remove.assert_not_called() @@ -2572,7 +2593,7 @@ def test_on_peer_relation_departed(harness): event.defer.reset_mock() _remove_raft_member.side_effect = None harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_not_called() _updated_synchronous_node_count.assert_not_called() _get_ips_to_remove.assert_not_called() @@ -2587,7 +2608,7 @@ def test_on_peer_relation_departed(harness): with harness.hooks_disabled(): harness.set_leader() harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_not_called() _get_ips_to_remove.assert_not_called() @@ -2604,7 +2625,7 @@ def test_on_peer_relation_departed(harness): rel_id, harness.charm.app.name, {"cluster_initialised": "True"} ) harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_not_called() @@ -2619,7 +2640,7 @@ def test_on_peer_relation_departed(harness): _updated_synchronous_node_count.reset_mock() harness.add_relation_unit(rel_id, f"{harness.charm.app.name}/2") harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_not_called() @@ -2635,7 +2656,7 @@ def test_on_peer_relation_departed(harness): _updated_synchronous_node_count.reset_mock() _updated_synchronous_node_count.return_value = True harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_not_called() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_called_once() @@ -2653,7 +2674,7 @@ def test_on_peer_relation_departed(harness): _get_ips_to_remove.return_value = ips_to_remove _are_all_members_ready.return_value = False harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_called_once() @@ -2669,7 +2690,7 @@ def test_on_peer_relation_departed(harness): _get_ips_to_remove.reset_mock() _are_all_members_ready.return_value = True harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_not_called() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_called_once() @@ -2688,7 +2709,7 @@ def test_on_peer_relation_departed(harness): _update_relation_endpoints.reset_mock() _primary_endpoint.return_value = None harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_not_called() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_called_once() diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 44ba0841275..22781b63d0e 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -672,7 +672,7 @@ def test_remove_raft_member(patroni): # Member already removed _tcp_utility.return_value.executeCommand.return_value = "Response message" - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") _tcp_utility.assert_called_once_with(password="fake-raft-password", timeout=3) _tcp_utility.return_value.executeCommand.assert_called_once_with( @@ -686,7 +686,7 @@ def test_remove_raft_member(patroni): "SUCCESS", ] - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") _tcp_utility.assert_called_once_with(password="fake-raft-password", timeout=3) assert _tcp_utility.return_value.executeCommand.call_count == 2 @@ -703,7 +703,7 @@ def test_remove_raft_member(patroni): ] with pytest.raises(RemoveRaftMemberFailedError): - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert False # Raises on remove error @@ -713,16 +713,14 @@ def test_remove_raft_member(patroni): ] with pytest.raises(RemoveRaftMemberFailedError): - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert False # Raises on status error - _tcp_utility.return_value.executeCommand.side_effect = [ - UtilityException, - ] + _tcp_utility.return_value.executeCommand.side_effect = [UtilityException] with pytest.raises(RemoveRaftMemberFailedError): - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert False @@ -745,7 +743,7 @@ def test_remove_raft_member_no_quorum(patroni, harness): "members": [{"role": "async_replica", "name": "postgresql-0"}] } - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert harness.charm.unit_peer_data == {"raft_stuck": "True"} # No health @@ -757,14 +755,14 @@ def test_remove_raft_member_no_quorum(patroni, harness): } _get.side_effect = Exception - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert harness.charm.unit_peer_data == {"raft_stuck": "True"} # Sync replica _unit_peer_data.return_value = {} leader_mock = Mock() - leader_mock.host = "1.2.3.4" + leader_mock.address = "1.2.3.4:2222" _tcp_utility.return_value.executeCommand.return_value = { "partner_node_status_server_1.2.3.4:2222": 0, "has_quorum": False, @@ -775,7 +773,7 @@ def test_remove_raft_member_no_quorum(patroni, harness): "members": [{"role": "sync_standby", "name": "postgresql-0"}] } - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert harness.charm.unit_peer_data == {"raft_stuck": "True"} diff --git a/tests/unit/test_raft_controller.py b/tests/unit/test_raft_controller.py index f167c6233df..3baa305a596 100644 --- a/tests/unit/test_raft_controller.py +++ b/tests/unit/test_raft_controller.py @@ -4,7 +4,6 @@ from pathlib import Path from unittest.mock import MagicMock, patch -from charmlibs.systemd import SystemdError from jinja2 import Template from pytest import fixture @@ -63,19 +62,6 @@ def test_remove_service_disables_unit_and_deletes_dir(tmp_path: Path, controller _rmtree.assert_called_once_with(controller.data_dir) -def test_install_service_returns_false_when_daemon_reload_fails( - tmp_path: Path, controller: RaftController -): - with ( - patch("raft_controller.daemon_reload") as _daemon_reload, - patch("raft_controller.render_file"), - patch("raft_controller.create_directory"), - ): - _daemon_reload.side_effect = SystemdError - - assert not install_service() - - def test_install_service_uses_patroni_profile_execstart( tmp_path: Path, controller: RaftController ): @@ -92,7 +78,7 @@ def test_install_service_uses_patroni_profile_execstart( patch("raft_controller.render_file") as _render_file, patch("raft_controller.create_directory"), ): - assert install_service() + install_service() _render_file.assert_called_once_with(SERVICE_FILE, expected_content, 0o644, change_owner=False) _daemon_reload.assert_called_once_with() diff --git a/tests/unit/test_watcher_relation.py b/tests/unit/test_watcher_relation.py new file mode 100644 index 00000000000..6daa309e149 --- /dev/null +++ b/tests/unit/test_watcher_relation.py @@ -0,0 +1,288 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Unit tests for the PostgreSQL watcher relation handler.""" + +from unittest.mock import MagicMock, PropertyMock, patch + +from src.relations.watcher import PostgreSQLWatcherRelation + + +def create_mock_charm(): + """Create a mock charm for testing.""" + mock_charm = MagicMock() + mock_charm.unit.is_leader.return_value = True + mock_charm.cluster_name = "postgresql" + mock_charm._unit_ip = "10.0.0.1" + mock_charm._patroni.unit_ip = "10.0.0.1" + mock_charm._patroni.peers_ips = {"10.0.0.2"} + mock_charm._patroni.raft_password = "test-raft-password" + mock_charm.is_cluster_initialised = True + mock_charm.update_config = MagicMock() + return mock_charm + + +def create_mock_relation(): + """Create a mock relation for testing.""" + mock_relation = MagicMock() + mock_relation.data = { + MagicMock(): {}, # app data + MagicMock(): {}, # unit data + } + mock_relation.units = set() + return mock_relation + + +class TestWatcherRelation: + """Tests for PostgreSQLWatcherRelation class.""" + + def test_watcher_address_no_relation(self): + """Test watcher_address returns None when no relation exists.""" + mock_charm = create_mock_charm() + + with patch.object( + PostgreSQLWatcherRelation, + "_relation", + new_callable=PropertyMock, + return_value=None, + ): + relation = PostgreSQLWatcherRelation(mock_charm) + assert relation.watcher_raft_address is None + + def test_watcher_address_with_relation(self): + """Test watcher_address returns the watcher IP when available.""" + mock_charm = create_mock_charm() + mock_relation = MagicMock() + + # Create a mock unit with unit-address + mock_unit = MagicMock() + mock_relation.units = {mock_unit} + mock_relation.data = { + mock_unit: {"unit-address": "10.0.0.10"}, + mock_relation.app: {"watcher-raft-port": "2222"}, + } + + with patch.object( + PostgreSQLWatcherRelation, + "_relation", + new_callable=PropertyMock, + return_value=mock_relation, + ): + relation = PostgreSQLWatcherRelation(mock_charm) + assert relation.watcher_raft_address == "10.0.0.10:2222" + + def test_on_watcher_relation_joined_not_leader(self): + """Test relation joined event is ignored for non-leader units.""" + mock_charm = create_mock_charm() + mock_charm.unit.is_leader.return_value = False + mock_event = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + + with ( + patch.object(relation, "update_unit_address") as update_unit_address, + patch.object(relation, "_get_or_create_watcher_secret") as mock_secret, + ): + relation._on_watcher_relation_joined(mock_event) + update_unit_address.assert_called_once_with(mock_event.relation) + mock_secret.assert_not_called() + + def test_on_watcher_relation_joined_leader(self): + """Test relation joined event creates secret for leader.""" + mock_charm = create_mock_charm() + mock_event = MagicMock() + mock_secret = MagicMock() + mock_secret.id = "secret:abc123" + + relation = PostgreSQLWatcherRelation(mock_charm) + + with ( + patch.object(relation, "_get_or_create_watcher_secret", return_value=mock_secret), + patch.object(relation, "_update_relation_data") as mock_update, + ): + relation._on_watcher_relation_joined(mock_event) + mock_secret.grant.assert_called_once_with(mock_event.relation) + mock_update.assert_called_once_with(mock_event.relation) + + def test_on_watcher_relation_joined_no_secret(self): + """Test relation joined event defers when secret creation fails.""" + mock_charm = create_mock_charm() + mock_event = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(relation, "_get_or_create_watcher_secret", return_value=None): + relation._on_watcher_relation_joined(mock_event) + mock_event.defer.assert_called_once() + + def test_on_watcher_relation_changed_not_initialized(self): + """Test relation changed event defers when cluster not initialized.""" + mock_charm = create_mock_charm() + mock_charm.is_cluster_initialised = False + mock_event = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + relation._on_watcher_relation_changed(mock_event) + + mock_event.defer.assert_called_once() + + def test_on_watcher_relation_changed_updates_config(self): + """Test relation changed event updates Patroni config.""" + mock_charm = create_mock_charm() + mock_event = MagicMock() + + # Setup mock relation with watcher unit + mock_unit = MagicMock() + mock_event.relation.units = {mock_unit} + mock_event.relation.data = { + mock_unit: {"unit-address": "10.0.0.10"}, + mock_charm.unit: {}, + } + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(relation, "_update_relation_data"): + relation._on_watcher_relation_changed(mock_event) + mock_charm.update_config.assert_called_once() + + def test_update_relation_data_not_leader(self): + """Test _update_relation_data does nothing for non-leader.""" + mock_charm = create_mock_charm() + mock_charm.unit.is_leader.return_value = False + mock_relation = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + relation._update_relation_data(mock_relation) + + # Should not try to update relation data + assert not mock_relation.data[mock_charm.app].update.called + + def test_update_relation_data_leader(self): + """Test _update_relation_data populates relation data correctly.""" + mock_charm = create_mock_charm() + mock_charm._units_ips = ["10.0.0.1", "10.0.0.2"] # Mock PostgreSQL endpoints + mock_charm._unit_ip = "10.0.0.1" + mock_relation = MagicMock() + mock_relation.data = { + mock_charm.app: {}, + mock_charm.unit: {}, + } + + mock_secret = MagicMock() + mock_secret.id = "secret:abc123" + + relation = PostgreSQLWatcherRelation(mock_charm) + + with ( + patch.object(mock_charm.model, "get_secret", return_value=mock_secret), + patch.object(relation, "_get_standby_clusters", return_value=[]), + ): + relation._update_relation_data(mock_relation) + + # Verify app data was updated + app_data = mock_relation.data[mock_charm.app] + assert "cluster-name" in app_data + assert app_data["cluster-name"] == "postgresql" + assert "raft-secret-id" in app_data + assert "raft-partner-addrs" in app_data + assert "raft-port" in app_data + + # Verify unit data was updated + unit_data = mock_relation.data[mock_charm.unit] + assert "unit-address" in unit_data + + def test_update_unit_address_updates_az(self): + """Test update_unit_address also publishes unit AZ.""" + mock_charm = create_mock_charm() + mock_relation = MagicMock() + mock_relation.data = { + mock_charm.unit: { + "unit-address": "10.0.0.1", + } + } + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az1"}, clear=False): + relation.update_unit_address(mock_relation) + + assert mock_relation.data[mock_charm.unit]["unit-az"] == "az1" + + def test_update_watcher_secret_not_leader(self): + """Test update_watcher_secret does nothing for non-leader.""" + mock_charm = create_mock_charm() + mock_charm.unit.is_leader.return_value = False + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(mock_charm.model, "get_secret") as mock_get: + relation.update_watcher_secret() + mock_get.assert_not_called() + + def test_update_watcher_secret_leader(self): + """Test update_watcher_secret updates secret content.""" + mock_charm = create_mock_charm() + mock_secret = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(mock_charm.model, "get_secret", return_value=mock_secret): + relation.update_watcher_secret() + mock_secret.set_content.assert_called_once() + + +class TestWatcherRelationSecrets: + """Tests for secret management in watcher relation.""" + + def test_get_or_create_watcher_secret_existing(self): + """Test _get_or_create_watcher_secret returns existing secret.""" + mock_charm = create_mock_charm() + mock_secret = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(mock_charm.model, "get_secret", return_value=mock_secret): + result = relation._get_or_create_watcher_secret() + assert result == mock_secret + + def test_get_or_create_watcher_secret_creates_new(self): + """Test _get_or_create_watcher_secret creates new secret.""" + mock_charm = create_mock_charm() + mock_secret = MagicMock() + + from ops import SecretNotFoundError + + relation = PostgreSQLWatcherRelation(mock_charm) + + with ( + patch.object( + mock_charm.model, + "get_secret", + side_effect=SecretNotFoundError("not found"), + ), + patch.object( + mock_charm.model.app, + "add_secret", + return_value=mock_secret, + ), + ): + result = relation._get_or_create_watcher_secret() + assert result == mock_secret + mock_charm.model.app.add_secret.assert_called_once() + + def test_get_or_create_watcher_secret_no_raft_password(self): + """Test _get_or_create_watcher_secret returns None without password.""" + mock_charm = create_mock_charm() + mock_charm._patroni.raft_password = None + + from ops import SecretNotFoundError + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object( + mock_charm.model, + "get_secret", + side_effect=SecretNotFoundError("not found"), + ): + result = relation._get_or_create_watcher_secret() + assert result is None diff --git a/tests/unit/test_watcher_requirer.py b/tests/unit/test_watcher_requirer.py new file mode 100644 index 00000000000..4e36ee4271a --- /dev/null +++ b/tests/unit/test_watcher_requirer.py @@ -0,0 +1,306 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Unit tests for the watcher requirer relation handler (AZ co-location logic).""" + +from unittest.mock import MagicMock, patch + +from ops import ActiveStatus, BlockedStatus, WaitingStatus + +from src.relations.watcher_requirer import WatcherRequirerHandler + + +def create_mock_charm(profile="testing"): + """Create a mock charm for watcher requirer testing.""" + mock_charm = MagicMock() + mock_charm.config = MagicMock() + mock_charm.config.profile = profile + mock_charm.unit.name = "pg-watcher/0" + return mock_charm + + +def create_mock_relation(units_with_az=None): + """Create a mock relation with units that have AZ data. + + Args: + units_with_az: Dict mapping unit names to their AZ values. + Example: {"postgresql/0": "az1", "postgresql/1": "az2"} + """ + mock_relation = MagicMock() + mock_relation.id = 42 + + if units_with_az is None: + units_with_az = {} + + mock_units = [] + mock_data = {} + for unit_name, az in units_with_az.items(): + mock_unit = MagicMock() + mock_unit.name = unit_name + mock_units.append(mock_unit) + unit_data = {} + if az is not None: + unit_data["unit-az"] = az + mock_data[mock_unit] = unit_data + + mock_relation.units = set(mock_units) + mock_relation.app = MagicMock() + mock_relation.app.name = "postgresql" + mock_data[mock_relation.app] = {} + mock_relation.data = mock_data + return mock_relation + + +class TestAZColocation: + """Tests for AZ co-location detection and enforcement.""" + + def test_check_az_colocation_no_az_set(self): + """No warning when JUJU_AVAILABILITY_ZONE is not set.""" + mock_charm = create_mock_charm() + relation = create_mock_relation({"postgresql/0": "az1"}) + + with patch.object(WatcherRequirerHandler, "__init__", return_value=None): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + + with patch.dict("os.environ", {}, clear=True): + result = handler._check_az_colocation(relation) + assert result is None + + def test_check_az_colocation_different_az(self): + """No warning when watcher is in a different AZ.""" + mock_charm = create_mock_charm() + relation = create_mock_relation({"postgresql/0": "az1", "postgresql/1": "az2"}) + + with patch.object(WatcherRequirerHandler, "__init__", return_value=None): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + + with patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az3"}, clear=False): + result = handler._check_az_colocation(relation) + assert result is None + + def test_check_az_colocation_same_az(self): + """Warning returned when watcher shares AZ with a PostgreSQL unit.""" + mock_charm = create_mock_charm() + relation = create_mock_relation({"postgresql/0": "az1", "postgresql/1": "az2"}) + + with patch.object(WatcherRequirerHandler, "__init__", return_value=None): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + + with patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az1"}, clear=False): + result = handler._check_az_colocation(relation) + assert result is not None + assert "az1" in result + assert "postgresql/0" in result + + def test_check_az_colocation_multiple_colocated(self): + """Warning lists all co-located units.""" + mock_charm = create_mock_charm() + relation = create_mock_relation({"postgresql/0": "az1", "postgresql/1": "az1"}) + + with patch.object(WatcherRequirerHandler, "__init__", return_value=None): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + + with patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az1"}, clear=False): + result = handler._check_az_colocation(relation) + assert result is not None + assert "postgresql/0" in result + assert "postgresql/1" in result + + def test_check_az_colocation_pg_unit_no_az(self): + """No warning when PostgreSQL unit has no AZ set.""" + mock_charm = create_mock_charm() + relation = create_mock_relation({"postgresql/0": None}) + + with patch.object(WatcherRequirerHandler, "__init__", return_value=None): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + + with patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az1"}, clear=False): + result = handler._check_az_colocation(relation) + assert result is None + + +class TestAZProfileEnforcement: + """Tests for profile-based AZ enforcement (testing=warning, production=blocked).""" + + def _setup_handler_with_relations(self, profile, watcher_az, pg_units_az): + """Create a handler with mocked relations for update_status testing. + + Args: + profile: "testing" or "production" + watcher_az: The watcher's AZ or None + pg_units_az: Dict of unit_name -> az for PostgreSQL units + """ + mock_charm = create_mock_charm(profile=profile) + mock_relation = create_mock_relation(pg_units_az) + + with patch.object(WatcherRequirerHandler, "__init__", return_value=None): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + + # Mock framework.model to make self.model work + mock_framework = MagicMock() + mock_framework.model = mock_charm.model + handler.framework = mock_framework + + # Mock model.relations + mock_charm.model.relations.get.return_value = [mock_relation] + + # Mock _get_pg_endpoints + handler._get_pg_endpoints = MagicMock(return_value=list(pg_units_az.keys())) + handler._update_unit_address_if_changed = MagicMock() + + return handler, mock_charm, watcher_az + + def test_testing_profile_same_az_sets_active_with_warning(self): + """With profile=testing and same AZ, status is Active with WARNING.""" + handler, mock_charm, _ = self._setup_handler_with_relations( + profile="testing", + watcher_az="az1", + pg_units_az={"postgresql/0": "az1", "postgresql/1": "az2"}, + ) + + with ( + patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az1"}, clear=False), + patch( + "relations.watcher_requirer.RaftController.get_status", + return_value={"connected": True}, + ), + ): + handler._on_update_status(MagicMock()) + + status = mock_charm.unit.status + assert isinstance(status, ActiveStatus), ( + f"Expected ActiveStatus, got {type(status)}: {status}" + ) + assert "WARNING" in status.message + + def test_production_profile_same_az_sets_blocked(self): + """With profile=production and same AZ, status is Blocked.""" + handler, mock_charm, _ = self._setup_handler_with_relations( + profile="production", + watcher_az="az1", + pg_units_az={"postgresql/0": "az1", "postgresql/1": "az2"}, + ) + + with ( + patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az1"}, clear=False), + patch( + "relations.watcher_requirer.RaftController.get_status", + return_value={"connected": True}, + ), + ): + handler._on_update_status(MagicMock()) + + status = mock_charm.unit.status + assert isinstance(status, BlockedStatus), ( + f"Expected BlockedStatus, got {type(status)}: {status}" + ) + assert "AZ co-location" in status.message + + def test_production_profile_different_az_sets_active(self): + """With profile=production and different AZ, status is Active (no block).""" + handler, mock_charm, _ = self._setup_handler_with_relations( + profile="production", + watcher_az="az3", + pg_units_az={"postgresql/0": "az1", "postgresql/1": "az2"}, + ) + + with ( + patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az3"}, clear=False), + patch( + "relations.watcher_requirer.RaftController.get_status", + return_value={"connected": True}, + ), + ): + handler._on_update_status(MagicMock()) + + status = mock_charm.unit.status + assert isinstance(status, ActiveStatus), ( + f"Expected ActiveStatus, got {type(status)}: {status}" + ) + assert "WARNING" not in status.message + + def test_no_az_no_block(self): + """When JUJU_AVAILABILITY_ZONE is not set, no blocking regardless of profile.""" + handler, mock_charm, _ = self._setup_handler_with_relations( + profile="production", + watcher_az=None, + pg_units_az={"postgresql/0": "az1", "postgresql/1": "az2"}, + ) + + env = {k: v for k, v in __import__("os").environ.items() if k != "JUJU_AVAILABILITY_ZONE"} + with ( + patch.dict("os.environ", env, clear=True), + patch( + "relations.watcher_requirer.RaftController.get_status", + return_value={"connected": True}, + ), + ): + handler._on_update_status(MagicMock()) + + status = mock_charm.unit.status + assert isinstance(status, ActiveStatus), ( + f"Expected ActiveStatus, got {type(status)}: {status}" + ) + + def test_no_raft_connection_sets_waiting(self): + """When Raft is not connected, status is Waiting regardless of AZ.""" + mock_charm = create_mock_charm(profile="production") + mock_relation = create_mock_relation({"postgresql/0": "az1"}) + + with patch.object(WatcherRequirerHandler, "__init__", return_value=None): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + handler._raft_controllers = {} + mock_framework = MagicMock() + mock_framework.model = mock_charm.model + handler.framework = mock_framework + mock_charm.model.relations.get.return_value = [mock_relation] + + mock_raft = MagicMock() + mock_raft.get_status.return_value = {"connected": False} + handler._raft_controllers[mock_relation.id] = mock_raft + handler._get_pg_endpoints = MagicMock(return_value=[]) + handler._update_unit_address_if_changed = MagicMock() + + with patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az1"}, clear=False): + handler._on_update_status(MagicMock()) + + status = mock_charm.unit.status + assert isinstance(status, WaitingStatus) + + +class TestWatcherRelationLifecycle: + """Tests for watcher relation lifecycle cleanup.""" + + def test_relation_broken_removes_port(self): + """Relation-broken removes the Raft service and releases the allocated port.""" + mock_charm = create_mock_charm() + mock_relation = MagicMock() + mock_relation.id = 42 + mock_event = MagicMock() + mock_event.relation = mock_relation + + with ( + patch.object(WatcherRequirerHandler, "__init__", return_value=None), + patch("relations.watcher_requirer.RaftController.remove_service") as _remove_service, + ): + handler = WatcherRequirerHandler.__new__(WatcherRequirerHandler) + handler.charm = mock_charm + handler._release_port_for_relation = MagicMock() + + mock_framework = MagicMock() + mock_framework.model = mock_charm.model + handler.framework = mock_framework + + mock_charm.model.relations.get.return_value = [] + + handler._on_watcher_relation_broken(mock_event) + + _remove_service.assert_called_once_with() + handler._release_port_for_relation.assert_called_once_with(42) From 9c1ad78b4e885431ea235d38a98e62e5639aa751 Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Thu, 30 Apr 2026 16:56:53 +0300 Subject: [PATCH 2/4] Fix ip change raft removal --- src/charm.py | 5 +--- src/relations/watcher.py | 25 ------------------- .../integration/ha_tests/test_stereo_mode.py | 4 +-- 3 files changed, 3 insertions(+), 31 deletions(-) diff --git a/src/charm.py b/src/charm.py index 9dd9dad044e..bf4cf3c0001 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1257,7 +1257,7 @@ def _reconfigure_cluster(self, event: HookEvent | RelationEvent) -> bool: ): logger.info("Removing %s from the cluster due to IP change", ip_to_remove) try: - self._patroni.remove_raft_member(ip_to_remove) + self._patroni.remove_raft_member(f"{ip_to_remove}:{RAFT_PORT}") except RemoveRaftMemberFailedError: logger.debug("Deferring on_peer_relation_changed: failed to remove raft member") return False @@ -2223,9 +2223,6 @@ def _on_update_status(self, _) -> None: # Keep this unit data current for watcher AZ/IP checks. self.watcher_offer.update_unit_address() - # Ensure watcher is in Raft cluster (handles cases where relation events weren't delivered) - self.watcher_offer.ensure_watcher_in_raft() - if self.unit.is_leader() and "refresh_remove_trigger" not in self.app_peer_data: self.postgresql.drop_hba_triggers() self.app_peer_data["refresh_remove_trigger"] = "True" diff --git a/src/relations/watcher.py b/src/relations/watcher.py index aae0d2b5f5f..603d5792bdc 100644 --- a/src/relations/watcher.py +++ b/src/relations/watcher.py @@ -562,28 +562,3 @@ def update_watcher_secret(self) -> None: logger.info("Updated watcher secret with new Raft password") except SecretNotFoundError: logger.debug("Watcher secret not found, nothing to update") - - def ensure_watcher_in_raft(self) -> None: - """Ensure the connected watcher is in the Raft cluster and has fresh endpoint data. - - Called periodically from update_status to handle cases where Juju - relation events weren't delivered (e.g., when a watcher unit is replaced). - This method: - 1. Cleans up any stale watcher IPs from the Raft cluster - 2. Adds the current watcher to Raft if not present - 3. Updates the watcher relation data with fresh PostgreSQL IPs - - The last point is critical because after network disruptions that cause IP - changes, the watcher may have stale pg-endpoints and be unable to health - check the PostgreSQL nodes properly. - """ - if not self.charm.is_cluster_initialised or not self.is_active: - return - - # Only the leader handles Raft membership changes to avoid races - if self.charm.unit.is_leader(): - self._cleanup_old_watcher_from_raft() - - # Update watcher relation data with fresh PostgreSQL IPs - if relation := self._relation: - self._update_relation_data(relation) diff --git a/tests/integration/ha_tests/test_stereo_mode.py b/tests/integration/ha_tests/test_stereo_mode.py index 0bb361e6bf6..9cd5473911e 100644 --- a/tests/integration/ha_tests/test_stereo_mode.py +++ b/tests/integration/ha_tests/test_stereo_mode.py @@ -508,7 +508,7 @@ async def test_primary_network_isolation_with_watcher( try: # Cut network from primary (this removes the eth0 interface entirely) - cut_network_from_unit_without_ip_change(primary_machine) + cut_network_from_unit(primary_machine) # Wait for failover to happen - Patroni needs time to detect leader failure # and elect a new leader. This can take 30-90 seconds depending on TTL settings. @@ -526,7 +526,7 @@ async def test_primary_network_isolation_with_watcher( finally: # Restore network logger.info(f"Restoring network for {primary_machine}") - restore_network_for_unit_without_ip_change(primary_machine) + restore_network_for_unit(primary_machine) # Wait for cluster to stabilize with restored network # The old primary may take time to rejoin after getting a new IP address, From 6b37f732d30dcf6c6cfcfc1267d4abf90e64823c Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Thu, 30 Apr 2026 17:07:01 +0300 Subject: [PATCH 3/4] Fix unit test --- tests/unit/test_charm.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index a742a27d35c..c0e7793559b 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -48,7 +48,13 @@ SwitchoverFailedError, SwitchoverNotSyncError, ) -from constants import PEER, POSTGRESQL_DATA_PATH, SECRET_INTERNAL_LABEL, UPDATE_CERTS_BIN_PATH +from constants import ( + PEER, + POSTGRESQL_DATA_PATH, + RAFT_PORT, + SECRET_INTERNAL_LABEL, + UPDATE_CERTS_BIN_PATH, +) CREATE_CLUSTER_CONF_PATH = "/etc/postgresql-common/createcluster.d/pgcharm.conf" @@ -1933,7 +1939,7 @@ def test_reconfigure_cluster(harness): relation_data = {mock_event.unit: {"ip-to-remove": ip_to_remove}} mock_event.relation.data = relation_data assert not (harness.charm._reconfigure_cluster(mock_event)) - _remove_raft_member.assert_called_once_with(ip_to_remove) + _remove_raft_member.assert_called_once_with(f"{ip_to_remove}:{RAFT_PORT}") _remove_from_members_ips.assert_not_called() _add_members.assert_not_called() @@ -1944,7 +1950,7 @@ def test_reconfigure_cluster(harness): _add_members.reset_mock() mock_event.relation.data = relation_data assert harness.charm._reconfigure_cluster(mock_event) - _remove_raft_member.assert_called_once_with(ip_to_remove) + _remove_raft_member.assert_called_once_with(f"{ip_to_remove}:{RAFT_PORT}") _remove_from_members_ips.assert_not_called() _add_members.assert_called_once_with(mock_event) @@ -1957,7 +1963,7 @@ def test_reconfigure_cluster(harness): rel_id, harness.charm.app.name, {"members_ips": '["' + ip_to_remove + '"]'} ) assert harness.charm._reconfigure_cluster(mock_event) - _remove_raft_member.assert_called_once_with(ip_to_remove) + _remove_raft_member.assert_called_once_with(f"{ip_to_remove}:{RAFT_PORT}") _remove_from_members_ips.assert_called_once_with(ip_to_remove) _add_members.assert_called_once_with(mock_event) From 7ef992807080070cd1ddc574a8b9f1cf1f81cf09 Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Thu, 30 Apr 2026 17:36:42 +0300 Subject: [PATCH 4/4] Check for any disabled --- src/relations/watcher_requirer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/relations/watcher_requirer.py b/src/relations/watcher_requirer.py index 5a922a0ec56..59217d34424 100644 --- a/src/relations/watcher_requirer.py +++ b/src/relations/watcher_requirer.py @@ -349,7 +349,7 @@ def _on_update_status(self, event: UpdateStatusEvent) -> None: password = self._get_raft_password(relation) raft_controller = RaftController(self.charm, instance_id=f"rel{relation.id}") raft_status = raft_controller.get_status(port, password) - disabled = self._is_disabled(relation) + disabled = disabled or self._is_disabled(relation) connected_count += 1 if raft_status.get("connected") else 0 pg_endpoints = self._get_raft_partner_addrs(relation)