Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions internal/api/nzbdav_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package api

import (
"fmt"
"log/slog"
"os"
"path/filepath"
"time"

"github.com/gofiber/fiber/v2"
"github.com/javi11/altmount/internal/database"
"github.com/javi11/altmount/internal/health"
"github.com/javi11/altmount/internal/importer"
"github.com/javi11/altmount/internal/importer/migration"
)
Expand Down Expand Up @@ -224,16 +226,43 @@ func (s *Server) handleMigrateNzbdavSymlinks(c *fiber.Ctx) error {
})
}

// Register each rewritten symlink in file_health via the scoped sync.
// This reads each file's .meta to fill in source_nzb_path / release_date
// and uses the on-disk library symlink path as library_path. Non-fatal:
// the symlink rewrites + migration-status update are already persisted;
// on failure we just log and report partial counts to the caller.
libraryFilesAttempted := 0
libraryFilesRegistered := 0
if !req.DryRun && len(report.RewrittenItems) > 0 && s.librarySyncWorker != nil {
syncItems := make([]health.FileSyncRequest, len(report.RewrittenItems))
for i, it := range report.RewrittenItems {
syncItems[i] = health.FileSyncRequest{
MountRelativePath: it.FinalPath,
LibraryPath: it.LibraryPath,
}
}
libraryFilesAttempted = len(syncItems)
registered, err := s.librarySyncWorker.SyncFiles(ctx, syncItems)
if err != nil {
slog.WarnContext(ctx, "SyncFiles failed after nzbdav symlink rewrite",
"attempted", libraryFilesAttempted,
"error", err)
}
libraryFilesRegistered = registered
}

return c.Status(200).JSON(fiber.Map{
"success": true,
"data": fiber.Map{
"scanned": report.Scanned,
"matched": report.Matched,
"rewritten": report.Rewritten,
"skipped_wrong_prefix": report.SkippedWrongPrefix,
"unmatched": report.Unmatched,
"errors": report.Errors,
"dry_run": req.DryRun,
"scanned": report.Scanned,
"matched": report.Matched,
"rewritten": report.Rewritten,
"skipped_wrong_prefix": report.SkippedWrongPrefix,
"unmatched": report.Unmatched,
"errors": report.Errors,
"dry_run": req.DryRun,
"library_files_attempted": libraryFilesAttempted,
"library_files_registered": libraryFilesRegistered,
},
})
}
Expand Down
57 changes: 40 additions & 17 deletions internal/database/symlink_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,64 @@ import (
"fmt"
"path/filepath"
"strings"

"github.com/javi11/altmount/internal/importer/migration"
)

// DBSymlinkLookup adapts ImportMigrationRepository to migration.SymlinkLookup.
//
// CommitRewrites only advances import_migrations.status to 'symlinks_migrated'.
// Registration of the rewritten files in file_health is delegated to the
// library sync worker (triggered by the handler after a successful rewrite);
// that worker walks the library, reads each .meta file, and inserts file_health
// rows with proper library_path / source_nzb_path / release_date.
type DBSymlinkLookup struct {
repo *ImportMigrationRepository
migRepo *ImportMigrationRepository
}

