From 559992cf012a3d9aac1f73ddfb68626022475d6b Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Tue, 5 May 2026 17:18:54 +0300 Subject: [PATCH 1/6] Stereo mode split charm --- metadata.yaml | 4 + poetry.lock | 14 +- pyproject.toml | 1 - src/charm.py | 79 +++- src/cluster.py | 39 +- src/constants.py | 1 + src/relations/async_replication.py | 9 + src/relations/watcher.py | 564 ++++++++++++++++++++++++++++ templates/patroni.yml.j2 | 9 +- templates/watcher.service.j2 | 19 - templates/watcher.yml.j2 | 18 - tests/unit/test_charm.py | 36 +- tests/unit/test_cluster.py | 22 +- tests/unit/test_raft_controller.py | 98 ----- tests/unit/test_watcher_relation.py | 288 ++++++++++++++ 15 files changed, 1004 insertions(+), 197 deletions(-) create mode 100644 src/relations/watcher.py delete mode 100644 templates/watcher.service.j2 delete mode 100644 templates/watcher.yml.j2 delete mode 100644 tests/unit/test_raft_controller.py create mode 100644 tests/unit/test_watcher_relation.py diff --git a/metadata.yaml b/metadata.yaml index 36fff6de212..c7dacacec05 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -45,6 +45,10 @@ provides: interface: cos_agent limit: 1 optional: true + watcher-offer: + interface: postgresql_watcher + limit: 1 + optional: true requires: replication: diff --git a/poetry.lock b/poetry.lock index 793ee0b4eac..f6a970a8c8e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -502,18 +502,6 @@ files = [ [package.dependencies] opentelemetry-api = "*" -[[package]] -name = "charmlibs-systemd" -version = "1.0.0" -description = "The charmlibs.systemd package." -optional = false -python-versions = ">=3.10" -groups = ["main"] -files = [ - {file = "charmlibs_systemd-1.0.0-py3-none-any.whl", hash = "sha256:37d4022e28f70f7a2a54fbff7c5694d25dc62dbb8680feffabde8c324a432199"}, - {file = "charmlibs_systemd-1.0.0.tar.gz", hash = "sha256:947e93b076e105509b190020ec16de051e9015c1eb12904192fb39489e0e1caa"}, -] - [[package]] name = "charset-normalizer" version = "3.4.7" @@ -3085,4 +3073,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<4.0" -content-hash = "69b6d5a79cdf00e9122647cd19f30c2cd4cb87b82d0e4443a560e5f232103ba4" +content-hash = "fe7969a457adab33f429e5d3ab60e037fe4311e7bff99cc64abf478794014e49" diff --git a/pyproject.toml b/pyproject.toml index 2d81c2dcc51..fc43efbc029 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,6 @@ psutil = "^7.2.2" charm-refresh = "^3.1.0.2" httpx = "^0.28.1" charmlibs-snap = "^1.0.1" -charmlibs-systemd = "^1.0.0" charmlibs-interfaces-tls-certificates = "^1.8.1" postgresql-charms-single-kernel = "16.1.11" diff --git a/src/charm.py b/src/charm.py index ff46a63386b..aefccc8e287 100755 --- a/src/charm.py +++ b/src/charm.py @@ -113,6 +113,7 @@ PLUGIN_OVERRIDES, POSTGRESQL_DATA_PATH, RAFT_PASSWORD_KEY, + RAFT_PORT, REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION, REPLICATION_PASSWORD_KEY, @@ -135,6 +136,7 @@ from relations.async_replication import PostgreSQLAsyncReplication from relations.postgresql_provider import PostgreSQLProvider from relations.tls import TLS +from relations.watcher import PostgreSQLWatcherRelation from rotate_logs import RotateLogs from utils import label2name, new_password, render_file @@ -353,6 +355,7 @@ def __init__(self, *args): self.tls = TLS(self, PEER) self.tls_transfer = TLSTransfer(self, PEER) self.async_replication = PostgreSQLAsyncReplication(self) + self.watcher_offer = PostgreSQLWatcherRelation(self) # self.logical_replication = PostgreSQLLogicalReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart @@ -424,7 +427,7 @@ def _post_snap_refresh(self, refresh: charm_refresh.Machines): logger.exception("Unable to check or update internal cert") if not self._patroni.start_patroni(): - self.set_unit_status(ops.BlockedStatus("Failed to start PostgreSQL"), refresh=refresh) + self.set_unit_status(BlockedStatus("Failed to start PostgreSQL"), refresh=refresh) return self._setup_exporter() @@ -756,7 +759,7 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: # checked for none in the early exit method departing_member = event.departing_unit.name.replace("/", "-") # type: ignore if member_ip := self._patroni.get_member_ip(departing_member): - self._patroni.remove_raft_member(member_ip) + self._patroni.remove_raft_member(f"{member_ip}:{RAFT_PORT}") except RemoveRaftMemberFailedError: logger.debug( "Deferring on_peer_relation_departed: Failed to remove member from raft cluster" @@ -900,6 +903,10 @@ def _raft_reinitialisation(self) -> None: self._patroni.remove_raft_data() logger.info(f"Stopping {self.unit.name}") self.unit_peer_data["raft_stopped"] = "True" + self.watcher_offer.disable_watcher() + if self.watcher_offer.is_active: + logger.info("waiting for RAFT watcher to disconnect.") + return if self.unit.is_leader(): self._stuck_raft_cluster_stopped_check() @@ -1000,16 +1007,56 @@ def _on_peer_relation_changed(self, event: HookEvent): event.defer() return + # In Raft mode with a watcher, ensure this member is properly registered in the DCS. + # A new member may be running but not registered if it was added to Raft after starting. + if ( + self.watcher_offer.is_watcher_connected + and not self._patroni.is_member_registered_in_cluster() + ): + logger.info("Member running but not registered in Raft cluster - restarting Patroni") + self._patroni.restart_patroni() + event.defer() + return + self._start_stop_pgbackrest_service(event) - # This is intended to be executed only when leader is reinitializing S3 connection due to the leader change. + if not self._handle_s3_initialization(event): + return + + # Update watcher relation with fresh peer IPs when peer data changes + # This ensures pg-endpoints stay current when unit IPs change + if self.unit.is_leader(): + self.watcher_offer.update_endpoints() + + self._update_new_unit_status() + + def _on_secret_changed(self, event: SecretChangedEvent) -> None: + """Handle the secret_changed event.""" + if not self.unit.is_leader(): + return + + if (admin_secret_id := self.config.system_users) and admin_secret_id == event.secret.id: + try: + self._update_admin_password(admin_secret_id) + except PostgreSQLUpdateUserPasswordError: + event.defer() + + # Split off into separate function, because of complexity _on_peer_relation_changed + def _handle_s3_initialization(self, event: HookEvent) -> bool: + """Handle S3 initialization during peer relation changes. + + Returns: + True if processing should continue, False if we should return early. + """ + # This is intended to be executed only when leader is reinitializing S3 connection + # due to the leader change. if ( "s3-initialization-start" in self.app_peer_data and "s3-initialization-done" not in self.unit_peer_data and self.is_primary and not self.backup._on_s3_credential_changed_primary(event) ): - return + return False # Clean-up unit initialization data after successful sync to the leader. if "s3-initialization-done" in self.app_peer_data and not self.unit.is_leader(): @@ -1020,18 +1067,7 @@ def _on_peer_relation_changed(self, event: HookEvent): "s3-initialization-start": "", }) - self._update_new_unit_status() - - def _on_secret_changed(self, event: SecretChangedEvent) -> None: - """Handle the secret_changed event.""" - if not self.unit.is_leader(): - return - - if (admin_secret_id := self.config.system_users) and admin_secret_id == event.secret.id: - try: - self._update_admin_password(admin_secret_id) - except PostgreSQLUpdateUserPasswordError: - event.defer() + return True # Split off into separate function, because of complexity _on_peer_relation_changed def _start_stop_pgbackrest_service(self, event: HookEvent) -> None: @@ -1058,6 +1094,8 @@ def _update_new_unit_status(self) -> None: if self.primary_endpoint: self._update_relation_endpoints() self.async_replication.handle_read_only_mode() + # Update watcher relation with current cluster endpoints + self.watcher_offer.update_endpoints() else: self.set_unit_status(WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)) @@ -1075,7 +1113,7 @@ def _reconfigure_cluster(self, event: HookEvent | RelationEvent) -> bool: ): logger.info("Removing %s from the cluster due to IP change", ip_to_remove) try: - self._patroni.remove_raft_member(ip_to_remove) + self._patroni.remove_raft_member(f"{ip_to_remove}:{RAFT_PORT}") except RemoveRaftMemberFailedError: logger.debug("Deferring on_peer_relation_changed: failed to remove raft member") return False @@ -1110,6 +1148,10 @@ def _update_member_ip(self) -> bool: self.unit_peer_data.update({"ip": current_ip}) self._patroni.stop_patroni() self._update_certificate() + # Update watcher relation - unit address for all units, endpoints only for leader + self.watcher_offer.update_unit_address() + if self.unit.is_leader(): + self.watcher_offer.update_endpoints() return True else: self.unit_peer_data.update({"ip-to-remove": ""}) @@ -2026,6 +2068,9 @@ def _on_update_status(self, _) -> None: # Restart topology observer if it is gone self._observer.start_observer() + # Keep this unit data current for watcher AZ/IP checks. + self.watcher_offer.update_unit_address() + if self.unit.is_leader() and "refresh_remove_trigger" not in self.app_peer_data: self.postgresql.drop_hba_triggers() self.app_peer_data["refresh_remove_trigger"] = "True" diff --git a/src/cluster.py b/src/cluster.py index 24ffaaea1f6..2374b9f0d9c 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -53,6 +53,7 @@ POSTGRESQL_CONF_PATH, POSTGRESQL_DATA_PATH, POSTGRESQL_LOGS_PATH, + RAFT_PARTNER_PREFIX, TLS_CA_BUNDLE_FILE, ) from utils import _change_owner, label2name, parallel_patroni_get_request, render_file @@ -529,6 +530,28 @@ def is_member_isolated(self) -> bool: return len(r.json()["members"]) == 0 + def is_member_registered_in_cluster(self) -> bool: + """Check if this member is registered in the Raft DCS cluster. + + In Raft mode, a new member may be running and replicating but not yet + registered in the DCS if it hasn't been added to the Raft cluster. + + Returns: + True if this member appears in the /cluster endpoint, False otherwise. + """ + try: + cluster_status = self.cluster_status() + except RetryError: + logger.debug("Could not get cluster status to check member registration") + return False + + if not cluster_status: + return False + + # Check if this member's name appears in the cluster members list + member_name = self.member_name + return any(member.get("name") == member_name for member in cluster_status) + def online_cluster_members(self) -> list[ClusterMember]: """Return list of online cluster members.""" try: @@ -681,6 +704,9 @@ def render_patroni_yml_file( user_databases_map=user_databases_map, slots=slots, instance_password_encryption=self.charm.config.instance_password_encryption, + watcher=self.charm.watcher_offer.watcher_raft_address + if self.charm.watcher_offer.is_active + else None, ) render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) @@ -880,7 +906,7 @@ def get_running_cluster_members(self) -> list[str]: except Exception: return [] - def remove_raft_member(self, member_ip: str) -> None: + def remove_raft_member(self, member_address: str | None) -> None: """Remove a member from the raft cluster. The raft cluster is a different cluster from the Patroni cluster. @@ -891,6 +917,10 @@ def remove_raft_member(self, member_ip: str) -> None: RaftMemberNotFoundError: if the member to be removed is not part of the raft cluster. """ + if not member_address: + logger.debug("Remove raft member: No address provided") + return + if self.charm.has_raft_keys(): logger.debug("Remove raft member: Raft already in recovery") return @@ -909,12 +939,12 @@ def remove_raft_member(self, member_ip: str) -> None: raise RemoveRaftMemberFailedError() from None # Check whether the member is still part of the raft cluster. - if not member_ip or f"partner_node_status_server_{member_ip}:2222" not in raft_status: + if f"{RAFT_PARTNER_PREFIX}{member_address}" not in raft_status: return # If there's no quorum and the leader left raft cluster is stuck if not raft_status["has_quorum"] and ( - not raft_status["leader"] or raft_status["leader"].host == member_ip + not raft_status["leader"] or raft_status["leader"].address == member_address ): self.charm.set_unit_status( BlockedStatus("Raft majority loss, run: promote-to-primary") @@ -932,10 +962,9 @@ def remove_raft_member(self, member_ip: str) -> None: ) return - # Suppressing since the call will be removed soon # Remove the member from the raft cluster. try: - result = syncobj_util.executeCommand(raft_host, ["remove", f"{member_ip}:2222"]) + result = syncobj_util.executeCommand(raft_host, ["remove", member_address]) except UtilityException: logger.debug("Remove raft member: Remove call failed") raise RemoveRaftMemberFailedError() from None diff --git a/src/constants.py b/src/constants.py index 2d0bd51d9ec..78c429f733d 100644 --- a/src/constants.py +++ b/src/constants.py @@ -92,6 +92,7 @@ WATCHER_SECRET_LABEL = "watcher-secret" # noqa: S105 RAFT_PORT = 2222 +RAFT_PARTNER_PREFIX = "partner_node_status_server_" BACKUP_TYPE_OVERRIDES = {"full": "full", "differential": "diff", "incremental": "incr"} PLUGIN_OVERRIDES = {"audit": "pgaudit", "uuid_ossp": '"uuid-ossp"'} diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index b590f5442ee..f1e45230dc3 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -405,6 +405,7 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: for unit in {*self.charm._peers.units, self.charm.unit} # type: ignore ): self.charm.app_peer_data.update({"cluster_initialised": "True"}) + self.charm.watcher_offer.enable_watcher() elif self._is_following_promoted_cluster(): self.charm.set_unit_status( WaitingStatus("Waiting for the database to be started in all units") @@ -541,10 +542,14 @@ def _on_async_relation_broken(self, _) -> None: self.charm.app_peer_data.update({"promoted-cluster-counter": ""}) self.charm.update_config() + if self.charm.unit.is_leader(): + self.charm.watcher_offer.update_endpoints() + def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: """Update the Patroni configuration if one of the clusters was already promoted.""" if self.charm.unit.is_leader(): self.set_app_status() + self.charm.watcher_offer.update_endpoints() primary_cluster = self._get_primary_cluster() logger.debug("Primary cluster: %s", primary_cluster) @@ -604,6 +609,9 @@ def _on_async_relation_joined(self, _) -> None: "unit-promoted-cluster-counter": highest_promoted_cluster_counter }) + if self.charm.unit.is_leader(): + self.charm.watcher_offer.update_endpoints() + def _on_create_replication(self, event: ActionEvent) -> None: """Set up asynchronous replication between two clusters.""" if self._get_primary_cluster() is not None: @@ -757,6 +765,7 @@ def _stop_database(self, event: RelationChangedEvent) -> bool: if not self.charm.unit.is_leader() and not os.path.exists(POSTGRESQL_DATA_PATH): logger.debug("Early exit on_async_relation_changed: following promoted cluster.") return False + self.charm.watcher_offer.disable_watcher() try: for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): diff --git a/src/relations/watcher.py b/src/relations/watcher.py new file mode 100644 index 00000000000..603d5792bdc --- /dev/null +++ b/src/relations/watcher.py @@ -0,0 +1,564 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""PostgreSQL Watcher Relation implementation. + +This module handles the relation between the PostgreSQL charm and a watcher/witness charm +that participates in the Raft consensus for stereo mode (2-node PostgreSQL clusters). + +The watcher provides quorum without storing data, enabling automatic failover +when one of the two PostgreSQL nodes becomes unavailable. +""" + +import contextlib +import json +import logging +import os +from functools import cached_property +from typing import TYPE_CHECKING + +from ops import ( + Object, + Relation, + RelationBrokenEvent, + RelationChangedEvent, + RelationJoinedEvent, + Secret, + SecretNotFoundError, +) +from pysyncobj.utility import TcpUtility + +from constants import ( + RAFT_PARTNER_PREFIX, + RAFT_PASSWORD_KEY, + RAFT_PORT, + REPLICATION_CONSUMER_RELATION, + REPLICATION_OFFER_RELATION, + WATCHER_OFFER_RELATION, + WATCHER_PASSWORD_KEY, + WATCHER_SECRET_LABEL, + WATCHER_USER, +) +from utils import new_password + +if TYPE_CHECKING: + from charm import PostgresqlOperatorCharm + +logger = logging.getLogger(__name__) + + +class PostgreSQLWatcherRelation(Object): + """Handles the watcher relation for stereo mode support.""" + + def __init__(self, charm: "PostgresqlOperatorCharm"): + """Initialize the watcher relation handler. + + Args: + charm: The PostgreSQL operator charm instance. + """ + super().__init__(charm, WATCHER_OFFER_RELATION) + self.charm = charm + + self.framework.observe( + self.charm.on[WATCHER_OFFER_RELATION].relation_joined, + self._on_watcher_relation_joined, + ) + self.framework.observe( + self.charm.on[WATCHER_OFFER_RELATION].relation_changed, + self._on_watcher_relation_changed, + ) + self.framework.observe( + self.charm.on[WATCHER_OFFER_RELATION].relation_broken, + self._on_watcher_relation_broken, + ) + + @cached_property + def _relation(self) -> Relation | None: + """Return the watcher relation if it exists.""" + return self.model.get_relation(WATCHER_OFFER_RELATION) + + @property + def is_watcher_connected(self) -> bool: + """Check if a watcher is connected to this cluster. + + Returns: + True if a watcher is connected, False otherwise. + """ + try: + syncobj_util = TcpUtility(password=self.charm._patroni.raft_password, timeout=3) + raft_status = syncobj_util.executeCommand(f"127.0.0.1:{RAFT_PORT}", ["status"]) + if raft_status: + # Check if watcher is in the partner_node_status entries + member_key = f"{RAFT_PARTNER_PREFIX}{self.watcher_raft_address}" + return member_key in raft_status + except Exception as e: + logger.debug(f"Error checking Raft membership: {e}") + return False + + def enable_watcher(self) -> None: + """Clear up disable flag.""" + if not self._relation or not self.charm.unit.is_leader(): + return None + + self._relation.data[self.charm.app].pop("disable-watcher", None) + self.update_watcher_secret() + + def disable_watcher(self) -> None: + """Inform watcher to stop service.""" + if not self._relation or not self.charm.unit.is_leader(): + return None + + self._relation.data[self.charm.app].update({"disable-watcher": "True"}) + try: + self.charm._patroni.remove_raft_member(self.watcher_raft_address) + except Exception as e: + logger.warning(f"Error remove Raft watcher: {e}") + + @cached_property + def is_active(self) -> bool: + """Check if the watcher should be added to peers.""" + if not self._relation: + return False + + return self._relation.data[self._relation.app].get("raft-status") == "connected" + + @cached_property + def watcher_raft_address(self) -> str | None: + """Return the watcher's Raft address for inclusion in partner_addrs. + + Returns: + The watcher's Raft address (ip:port), or None if not available. + """ + if not self._relation: + return None + + unit_address = None + port = None + # Get the watcher unit address from the relation data + for unit in self._relation.units: + if unit_address := self._relation.data[unit].get("unit-address"): + break + port_str = self._relation.data[self._relation.app].get("watcher-raft-port") + if port_str: + try: + port = int(port_str) + except ValueError: + logger.warning(f"Invalid watcher-raft-port value: {port_str}") + + if unit_address and port is not None: + return f"{unit_address}:{port}" + return None + + def _on_watcher_relation_joined(self, event: RelationJoinedEvent) -> None: + """Handle a new watcher joining the relation. + + Shares cluster information including Raft password and PostgreSQL endpoints + with the watcher charm. + + Args: + event: The relation joined event. + """ + # Every unit should publish its own per-unit data. + self.update_unit_address(event.relation) + + if not self.charm.unit.is_leader(): + return + + logger.info("Watcher relation joined, sharing cluster information") + + # Ensure watcher user exists before creating the secret, + # so both raft-password and watcher-password are included from the start + watcher_pw = self._ensure_watcher_user() + + # Create or get the watcher secret containing Raft password and watcher password + secret = self._get_or_create_watcher_secret(watcher_password=watcher_pw) + if secret is None: + logger.warning("Failed to create watcher secret, deferring event") + event.defer() + return + + # Grant the secret to the watcher application + try: + secret.grant(event.relation) + except Exception as e: + logger.warning(f"Failed to grant secret to watcher: {e}") + + # Update relation data with cluster information + self._update_relation_data(event.relation) + + def _on_watcher_relation_changed(self, event: RelationChangedEvent) -> None: + """Handle watcher relation data changes. + + Updates Patroni configuration to include the watcher in the Raft cluster. + + Args: + event: The relation changed event. + """ + # Keep this unit's relation data current on every relation-changed hook. + self.update_unit_address(event.relation) + + if not self.charm.is_cluster_initialised: + logger.debug("Cluster not initialized, deferring watcher relation changed") + event.defer() + return + + watcher_address = None + for unit in event.relation.units: + if unit_address := event.relation.data[unit].get("unit-address"): + watcher_address = unit_address + break + + if watcher_address: + logger.info(f"Watcher address updated: {watcher_address}") + # Only the leader handles Raft membership changes and user management + # to avoid race conditions between multiple PostgreSQL units + if self.charm.unit.is_leader(): + self._cleanup_old_watcher_from_raft() + self._ensure_watcher_user() + # Update Patroni configuration to include watcher in Raft + self.charm.update_config() + + # Update relation data for the watcher + if self.charm.unit.is_leader(): + self._update_relation_data(event.relation) + + def _cleanup_old_watcher_from_raft(self) -> None: + """Remove any old watcher IPs from Raft that differ from the current watcher. + + When a watcher unit is replaced (e.g., destroyed and re-deployed), it gets + a new IP address. The old IP remains in the Raft cluster membership, which + prevents the new watcher from being recognized as a valid cluster member. + This method finds and removes any such stale watcher entries. + + Args: + current_watcher_address: The current watcher's IP address. + """ + # Get all PostgreSQL unit IPs (these should stay in the cluster) + # Use _units_ips for fresh IPs from unit relation data + pg_ips = set(self.charm._units_ips) + port_postfix = str(RAFT_PORT) + + # Get Raft cluster status to find all members + try: + syncobj_util = TcpUtility(password=self.charm._patroni.raft_password, timeout=3) + if raft_status := syncobj_util.executeCommand(f"127.0.0.1:{RAFT_PORT}", ["status"]): + # Find all partner nodes in the Raft cluster + # Keys look like: partner_node_status_server_10.131.50.142:2222 + stale_members: list[str] = [] + for key in raft_status: + if ( + key.startswith(RAFT_PARTNER_PREFIX) + and not key.endswith(port_postfix) + and raft_status[key] != 2 + ): + member_addr = key.replace(RAFT_PARTNER_PREFIX, "") + member_ip = member_addr.split(":")[0] + + # Check if this is a stale watcher (not a PostgreSQL node and not current watcher) + if member_ip not in pg_ips and member_addr != self.watcher_raft_address: + stale_members.append(member_addr) + + # Remove stale watcher members + for stale_addr in stale_members: + logger.info(f"Removing stale watcher from Raft cluster: {stale_addr}") + self._remove_watcher_from_raft(stale_addr) + except Exception as e: + logger.debug(f"Error during Raft cleanup: {e}") + + def _on_watcher_relation_broken(self, event: RelationBrokenEvent) -> None: + """Handle watcher relation being broken. + + Updates Patroni configuration to remove the watcher from the Raft cluster. + + Args: + event: The relation broken event. + """ + logger.info("Watcher relation broken, updating Patroni configuration") + + if not self.charm.is_cluster_initialised: + return + + self._cleanup_old_watcher_from_raft() + # Update Patroni configuration without the watcher + self.charm.update_config() + + def _remove_watcher_from_raft(self, watcher_address: str) -> None: + """Remove the watcher from the Raft cluster. + + This is critical for maintaining correct quorum calculations. If a dead + watcher remains in the cluster membership, it counts toward the total + node count, making it harder to achieve quorum. + + Args: + watcher_address: The watcher's IP address. + """ + if self.watcher_raft_address: + logger.info(f"Removing watcher from Raft cluster: {watcher_address}") + self.charm._patroni.remove_raft_member(watcher_address) + + if self.charm.is_cluster_initialised: + self.charm.update_config() + + def _ensure_watcher_user(self) -> str | None: + """Ensure the watcher PostgreSQL user exists for health checks. + + Creates the watcher user if it doesn't exist, and updates the watcher + secret with the password so the watcher charm can authenticate. + + Returns: + The watcher password, or None if user creation failed. + """ + if not self.charm.is_cluster_initialised: + logger.debug("Cluster not initialized, cannot create watcher user") + return None + + try: + users = self.charm.postgresql.list_users() + if WATCHER_USER in users: + logger.debug(f"User {WATCHER_USER} already exists") + # Get existing password from secret if available + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + content = secret.get_content(refresh=True) + existing_pw = content.get(WATCHER_PASSWORD_KEY) + if existing_pw: + return existing_pw + # Password not in secret — fall through to regenerate + except SecretNotFoundError: + # Secret doesn't exist yet, will be created below with new password + pass + + # Generate a password for the watcher user + watcher_password = new_password() + + # Create the watcher user (minimal privileges - only needs to connect and run SELECT 1) + if WATCHER_USER not in users: + logger.info(f"Creating PostgreSQL user: {WATCHER_USER}") + self.charm.postgresql.create_user(WATCHER_USER, watcher_password) + else: + # User exists but we don't have the password, update it + logger.info(f"Updating password for PostgreSQL user: {WATCHER_USER}") + self.charm.postgresql.update_user_password(WATCHER_USER, watcher_password) + + # Grant connect privilege on postgres database (for health checks) + self.charm.postgresql.grant_database_privileges_to_user( + WATCHER_USER, "postgres", ["connect"] + ) + + # Update the secret to include the watcher password + self._update_watcher_secret_with_password(watcher_password) + + return watcher_password + + except Exception as e: + logger.error(f"Failed to ensure watcher user: {e}") + return None + + def _update_watcher_secret_with_password(self, watcher_password: str) -> None: + """Update the watcher secret to include the watcher password. + + Args: + watcher_password: The password for the watcher PostgreSQL user. + """ + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + content = secret.get_content(refresh=True) + content[WATCHER_PASSWORD_KEY] = watcher_password + secret.set_content(content) + logger.info("Updated watcher secret with watcher password") + except SecretNotFoundError: + logger.warning( + "Watcher secret not found, password change cannot be propagated to watcher. " + "It will be synced on next relation-changed event." + ) + except Exception as e: + logger.error(f"Failed to update watcher secret with password: {e}") + + def _get_existing_watcher_password(self) -> str | None: + """Get the watcher password from an existing secret if available.""" + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + content = secret.get_content(refresh=True) + return content.get(WATCHER_PASSWORD_KEY) + except SecretNotFoundError: + return None + except Exception as e: + logger.debug(f"Failed to get existing watcher password: {e}") + return None + + def _get_or_create_watcher_secret(self, watcher_password: str | None = None) -> Secret | None: + """Get or create the secret for sharing Raft credentials with the watcher. + + Args: + watcher_password: Optional watcher password to include in the secret. + + Returns: + The Juju secret containing Raft password, or None if creation failed. + """ + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + logger.debug("Found existing watcher secret") + return secret + except SecretNotFoundError: + logger.debug("No existing watcher secret found, creating new one") + + # Get the Raft password from the internal secret + try: + raft_password = self.charm._patroni.raft_password + except Exception as e: + logger.warning(f"Error getting raft_password: {e}") + raft_password = None + + if not raft_password: + logger.warning("Raft password not available, cannot create secret") + return None + + # Create a new secret with the Raft password (and watcher password if available) + try: + content = {RAFT_PASSWORD_KEY: raft_password} + # Include watcher password if provided, or look it up from existing secret + watcher_pw = watcher_password or self._get_existing_watcher_password() + if watcher_pw: + content[WATCHER_PASSWORD_KEY] = watcher_pw + secret = self.charm.model.app.add_secret( + content=content, + label=WATCHER_SECRET_LABEL, + ) + logger.info("Created watcher secret") + return secret + except Exception as e: + logger.error(f"Failed to create watcher secret: {e}") + return None + + def _update_relation_data(self, relation: Relation) -> None: + """Update the relation data with cluster information. + + Args: + relation: The watcher relation. + """ + if not self.charm.unit.is_leader(): + return + + # Get the secret ID for sharing + try: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + secret_id = secret.id + if not secret_id: + # When a secret is retrieved by label, the ops library may lazily load the ID. + # Calling get_info() forces it to resolve. + secret_id = secret.get_info().id + if secret_id is None: + logger.warning("Watcher secret has no ID") + return + # Ensure the secret is granted to the watcher relation (handles + # cases where the secret was recreated after initial relation_joined) + with contextlib.suppress(Exception): + secret.grant(relation) + except SecretNotFoundError: + logger.warning("Watcher secret not found") + return + except Exception as e: + logger.error(f"Error getting secret: {e}") + return + + # Collect PostgreSQL unit endpoints using fresh IPs from unit relation data. + # _units_ips reads directly from unit relation data (always fresh), while + # _peer_members_ips reads from app peer data (may be stale after network disruptions). + pg_endpoints: list[str] = sorted(self.charm._units_ips) + if not pg_endpoints: + logger.warning("No PostgreSQL endpoints available") + return + + # Update relation data + relation.data[self.charm.app].update({ + "cluster-name": self.charm.cluster_name, + "raft-secret-id": secret_id, + "version": self.charm._patroni.get_postgresql_version(), + "raft-partner-addrs": json.dumps(pg_endpoints), + "raft-port": str(RAFT_PORT), + "patroni-cas": self.charm.tls.get_peer_ca_bundle(), + "standby-clusters": json.dumps(self._get_standby_clusters()), + "tls-enabled": "true" if self.charm.is_tls_enabled else "false", + }) + self.update_watcher_secret() + + # Also share this unit's per-unit data. + self.update_unit_address(relation) + + def update_unit_address(self, relation: Relation | None = None) -> None: + """Update this unit's address in the watcher relation. + + Called when the unit's IP changes (e.g., after network isolation). + This updates unit-specific data in the relation, not application data. + Can be called by any unit, not just the leader. + """ + if relation is None: + relation = self._relation + + if not relation: + return + + unit_ip = self.charm._unit_ip + if unit_ip is None: + return + + changed = False + current_address = relation.data[self.charm.unit].get("unit-address") + if current_address != unit_ip: + logger.info( + f"Updating unit-address in watcher relation from {current_address} to {unit_ip}" + ) + relation.data[self.charm.unit]["unit-address"] = unit_ip + changed = True + + unit_az = os.environ.get("JUJU_AVAILABILITY_ZONE") + current_az = relation.data[self.charm.unit].get("unit-az") + if unit_az and current_az != unit_az: + relation.data[self.charm.unit]["unit-az"] = unit_az + changed = True + + if changed: + logger.debug("Updated watcher relation unit data") + + def update_endpoints(self) -> None: + """Update the watcher with current cluster endpoints. + + Called when cluster membership changes (peer joins/departs). + Also dynamically adds new PostgreSQL peers to the running Raft cluster. + """ + if self.charm.unit.is_leader() and (relation := self._relation): + self._update_relation_data(relation) + + def _get_standby_clusters(self) -> list[str]: + """Return the names of related standby clusters.""" + standby_clusters = [] + for relation in [ + self.model.get_relation(REPLICATION_OFFER_RELATION), + self.model.get_relation(REPLICATION_CONSUMER_RELATION), + ]: + if relation is None: + continue + # We are interested in the other side's application name + if relation.app and self.charm.async_replication.is_primary_cluster(): + standby_clusters.append(relation.app.name) + return sorted(set(standby_clusters)) + + def update_watcher_secret(self) -> None: + """Update the watcher secret with current Raft password. + + Called when credentials are rotated. Preserves existing secret content + (e.g., watcher-password) while updating the Raft password. + """ + if not self.charm.unit.is_leader(): + return + + try: + if raft_password := self.charm._patroni.raft_password: + secret = self.charm.model.get_secret(label=WATCHER_SECRET_LABEL) + content = secret.get_content(refresh=True) + if content.get(RAFT_PASSWORD_KEY) != raft_password: + content[RAFT_PASSWORD_KEY] = raft_password + secret.set_content(content) + logger.info("Updated watcher secret with new Raft password") + except SecretNotFoundError: + logger.debug("Watcher secret not found, nothing to update") diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index d17fad8fe56..9414bb1cf5b 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -37,12 +37,15 @@ raft: data_dir: {{ conf_path }}/raft self_addr: '{{ self_ip }}:2222' password: {{ raft_password }} - {% if partner_addrs -%} + {% if partner_addrs or watcher -%} partner_addrs: {% endif -%} {% for partner_addr in partner_addrs -%} - {{ partner_addr }}:2222 {% endfor %} + {%- if watcher %} + - {{ watcher }} + {% endif %} bootstrap: dcs: @@ -200,6 +203,10 @@ postgresql: {%- endif %} {%- endfor %} {%- endif %} + {%- if watcher_addr %} + # Allow watcher to connect for health checks + - {{ 'hostssl' if enable_tls else 'host' }} postgres watcher {{ watcher_addr }}/32 scram-sha-256 + {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 scram-sha-256 # Allow replications connections from other cluster members. {%- for endpoint in extra_replication_endpoints %} diff --git a/templates/watcher.service.j2 b/templates/watcher.service.j2 deleted file mode 100644 index 2df03728cf1..00000000000 --- a/templates/watcher.service.j2 +++ /dev/null @@ -1,19 +0,0 @@ -[Unit] -Description=PostgreSQL Watcher Raft Service (%i) -After=network.target -Wants=network.target - -[Service] -Type=simple -# charmed-postgresql.patroni-raft-controller app lacks network interfaces -# in the snap profile, so run the controller under the patroni app profile. -ExecStart=/snap/bin/charmed-postgresql.patroni-raft-controller {{ config_file }}/%i/patroni-raft.yaml -Restart=always -RestartSec=5 -TimeoutStartSec=30 -TimeoutStopSec=30 -StandardOutput=journal -StandardError=journal - -[Install] -WantedBy=multi-user.target diff --git a/templates/watcher.yml.j2 b/templates/watcher.yml.j2 deleted file mode 100644 index a1708b2ba54..00000000000 --- a/templates/watcher.yml.j2 +++ /dev/null @@ -1,18 +0,0 @@ -######################################################################################### -# [ WARNING ] -# watcher configuration file maintained by the postgres-operator -# local changes may be overwritten. -######################################################################################### -# For a complete reference of all the options for this configuration file, -# please refer to https://patroni.readthedocs.io/en/latest/SETTINGS.html. - -raft: - {% if partner_addrs -%} - partner_addrs: - {% endif -%} - {% for partner_addr in partner_addrs -%} - - {{ partner_addr }}:2222 - {% endfor %} - self_addr: '{{ self_addr }}:{{ self_port }}' - password: {{ password }} - data_dir: {{ data_dir }}/raft diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 9e042fef302..c1b85b42bd1 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -48,7 +48,13 @@ SwitchoverFailedError, SwitchoverNotSyncError, ) -from constants import PEER, POSTGRESQL_DATA_PATH, SECRET_INTERNAL_LABEL, UPDATE_CERTS_BIN_PATH +from constants import ( + PEER, + POSTGRESQL_DATA_PATH, + RAFT_PORT, + SECRET_INTERNAL_LABEL, + UPDATE_CERTS_BIN_PATH, +) CREATE_CLUSTER_CONF_PATH = "/etc/postgresql-common/createcluster.d/pgcharm.conf" @@ -1916,7 +1922,7 @@ def test_reconfigure_cluster(harness): relation_data = {mock_event.unit: {"ip-to-remove": ip_to_remove}} mock_event.relation.data = relation_data assert not (harness.charm._reconfigure_cluster(mock_event)) - _remove_raft_member.assert_called_once_with(ip_to_remove) + _remove_raft_member.assert_called_once_with(f"{ip_to_remove}:{RAFT_PORT}") _remove_from_members_ips.assert_not_called() _add_members.assert_not_called() @@ -1927,7 +1933,7 @@ def test_reconfigure_cluster(harness): _add_members.reset_mock() mock_event.relation.data = relation_data assert harness.charm._reconfigure_cluster(mock_event) - _remove_raft_member.assert_called_once_with(ip_to_remove) + _remove_raft_member.assert_called_once_with(f"{ip_to_remove}:{RAFT_PORT}") _remove_from_members_ips.assert_not_called() _add_members.assert_called_once_with(mock_event) @@ -1940,7 +1946,7 @@ def test_reconfigure_cluster(harness): rel_id, harness.charm.app.name, {"members_ips": '["' + ip_to_remove + '"]'} ) assert harness.charm._reconfigure_cluster(mock_event) - _remove_raft_member.assert_called_once_with(ip_to_remove) + _remove_raft_member.assert_called_once_with(f"{ip_to_remove}:{RAFT_PORT}") _remove_from_members_ips.assert_called_once_with(ip_to_remove) _add_members.assert_called_once_with(mock_event) @@ -1967,6 +1973,10 @@ def test_update_member_ip(harness): with ( patch("charm.PostgresqlOperatorCharm._update_certificate") as _update_certificate, patch("charm.Patroni.stop_patroni") as _stop_patroni, + patch("charm.PostgresqlOperatorCharm.update_endpoint_addresses"), + patch("charm.PostgresqlOperatorCharm.update_config"), + patch.object(harness.charm.watcher_offer, "update_unit_address"), + patch.object(harness.charm.watcher_offer, "update_endpoints"), ): rel_id = harness.model.get_relation(PEER).id # Test when the IP address of the unit hasn't changed. @@ -2557,7 +2567,7 @@ def test_on_peer_relation_departed(harness): mock_ip_address = "1.1.1.1" _get_member_ip.return_value = mock_ip_address harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_not_called() _get_ips_to_remove.assert_not_called() @@ -2572,7 +2582,7 @@ def test_on_peer_relation_departed(harness): event.defer.reset_mock() _remove_raft_member.side_effect = None harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_not_called() _updated_synchronous_node_count.assert_not_called() _get_ips_to_remove.assert_not_called() @@ -2587,7 +2597,7 @@ def test_on_peer_relation_departed(harness): with harness.hooks_disabled(): harness.set_leader() harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_not_called() _get_ips_to_remove.assert_not_called() @@ -2604,7 +2614,7 @@ def test_on_peer_relation_departed(harness): rel_id, harness.charm.app.name, {"cluster_initialised": "True"} ) harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_not_called() @@ -2619,7 +2629,7 @@ def test_on_peer_relation_departed(harness): _updated_synchronous_node_count.reset_mock() harness.add_relation_unit(rel_id, f"{harness.charm.app.name}/2") harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_not_called() @@ -2635,7 +2645,7 @@ def test_on_peer_relation_departed(harness): _updated_synchronous_node_count.reset_mock() _updated_synchronous_node_count.return_value = True harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_not_called() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_called_once() @@ -2653,7 +2663,7 @@ def test_on_peer_relation_departed(harness): _get_ips_to_remove.return_value = ips_to_remove _are_all_members_ready.return_value = False harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_called_once() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_called_once() @@ -2669,7 +2679,7 @@ def test_on_peer_relation_departed(harness): _get_ips_to_remove.reset_mock() _are_all_members_ready.return_value = True harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_not_called() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_called_once() @@ -2688,7 +2698,7 @@ def test_on_peer_relation_departed(harness): _update_relation_endpoints.reset_mock() _primary_endpoint.return_value = None harness.charm._on_peer_relation_departed(event) - _remove_raft_member.assert_called_once_with(mock_ip_address) + _remove_raft_member.assert_called_once_with(f"{mock_ip_address}:2222") event.defer.assert_not_called() _updated_synchronous_node_count.assert_called_once_with() _get_ips_to_remove.assert_called_once() diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 44ba0841275..22781b63d0e 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -672,7 +672,7 @@ def test_remove_raft_member(patroni): # Member already removed _tcp_utility.return_value.executeCommand.return_value = "Response message" - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") _tcp_utility.assert_called_once_with(password="fake-raft-password", timeout=3) _tcp_utility.return_value.executeCommand.assert_called_once_with( @@ -686,7 +686,7 @@ def test_remove_raft_member(patroni): "SUCCESS", ] - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") _tcp_utility.assert_called_once_with(password="fake-raft-password", timeout=3) assert _tcp_utility.return_value.executeCommand.call_count == 2 @@ -703,7 +703,7 @@ def test_remove_raft_member(patroni): ] with pytest.raises(RemoveRaftMemberFailedError): - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert False # Raises on remove error @@ -713,16 +713,14 @@ def test_remove_raft_member(patroni): ] with pytest.raises(RemoveRaftMemberFailedError): - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert False # Raises on status error - _tcp_utility.return_value.executeCommand.side_effect = [ - UtilityException, - ] + _tcp_utility.return_value.executeCommand.side_effect = [UtilityException] with pytest.raises(RemoveRaftMemberFailedError): - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert False @@ -745,7 +743,7 @@ def test_remove_raft_member_no_quorum(patroni, harness): "members": [{"role": "async_replica", "name": "postgresql-0"}] } - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert harness.charm.unit_peer_data == {"raft_stuck": "True"} # No health @@ -757,14 +755,14 @@ def test_remove_raft_member_no_quorum(patroni, harness): } _get.side_effect = Exception - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert harness.charm.unit_peer_data == {"raft_stuck": "True"} # Sync replica _unit_peer_data.return_value = {} leader_mock = Mock() - leader_mock.host = "1.2.3.4" + leader_mock.address = "1.2.3.4:2222" _tcp_utility.return_value.executeCommand.return_value = { "partner_node_status_server_1.2.3.4:2222": 0, "has_quorum": False, @@ -775,7 +773,7 @@ def test_remove_raft_member_no_quorum(patroni, harness): "members": [{"role": "sync_standby", "name": "postgresql-0"}] } - patroni.remove_raft_member("1.2.3.4") + patroni.remove_raft_member("1.2.3.4:2222") assert harness.charm.unit_peer_data == {"raft_stuck": "True"} diff --git a/tests/unit/test_raft_controller.py b/tests/unit/test_raft_controller.py deleted file mode 100644 index f167c6233df..00000000000 --- a/tests/unit/test_raft_controller.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -from pathlib import Path -from unittest.mock import MagicMock, patch - -from charmlibs.systemd import SystemdError -from jinja2 import Template -from pytest import fixture - -from raft_controller import SERVICE_FILE, RaftController, install_service - - -@fixture -def controller(tmp_path: Path) -> RaftController: - controller = RaftController(MagicMock(), instance_id="rel42") - controller.data_dir = str(tmp_path / "watcher-raft" / "rel42") - controller.config_file = str(tmp_path / "watcher-raft" / "rel42" / "patroni-raft.yaml") - controller.service_name = "watcher-raft-rel42" - controller.service_file = str(tmp_path / "watcher-raft-rel42.service") - return controller - - -def test_configure(tmp_path: Path, controller: RaftController): - with open("templates/watcher.yml.j2") as file: - contents = file.read() - template = Template(contents) - - expected_content = template.render( - self_addr="10.0.0.1", - self_port=2222, - partner_addrs=["10.0.0.2"], - password="secret", - data_dir=f"{tmp_path}/watcher-raft/rel42", - ) - with ( - patch("raft_controller.render_file") as _render_file, - patch("raft_controller.create_directory") as _create_directory, - ): - assert controller.configure(2222, "10.0.0.1", ["10.0.0.2"], "secret") - - assert _create_directory.call_count == 2 - _create_directory.assert_any_call(f"{tmp_path}/watcher-raft/rel42", 0o700) - _create_directory.assert_any_call(f"{tmp_path}/watcher-raft/rel42/raft", 0o700) - _render_file.assert_called_once_with( - f"{tmp_path}/watcher-raft/rel42/patroni-raft.yaml", expected_content, 0o600 - ) - - -def test_remove_service_disables_unit_and_deletes_dir(tmp_path: Path, controller: RaftController): - Path(controller.service_file).write_text("[Unit]\nDescription=test\n") - - with ( - patch("raft_controller.service_running") as _service_running, - patch("raft_controller.service_stop") as _service_stop, - patch("raft_controller.service_disable") as _service_disable, - patch("raft_controller.rmtree") as _rmtree, - ): - assert controller.remove_service() - _service_running.assert_called_once_with(controller.service_name) - _service_stop.assert_called_once_with(controller.service_name) - _service_disable.assert_called_once_with(controller.service_name) - _rmtree.assert_called_once_with(controller.data_dir) - - -def test_install_service_returns_false_when_daemon_reload_fails( - tmp_path: Path, controller: RaftController -): - with ( - patch("raft_controller.daemon_reload") as _daemon_reload, - patch("raft_controller.render_file"), - patch("raft_controller.create_directory"), - ): - _daemon_reload.side_effect = SystemdError - - assert not install_service() - - -def test_install_service_uses_patroni_profile_execstart( - tmp_path: Path, controller: RaftController -): - with open("templates/watcher.service.j2") as file: - contents = file.read() - template = Template(contents) - - expected_content = template.render( - config_file="/var/snap/charmed-postgresql/common/watcher-raft" - ) - - with ( - patch("raft_controller.daemon_reload") as _daemon_reload, - patch("raft_controller.render_file") as _render_file, - patch("raft_controller.create_directory"), - ): - assert install_service() - - _render_file.assert_called_once_with(SERVICE_FILE, expected_content, 0o644, change_owner=False) - _daemon_reload.assert_called_once_with() diff --git a/tests/unit/test_watcher_relation.py b/tests/unit/test_watcher_relation.py new file mode 100644 index 00000000000..6daa309e149 --- /dev/null +++ b/tests/unit/test_watcher_relation.py @@ -0,0 +1,288 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Unit tests for the PostgreSQL watcher relation handler.""" + +from unittest.mock import MagicMock, PropertyMock, patch + +from src.relations.watcher import PostgreSQLWatcherRelation + + +def create_mock_charm(): + """Create a mock charm for testing.""" + mock_charm = MagicMock() + mock_charm.unit.is_leader.return_value = True + mock_charm.cluster_name = "postgresql" + mock_charm._unit_ip = "10.0.0.1" + mock_charm._patroni.unit_ip = "10.0.0.1" + mock_charm._patroni.peers_ips = {"10.0.0.2"} + mock_charm._patroni.raft_password = "test-raft-password" + mock_charm.is_cluster_initialised = True + mock_charm.update_config = MagicMock() + return mock_charm + + +def create_mock_relation(): + """Create a mock relation for testing.""" + mock_relation = MagicMock() + mock_relation.data = { + MagicMock(): {}, # app data + MagicMock(): {}, # unit data + } + mock_relation.units = set() + return mock_relation + + +class TestWatcherRelation: + """Tests for PostgreSQLWatcherRelation class.""" + + def test_watcher_address_no_relation(self): + """Test watcher_address returns None when no relation exists.""" + mock_charm = create_mock_charm() + + with patch.object( + PostgreSQLWatcherRelation, + "_relation", + new_callable=PropertyMock, + return_value=None, + ): + relation = PostgreSQLWatcherRelation(mock_charm) + assert relation.watcher_raft_address is None + + def test_watcher_address_with_relation(self): + """Test watcher_address returns the watcher IP when available.""" + mock_charm = create_mock_charm() + mock_relation = MagicMock() + + # Create a mock unit with unit-address + mock_unit = MagicMock() + mock_relation.units = {mock_unit} + mock_relation.data = { + mock_unit: {"unit-address": "10.0.0.10"}, + mock_relation.app: {"watcher-raft-port": "2222"}, + } + + with patch.object( + PostgreSQLWatcherRelation, + "_relation", + new_callable=PropertyMock, + return_value=mock_relation, + ): + relation = PostgreSQLWatcherRelation(mock_charm) + assert relation.watcher_raft_address == "10.0.0.10:2222" + + def test_on_watcher_relation_joined_not_leader(self): + """Test relation joined event is ignored for non-leader units.""" + mock_charm = create_mock_charm() + mock_charm.unit.is_leader.return_value = False + mock_event = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + + with ( + patch.object(relation, "update_unit_address") as update_unit_address, + patch.object(relation, "_get_or_create_watcher_secret") as mock_secret, + ): + relation._on_watcher_relation_joined(mock_event) + update_unit_address.assert_called_once_with(mock_event.relation) + mock_secret.assert_not_called() + + def test_on_watcher_relation_joined_leader(self): + """Test relation joined event creates secret for leader.""" + mock_charm = create_mock_charm() + mock_event = MagicMock() + mock_secret = MagicMock() + mock_secret.id = "secret:abc123" + + relation = PostgreSQLWatcherRelation(mock_charm) + + with ( + patch.object(relation, "_get_or_create_watcher_secret", return_value=mock_secret), + patch.object(relation, "_update_relation_data") as mock_update, + ): + relation._on_watcher_relation_joined(mock_event) + mock_secret.grant.assert_called_once_with(mock_event.relation) + mock_update.assert_called_once_with(mock_event.relation) + + def test_on_watcher_relation_joined_no_secret(self): + """Test relation joined event defers when secret creation fails.""" + mock_charm = create_mock_charm() + mock_event = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(relation, "_get_or_create_watcher_secret", return_value=None): + relation._on_watcher_relation_joined(mock_event) + mock_event.defer.assert_called_once() + + def test_on_watcher_relation_changed_not_initialized(self): + """Test relation changed event defers when cluster not initialized.""" + mock_charm = create_mock_charm() + mock_charm.is_cluster_initialised = False + mock_event = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + relation._on_watcher_relation_changed(mock_event) + + mock_event.defer.assert_called_once() + + def test_on_watcher_relation_changed_updates_config(self): + """Test relation changed event updates Patroni config.""" + mock_charm = create_mock_charm() + mock_event = MagicMock() + + # Setup mock relation with watcher unit + mock_unit = MagicMock() + mock_event.relation.units = {mock_unit} + mock_event.relation.data = { + mock_unit: {"unit-address": "10.0.0.10"}, + mock_charm.unit: {}, + } + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(relation, "_update_relation_data"): + relation._on_watcher_relation_changed(mock_event) + mock_charm.update_config.assert_called_once() + + def test_update_relation_data_not_leader(self): + """Test _update_relation_data does nothing for non-leader.""" + mock_charm = create_mock_charm() + mock_charm.unit.is_leader.return_value = False + mock_relation = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + relation._update_relation_data(mock_relation) + + # Should not try to update relation data + assert not mock_relation.data[mock_charm.app].update.called + + def test_update_relation_data_leader(self): + """Test _update_relation_data populates relation data correctly.""" + mock_charm = create_mock_charm() + mock_charm._units_ips = ["10.0.0.1", "10.0.0.2"] # Mock PostgreSQL endpoints + mock_charm._unit_ip = "10.0.0.1" + mock_relation = MagicMock() + mock_relation.data = { + mock_charm.app: {}, + mock_charm.unit: {}, + } + + mock_secret = MagicMock() + mock_secret.id = "secret:abc123" + + relation = PostgreSQLWatcherRelation(mock_charm) + + with ( + patch.object(mock_charm.model, "get_secret", return_value=mock_secret), + patch.object(relation, "_get_standby_clusters", return_value=[]), + ): + relation._update_relation_data(mock_relation) + + # Verify app data was updated + app_data = mock_relation.data[mock_charm.app] + assert "cluster-name" in app_data + assert app_data["cluster-name"] == "postgresql" + assert "raft-secret-id" in app_data + assert "raft-partner-addrs" in app_data + assert "raft-port" in app_data + + # Verify unit data was updated + unit_data = mock_relation.data[mock_charm.unit] + assert "unit-address" in unit_data + + def test_update_unit_address_updates_az(self): + """Test update_unit_address also publishes unit AZ.""" + mock_charm = create_mock_charm() + mock_relation = MagicMock() + mock_relation.data = { + mock_charm.unit: { + "unit-address": "10.0.0.1", + } + } + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.dict("os.environ", {"JUJU_AVAILABILITY_ZONE": "az1"}, clear=False): + relation.update_unit_address(mock_relation) + + assert mock_relation.data[mock_charm.unit]["unit-az"] == "az1" + + def test_update_watcher_secret_not_leader(self): + """Test update_watcher_secret does nothing for non-leader.""" + mock_charm = create_mock_charm() + mock_charm.unit.is_leader.return_value = False + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(mock_charm.model, "get_secret") as mock_get: + relation.update_watcher_secret() + mock_get.assert_not_called() + + def test_update_watcher_secret_leader(self): + """Test update_watcher_secret updates secret content.""" + mock_charm = create_mock_charm() + mock_secret = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(mock_charm.model, "get_secret", return_value=mock_secret): + relation.update_watcher_secret() + mock_secret.set_content.assert_called_once() + + +class TestWatcherRelationSecrets: + """Tests for secret management in watcher relation.""" + + def test_get_or_create_watcher_secret_existing(self): + """Test _get_or_create_watcher_secret returns existing secret.""" + mock_charm = create_mock_charm() + mock_secret = MagicMock() + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object(mock_charm.model, "get_secret", return_value=mock_secret): + result = relation._get_or_create_watcher_secret() + assert result == mock_secret + + def test_get_or_create_watcher_secret_creates_new(self): + """Test _get_or_create_watcher_secret creates new secret.""" + mock_charm = create_mock_charm() + mock_secret = MagicMock() + + from ops import SecretNotFoundError + + relation = PostgreSQLWatcherRelation(mock_charm) + + with ( + patch.object( + mock_charm.model, + "get_secret", + side_effect=SecretNotFoundError("not found"), + ), + patch.object( + mock_charm.model.app, + "add_secret", + return_value=mock_secret, + ), + ): + result = relation._get_or_create_watcher_secret() + assert result == mock_secret + mock_charm.model.app.add_secret.assert_called_once() + + def test_get_or_create_watcher_secret_no_raft_password(self): + """Test _get_or_create_watcher_secret returns None without password.""" + mock_charm = create_mock_charm() + mock_charm._patroni.raft_password = None + + from ops import SecretNotFoundError + + relation = PostgreSQLWatcherRelation(mock_charm) + + with patch.object( + mock_charm.model, + "get_secret", + side_effect=SecretNotFoundError("not found"), + ): + result = relation._get_or_create_watcher_secret() + assert result is None From a7ca4e733ba4c1c9ef1a5e8b560aa653b48da581 Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Tue, 5 May 2026 19:17:03 +0300 Subject: [PATCH 2/6] Linting --- src/charm.py | 22 +-- src/raft_controller.py | 414 ----------------------------------------- 2 files changed, 11 insertions(+), 425 deletions(-) delete mode 100644 src/raft_controller.py diff --git a/src/charm.py b/src/charm.py index aefccc8e287..a1c2c5aafd3 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1030,17 +1030,6 @@ def _on_peer_relation_changed(self, event: HookEvent): self._update_new_unit_status() - def _on_secret_changed(self, event: SecretChangedEvent) -> None: - """Handle the secret_changed event.""" - if not self.unit.is_leader(): - return - - if (admin_secret_id := self.config.system_users) and admin_secret_id == event.secret.id: - try: - self._update_admin_password(admin_secret_id) - except PostgreSQLUpdateUserPasswordError: - event.defer() - # Split off into separate function, because of complexity _on_peer_relation_changed def _handle_s3_initialization(self, event: HookEvent) -> bool: """Handle S3 initialization during peer relation changes. @@ -1069,6 +1058,17 @@ def _handle_s3_initialization(self, event: HookEvent) -> bool: return True + def _on_secret_changed(self, event: SecretChangedEvent) -> None: + """Handle the secret_changed event.""" + if not self.unit.is_leader(): + return + + if (admin_secret_id := self.config.system_users) and admin_secret_id == event.secret.id: + try: + self._update_admin_password(admin_secret_id) + except PostgreSQLUpdateUserPasswordError: + event.defer() + # Split off into separate function, because of complexity _on_peer_relation_changed def _start_stop_pgbackrest_service(self, event: HookEvent) -> None: # Start or stop the pgBackRest TLS server service when TLS certificate change. diff --git a/src/raft_controller.py b/src/raft_controller.py deleted file mode 100644 index ad009d97ae0..00000000000 --- a/src/raft_controller.py +++ /dev/null @@ -1,414 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Raft controller management for PostgreSQL watcher. - -This module manages a Patroni raft_controller node that participates in -consensus without running PostgreSQL, providing the necessary third vote -for quorum in 2-node PostgreSQL clusters. - -Uses Patroni's own ``patroni_raft_controller`` from the charmed-postgresql -snap, which is the same battle-tested Raft implementation used by the -PostgreSQL nodes. This guarantees wire compatibility with Patroni's -KVStoreTTL class. - -The Raft service runs as a systemd service to ensure it persists between -charm hook invocations. -""" - -import logging -from contextlib import suppress -from ipaddress import IPv4Address -from shutil import rmtree -from typing import TYPE_CHECKING, TypedDict - -import psycopg2 -from charmlibs.systemd import ( - SystemdError, - daemon_reload, - service_disable, - service_enable, - service_restart, - service_running, - service_start, - service_stop, -) -from jinja2 import Template -from pysyncobj.utility import TcpUtility -from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed - -from cluster import ClusterMember -from constants import PATRONI_CLUSTER_STATUS_ENDPOINT -from utils import create_directory, parallel_patroni_get_request, render_file - -if TYPE_CHECKING: - from charm import PostgresqlOperatorCharm - -logger = logging.getLogger(__name__) - -# Base directory for all Raft instances. -# Must be under the snap's common path so that -# charmed-postgresql.patroni-raft-controller can access it. -RAFT_BASE_DIR = "/var/snap/charmed-postgresql/common/watcher-raft" -SERVICE_FILE = "/etc/systemd/system/watcher-raft@.service" - -# Default health check configuration -DEFAULT_RETRY_COUNT = 3 -DEFAULT_RETRY_INTERVAL_SECONDS = 7 -DEFAULT_QUERY_TIMEOUT_SECONDS = 5 -DEFAULT_CHECK_INTERVAL_SECONDS = 10 - -# TCP keepalive settings to detect dead connections quickly -TCP_KEEPALIVE_IDLE = 1 # Start keepalive probes after 1 second of idle -TCP_KEEPALIVE_INTERVAL = 1 # Send keepalive probes every 1 second -TCP_KEEPALIVE_COUNT = 3 # Consider connection dead after 3 failed probes - - -class ClusterStatus(TypedDict): - """Type definition for the cluster status mapping.""" - - running: bool - connected: bool - has_quorum: bool - leader: str | None - members: list[str] - - -def install_service() -> bool: - """Install the systemd template service for the Raft controller. - - Returns: - True if the service file was updated, False if unchanged. - """ - with open("templates/watcher.service.j2") as file: - template = Template(file.read()) - - rendered = template.render(config_file=RAFT_BASE_DIR) - render_file(SERVICE_FILE, rendered, 0o644, change_owner=False) - - # Reload systemd to pick up the new service - try: - daemon_reload() - logger.info(f"Installed systemd service {SERVICE_FILE}") - except SystemdError as e: - logger.error(f"Failed to reload systemd: {e}") - return False - - return True - - -class RaftController: - """Manages the Raft service for consensus participation. - - The Raft service runs as a systemd service to ensure it persists - between charm hook invocations. This is necessary because: - 1. Each hook invocation creates a new Python process - 2. pysyncobj requires a persistent process for Raft consensus - 3. The systemd service ensures the Raft node stays running - """ - - def __init__(self, charm: "PostgresqlOperatorCharm", instance_id: str = "default"): - """Initialize the Raft controller. - - Args: - charm: The PostgreSQL watcher charm instance. - instance_id: Unique identifier for this Raft instance. Used to - derive data directories, config files, and service names. - Defaults to "default" for backward compatibility. - - """ - self.charm = charm - self.instance_id = instance_id - - # Derive all paths from instance_id - self.data_dir = f"{RAFT_BASE_DIR}/{instance_id}" - self.config_file = f"{RAFT_BASE_DIR}/{instance_id}/patroni-raft.yaml" - self.ca_file = f"{RAFT_BASE_DIR}/{instance_id}/patroni-ca.pem" - self.service_name = f"watcher-raft@{instance_id}" - - def configure( - self, - self_port: int, - self_addr: str | None = None, - partner_addrs: list[str] | None = None, - password: str | None = None, - cas: str | None = None, - ) -> bool: - """Configure the Raft controller. - - Args: - self_port: This node's Raft port. - self_addr: This node's Raft address. - partner_addrs: List of partner Raft addresses. - password: Raft cluster password. - cas: Patroni CA bundle. - - Returns: - True if configuration changed, False if unchanged. - """ - if not partner_addrs: - partner_addrs = [] - - # Ensure data directory exists - create_directory(self.data_dir, 0o700) - create_directory(f"{self.data_dir}/raft", 0o700) - - if not self_addr or not password: - logger.warning("Cannot install service: not configured") - return False - - # Validate addresses to prevent injection into the systemd unit file - try: - IPv4Address(self_addr) - except Exception: - logger.error(f"Invalid self_addr format: {self_addr}") - return False - try: - for addr in partner_addrs: - IPv4Address(addr) - except Exception: - logger.error(f"Invalid partner address format: {addr}") - return False - - with open("templates/watcher.yml.j2") as file: - template = Template(file.read()) - - # Write Patroni-compatible YAML config (includes password) - rendered = template.render( - self_addr=self_addr, - self_port=self_port, - partner_addrs=partner_addrs, - password=password, - data_dir=self.data_dir, - ) - render_file(self.config_file, rendered, 0o600) - if cas: - render_file(self.ca_file, cas, 0o600) - - logger.info(f"Raft controller configured: self={self_addr}, partners={partner_addrs}") - return True - - def start(self) -> bool: - """Start the Raft controller service. - - Returns: - True if started successfully, False otherwise. - """ - if service_running(self.service_name): - logger.debug("Raft controller already running") - return True - - try: - # Enable and start the service - service_enable(self.service_name) - service_start(self.service_name) - logger.info(f"Started Raft controller service {self.service_name}") - return True - except SystemdError as e: - logger.error(f"Failed to start Raft controller: {e}") - return False - - def stop(self) -> bool: - """Stop the Raft controller service. - - Returns: - True if stopped successfully, False otherwise. - """ - if not service_running(self.service_name): - logger.debug("Raft controller not running") - return True - - try: - service_stop(self.service_name) - logger.info(f"Stopped Raft controller service {self.service_name}") - return True - except SystemdError as e: - logger.error(f"Failed to stop Raft controller: {e}") - return False - - def remove_service(self) -> bool: - """Disable and remove the Raft systemd service unit file.""" - if not self.stop(): - return False - - try: - service_disable(self.service_name) - except SystemdError as e: - logger.error(f"Failed to disable Raft controller service: {e}") - return False - - try: - rmtree(self.data_dir) - except Exception as e: - logger.error(f"Failed to remove Raft controller directory: {e}") - return False - - return True - - def restart(self) -> bool: - """Restart the Raft controller service. - - Returns: - True if restarted successfully, False otherwise. - """ - try: - service_restart(self.service_name) - logger.info(f"Restarted Raft controller service {self.service_name}") - return True - except SystemdError as e: - logger.error(f"Failed to restart Raft controller: {e}") - return False - - def get_status(self, self_port: int, password: str | None) -> ClusterStatus: - """Get the Raft controller status. - - Returns: - Dictionary with status information. - """ - is_running = service_running(self.service_name) - status: ClusterStatus = { - "running": is_running, - "connected": False, - "has_quorum": False, - "leader": None, - "members": [], - } - - if not password or not is_running: - return status - - # Query Raft status using pysyncobj TcpUtility - try: - utility = TcpUtility(password=password, timeout=3) - raft_status = utility.executeCommand(f"localhost:{self_port}", ["status"]) - status["connected"] = True - status["has_quorum"] = raft_status.get("has_quorum", False) - status["leader"] = ( - str(raft_status.get("leader")) if raft_status.get("leader") else None - ) - - # Extract member addresses from partner_node_status_server_* keys - prefix = "partner_node_status_server_" - members: list[str] = [str(raft_status["self"])] - for key in raft_status: - if key.startswith(prefix): - members.append(key[len(prefix) :]) - status["members"] = sorted(members) - return status - except Exception as e: - logger.debug(f"Error querying Raft status via TcpUtility: {e}") - - return status - - def check_all_endpoints(self, endpoints: list[str], password: str) -> dict[str, bool]: - """Test connectivity to all PostgreSQL endpoints. - - WARNING: This method uses blocking time.sleep() for retry intervals - (up to ~38s worst case with 2 endpoints). Only call from Juju actions, - never from hook handlers. - - Args: - endpoints: List of PostgreSQL unit IP addresses. - password: Password for the watcher user. - - Returns: - Dictionary mapping endpoint IP to health status data. - """ - results: dict[str, bool] = {} - for endpoint in endpoints: - results[endpoint] = self._check_endpoint_with_retries(endpoint, password) - - self._last_health_results = results - return results - - def _check_endpoint_with_retries(self, endpoint: str, password: str) -> bool: - """Check a single endpoint with retry logic. - - Per acceptance criteria: Repeat tests at least 3 times before - deciding that an instance is no longer reachable, waiting 7 seconds - between every try. - - Args: - endpoint: PostgreSQL endpoint IP address. - password: Password for the watcher user. - - Returns: - Dictionary with health status data. - """ - with suppress(RetryError): - for attempt in Retrying( - stop=stop_after_attempt(DEFAULT_RETRY_COUNT), - wait=wait_fixed(DEFAULT_RETRY_INTERVAL_SECONDS), - ): - with attempt: - if result := self._execute_health_query(endpoint, password): - logger.debug(f"Health check passed for {endpoint}") - return result - raise Exception(f"Cannot reach {endpoint}") - - logger.error(f"Endpoint {endpoint} unhealthy after {DEFAULT_RETRY_COUNT} attempts") - return False - - def _execute_health_query(self, endpoint: str, password: str) -> bool: - """Execute health check queries with TCP keepalive and timeout. - - Per acceptance criteria: - - Testing actual queries (SELECT 1) - - Using direct and reserved connections (no pgbouncer) - - Setting TCP keepalive to avoid hanging on dead connections - - Setting query timeout - - Args: - endpoint: PostgreSQL endpoint IP address. - password: Password for the watcher user. - - Returns: - Dictionary with health info (is_in_recovery, etc.) or None if failed. - """ - connection = None - result = False - try: - # Connect directly to PostgreSQL port 5432 (not pgbouncer 6432) - # Using the 'postgres' database which always exists - with ( - psycopg2.connect( - host=endpoint, - port=5432, - dbname="postgres", - user="watcher", - password=password, - connect_timeout=DEFAULT_QUERY_TIMEOUT_SECONDS, - # TCP keepalive settings per acceptance criteria - keepalives=1, - keepalives_idle=TCP_KEEPALIVE_IDLE, - keepalives_interval=TCP_KEEPALIVE_INTERVAL, - keepalives_count=TCP_KEEPALIVE_COUNT, - # Set options for query timeout - options=f"-c statement_timeout={DEFAULT_QUERY_TIMEOUT_SECONDS * 1000}", - ) as connection, - connection.cursor() as cursor, - ): - # Query recovery status to determine primary vs replica - cursor.execute("SELECT 1") - result = True - - except psycopg2.Error as e: - # Other database errors - logger.debug(f"Database error on {endpoint}: {e}") - finally: - if connection is not None: - try: - connection.close() - except psycopg2.Error as e: - logger.debug(f"Failed to close connection to {endpoint}: {e}") - return result - - def cluster_status(self, endpoints: list[str]) -> list[ClusterMember]: - """Query the cluster status.""" - # Request info from cluster endpoint (which returns all members of the cluster). - if response := parallel_patroni_get_request( - f"/{PATRONI_CLUSTER_STATUS_ENDPOINT}", endpoints, self.ca_file, None - ): - logger.debug("API cluster_status: %s", response["members"]) - return response["members"] - return [] From 7bee41e2a6f83caa0ede421aef0a9872655a3f5e Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Wed, 6 May 2026 19:37:31 +0300 Subject: [PATCH 3/6] Remove stale watcher --- src/relations/watcher.py | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/src/relations/watcher.py b/src/relations/watcher.py index 603d5792bdc..6111df831e7 100644 --- a/src/relations/watcher.py +++ b/src/relations/watcher.py @@ -261,7 +261,7 @@ def _cleanup_old_watcher_from_raft(self) -> None: # Remove stale watcher members for stale_addr in stale_members: logger.info(f"Removing stale watcher from Raft cluster: {stale_addr}") - self._remove_watcher_from_raft(stale_addr) + self.charm._patroni.remove_raft_member(stale_addr) except Exception as e: logger.debug(f"Error during Raft cleanup: {e}") @@ -273,32 +273,15 @@ def _on_watcher_relation_broken(self, event: RelationBrokenEvent) -> None: Args: event: The relation broken event. """ - logger.info("Watcher relation broken, updating Patroni configuration") - if not self.charm.is_cluster_initialised: return + logger.info("Watcher relation broken, updating Patroni configuration") + self.watcher_raft_address = None self._cleanup_old_watcher_from_raft() # Update Patroni configuration without the watcher self.charm.update_config() - def _remove_watcher_from_raft(self, watcher_address: str) -> None: - """Remove the watcher from the Raft cluster. - - This is critical for maintaining correct quorum calculations. If a dead - watcher remains in the cluster membership, it counts toward the total - node count, making it harder to achieve quorum. - - Args: - watcher_address: The watcher's IP address. - """ - if self.watcher_raft_address: - logger.info(f"Removing watcher from Raft cluster: {watcher_address}") - self.charm._patroni.remove_raft_member(watcher_address) - - if self.charm.is_cluster_initialised: - self.charm.update_config() - def _ensure_watcher_user(self) -> str | None: """Ensure the watcher PostgreSQL user exists for health checks. From fc24489f5a6f07d0745fac9e8baeabe283f94a9e Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Thu, 7 May 2026 09:45:17 +0300 Subject: [PATCH 4/6] Stereo test --- .../integration/ha_tests/test_stereo_mode.py | 844 ++++++++++++++++++ tests/spread/test_stereo_mode.py/task.yaml | 7 + 2 files changed, 851 insertions(+) create mode 100644 tests/integration/ha_tests/test_stereo_mode.py create mode 100644 tests/spread/test_stereo_mode.py/task.yaml diff --git a/tests/integration/ha_tests/test_stereo_mode.py b/tests/integration/ha_tests/test_stereo_mode.py new file mode 100644 index 00000000000..8bf236897b4 --- /dev/null +++ b/tests/integration/ha_tests/test_stereo_mode.py @@ -0,0 +1,844 @@ +#!/usr/bin/env python3 +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Integration tests for PostgreSQL stereo mode with watcher. + +Tests the deployment and failover scenarios for 2-node PostgreSQL clusters +with a watcher/witness node for quorum. + +Test scenarios from acceptance criteria: +1. Replica shutdown: clients rerouted to primary, no significant outage +2. Primary shutdown: replica promoted, old primary becomes replica when healthy +3. Watcher shutdown: no service outage +4. Network isolation variants of above +""" + +import asyncio +import logging + +import pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed +from yaml import safe_load + +from ..helpers import APPLICATION_NAME, DATABASE_APP_NAME +from .helpers import APPLICATION_NAME as TEST_APP_NAME +from .helpers import ( + are_writes_increasing, + check_writes, + cut_network_from_unit, + cut_network_from_unit_without_ip_change, + get_cluster_roles, + get_primary, + restore_network_for_unit, + restore_network_for_unit_without_ip_change, +) + +WATCHER_APP_NAME = "postgresql-watcher" + + +async def start_writes(ops_test: OpsTest) -> None: + """Start continuous writes to PostgreSQL (assumes relation already exists).""" + for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await ops_test.model + .applications[TEST_APP_NAME] + .units[0] + .run_action("start-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to create continuous_writes table" + + +logger = logging.getLogger(__name__) + + +async def verify_raft_cluster_health( + ops_test: OpsTest, + db_app_name: str, + watcher_app_name: str, + expected_members: int = 3, + check_watcher_ip: bool = True, +) -> None: + """Verify that the Raft cluster has the expected number of members and quorum. + + This function checks that all PostgreSQL units see the expected number of + Raft members (including the watcher) and have quorum. This is critical + after watcher re-deployment to ensure the cluster is properly formed. + + Args: + ops_test: The OpsTest instance. + db_app_name: The PostgreSQL application name. + watcher_app_name: The watcher application name. + expected_members: Expected number of Raft members (default 3 for stereo mode). + check_watcher_ip: Whether to verify the watcher IP in Raft status (default True). + Set to False after network isolation tests where watcher may have been + redeployed with a new IP that isn't yet in the Raft configuration. + + Raises: + AssertionError: If the Raft cluster is not healthy. + """ + logger.info(f"Verifying Raft cluster health with {expected_members} expected members") + + # Get watcher address for verification using juju exec to avoid cached IPs + watcher_unit = ops_test.model.applications[watcher_app_name].units[0] + return_code, watcher_ip, _ = await ops_test.juju( + "exec", "--unit", watcher_unit.name, "--", "unit-get", "private-address" + ) + assert return_code == 0, f"Failed to get watcher address from {watcher_unit.name}" + watcher_ip = watcher_ip.strip() + + for attempt in Retrying(stop=stop_after_delay(360), wait=wait_fixed(10), reraise=True): + with attempt: + for unit in ops_test.model.applications[db_app_name].units: + # Get the Raft password from Patroni config using juju exec directly + # We need to avoid shell interpretation issues with run_command_on_unit + complete_command = [ + "exec", + "--unit", + unit.name, + "--", + "cat", + "/var/snap/charmed-postgresql/current/etc/patroni/patroni.yaml", + ] + return_code, stdout, _ = await ops_test.juju(*complete_command) + assert return_code == 0, f"Failed to read patroni.yaml on {unit.name}" + + conf = safe_load(stdout) + password = conf.get("raft", {}).get("password") + assert password, f"Could not find Raft password in patroni.yaml on {unit.name}" + + # Check Raft status using the password via juju exec directly + complete_command = [ + "exec", + "--unit", + unit.name, + "--", + "charmed-postgresql.syncobj-admin", + "-conn", + conf["raft"]["self_addr"], + "-pass", + password, + "-status", + ] + return_code, output, _ = await ops_test.juju(*complete_command) + if return_code != 0: + logger.warning(f"Raft status check failed on {unit.name}: {output}") + raise AssertionError(f"Raft status check failed on {unit.name}") + logger.info(f"Raft status on {unit.name}: {output[:200]}...") + + # Verify quorum + assert "has_quorum: True" in output or "has_quorum:True" in output, ( + f"Unit {unit.name} does not have Raft quorum" + ) + + # Verify watcher is in the cluster (if requested) + # After network isolation tests, the watcher may have been redeployed + # with a new IP that isn't yet updated in the Raft configuration + if check_watcher_ip: + assert watcher_ip in output, ( + f"Watcher {watcher_ip} not found in Raft cluster on {unit.name}\n" + f"Raft output: {output}" + ) + + logger.info("Raft cluster health verified successfully") + + +@pytest.mark.abort_on_fail +async def test_build_and_deploy_stereo_mode(ops_test: OpsTest, charm) -> None: + """Build and deploy PostgreSQL in stereo mode with watcher. + + Deploys 2 PostgreSQL units and a watcher (same charm, role=watcher), + then relates them to form a 3-node Raft cluster for quorum. + """ + # Check if PostgreSQL is already deployed (e.g., from a previous test run) + # If so, verify it's in the expected state or skip deployment + if DATABASE_APP_NAME in ops_test.model.applications: + logger.info("PostgreSQL already deployed, checking state...") + pg_units = len(ops_test.model.applications[DATABASE_APP_NAME].units) + watcher_deployed = WATCHER_APP_NAME in ops_test.model.applications + test_app_deployed = APPLICATION_NAME in ops_test.model.applications + + if pg_units == 2 and watcher_deployed and test_app_deployed: + logger.info("Stereo mode already deployed with expected state, verifying...") + await ops_test.model.wait_for_idle(status="active", timeout=300) + return + + # If state is incorrect, we need to clean up and redeploy + logger.info(f"Unexpected state (pg_units={pg_units}), cleaning up...") + for app in [WATCHER_APP_NAME, WATCHER_APP_NAME, APPLICATION_NAME]: + if app in ops_test.model.applications: + await ops_test.model.remove_application(app, block_until_done=True) + + async with ops_test.fast_forward(): + # Deploy PostgreSQL with 2 units from the start + logger.info("Deploying PostgreSQL charm with 2 units...") + await ops_test.model.deploy( + charm, + application_name=DATABASE_APP_NAME, + num_units=2, + series="noble", + config={"profile": "testing", "synchronous-mode-strict": False}, + ) + # Deploy watcher using the same charm with role=watcher + logger.info("Deploying watcher (same charm, role=watcher)...") + await ops_test.model.deploy( + WATCHER_APP_NAME, + application_name=WATCHER_APP_NAME, + num_units=1, + series="noble", + config={"profile": "testing"}, + ) + logger.info("Deploying test application...") + await ops_test.model.deploy( + APPLICATION_NAME, + application_name=APPLICATION_NAME, + series="noble", + channel="edge", + ) + + # Wait for initial deployment + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + timeout=1200, + raise_on_error=False, # Watcher may be waiting for relation + ) + + # Relate PostgreSQL (watcher-offer) to watcher (watcher) + # The relation may already exist if deploying into a model with prior state + logger.info("Relating PostgreSQL to watcher") + try: + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" + ) + except Exception as e: + if "already exists" in str(e) or "relation" in str(e).lower(): + logger.info(f"Watcher relation already exists: {e}") + else: + raise + + # Wait for watcher to join Raft cluster + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=600, + ) + + # Relate PostgreSQL to test app + try: + await ops_test.model.integrate(DATABASE_APP_NAME, f"{APPLICATION_NAME}:database") + except Exception as e: + if "already exists" in str(e) or "relation" in str(e).lower(): + logger.info(f"Database relation already exists: {e}") + else: + raise + + await ops_test.model.wait_for_idle(status="active", timeout=1800) + + # Verify deployment + assert len(ops_test.model.applications[DATABASE_APP_NAME].units) == 2 + assert len(ops_test.model.applications[WATCHER_APP_NAME].units) == 1 + + +@pytest.mark.abort_on_fail +async def test_replica_shutdown_with_watcher(ops_test: OpsTest, continuous_writes) -> None: + """Test replica shutdown with watcher providing quorum. + + Expected behavior: + - All connected clients to the primary should not be interrupted + - Clients connected to replica should be re-routed to primary + - No significant outage (less than a minute) + """ + await start_writes(ops_test) + + # Get current cluster roles + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit) + primary = original_roles["primaries"][0] + + # Get the replica unit + replica = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name != primary: + replica = unit.name + break + + assert replica is not None, "Could not find replica unit" + logger.info(f"Shutting down replica: {replica}") + + # Shutdown the replica + await ops_test.model.destroy_unit(replica, force=True, destroy_storage=False, max_wait=1500) + + # Wait for the cluster to stabilize after unit removal + # The primary needs time to reconfigure the cluster and update secrets + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + # Verify writes continue (primary should still be available) + # With watcher, we should maintain quorum + await are_writes_increasing(ops_test, down_unit=replica) + + # Wait for cluster to stabilize + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=600, + idle_period=30, + ) + + # Scale back up + logger.info("Scaling back up after replica shutdown") + await ops_test.model.applications[DATABASE_APP_NAME].add_unit(count=1) + await ops_test.model.wait_for_idle(status="active", timeout=1500) + + # Wait for the new replica to become a sync_standby + # This ensures the cluster is fully ready for the next test + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(10), reraise=True): + with attempt: + new_roles = await get_cluster_roles( + ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name + ) + logger.info(f"Cluster roles: {new_roles}") + assert len(new_roles["primaries"]) == 1, "Should have exactly one primary" + assert new_roles["primaries"][0] == primary, "Primary should not have changed" + assert len(new_roles["sync_standbys"]) == 1, "New replica should become sync_standby" + + await check_writes(ops_test) + + +@pytest.mark.abort_on_fail +async def test_primary_shutdown_with_watcher(ops_test: OpsTest, continuous_writes) -> None: + """Test primary shutdown with watcher providing quorum. + + Expected behavior: + - Old primary should be network-isolated (Patroni handles this) + - Replica should be promoted to primary + - Clients re-routed to new primary + - When old primary is healthy, it should become a replica + """ + await start_writes(ops_test) + + # Get current cluster roles + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit) + original_primary = original_roles["primaries"][0] + + # Get the replica - prefer sync_standby if available, otherwise any replica + # After a previous test scales up, the new unit may not yet be a sync_standby + if original_roles["sync_standbys"]: + original_replica = original_roles["sync_standbys"][0] + elif original_roles["replicas"]: + original_replica = original_roles["replicas"][0] + else: + # Fall back to finding the other unit manually + original_replica = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name != original_primary: + original_replica = unit.name + break + assert original_replica is not None, "Could not find replica unit" + + logger.info(f"Shutting down primary: {original_primary}") + + # Shutdown the primary + await ops_test.model.destroy_unit( + original_primary, force=True, destroy_storage=False, max_wait=1500 + ) + + # With watcher providing quorum, failover should happen automatically + # Wait for the model to stabilize first + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=600, + idle_period=30, + ) + + # Wait for the replica to be promoted to primary + # Patroni needs time to detect leader failure and elect new leader (30-90s) + remaining_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(10), reraise=True): + with attempt: + new_roles = await get_cluster_roles(ops_test, remaining_unit) + logger.info(f"Waiting for failover - current roles: {new_roles}") + assert len(new_roles["primaries"]) == 1, "Should have exactly one primary" + assert new_roles["primaries"][0] == original_replica, ( + f"Replica {original_replica} should have been promoted, " + f"but primary is {new_roles['primaries'][0]}" + ) + + # Wait for the charm to reconfigure after failover + # This ensures the relation endpoints are updated for the test app to reconnect + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + # Scale back up FIRST - with synchronous_mode_strict=true, the primary cannot + # accept writes when there's no sync_standby available. We need 2 units before + # we can verify writes are working. + logger.info("Scaling back up after primary shutdown") + await ops_test.model.applications[DATABASE_APP_NAME].add_unit(count=1) + # Wait longer for the new unit to fully join the cluster + # The new unit needs to: start PostgreSQL, join Raft cluster, become sync_standby + await ops_test.model.wait_for_idle(status="active", timeout=1800, idle_period=60) + + # Wait for the new replica to become a sync_standby + # This can take a while as the new unit needs to fully sync and be recognized + for attempt in Retrying(stop=stop_after_delay(300), wait=wait_fixed(15), reraise=True): + with attempt: + final_roles = await get_cluster_roles( + ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name + ) + logger.info(f"Final cluster roles: {final_roles}") + assert len(final_roles["primaries"]) == 1, "Should have exactly one primary" + assert len(final_roles["sync_standbys"]) == 1, "New replica should become sync_standby" + + # Now that we have a sync_standby, restart continuous writes and verify + # The continuous writes app caches the connection string, so we need to clear + # and restart it after failover to pick up the new primary's address. + # First clear the old writes state + action = ( + await ops_test.model + .applications[TEST_APP_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + + # Then start fresh writes + await start_writes(ops_test) + + # Verify writes continue on the new primary + await are_writes_increasing(ops_test, down_unit=original_primary) + + await check_writes(ops_test) + + +@pytest.mark.abort_on_fail +async def test_watcher_shutdown_no_outage(ops_test: OpsTest, continuous_writes) -> None: + """Test watcher shutdown - should not cause service outage. + + Expected behavior: + - No outage experienced by either primary or replica + - Cluster continues to function (but loses quorum guarantee) + """ + await start_writes(ops_test) + + # Get current cluster state + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit) + + logger.info("Removing watcher unit") + + # Remove the watcher + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + await ops_test.model.destroy_unit(watcher_unit.name, force=True, max_wait=300) + + # Verify writes continue without interruption + await are_writes_increasing(ops_test) + + # PostgreSQL cluster should remain active + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=300, + idle_period=30, + ) + + # Verify cluster roles unchanged + new_roles = await get_cluster_roles(ops_test, any_unit) + assert new_roles["primaries"] == original_roles["primaries"] + + # Re-deploy watcher + logger.info("Re-deploying watcher") + await ops_test.model.applications[WATCHER_APP_NAME].add_unit(count=1) + await ops_test.model.wait_for_idle(status="active", timeout=600) + + # Verify the Raft cluster is properly formed with the new watcher + # This is critical - without this verification, subsequent tests might fail + # because the watcher is not actually participating in the Raft cluster + await verify_raft_cluster_health(ops_test, DATABASE_APP_NAME, WATCHER_APP_NAME) + + await check_writes(ops_test) + + +@pytest.mark.abort_on_fail +async def test_primary_network_isolation_with_watcher( + ops_test: OpsTest, continuous_writes +) -> None: + """Test network isolation of primary with watcher. + + Expected behavior: + - Isolated primary's connections terminated + - Replica promoted to primary + - When network restored, old primary becomes replica + """ + await start_writes(ops_test) + + # Get current cluster state + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit) + primary = original_roles["primaries"][0] + replica = original_roles["sync_standbys"][0] + + # Get primary machine name for network manipulation + primary_unit = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name == primary: + primary_unit = unit + break + + assert primary_unit is not None + primary_machine = primary_unit.machine.hostname + + logger.info(f"Isolating primary network: {primary} on {primary_machine}") + + try: + # Cut network from primary (this removes the eth0 interface entirely) + cut_network_from_unit(primary_machine) + + # Wait for failover to happen - Patroni needs time to detect leader failure + # and elect a new leader. This can take 30-90 seconds depending on TTL settings. + # Use explicit retry loop instead of just wait_for_idle. + new_primary = None + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(10), reraise=True): + with attempt: + new_primary = await get_primary(ops_test, DATABASE_APP_NAME, down_unit=primary) + logger.info(f"Current primary: {new_primary}, expected: {replica}") + assert new_primary == replica, ( + f"Waiting for failover: replica {replica} should be promoted, " + f"but primary is still {new_primary}" + ) + await are_writes_increasing(ops_test, down_unit=primary_unit.name) + finally: + # Restore network + logger.info(f"Restoring network for {primary_machine}") + restore_network_for_unit(primary_machine) + + # Wait for cluster to stabilize with restored network + # The old primary may take time to rejoin after getting a new IP address, + # so we use raise_on_error=False and wait longer + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + timeout=900, + idle_period=30, + raise_on_error=False, # Old primary may be in error while rejoining + ) + + # Wait for the old primary to rejoin as replica + # This can take a while as it needs to recover with a new IP + for attempt in Retrying(stop=stop_after_delay(300), wait=wait_fixed(15), reraise=True): + with attempt: + final_roles = await get_cluster_roles(ops_test, replica) + logger.info(f"Final cluster roles: {final_roles}") + assert replica in final_roles["primaries"], ( + "Replica should remain primary after network restore" + ) + # Old primary should not be primary anymore + assert ( + primary not in final_roles["primaries"] and primary in final_roles["sync_standbys"] + ), "Old primary should now be a sync standby" + + # Use use_ip_from_inside=True because the old primary got a new IP after network restore + # and Juju's cached IP may be stale + await check_writes(ops_test, use_ip_from_inside=True) + + +@pytest.mark.abort_on_fail +async def test_replica_network_isolation_with_watcher( + ops_test: OpsTest, continuous_writes +) -> None: + """Test network isolation of replica with watcher. + + Expected behavior: + - Primary remains primary (doesn't failover) - Raft quorum maintained with watcher + - With synchronous_mode_strict=true, writes pause (no sync_standby available) + - After network restore, writes resume + - No data loss + + Note: This test uses iptables-based network isolation to preserve the replica's IP, + avoiding the complexity of IP changes when using eth0 device removal. + """ + await start_writes(ops_test) + + # Get current cluster state - use use_ip_from_inside=True because the previous test + # may have left units with stale IPs in Juju's cache after network restore + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit, use_ip_from_inside=True) + primary = original_roles["primaries"][0] + replica = original_roles["sync_standbys"][0] + + # Get replica machine for network manipulation + replica_unit = None + for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + if unit.name == replica: + replica_unit = unit + break + + assert replica_unit is not None + replica_machine = replica_unit.machine.hostname + + logger.info(f"Isolating replica network: {replica} on {replica_machine}") + + try: + # Cut network from replica using iptables (preserves IP) + cut_network_from_unit_without_ip_change(replica_machine) + + # Give Patroni time to detect the network isolation. + await asyncio.sleep(30) + + # Primary should remain primary (no failover should happen) + # Raft quorum is maintained with primary + watcher (2 out of 3) + current_primary = await get_primary(ops_test, DATABASE_APP_NAME, down_unit=replica) + assert current_primary == primary, "Primary should not change during replica isolation" + await are_writes_increasing(ops_test, down_unit=replica) + finally: + # Restore network + logger.info(f"Restoring network for {replica_machine}") + restore_network_for_unit_without_ip_change(replica_machine) + + # Wait for cluster to stabilize - replica should rejoin + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=600, + idle_period=30, + ) + + # Verify cluster has a primary after restore (may or may not be the same one, + # since Patroni can switchover during network restore/rejoin) + final_roles = await get_cluster_roles(ops_test, any_unit, use_ip_from_inside=True) + assert len(final_roles["primaries"]) == 1, ( + "Cluster should have exactly one primary after restore" + ) + + # Verify writes continue after network restore + # Use use_ip_from_inside=True because previous tests may have caused IP changes + await are_writes_increasing(ops_test, use_ip_from_inside=True) + await check_writes(ops_test, use_ip_from_inside=True) + + +@pytest.mark.abort_on_fail +async def test_watcher_network_isolation(ops_test: OpsTest, continuous_writes) -> None: + """Test network isolation of watcher. + + Expected behavior: + - No service outage for PostgreSQL cluster + - Cluster loses quorum guarantee but continues operating + """ + await start_writes(ops_test) + + # Get watcher machine + watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + watcher_machine = watcher_unit.machine.hostname + + # Get current cluster state - use use_ip_from_inside=True because previous tests + # may have left units with stale IPs in Juju's cache after network manipulation + any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + original_roles = await get_cluster_roles(ops_test, any_unit, use_ip_from_inside=True) + + logger.info(f"Isolating watcher network: {watcher_machine}") + + try: + # Cut network from watcher + cut_network_from_unit(watcher_machine) + + # Verify writes continue without interruption + await are_writes_increasing(ops_test, use_ip_from_inside=True) + + # Cluster roles should remain unchanged + current_roles = await get_cluster_roles(ops_test, any_unit, use_ip_from_inside=True) + assert current_roles["primaries"] == original_roles["primaries"] + + finally: + # Restore network + logger.info(f"Restoring watcher network: {watcher_machine}") + restore_network_for_unit(watcher_machine) + + # Wait for full recovery + await ops_test.model.wait_for_idle(status="active", timeout=600) + + # Use use_ip_from_inside=True because the watcher got a new IP after network restore + await check_writes(ops_test, use_ip_from_inside=True) + + +@pytest.mark.abort_on_fail +async def test_multi_cluster_watcher(ops_test: OpsTest, charm) -> None: + """Verify that a single watcher can monitor multiple PostgreSQL clusters. + + The watcher relation no longer has limit: 1, so the watcher can relate + to multiple PostgreSQL clusters simultaneously. Each relation gets its own + Raft instance with a dedicated port and data directory. + """ + second_pg_app = "postgresql-b" + + try: + # Deploy a second PostgreSQL cluster + logger.info("Deploying second PostgreSQL cluster for multi-cluster watcher test") + await ops_test.model.deploy( + charm, + application_name=second_pg_app, + num_units=2, + series="noble", + config={"profile": "testing", "synchronous-mode-strict": False}, + ) + await ops_test.model.wait_for_idle( + apps=[second_pg_app], + status="active", + timeout=1200, + ) + + # Relate the watcher to the second cluster + logger.info("Relating watcher to second PostgreSQL cluster") + await ops_test.model.integrate( + f"{second_pg_app}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" + ) + + # Use fast_forward to trigger update_status quickly, which runs + # ensure_watcher_in_raft to add the watcher to the second cluster's Raft + async with ops_test.fast_forward(): + # Wait for the watcher to connect to both clusters + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, second_pg_app, WATCHER_APP_NAME], + status="active", + timeout=600, + ) + + # Verify both Raft clusters have the watcher as a member + # Check first cluster + await verify_raft_cluster_health( + ops_test, DATABASE_APP_NAME, WATCHER_APP_NAME, expected_members=3 + ) + # Check second cluster + await verify_raft_cluster_health( + ops_test, second_pg_app, WATCHER_APP_NAME, expected_members=3 + ) + + finally: + # Clean up the second cluster relation and app + if second_pg_app in ops_test.model.applications: + await ops_test.model.remove_application( + second_pg_app, block_until_done=True, force=True + ) + + # Verify original watcher is still healthy after removing the second cluster + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=300, + ) + + +@pytest.mark.abort_on_fail +async def test_watcher_production_profile_az_blocked(ops_test: OpsTest, charm) -> None: + """Test watcher with profile=production blocks on AZ co-location. + + When all units are in the same availability zone (common on single-host + LXD deployments), a watcher with profile=production should enter + BlockedStatus because it shares an AZ with the PostgreSQL units. + This validates the AZ enforcement behavior. + + If JUJU_AVAILABILITY_ZONE is not set (some CI environments), the watcher + should reach active status since no AZ co-location can be detected. + + Since watcher-offer has limit: 1, we must remove the existing testing watcher + before deploying the production one, then restore it afterward. + """ + production_watcher = "pg-watcher-prod" + + # Remove existing watcher to free the watcher-offer relation slot + logger.info("Removing existing testing watcher to free relation slot") + if WATCHER_APP_NAME in ops_test.model.applications: + await ops_test.model.remove_application( + WATCHER_APP_NAME, block_until_done=True, force=True + ) + + try: + # Deploy a production-profile watcher + logger.info("Deploying watcher with profile=production") + await ops_test.model.deploy( + WATCHER_APP_NAME, + application_name=production_watcher, + num_units=1, + series="noble", + config={"profile": "production"}, + ) + + # Wait for initial install + await ops_test.model.wait_for_idle( + apps=[production_watcher], + timeout=600, + raise_on_error=False, + ) + + # Relate to the existing PostgreSQL cluster + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:watcher-offer", f"{production_watcher}:watcher" + ) + + # Wait for the watcher to settle (it may block or go active depending on AZ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[production_watcher], + timeout=600, + raise_on_error=False, + ) + + # Check the watcher's status + watcher_unit = ops_test.model.applications[production_watcher].units[0] + status = watcher_unit.workload_status + status_msg = watcher_unit.workload_status_message + + if status == "blocked": + # AZ is set and co-located — expected on single-host deployments + assert "AZ co-location" in status_msg, ( + f"Blocked status should mention AZ co-location, got: {status_msg}" + ) + logger.info(f"Production watcher correctly blocked: {status_msg}") + elif status == "active": + # AZ is not set — no co-location detected, watcher is active + logger.info("JUJU_AVAILABILITY_ZONE not set, watcher is active (no AZ enforcement)") + else: + pytest.fail( + f"Unexpected watcher status: {status} - {status_msg}. " + "Expected 'blocked' (AZ co-location) or 'active' (no AZ)." + ) + + finally: + # Clean up production watcher + if production_watcher in ops_test.model.applications: + await ops_test.model.remove_application( + production_watcher, block_until_done=True, force=True + ) + + # Restore the original testing watcher + logger.info("Restoring original testing watcher") + await ops_test.model.deploy( + WATCHER_APP_NAME, + application_name=WATCHER_APP_NAME, + num_units=1, + series="noble", + config={"profile": "testing"}, + ) + await ops_test.model.wait_for_idle( + apps=[WATCHER_APP_NAME], + timeout=600, + raise_on_error=False, + ) + await ops_test.model.integrate( + f"{DATABASE_APP_NAME}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" + ) + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], + status="active", + timeout=600, + ) diff --git a/tests/spread/test_stereo_mode.py/task.yaml b/tests/spread/test_stereo_mode.py/task.yaml new file mode 100644 index 00000000000..65ce3cff758 --- /dev/null +++ b/tests/spread/test_stereo_mode.py/task.yaml @@ -0,0 +1,7 @@ +summary: test_stereo_mode.py +environment: + TEST_MODULE: ha_tests/test_stereo_mode.py +execute: | + tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" +artifacts: + - allure-results From c2d02f3674c223458e0b8ef64d3d0f30a077e292 Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Thu, 7 May 2026 11:24:36 +0300 Subject: [PATCH 5/6] Watcher channel --- tests/integration/ha_tests/test_stereo_mode.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/integration/ha_tests/test_stereo_mode.py b/tests/integration/ha_tests/test_stereo_mode.py index 8bf236897b4..c1e509d15b8 100644 --- a/tests/integration/ha_tests/test_stereo_mode.py +++ b/tests/integration/ha_tests/test_stereo_mode.py @@ -189,6 +189,7 @@ async def test_build_and_deploy_stereo_mode(ops_test: OpsTest, charm) -> None: application_name=WATCHER_APP_NAME, num_units=1, series="noble", + channel="16/edge", config={"profile": "testing"}, ) logger.info("Deploying test application...") @@ -770,6 +771,7 @@ async def test_watcher_production_profile_az_blocked(ops_test: OpsTest, charm) - application_name=production_watcher, num_units=1, series="noble", + channel="16/edge", config={"profile": "production"}, ) @@ -827,6 +829,7 @@ async def test_watcher_production_profile_az_blocked(ops_test: OpsTest, charm) - application_name=WATCHER_APP_NAME, num_units=1, series="noble", + channel="16/edge", config={"profile": "testing"}, ) await ops_test.model.wait_for_idle( From 394815a95017aced5c19a62337160522ac5c5236 Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Thu, 7 May 2026 18:35:48 +0300 Subject: [PATCH 6/6] Disabling stereo test --- tests/spread/test_stereo_mode.py/task.yaml | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 tests/spread/test_stereo_mode.py/task.yaml diff --git a/tests/spread/test_stereo_mode.py/task.yaml b/tests/spread/test_stereo_mode.py/task.yaml deleted file mode 100644 index 65ce3cff758..00000000000 --- a/tests/spread/test_stereo_mode.py/task.yaml +++ /dev/null @@ -1,7 +0,0 @@ -summary: test_stereo_mode.py -environment: - TEST_MODULE: ha_tests/test_stereo_mode.py -execute: | - tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" -artifacts: - - allure-results