Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,24 @@ jobs:
- name: Disable Cephfs Mirror daemons
run: ~/actionutils.sh remote_disable_mirror_daemon "cephfs-mirror"

- name: Print logs for failure
if: failure()
run: |
for node in node-wrk0 node-wrk1 node-wrk2 ; do
echo "=== $node: snap logs microceph ==="
lxc exec $node -- sh -c "snap logs microceph -n 1000" || true
echo "=== $node: snap logs cephfs-mirror ==="
lxc exec $node -- sh -c "snap logs microceph.cephfs-mirror -n 1000" || true
echo "=== $node: ceph -s ==="
lxc exec $node -- sh -c "microceph.ceph -s" || true
echo "=== $node: fs snapshot mirror daemon status ==="
lxc exec $node -- sh -c "microceph.ceph fs snapshot mirror daemon status" || true
echo "=== $node: replication status cephfs vol ==="
lxc exec $node -- sh -c "microceph replication status cephfs vol --json" || true
echo "=== $node: ceph.conf ==="
lxc exec $node -- sh -c "cat /var/snap/microceph/current/conf/ceph.conf" || true
done

nfs-test:
name: Test MicroCeph NFS feature
runs-on: ubuntu-22.04
Expand Down
117 changes: 112 additions & 5 deletions microceph/ceph/replication_cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (
"fmt"
"slices"
"strings"
"time"

"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/canonical/microceph/microceph/api/types"
"github.com/canonical/microceph/microceph/constants"
"github.com/canonical/microceph/microceph/database"
Expand All @@ -15,6 +19,22 @@ import (
"github.com/tidwall/gjson"
)

// cephFSMirrorStabilityAttempts and cephFSMirrorStabilityBackoff bound the
// pre-disable wait that ensures every mirror path is idle before removal.
// Declared as vars (not consts) so unit tests can shrink them.
var (
cephFSMirrorStabilityAttempts uint = 12
cephFSMirrorStabilityBackoff = 5 * time.Second
cephFSDisableRemoveAttempts uint = 10
cephFSDisableRemoveBackoff = 5 * time.Second
)

const cephFSMirrorIdleState = "idle"

// peerStatusFetcher fetches the per-path mirror status for one peer of a volume.
// Extracted as a type so tests can stub the live admin-socket call.
type peerStatusFetcher func(ctx context.Context, peerID string) (types.CephFsReplicationMirrorStatusMap, error)

// CephFSSnapshotMirrorDaemonStatus is the abstraction for storing
type CephFSSnapshotMirrorDaemonStatus []struct {
DaemonID int `json:"daemon_id"`
Expand Down Expand Up @@ -181,7 +201,11 @@ func (rh *CephfsReplicationHandler) DisableHandler(ctx context.Context, args ...
if len(rh.Request.SubvolumeGroup) != 0 {
return fmt.Errorf("Disable not supported for subvolumegroup. Provide a subvolume name to proceed.")
}
err = disableCephFSVolumeMirror(ctx, rh.Request, rh.MirrorList)
fetchers, ferr := rh.buildPeerStatusFetchers()
if ferr != nil {
return fmt.Errorf("REPCFS: failed to build peer status fetchers: %w", ferr)
}
err = disableCephFSVolumeMirror(ctx, rh.Request, rh.MirrorList, fetchers)
case types.CephfsResourceSubvolume:
err = cephFSSnapshotMirrorRemovePath(ctx, rh.Request.Volume, GetCephFSSubvolumePath(rh.Request.SubvolumeGroup, rh.Request.Subvolume))
case types.CephfsResourceDirectory:
Expand Down Expand Up @@ -291,6 +315,29 @@ func (rh *CephfsReplicationHandler) DemoteHandler(ctx context.Context, args ...a
return fmt.Errorf("%s not implemented for cephfs", types.DemoteReplicationRequest)
}

// buildPeerStatusFetchers resolves volumeID + peers + admin socket once, and returns
// one peerStatusFetcher closure per peer. Returns an empty map when the volume has no
// peers yet so the caller can skip pre-disable wait.
func (rh *CephfsReplicationHandler) buildPeerStatusFetchers() (map[string]peerStatusFetcher, error) {
volumeID, peers := GetCephFsMirrorVolumeAndPeersId(rh)
if volumeID < 0 || len(peers) == 0 {
return map[string]peerStatusFetcher{}, nil
}

adminSock, err := FindCephFsMirrorAdminSockPath()
if err != nil || len(adminSock) == 0 {
return nil, fmt.Errorf("failed to find CephFS mirror admin socket: %w", err)
}

fetchers := make(map[string]peerStatusFetcher, len(peers))
for _, peer := range peers {
fetchers[peer] = func(ctx context.Context, peerID string) (types.CephFsReplicationMirrorStatusMap, error) {
return GetCephFsMirrorPeerStatus(ctx, adminSock, rh.Request.Volume, volumeID, peerID)
}
}
return fetchers, nil
}

// #### CephFS Mirroring Specific Helpers ####

// GetCephFSMirrorStatus fetches the mirroring status of a filesystem for all requested peers.
Expand Down Expand Up @@ -470,17 +517,77 @@ func enableCephFSResourceMirror(ctx context.Context, request types.CephfsReplica
}

// disableCephFSVolumeMirror iterates over all paths enabled for mirroring in a volume and disables them.
func disableCephFSVolumeMirror(ctx context.Context, request types.CephfsReplicationRequest, mirrorPathList []string) error {
// Before issuing the removes it waits until every path reports an idle mirror state across
// all peers so the cephfs-mirror daemon does not race with an in-flight snapshot sync;
// each per-path remove is then retried with linear backoff to absorb transient daemon errors.
func disableCephFSVolumeMirror(ctx context.Context, request types.CephfsReplicationRequest, mirrorPathList []string, fetchers map[string]peerStatusFetcher) error {
if !request.IsForceOp {
err := fmt.Errorf("Disabling it for the volume (%s) may result in data-loss, please use appropriate parmaters", request.Volume)
return err
}

if err := cephFSWaitForMirrorPathsIdle(ctx, mirrorPathList, fetchers); err != nil {
return err
}

for _, mirrorPath := range mirrorPathList {
err := cephFSSnapshotMirrorRemovePath(ctx, request.Volume, mirrorPath)
if err != nil {
logger.Errorf("Failed to remove mirror path %s on CephFS volume %s: %v", mirrorPath, request.Volume, err)
path := mirrorPath
err := retry.Retry(func(i uint) error {
err := cephFSSnapshotMirrorRemovePath(ctx, request.Volume, path)
if err != nil {
logger.Errorf("REPCFS: attempt %d: failed to remove mirror path %s on CephFS volume %s: %v", i, path, request.Volume, err)
}
return err
},
strategy.Limit(cephFSDisableRemoveAttempts),
strategy.Backoff(backoff.Linear(cephFSDisableRemoveBackoff)),
)
if err != nil {
return fmt.Errorf("Failed to remove mirror path %s on CephFS volume %s: %w", path, request.Volume, err)
}
}

return nil
}

// cephFSWaitForMirrorPathsIdle polls each peer's mirror status and waits until every path
// reports the idle state. Returns "mirror path not stable: <path>" if any path remains
// non-idle after cephFSMirrorStabilityAttempts probes (linear backoff).
func cephFSWaitForMirrorPathsIdle(ctx context.Context, mirrorPathList []string, fetchers map[string]peerStatusFetcher) error {
if len(mirrorPathList) == 0 || len(fetchers) == 0 {
return nil
}

for _, mirrorPath := range mirrorPathList {
path := mirrorPath
var lastState string
err := retry.Retry(func(i uint) error {
for peerID, fetch := range fetchers {
status, err := fetch(ctx, peerID)
if err != nil {
return fmt.Errorf("failed to fetch mirror status for peer %s: %w", peerID, err)
}

pathStatus, ok := status[path]
if !ok {
// Path not yet reported by this peer; treat as not-yet-stable.
lastState = "missing"
return fmt.Errorf("path %s not reported by peer %s", path, peerID)
}

if pathStatus.State != cephFSMirrorIdleState {
lastState = pathStatus.State
return fmt.Errorf("path %s on peer %s is in state %q", path, peerID, pathStatus.State)
}
}
return nil
},
strategy.Limit(cephFSMirrorStabilityAttempts),
strategy.Backoff(backoff.Linear(cephFSMirrorStabilityBackoff)),
)
if err != nil {
logger.Errorf("REPCFS: mirror path %s did not stabilise (last state: %s): %v", path, lastState, err)
return fmt.Errorf("mirror path not stable: %s", path)
}
}

Expand Down
141 changes: 141 additions & 0 deletions microceph/ceph/replication_cephfs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package ceph

import (
"context"
"fmt"
"testing"
"time"

"github.com/canonical/microceph/microceph/api/types"
"github.com/stretchr/testify/assert"
)

// shrinkStabilityRetries lowers the wait loop's attempts/backoff so unit tests
// don't sleep for tens of seconds, and restores them on cleanup.
func shrinkStabilityRetries(t *testing.T, attempts uint, backoff time.Duration) {
t.Helper()
origAttempts := cephFSMirrorStabilityAttempts
origBackoff := cephFSMirrorStabilityBackoff
cephFSMirrorStabilityAttempts = attempts
cephFSMirrorStabilityBackoff = backoff
t.Cleanup(func() {
cephFSMirrorStabilityAttempts = origAttempts
cephFSMirrorStabilityBackoff = origBackoff
})
}

func TestCephFSWaitForMirrorPathsIdle_AllIdle(t *testing.T) {
shrinkStabilityRetries(t, 3, time.Millisecond)

paths := []string{"/dir1", "/dir2"}
fetchers := map[string]peerStatusFetcher{
"peer-a": func(_ context.Context, _ string) (types.CephFsReplicationMirrorStatusMap, error) {
return types.CephFsReplicationMirrorStatusMap{
"/dir1": {State: cephFSMirrorIdleState},
"/dir2": {State: cephFSMirrorIdleState},
}, nil
},
}

err := cephFSWaitForMirrorPathsIdle(context.Background(), paths, fetchers)
assert.NoError(t, err)
}

func TestCephFSWaitForMirrorPathsIdle_PathNeverStabilises(t *testing.T) {
shrinkStabilityRetries(t, 3, time.Millisecond)

paths := []string{"/dir1", "/dir2"}
fetchers := map[string]peerStatusFetcher{
"peer-a": func(_ context.Context, _ string) (types.CephFsReplicationMirrorStatusMap, error) {
return types.CephFsReplicationMirrorStatusMap{
"/dir1": {State: cephFSMirrorIdleState},
"/dir2": {State: "syncing"},
}, nil
},
}

err := cephFSWaitForMirrorPathsIdle(context.Background(), paths, fetchers)
assert.ErrorContains(t, err, "mirror path not stable: /dir2")
}

func TestCephFSWaitForMirrorPathsIdle_PathMissingFromPeer(t *testing.T) {
shrinkStabilityRetries(t, 2, time.Millisecond)

paths := []string{"/dir1"}
fetchers := map[string]peerStatusFetcher{
"peer-a": func(_ context.Context, _ string) (types.CephFsReplicationMirrorStatusMap, error) {
return types.CephFsReplicationMirrorStatusMap{}, nil
},
}

err := cephFSWaitForMirrorPathsIdle(context.Background(), paths, fetchers)
assert.ErrorContains(t, err, "mirror path not stable: /dir1")
}

func TestCephFSWaitForMirrorPathsIdle_StabilisesOnSecondAttempt(t *testing.T) {
shrinkStabilityRetries(t, 5, time.Millisecond)

paths := []string{"/dir1"}
calls := 0
fetchers := map[string]peerStatusFetcher{
"peer-a": func(_ context.Context, _ string) (types.CephFsReplicationMirrorStatusMap, error) {
calls++
state := "syncing"
if calls >= 2 {
state = cephFSMirrorIdleState
}
return types.CephFsReplicationMirrorStatusMap{
"/dir1": {State: state},
}, nil
},
}

err := cephFSWaitForMirrorPathsIdle(context.Background(), paths, fetchers)
assert.NoError(t, err)
assert.GreaterOrEqual(t, calls, 2)
}

func TestCephFSWaitForMirrorPathsIdle_FetcherError(t *testing.T) {
shrinkStabilityRetries(t, 2, time.Millisecond)

paths := []string{"/dir1"}
fetchers := map[string]peerStatusFetcher{
"peer-a": func(_ context.Context, _ string) (types.CephFsReplicationMirrorStatusMap, error) {
return nil, fmt.Errorf("admin socket gone")
},
}

err := cephFSWaitForMirrorPathsIdle(context.Background(), paths, fetchers)
assert.ErrorContains(t, err, "mirror path not stable: /dir1")
}

func TestCephFSWaitForMirrorPathsIdle_NoPathsNoFetchers(t *testing.T) {
assert.NoError(t, cephFSWaitForMirrorPathsIdle(context.Background(), nil, nil))
assert.NoError(t, cephFSWaitForMirrorPathsIdle(context.Background(), []string{"/x"}, nil))
assert.NoError(t, cephFSWaitForMirrorPathsIdle(context.Background(), nil, map[string]peerStatusFetcher{
"peer-a": func(_ context.Context, _ string) (types.CephFsReplicationMirrorStatusMap, error) {
return nil, nil
},
}))
}

func TestCephFSWaitForMirrorPathsIdle_MultiplePeersOneNonIdle(t *testing.T) {
shrinkStabilityRetries(t, 2, time.Millisecond)

paths := []string{"/dir1"}
fetchers := map[string]peerStatusFetcher{
"peer-a": func(_ context.Context, _ string) (types.CephFsReplicationMirrorStatusMap, error) {
return types.CephFsReplicationMirrorStatusMap{
"/dir1": {State: cephFSMirrorIdleState},
}, nil
},
"peer-b": func(_ context.Context, _ string) (types.CephFsReplicationMirrorStatusMap, error) {
return types.CephFsReplicationMirrorStatusMap{
"/dir1": {State: "syncing"},
}, nil
},
}

err := cephFSWaitForMirrorPathsIdle(context.Background(), paths, fetchers)
assert.ErrorContains(t, err, "mirror path not stable: /dir1")
}
7 changes: 7 additions & 0 deletions microceph/cmd/microceph/replication_disable_cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func (c *cmdReplicationDisableCephFS) Run(cmd *cobra.Command, args []string) err
return err
}

// Volume-level force disable waits for every mirror path to become idle
// before issuing removes (see disableCephFSVolumeMirror); surface that so
// the operator does not perceive the call as hung.
if c.flagForce && len(c.subvolume) == 0 && len(c.dirpath) == 0 {
fmt.Println("Checking mirror path stability before disabling replication; this may take up to a minute...")
}

_, err = client.SendReplicationRequest(context.Background(), cli, payload)
return err
}
Expand Down
Loading