Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ provides:
interface: cos_agent
limit: 1
optional: true
watcher-offer:
Comment thread
taurus-forever marked this conversation as resolved.
interface: postgresql_watcher
limit: 1
optional: true

requires:
replication:
Expand Down
14 changes: 1 addition & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ psutil = "^7.2.2"
charm-refresh = "^3.1.0.2"
httpx = "^0.28.1"
charmlibs-snap = "^1.0.1"
charmlibs-systemd = "^1.0.0"
charmlibs-interfaces-tls-certificates = "^1.8.1"
postgresql-charms-single-kernel = "16.1.11"

Expand Down
57 changes: 51 additions & 6 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
PLUGIN_OVERRIDES,
POSTGRESQL_DATA_PATH,
RAFT_PASSWORD_KEY,
RAFT_PORT,
REPLICATION_CONSUMER_RELATION,
REPLICATION_OFFER_RELATION,
REPLICATION_PASSWORD_KEY,
Expand All @@ -135,6 +136,7 @@
from relations.async_replication import PostgreSQLAsyncReplication
from relations.postgresql_provider import PostgreSQLProvider
from relations.tls import TLS
from relations.watcher import PostgreSQLWatcherRelation
from rotate_logs import RotateLogs
from utils import label2name, new_password, render_file

Expand Down Expand Up @@ -353,6 +355,7 @@ def __init__(self, *args):
self.tls = TLS(self, PEER)
self.tls_transfer = TLSTransfer(self, PEER)
self.async_replication = PostgreSQLAsyncReplication(self)
self.watcher_offer = PostgreSQLWatcherRelation(self)
# self.logical_replication = PostgreSQLLogicalReplication(self)
self.restart_manager = RollingOpsManager(
charm=self, relation="restart", callback=self._restart
Expand Down Expand Up @@ -424,7 +427,7 @@ def _post_snap_refresh(self, refresh: charm_refresh.Machines):
logger.exception("Unable to check or update internal cert")

if not self._patroni.start_patroni():
self.set_unit_status(ops.BlockedStatus("Failed to start PostgreSQL"), refresh=refresh)
self.set_unit_status(BlockedStatus("Failed to start PostgreSQL"), refresh=refresh)
return

self._setup_exporter()
Expand Down Expand Up @@ -756,7 +759,7 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
# checked for none in the early exit method
departing_member = event.departing_unit.name.replace("/", "-") # type: ignore
if member_ip := self._patroni.get_member_ip(departing_member):
self._patroni.remove_raft_member(member_ip)
self._patroni.remove_raft_member(f"{member_ip}:{RAFT_PORT}")
except RemoveRaftMemberFailedError:
logger.debug(
"Deferring on_peer_relation_departed: Failed to remove member from raft cluster"
Expand Down Expand Up @@ -900,6 +903,10 @@ def _raft_reinitialisation(self) -> None:
self._patroni.remove_raft_data()
logger.info(f"Stopping {self.unit.name}")
self.unit_peer_data["raft_stopped"] = "True"
self.watcher_offer.disable_watcher()
if self.watcher_offer.is_active:
logger.info("waiting for RAFT watcher to disconnect.")
return

if self.unit.is_leader():
self._stuck_raft_cluster_stopped_check()
Expand Down Expand Up @@ -1000,16 +1007,45 @@ def _on_peer_relation_changed(self, event: HookEvent):
event.defer()
return

# In Raft mode with a watcher, ensure this member is properly registered in the DCS.
# A new member may be running but not registered if it was added to Raft after starting.
if (
self.watcher_offer.is_watcher_connected
and not self._patroni.is_member_registered_in_cluster()
):
logger.info("Member running but not registered in Raft cluster - restarting Patroni")
self._patroni.restart_patroni()
event.defer()
return

Comment thread
taurus-forever marked this conversation as resolved.
self._start_stop_pgbackrest_service(event)

# This is intended to be executed only when leader is reinitializing S3 connection due to the leader change.
if not self._handle_s3_initialization(event):
return

# Update watcher relation with fresh peer IPs when peer data changes
# This ensures pg-endpoints stay current when unit IPs change
if self.unit.is_leader():
self.watcher_offer.update_endpoints()

self._update_new_unit_status()

# Split off into separate function, because of complexity _on_peer_relation_changed
def _handle_s3_initialization(self, event: HookEvent) -> bool:
"""Handle S3 initialization during peer relation changes.

Returns:
True if processing should continue, False if we should return early.
"""
# This is intended to be executed only when leader is reinitializing S3 connection
# due to the leader change.
if (
"s3-initialization-start" in self.app_peer_data
and "s3-initialization-done" not in self.unit_peer_data
and self.is_primary
and not self.backup._on_s3_credential_changed_primary(event)
):
return
return False

# Clean-up unit initialization data after successful sync to the leader.
if "s3-initialization-done" in self.app_peer_data and not self.unit.is_leader():
Expand All @@ -1020,7 +1056,7 @@ def _on_peer_relation_changed(self, event: HookEvent):
"s3-initialization-start": "",
})

self._update_new_unit_status()
return True

def _on_secret_changed(self, event: SecretChangedEvent) -> None:
"""Handle the secret_changed event."""
Expand Down Expand Up @@ -1058,6 +1094,8 @@ def _update_new_unit_status(self) -> None:
if self.primary_endpoint:
self._update_relation_endpoints()
self.async_replication.handle_read_only_mode()
# Update watcher relation with current cluster endpoints
self.watcher_offer.update_endpoints()
else:
self.set_unit_status(WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE))

