Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
# See LICENSE file for licensing details.

options:
role:
description: |
Deployment role for this application. Set at deploy time and cannot be changed afterwards.
"postgresql" (default) runs the full PostgreSQL database server with Patroni.
"watcher" runs a lightweight Raft witness for stereo mode (2-node clusters),
providing quorum without running PostgreSQL.
type: string
default: "postgresql"
synchronous-node-count:
description: |
Sets the number of synchronous nodes to be maintained in the cluster. Should be
Expand Down
6 changes: 6 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ provides:
interface: cos_agent
limit: 1
optional: true
watcher-offer:
interface: postgresql_watcher
limit: 1

requires:
watcher:
interface: postgresql_watcher
optional: true
replication:
interface: postgresql_async
limit: 1
Expand Down
367 changes: 282 additions & 85 deletions src/charm.py

Large diffs are not rendered by default.

39 changes: 34 additions & 5 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
POSTGRESQL_CONF_PATH,
POSTGRESQL_DATA_PATH,
POSTGRESQL_LOGS_PATH,
RAFT_PARTNER_PREFIX,
TLS_CA_BUNDLE_FILE,
)
from utils import _change_owner, label2name, parallel_patroni_get_request, render_file
Expand Down Expand Up @@ -529,6 +530,28 @@ def is_member_isolated(self) -> bool:

return len(r.json()["members"]) == 0

def is_member_registered_in_cluster(self) -> bool:
"""Check if this member is registered in the Raft DCS cluster.

In Raft mode, a new member may be running and replicating but not yet
registered in the DCS if it hasn't been added to the Raft cluster.

Returns:
True if this member appears in the /cluster endpoint, False otherwise.
"""
try:
cluster_status = self.cluster_status()
except RetryError:
logger.debug("Could not get cluster status to check member registration")
return False

if not cluster_status:
return False

# Check if this member's name appears in the cluster members list
member_name = self.member_name
return any(member.get("name") == member_name for member in cluster_status)

def online_cluster_members(self) -> list[ClusterMember]:
"""Return list of online cluster members."""
try:
Expand Down Expand Up @@ -681,6 +704,9 @@ def render_patroni_yml_file(
user_databases_map=user_databases_map,
slots=slots,
instance_password_encryption=self.charm.config.instance_password_encryption,
watcher=self.charm.watcher_offer.watcher_raft_address
if self.charm.watcher_offer.is_active
else None,
Comment on lines +707 to +709
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only render the watcher if it is expected (not disabled or stopped).

)
render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600)

Expand Down Expand Up @@ -880,7 +906,7 @@ def get_running_cluster_members(self) -> list[str]:
except Exception:
return []

def remove_raft_member(self, member_ip: str) -> None:
def remove_raft_member(self, member_address: str | None) -> None:
"""Remove a member from the raft cluster.

The raft cluster is a different cluster from the Patroni cluster.
Expand All @@ -891,6 +917,10 @@ def remove_raft_member(self, member_ip: str) -> None:
RaftMemberNotFoundError: if the member to be removed
is not part of the raft cluster.
"""
if not member_address:
logger.debug("Remove raft member: No address provided")
return

if self.charm.has_raft_keys():
logger.debug("Remove raft member: Raft already in recovery")
return
Expand All @@ -909,12 +939,12 @@ def remove_raft_member(self, member_ip: str) -> None:
raise RemoveRaftMemberFailedError() from None

# Check whether the member is still part of the raft cluster.
if not member_ip or f"partner_node_status_server_{member_ip}:2222" not in raft_status:
if f"{RAFT_PARTNER_PREFIX}{member_address}" not in raft_status:
return

# If there's no quorum and the leader left raft cluster is stuck
if not raft_status["has_quorum"] and (
not raft_status["leader"] or raft_status["leader"].host == member_ip
not raft_status["leader"] or raft_status["leader"].address == member_address
):
self.charm.set_unit_status(
BlockedStatus("Raft majority loss, run: promote-to-primary")
Expand All @@ -932,10 +962,9 @@ def remove_raft_member(self, member_ip: str) -> None:
)
return

# Suppressing since the call will be removed soon
# Remove the member from the raft cluster.
try:
result = syncobj_util.executeCommand(raft_host, ["remove", f"{member_ip}:2222"])
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Port is no longer guaranteed to be 2222.