// NewDBSymlinkLookup creates a new DBSymlinkLookup wrapping the given repository.
func NewDBSymlinkLookup(repo *ImportMigrationRepository) *DBSymlinkLookup {
return &DBSymlinkLookup{repo: repo}
func NewDBSymlinkLookup(migRepo *ImportMigrationRepository) *DBSymlinkLookup {
return &DBSymlinkLookup{migRepo: migRepo}
}

// LookupFinalPath returns the final AltMount path for the given source and externalID.
// Returns ("", false, nil) when no matching row exists or the row has no final_path.
// Resolve returns the migration row info for the given source and externalID.
// Returns found=false when no matching row exists or the row has no final_path.
//
// Season-pack episode rows use a "file:<episodeFilename>" relative_path to signal that
// final_path stores the season directory and the episode path must be computed by joining
// them. This keeps MarkImported simple (always stores the directory) while allowing
// per-episode resolution here.
func (l *DBSymlinkLookup) LookupFinalPath(ctx context.Context, source, externalID string) (string, bool, error) {
row, err := l.repo.LookupByExternalID(ctx, source, externalID)
// Season-pack episode rows use a "file:<episodeFilename>" relative_path to signal
// that final_path stores the season directory and the episode path must be computed
// by joining them. This keeps MarkImported simple (always stores the directory)
// while allowing per-episode resolution here.
func (l *DBSymlinkLookup) Resolve(ctx context.Context, source, externalID string) (migration.ResolvedSymlink, bool, error) {
row, err := l.migRepo.LookupByExternalID(ctx, source, externalID)
if err != nil {
return "", false, fmt.Errorf("lookup final path (source=%s, id=%s): %w", source, externalID, err)
return migration.ResolvedSymlink{}, false, fmt.Errorf("lookup final path (source=%s, id=%s): %w", source, externalID, err)
}
if row == nil || row.FinalPath == nil {
return "", false, nil
return migration.ResolvedSymlink{}, false, nil
}
finalPath := *row.FinalPath
if episodeFilename, ok := strings.CutPrefix(row.RelativePath, "file:"); ok && episodeFilename != "" {
finalPath = filepath.Join(finalPath, episodeFilename)
}
return finalPath, true, nil
return migration.ResolvedSymlink{
FinalPath: finalPath,
RowID: row.ID,
QueueItemID: row.QueueItemID,
}, true, nil
}

// MarkSymlinksMigrated sets status=symlinks_migrated for the given row IDs.
func (l *DBSymlinkLookup) MarkSymlinksMigrated(ctx context.Context, ids []int64) error {
return l.repo.MarkSymlinksMigrated(ctx, ids)
// CommitRewrites bulk-advances every matched row to status=symlinks_migrated.
// Idempotent (WHERE id IN (...)). Re-runs are safe.
func (l *DBSymlinkLookup) CommitRewrites(ctx context.Context, items []migration.RewrittenItem) error {
if len(items) == 0 {
return nil
}
ids := make([]int64, len(items))
for i, it := range items {
ids[i] = it.RowID
}
if err := l.migRepo.MarkSymlinksMigrated(ctx, ids); err != nil {
return fmt.Errorf("advance import_migrations to symlinks_migrated: %w", err)
}
return nil
}
77 changes: 77 additions & 0 deletions internal/health/library_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,83 @@ func (lsw *LibrarySyncWorker) TriggerManualSync(ctx context.Context) error {
}
}

// FileSyncRequest describes a single file that should be registered in
// file_health by SyncFiles. MountRelativePath is the canonical mount-relative
// file path (matching how SyncLibrary keys its DB rows). LibraryPath is the
// on-disk path of the library symlink / .strm / .rclonelink that points at
// the mount file — used as the file_health.library_path so arrs see the
// right path.
type FileSyncRequest struct {
MountRelativePath string
LibraryPath string
}

// SyncFiles registers a scoped set of files in file_health, mirroring what
// SyncLibrary does for an arbitrary list — without walking the library or
// running any orphan-deletion logic.
//
// For each item it reads the .meta file via processMetadataForSync, builds an
// AutomaticHealthCheckRecord, and batch-upserts the resulting set. Items
// whose .meta is unreadable are registered as corrupted (via
// RegisterCorruptedFile) and excluded from the returned count.
//
// Returns the number of items that produced a successful file_health upsert
// and any error from the batch insert. Per-item failures (corrupted meta,
// missing meta) are logged but not surfaced as the function's error — the
// caller still gets a partial count.
func (lsw *LibrarySyncWorker) SyncFiles(ctx context.Context, items []FileSyncRequest) (int, error) {
if len(items) == 0 {
return 0, nil
}

records := make([]database.AutomaticHealthCheckRecord, 0, len(items))
for _, item := range items {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}

libraryPath := item.LibraryPath
var libraryPathPtr *string
if libraryPath != "" {
libraryPathPtr = &libraryPath
}

record, err := lsw.processMetadataForSync(ctx, item.MountRelativePath, libraryPathPtr)
if err != nil {
slog.WarnContext(ctx, "SyncFiles: metadata read failed, registering as corrupted",
"path", item.MountRelativePath, "error", err)
if regErr := lsw.healthRepo.RegisterCorruptedFile(ctx, item.MountRelativePath, libraryPathPtr, err.Error()); regErr != nil {
slog.ErrorContext(ctx, "SyncFiles: failed to register corrupted file",
"path", item.MountRelativePath, "error", regErr)
}
continue
}
if record == nil {
// processMetadataForSync returns nil when the metadata file
// exists but is empty/unparseable into a usable form.
slog.DebugContext(ctx, "SyncFiles: metadata returned nil, skipping",
"path", item.MountRelativePath)
continue
}

records = append(records, *record)
}

if len(records) == 0 {
return 0, nil
}

if err := lsw.healthRepo.BatchAddAutomaticHealthChecks(ctx, records); err != nil {
return 0, fmt.Errorf("batch add automatic health checks: %w", err)
}

slog.InfoContext(ctx, "SyncFiles: registered files in file_health",
"requested", len(items), "registered", len(records))
return len(records), nil
}

// run is the main library sync loop
func (lsw *LibrarySyncWorker) run(ctx context.Context) {
defer func() {
Expand Down
162 changes: 162 additions & 0 deletions internal/health/library_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,165 @@ func TestFindFilesToDelete_NormalizesBackslashes(t *testing.T) {
require.Len(t, toDelete, 1)
assert.Equal(t, "complete/orphan.mkv", toDelete[0])
}

// newTestLibrarySyncWorker spins up an in-memory DB, a metadata service
// rooted at a temp dir, and a fully wired LibrarySyncWorker — the same setup
// TestSyncLibrary_WorkerPool uses. Extracted so multiple tests can reuse it.
func newTestLibrarySyncWorker(t *testing.T) (*LibrarySyncWorker, *database.HealthRepository, *metadata.MetadataService, string, func()) {
t.Helper()

tempDir, err := os.MkdirTemp("", "altmount_test_syncfiles")
require.NoError(t, err)

db, err := sql.Open("sqlite3", "file::memory:?cache=shared&mode=memory")
require.NoError(t, err)

_, err = db.Exec(`
CREATE TABLE file_health (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_path TEXT NOT NULL UNIQUE,
library_path TEXT,
status TEXT NOT NULL,
last_checked DATETIME,
last_error TEXT,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
repair_retry_count INTEGER DEFAULT 0,
max_repair_retries INTEGER DEFAULT 3,
source_nzb_path TEXT,
error_details TEXT,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
release_date DATETIME,
scheduled_check_at DATETIME,
streaming_failure_count INTEGER DEFAULT 0,
is_masked BOOLEAN DEFAULT FALSE,
priority INTEGER NOT NULL DEFAULT 1
);

CREATE TABLE IF NOT EXISTS system_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
`)
require.NoError(t, err)

healthRepo := database.NewHealthRepository(db, database.DialectSQLite)
metadataService := metadata.NewMetadataService(tempDir)

healthEnabled := true
cfg := config.DefaultConfig()
cfg.Health.Enabled = &healthEnabled
cfg.Health.LibrarySyncIntervalMinutes = 60
cfg.Metadata.RootPath = tempDir
cfg.MountPath = "/mnt/test"

configManager := config.NewManager(cfg, "")

worker := NewLibrarySyncWorker(
metadataService,
healthRepo,
configManager.GetConfig,
configManager,
&MockRcloneClient{},
)

cleanup := func() {
_ = db.Close()
_ = os.RemoveAll(tempDir)
}
return worker, healthRepo, metadataService, tempDir, cleanup
}

func TestSyncFiles_Empty(t *testing.T) {
worker, _, _, _, cleanup := newTestLibrarySyncWorker(t)
defer cleanup()

registered, err := worker.SyncFiles(context.Background(), nil)
require.NoError(t, err)
assert.Equal(t, 0, registered)
}

func TestSyncFiles_HappyPath(t *testing.T) {
worker, healthRepo, metadataService, _, cleanup := newTestLibrarySyncWorker(t)
defer cleanup()

ctx := context.Background()

// Create one valid metadata file for a mount-relative path.
mountRel := "movies/Foo (2020)/Foo.mkv"
meta := metadataService.CreateFileMetadata(
1234, "/nzbs/foo.nzb", metapb.FileStatus_FILE_STATUS_HEALTHY, nil,
metapb.Encryption_NONE, "", "", nil, nil, 0, nil, "",
)
require.NoError(t, metadataService.WriteFileMetadata(mountRel, meta))

libraryPath := "/library/Movies/Foo (2020)/Foo.mkv"
registered, err := worker.SyncFiles(ctx, []FileSyncRequest{
{MountRelativePath: mountRel, LibraryPath: libraryPath},
})
require.NoError(t, err)
assert.Equal(t, 1, registered)

// Verify the row exists with the right library_path + source_nzb_path.
row, err := healthRepo.GetFileHealth(ctx, mountRel)
require.NoError(t, err)
require.NotNil(t, row)
require.NotNil(t, row.LibraryPath)
assert.Equal(t, libraryPath, *row.LibraryPath)
require.NotNil(t, row.SourceNzbPath)
assert.Equal(t, "/nzbs/foo.nzb", *row.SourceNzbPath)
}

func TestSyncFiles_MissingMetaSkipsSilently(t *testing.T) {
worker, healthRepo, _, _, cleanup := newTestLibrarySyncWorker(t)
defer cleanup()

ctx := context.Background()

// No .meta file on disk → ReadFileMetadata returns (nil, nil) →
// processMetadataForSync returns (nil, nil) → SyncFiles skips the item.
// No file_health row should be created and no error returned.
mountRel := "movies/Missing/file.mkv"
registered, err := worker.SyncFiles(ctx, []FileSyncRequest{
{MountRelativePath: mountRel, LibraryPath: "/library/Missing/file.mkv"},
})
require.NoError(t, err)
assert.Equal(t, 0, registered)

row, err := healthRepo.GetFileHealth(ctx, mountRel)
require.NoError(t, err)
assert.Nil(t, row, "expected NO file_health row when .meta is missing")
}

func TestSyncFiles_CorruptMetaRegistersCorrupted(t *testing.T) {
worker, healthRepo, _, tempDir, cleanup := newTestLibrarySyncWorker(t)
defer cleanup()

ctx := context.Background()

// Write garbage bytes as the .meta file so proto.Unmarshal fails →
// ReadFileMetadata returns an error → SyncFiles registers as corrupted.
mountRel := "movies/Corrupt/file.mkv"
metaPath := filepath.Join(tempDir, "movies", "Corrupt", "file.mkv.meta")
require.NoError(t, os.MkdirAll(filepath.Dir(metaPath), 0o755))
require.NoError(t, os.WriteFile(metaPath, []byte("this is not a valid proto"), 0o644))

libraryPath := "/library/Corrupt/file.mkv"
registered, err := worker.SyncFiles(ctx, []FileSyncRequest{
{MountRelativePath: mountRel, LibraryPath: libraryPath},
})
require.NoError(t, err)
assert.Equal(t, 0, registered, "corrupt item must be excluded from registered count")

row, err := healthRepo.GetFileHealth(ctx, mountRel)
require.NoError(t, err)
require.NotNil(t, row, "expected a file_health row from corrupted registration")
// RegisterCorruptedFile queues the row for a near-term check with the
// original parse error preserved in last_error. Status itself stays
// pending (the checker promotes it to corrupted/repair on its next pass).
require.NotNil(t, row.LastError, "expected last_error to record the parse failure")
assert.Contains(t, *row.LastError, "proto")
}
Loading
Loading