Expand All @@ -1075,7 +1113,7 @@ def _reconfigure_cluster(self, event: HookEvent | RelationEvent) -> bool:
):
logger.info("Removing %s from the cluster due to IP change", ip_to_remove)
try:
self._patroni.remove_raft_member(ip_to_remove)
self._patroni.remove_raft_member(f"{ip_to_remove}:{RAFT_PORT}")
except RemoveRaftMemberFailedError:
logger.debug("Deferring on_peer_relation_changed: failed to remove raft member")
return False
Expand Down Expand Up @@ -1110,6 +1148,10 @@ def _update_member_ip(self) -> bool:
self.unit_peer_data.update({"ip": current_ip})
self._patroni.stop_patroni()
self._update_certificate()
# Update watcher relation - unit address for all units, endpoints only for leader
self.watcher_offer.update_unit_address()
if self.unit.is_leader():
self.watcher_offer.update_endpoints()
return True
else:
self.unit_peer_data.update({"ip-to-remove": ""})
Expand Down Expand Up @@ -2026,6 +2068,9 @@ def _on_update_status(self, _) -> None:
# Restart topology observer if it is gone
self._observer.start_observer()

# Keep this unit data current for watcher AZ/IP checks.
self.watcher_offer.update_unit_address()

if self.unit.is_leader() and "refresh_remove_trigger" not in self.app_peer_data:
self.postgresql.drop_hba_triggers()
self.app_peer_data["refresh_remove_trigger"] = "True"
Expand Down
39 changes: 34 additions & 5 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
POSTGRESQL_CONF_PATH,
POSTGRESQL_DATA_PATH,
POSTGRESQL_LOGS_PATH,
RAFT_PARTNER_PREFIX,
TLS_CA_BUNDLE_FILE,
)
from utils import _change_owner, label2name, parallel_patroni_get_request, render_file
Expand Down Expand Up @@ -529,6 +530,28 @@ def is_member_isolated(self) -> bool:

return len(r.json()["members"]) == 0

def is_member_registered_in_cluster(self) -> bool:
"""Check if this member is registered in the Raft DCS cluster.

In Raft mode, a new member may be running and replicating but not yet
registered in the DCS if it hasn't been added to the Raft cluster.

Returns:
True if this member appears in the /cluster endpoint, False otherwise.
"""
try:
cluster_status = self.cluster_status()
except RetryError:
logger.debug("Could not get cluster status to check member registration")
return False

if not cluster_status:
return False

# Check if this member's name appears in the cluster members list
member_name = self.member_name
return any(member.get("name") == member_name for member in cluster_status)

def online_cluster_members(self) -> list[ClusterMember]:
"""Return list of online cluster members."""
try:
Expand Down Expand Up @@ -681,6 +704,9 @@ def render_patroni_yml_file(
user_databases_map=user_databases_map,
slots=slots,
instance_password_encryption=self.charm.config.instance_password_encryption,
watcher=self.charm.watcher_offer.watcher_raft_address
if self.charm.watcher_offer.is_active
else None,
)
render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600)

Expand Down Expand Up @@ -880,7 +906,7 @@ def get_running_cluster_members(self) -> list[str]:
except Exception:
return []

def remove_raft_member(self, member_ip: str) -> None:
def remove_raft_member(self, member_address: str | None) -> None:
"""Remove a member from the raft cluster.

The raft cluster is a different cluster from the Patroni cluster.
Expand All @@ -891,6 +917,10 @@ def remove_raft_member(self, member_ip: str) -> None:
RaftMemberNotFoundError: if the member to be removed
is not part of the raft cluster.
"""
if not member_address:
logger.debug("Remove raft member: No address provided")
return

if self.charm.has_raft_keys():
logger.debug("Remove raft member: Raft already in recovery")
return
Expand All @@ -909,12 +939,12 @@ def remove_raft_member(self, member_ip: str) -> None:
raise RemoveRaftMemberFailedError() from None

# Check whether the member is still part of the raft cluster.
if not member_ip or f"partner_node_status_server_{member_ip}:2222" not in raft_status:
if f"{RAFT_PARTNER_PREFIX}{member_address}" not in raft_status:
return

# If there's no quorum and the leader left raft cluster is stuck
if not raft_status["has_quorum"] and (
not raft_status["leader"] or raft_status["leader"].host == member_ip
not raft_status["leader"] or raft_status["leader"].address == member_address
):
self.charm.set_unit_status(
BlockedStatus("Raft majority loss, run: promote-to-primary")
Expand All @@ -932,10 +962,9 @@ def remove_raft_member(self, member_ip: str) -> None:
)
return

# Suppressing since the call will be removed soon
# Remove the member from the raft cluster.
try:
result = syncobj_util.executeCommand(raft_host, ["remove", f"{member_ip}:2222"])
result = syncobj_util.executeCommand(raft_host, ["remove", member_address])
except UtilityException:
logger.debug("Remove raft member: Remove call failed")
raise RemoveRaftMemberFailedError() from None
Expand Down
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
WATCHER_SECRET_LABEL = "watcher-secret" # noqa: S105

RAFT_PORT = 2222
RAFT_PARTNER_PREFIX = "partner_node_status_server_"

BACKUP_TYPE_OVERRIDES = {"full": "full", "differential": "diff", "incremental": "incr"}
PLUGIN_OVERRIDES = {"audit": "pgaudit", "uuid_ossp": '"uuid-ossp"'}
Expand Down
Loading
Loading