result = syncobj_util.executeCommand(raft_host, ["remove", member_address])
except UtilityException:
logger.debug("Remove raft member: Remove call failed")
raise RemoveRaftMemberFailedError() from None
Expand Down
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
WATCHER_SECRET_LABEL = "watcher-secret" # noqa: S105

RAFT_PORT = 2222
RAFT_PARTNER_PREFIX = "partner_node_status_server_"

BACKUP_TYPE_OVERRIDES = {"full": "full", "differential": "diff", "incremental": "incr"}
PLUGIN_OVERRIDES = {"audit": "pgaudit", "uuid_ossp": '"uuid-ossp"'}
Expand Down
113 changes: 96 additions & 17 deletions src/raft_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import logging
from contextlib import suppress
from ipaddress import IPv4Address
from ipaddress import ip_address
from shutil import rmtree
from typing import TYPE_CHECKING, TypedDict

Expand All @@ -38,7 +38,7 @@
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

from cluster import ClusterMember
from constants import PATRONI_CLUSTER_STATUS_ENDPOINT
from constants import PATRONI_CLUSTER_STATUS_ENDPOINT, RAFT_PARTNER_PREFIX, RAFT_PORT
from utils import create_directory, parallel_patroni_get_request, render_file

if TYPE_CHECKING:
Expand Down Expand Up @@ -74,7 +74,7 @@ class ClusterStatus(TypedDict):
members: list[str]


def install_service() -> bool:
def install_service() -> None:
"""Install the systemd template service for the Raft controller.

Returns:
Expand All @@ -87,14 +87,8 @@ def install_service() -> bool:
render_file(SERVICE_FILE, rendered, 0o644, change_owner=False)

# Reload systemd to pick up the new service
try:
daemon_reload()
logger.info(f"Installed systemd service {SERVICE_FILE}")
except SystemdError as e:
logger.error(f"Failed to reload systemd: {e}")
return False

return True
daemon_reload()
logger.info(f"Installed systemd service {SERVICE_FILE}")


class RaftController:
Expand Down Expand Up @@ -159,13 +153,13 @@ def configure(

# Validate addresses to prevent injection into the systemd unit file
try:
IPv4Address(self_addr)
ip_address(self_addr)
except Exception:
logger.error(f"Invalid self_addr format: {self_addr}")
return False
try:
for addr in partner_addrs:
IPv4Address(addr)
ip_address(addr)
except Exception:
logger.error(f"Invalid partner address format: {addr}")
return False
Expand Down Expand Up @@ -238,7 +232,8 @@ def remove_service(self) -> bool:
return False

try:
rmtree(self.data_dir)
with suppress(FileNotFoundError):
rmtree(self.data_dir)
except Exception as e:
logger.error(f"Failed to remove Raft controller directory: {e}")
return False
Expand All @@ -252,13 +247,98 @@ def restart(self) -> bool:
True if restarted successfully, False otherwise.
"""
try:
service_enable(self.service_name)
service_restart(self.service_name)
logger.info(f"Restarted Raft controller service {self.service_name}")
return True
except SystemdError as e:
logger.error(f"Failed to restart Raft controller: {e}")
return False

def get_stale_watchers(
self, member_address: str, raft_password: str, partner_addrs: list[str], port: int
) -> list[str]:
"""Collect stale watcher raft members."""
port_postfix = str(port)
watcher_addr = f"{member_address}:{port}"
watcher_key = f"{RAFT_PARTNER_PREFIX}{watcher_addr}"

# Get the status of the raft cluster.
syncobj_util = TcpUtility(password=raft_password, timeout=3)

stale_addrs = []
addrs = [watcher_addr, *[f"{addr}:{RAFT_PORT}" for addr in partner_addrs]]
for raft_host in addrs:
try:
raft_status = syncobj_util.executeCommand(raft_host, ["status"])
except Exception as e:
logger.warning(f"Collect stale addrs: Cannot connect to raft cluster: {e}")
continue
if not raft_status:
logger.warning("Collect stale addrs: No raft status")
continue
for key in raft_status:
if (
key.startswith(RAFT_PARTNER_PREFIX)
and key.endswith(port_postfix)
and key != watcher_key
):
stale_addrs.append(key.split(RAFT_PARTNER_PREFIX)[-1])
return stale_addrs
logger.warning("Collect stale addrs: No member available")
return stale_addrs

def remove_raft_member(
self, member_address: str, raft_password: str, partner_addrs: list[str]
) -> None:
"""Remove a member from the raft cluster.

The raft cluster is a different cluster from the Patroni cluster.
It is responsible for defining which Patroni member can update
the primary member in the DCS.

Raises:
RaftMemberNotFoundError: if the member to be removed
is not part of the raft cluster.
"""
if not member_address:
logger.debug("Remove raft member: No address provided")
return

# Get the status of the raft cluster.
syncobj_util = TcpUtility(password=raft_password, timeout=3)

for raft_host in [f"{addr}:{RAFT_PORT}" for addr in partner_addrs]:
try:
raft_status = syncobj_util.executeCommand(raft_host, ["status"])
except Exception as e:
logger.warning(f"Remove raft watcher: Cannot connect to raft cluster: {e}")
continue
if not raft_status:
logger.warning("Remove raft watcher: No raft status")
continue

# Check whether the member is still part of the raft cluster.
if f"{RAFT_PARTNER_PREFIX}{member_address}" not in raft_status:
return

# If there's no quorum and the leader left raft cluster is stuck
if not raft_status["has_quorum"] or not raft_status["leader"]:
logger.warning("Remove raft watcher: No quorum or leader")
continue
Comment on lines +325 to +328
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should here be OR? maybe AND?

No leader due to re-election, but quorum exist as majority is up. No?


# Remove the member from the raft cluster.
try:
result = syncobj_util.executeCommand(raft_host, ["remove", member_address])
except Exception as e:
logger.debug(f"Remove raft watcher: Remove call failed {e}")
continue

if not result or not result.startswith("SUCCESS"):
logger.debug(f"Remove raft watcher: Remove call not successful with {result}")
continue
return

def get_status(self, self_port: int, password: str | None) -> ClusterStatus:
"""Get the Raft controller status.

Expand Down Expand Up @@ -288,11 +368,10 @@ def get_status(self, self_port: int, password: str | None) -> ClusterStatus:
)

# Extract member addresses from partner_node_status_server_* keys
prefix = "partner_node_status_server_"
members: list[str] = [str(raft_status["self"])]
for key in raft_status:
if key.startswith(prefix):
members.append(key[len(prefix) :])
if key.startswith(RAFT_PARTNER_PREFIX):
members.append(key[len(RAFT_PARTNER_PREFIX) :])
status["members"] = sorted(members)
return status
except Exception as e:
Expand Down
9 changes: 9 additions & 0 deletions src/relations/async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None:
for unit in {*self.charm._peers.units, self.charm.unit} # type: ignore
):
self.charm.app_peer_data.update({"cluster_initialised": "True"})
self.charm.watcher_offer.enable_watcher()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the new RAFT is set up, a clean watcher can be started.

elif self._is_following_promoted_cluster():
self.charm.set_unit_status(
WaitingStatus("Waiting for the database to be started in all units")
Expand Down Expand Up @@ -541,10 +542,14 @@ def _on_async_relation_broken(self, _) -> None:
self.charm.app_peer_data.update({"promoted-cluster-counter": ""})
self.charm.update_config()

if self.charm.unit.is_leader():
self.charm.watcher_offer.update_endpoints()

def _on_async_relation_changed(self, event: RelationChangedEvent) -> None:
"""Update the Patroni configuration if one of the clusters was already promoted."""
if self.charm.unit.is_leader():
self.set_app_status()
self.charm.watcher_offer.update_endpoints()

primary_cluster = self._get_primary_cluster()
logger.debug("Primary cluster: %s", primary_cluster)
Expand Down Expand Up @@ -604,6 +609,9 @@ def _on_async_relation_joined(self, _) -> None:
"unit-promoted-cluster-counter": highest_promoted_cluster_counter
})

if self.charm.unit.is_leader():
self.charm.watcher_offer.update_endpoints()

def _on_create_replication(self, event: ActionEvent) -> None:
"""Set up asynchronous replication between two clusters."""
if self._get_primary_cluster() is not None:
Expand Down Expand Up @@ -757,6 +765,7 @@ def _stop_database(self, event: RelationChangedEvent) -> bool:
if not self.charm.unit.is_leader() and not os.path.exists(POSTGRESQL_DATA_PATH):
logger.debug("Early exit on_async_relation_changed: following promoted cluster.")
return False
self.charm.watcher_offer.disable_watcher()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RAFT is being re-initialised, watcher should be stopped so that the new RAFT doesn't sync from the watcher.


try:
for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)):
Expand Down
Loading
Loading