Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ A separate MCP tool, `find_similar_messages`, returns nearest neighbors for a se

> **Run only one embedding process at a time.** Don't run `msgvault embeddings build`/`resume` or `repair-encoding` concurrently with a `msgvault serve` daemon — they write the same embedding state, and concurrent writers are not coordinated across processes.

Large archives can scope an embedding generation with `[vector.embed.scope] message_types = ["sms", "mms"]`. Scoped vector and hybrid searches must include a matching `message_type` filter so a partial index is never used as if it covered the whole archive.

## Importing from MBOX or Apple Mail

Import email from providers that offer MBOX exports or from a local Apple Mail data directory:
Expand Down
51 changes: 30 additions & 21 deletions cmd/msgvault/cmd/add_synctech_sms_drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,15 @@ func runConfiguredSynctechSMSSource(ctx context.Context, src config.SynctechSMSS
return err
}
defer func() { _ = st.Close() }()

return runConfiguredSynctechSMSSourceWithStore(ctx, st, src)
}

func runConfiguredSynctechSMSSourceWithStore(ctx context.Context, st *store.Store, src config.SynctechSMSSource) error {
return runConfiguredSynctechSMSSourceWithStoreDriveClient(ctx, st, src, nil)
}

func runConfiguredSynctechSMSSourceWithStoreDriveClient(ctx context.Context, st *store.Store, src config.SynctechSMSSource, driveClient synctechsms.DriveClient) error {
opts := synctechImportOptions(src)
if opts.OwnerPhone == "" {
return fmt.Errorf("synctech-sms source %q owner_phone is required", src.Name)
Expand All @@ -128,7 +133,11 @@ func runConfiguredSynctechSMSSourceWithStore(ctx context.Context, st *store.Stor
}
_, err = synctechsms.NewImporter(st, opts).ImportPath(src.Path)
case "drive":
err = runSynctechSMSDriveSource(ctx, st, src, opts)
if driveClient != nil {
_, err = runSynctechSMSDriveSourceWithClient(ctx, st, src, opts, driveClient)
} else {
_, err = runSynctechSMSDriveSource(ctx, st, src, opts)
}
default:
return fmt.Errorf("unsupported synctech-sms backend %q", src.Backend)
}
Expand Down Expand Up @@ -167,28 +176,28 @@ func validateSynctechSMSDriveSource(src config.SynctechSMSSource) error {
return nil
}

func runSynctechSMSDriveSource(ctx context.Context, st *store.Store, src config.SynctechSMSSource, opts synctechsms.ImportOptions) error {
func runSynctechSMSDriveSource(ctx context.Context, st *store.Store, src config.SynctechSMSSource, opts synctechsms.ImportOptions) (synctechsms.ImportSummary, error) {
if err := validateSynctechSMSDriveSource(src); err != nil {
return err
return synctechsms.ImportSummary{}, err
}
client, err := newSynctechSMSDriveClient(ctx, src)
if err != nil {
return err
return synctechsms.ImportSummary{}, err
}
return runSynctechSMSDriveSourceWithClient(ctx, st, src, opts, client)
}

func runSynctechSMSDriveSourceWithClient(ctx context.Context, st *store.Store, src config.SynctechSMSSource, opts synctechsms.ImportOptions, client synctechsms.DriveClient) (retErr error) {
func runSynctechSMSDriveSourceWithClient(ctx context.Context, st *store.Store, src config.SynctechSMSSource, opts synctechsms.ImportOptions, client synctechsms.DriveClient) (summary synctechsms.ImportSummary, retErr error) {
if err := validateSynctechSMSDriveSource(src); err != nil {
return err
return summary, err
}
source, err := ensureConfiguredSynctechSMSSource(st, src, opts)
if err != nil {
return err
return summary, err
}
syncID, err := st.StartSync(source.ID, synctechsms.AdapterName)
if err != nil {
return fmt.Errorf("start sync: %w", err)
return summary, fmt.Errorf("start sync: %w", err)
}
completed := false
defer func() {
Expand All @@ -204,55 +213,55 @@ func runSynctechSMSDriveSourceWithClient(ctx context.Context, st *store.Store, s
}()
files, err := client.ListBackupFiles(ctx, src.FolderID)
if err != nil {
return fmt.Errorf("list Drive backup files: %w", err)
return summary, fmt.Errorf("list Drive backup files: %w", err)
}
imported, err := st.ListImportedSourceItemChecksums(source.ID, "drive")
if err != nil {
return fmt.Errorf("list imported Drive checksums: %w", err)
return summary, fmt.Errorf("list imported Drive checksums: %w", err)
}
stableAfter, err := time.ParseDuration(src.StableAfter)
if err != nil {
return fmt.Errorf("parse stable_after: %w", err)
return summary, fmt.Errorf("parse stable_after: %w", err)
}
selected := synctechsms.SelectStableDriveFiles(files, time.Now(), stableAfter, imported)
stagingDir := filepath.Join(cfg.Data.DataDir, "imports", "synctech-sms", src.Name)
if err := os.MkdirAll(stagingDir, 0o700); err != nil {
return fmt.Errorf("create staging directory: %w", err)
return summary, fmt.Errorf("create staging directory: %w", err)
}
imp := synctechsms.NewImporter(st, opts)
var summary synctechsms.ImportSummary
for _, file := range selected {
fileSummary, err := importOneDriveBackup(ctx, st, imp, client, source.ID, file, stagingDir)
if err != nil {
return err
}
summary.FilesSeen += fileSummary.FilesSeen
summary.FilesImported += fileSummary.FilesImported
summary.SMSImported += fileSummary.SMSImported
summary.MMSImported += fileSummary.MMSImported
summary.CallsImported += fileSummary.CallsImported
summary.AttachmentsImported += fileSummary.AttachmentsImported
summary.MessageIDs = append(summary.MessageIDs, fileSummary.MessageIDs...)
if err != nil {
return summary, err
}
}
if summary.FilesImported > 0 {
if err := st.RecomputeConversationStats(source.ID); err != nil {
return fmt.Errorf("recompute conversation stats: %w", err)
return summary, fmt.Errorf("recompute conversation stats: %w", err)
}
}
totalRecords := int64(summary.SMSImported + summary.MMSImported + summary.CallsImported)
if err := st.UpdateSyncCheckpoint(syncID, &store.Checkpoint{
MessagesProcessed: totalRecords,
MessagesAdded: totalRecords,
}); err != nil {
return fmt.Errorf("update sync checkpoint: %w", err)
return summary, fmt.Errorf("update sync checkpoint: %w", err)
}
if err := st.TouchSourceLastSyncAt(source.ID); err != nil {
return fmt.Errorf("touch source last sync: %w", err)
return summary, fmt.Errorf("touch source last sync: %w", err)
}
if err := st.CompleteSync(syncID, ""); err != nil {
return fmt.Errorf("complete sync: %w", err)
return summary, fmt.Errorf("complete sync: %w", err)
}
completed = true
return nil
return summary, nil
}

func importOneDriveBackup(ctx context.Context, st *store.Store, imp *synctechsms.Importer, client synctechsms.DriveClient, sourceID int64, file synctechsms.DriveFile, stagingDir string) (synctechsms.ImportSummary, error) {
Expand Down
137 changes: 125 additions & 12 deletions cmd/msgvault/cmd/add_synctech_sms_drive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func TestSynctechSMSDriveRunUsesSingleOuterSyncRun(t *testing.T) {
},
}

err := runSynctechSMSDriveSourceWithClient(context.Background(), f.Store, src, synctechImportOptions(src), client)
summary, err := runSynctechSMSDriveSourceWithClient(context.Background(), f.Store, src, synctechImportOptions(src), client)
require.NoError(err, "runSynctechSMSDriveSourceWithClient")
require.Len(summary.MessageIDs, 1, "summary message IDs")

source := getSynctechSource(t, f.Store, src.OwnerPhone)
assert.Equal(1, countSyncRuns(t, f.Store, source.ID), "sync run count")
Expand All @@ -86,7 +87,7 @@ func TestSynctechSMSDriveRunUsesSingleOuterSyncRun(t *testing.T) {
assert.Equal(int64(1), run.MessagesAdded, "messages added")
assert.True(getSynctechSource(t, f.Store, src.OwnerPhone).LastSyncAt.Valid, "last_sync_at should be touched")

item := getSourceImportItem(t, f.Store, source.ID, "drive", "backup-1")
item := getDriveSourceImportItem(t, f.Store, source.ID, "backup-1")
assert.Equal("imported", item.Status, "source import status")
assert.Equal(1, item.RecordsImported, "records imported")
assert.False(item.ErrorMessage.Valid, "source import error")
Expand All @@ -113,7 +114,7 @@ func TestSynctechSMSDriveRunSetsUpIdentityAndPostSourceMigration(t *testing.T) {
src := synctechDriveTestSource()
client := fakeSynctechDriveClient{}

err = runSynctechSMSDriveSourceWithClient(context.Background(), st, src, synctechImportOptions(src), client)
_, err = runSynctechSMSDriveSourceWithClient(context.Background(), st, src, synctechImportOptions(src), client)
require.NoError(err, "runSynctechSMSDriveSourceWithClient")

synctechSource := getSynctechSource(t, st, src.OwnerPhone)
Expand Down Expand Up @@ -154,7 +155,7 @@ func TestSynctechSMSDriveRunRecordsZeroSelectedPoll(t *testing.T) {
}},
}

err := runSynctechSMSDriveSourceWithClient(context.Background(), f.Store, src, synctechImportOptions(src), client)
_, err := runSynctechSMSDriveSourceWithClient(context.Background(), f.Store, src, synctechImportOptions(src), client)
require.NoError(err, "runSynctechSMSDriveSourceWithClient")

source := getSynctechSource(t, f.Store, src.OwnerPhone)
Expand Down Expand Up @@ -188,7 +189,7 @@ func TestSynctechSMSDriveRunMarksOuterSyncFailedOnDownloadError(t *testing.T) {
downloadErr: downloadErr,
}

err := runSynctechSMSDriveSourceWithClient(context.Background(), f.Store, src, synctechImportOptions(src), client)
_, err := runSynctechSMSDriveSourceWithClient(context.Background(), f.Store, src, synctechImportOptions(src), client)
require.ErrorIs(err, downloadErr, "runSynctechSMSDriveSourceWithClient")

source := getSynctechSource(t, f.Store, src.OwnerPhone)
Expand All @@ -198,17 +199,126 @@ func TestSynctechSMSDriveRunMarksOuterSyncFailedOnDownloadError(t *testing.T) {
require.True(run.ErrorMessage.Valid, "sync error_message")
assert.Contains(run.ErrorMessage.String, downloadErr.Error(), "sync error_message")

item := getSourceImportItem(t, f.Store, source.ID, "drive", "backup-1")
item := getDriveSourceImportItem(t, f.Store, source.ID, "backup-1")
assert.Equal("failed", item.Status, "source import status")
require.True(item.ErrorMessage.Valid, "source import error")
assert.Contains(item.ErrorMessage.String, downloadErr.Error(), "source import error")
}

func TestSynctechSMSDrivePartialFailureEnqueuesImportedMessages(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)
home := t.TempDir()
cfg = config.NewDefaultConfig()
cfg.HomeDir = home
cfg.Data.DataDir = home
f := storetest.New(t)
src := synctechDriveTestSource()
client := fakeSynctechDriveClient{
files: []synctechsms.DriveFile{
{
ID: "backup-1",
Name: "sms-1.xml",
Checksum: "sum-1",
Size: 128,
ModifiedTime: time.Now().Add(-30 * time.Minute),
},
{
ID: "backup-2",
Name: "sms-2.xml",
Checksum: "sum-2",
Size: 128,
ModifiedTime: time.Now().Add(-30 * time.Minute),
},
},
downloads: map[string]string{
"backup-1": `<smses count="1">
<sms address="+15551234567" date="1717214400000" type="1" body="hello before failure" read="1" status="-1" contact_name="Alice" />
</smses>`,
"backup-2": `<smses count="1">
<sms address="+15557654321" date="1717214460000" type="1" body="durable before malformed tail" read="1" status="-1" contact_name="Bob" />
<sms`,
},
}
err := runConfiguredSynctechSMSSourceWithStoreDriveClient(
context.Background(), f.Store, src, client)
require.Error(err, "runConfiguredSynctechSMSSourceWithStoreDriveClient")
assert.Contains(err.Error(), "import backup file", "partial parse error")

source := getSynctechSource(t, f.Store, src.OwnerPhone)
assertSourceMessageCount(t, f.Store, source.ID, 2)
assert.Equal(1, countSyncRuns(t, f.Store, source.ID), "sync run count")
run := getOnlySyncRun(t, f.Store, source.ID)
assert.Equal(store.SyncStatusFailed, run.Status, "sync status")

imported := getDriveSourceImportItem(t, f.Store, source.ID, "backup-1")
assert.Equal("imported", imported.Status, "first source import status")
failed := getDriveSourceImportItem(t, f.Store, source.ID, "backup-2")
assert.Equal("failed", failed.Status, "second source import status")

var unstamped int
require.NoError(f.Store.DB().QueryRow(
f.Store.Rebind(`SELECT COUNT(*) FROM messages WHERE source_id = ? AND embed_gen IS NULL`),
source.ID,
).Scan(&unstamped), "count unstamped messages")
assert.Equal(2, unstamped, "imported messages remain discoverable by scan-and-fill")
}

func TestRunConfiguredSynctechSMSSourceLeavesManualSyncMessagesUnstamped(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)
ctx := context.Background()
home := t.TempDir()
savedCfg := cfg
t.Cleanup(func() {
cfg = savedCfg
})
cfg = config.NewDefaultConfig()
cfg.HomeDir = home
cfg.Data.DataDir = home
cfg.Vector.Enabled = true
cfg.Vector.Embeddings.Endpoint = "http://127.0.0.1:1"
cfg.Vector.Embeddings.Model = "fake"
cfg.Vector.Embeddings.Dimension = 4

importDir := filepath.Join(home, "synctech-local")
require.NoError(os.MkdirAll(importDir, 0o700), "create import dir")
require.NoError(os.WriteFile(filepath.Join(importDir, "messages.xml"), []byte(`<smses count="1">
<sms address="+15551234567" date="1717214400000" type="1" body="manual sync should enqueue" read="1" status="-1" contact_name="Alice" />
</smses>`), 0o600), "write backup")

st, err := store.Open(cfg.DatabaseDSN())
require.NoError(err, "open store")
require.NoError(st.InitSchema(), "InitSchema")
require.NoError(st.Close(), "close store")

src := config.SynctechSMSSource{
Name: "pixel-local",
Backend: "local",
Path: importDir,
OwnerPhone: "+15550000001",
IncludeSMS: true,
}
require.NoError(runConfiguredSynctechSMSSource(ctx, src), "runConfiguredSynctechSMSSource")

st, err = store.Open(cfg.DatabaseDSN())
require.NoError(err, "reopen store")
t.Cleanup(func() { _ = st.Close() })
source := getSynctechSource(t, st, src.OwnerPhone)
var unstamped int
require.NoError(st.DB().QueryRowContext(ctx,
st.Rebind(`SELECT COUNT(*) FROM messages WHERE source_id = ? AND embed_gen IS NULL`),
source.ID,
).Scan(&unstamped), "count unstamped messages")
assert.Equal(1, unstamped, "manual sync message remains discoverable by scan-and-fill")
}

type fakeSynctechDriveClient struct {
files []synctechsms.DriveFile
downloads map[string]string
listErr error
downloadErr error
files []synctechsms.DriveFile
downloads map[string]string
listErr error
downloadErr error
downloadErrByID map[string]error
}

func (f fakeSynctechDriveClient) ListBackupFiles(context.Context, string) ([]synctechsms.DriveFile, error) {
Expand All @@ -219,6 +329,9 @@ func (f fakeSynctechDriveClient) ListBackupFiles(context.Context, string) ([]syn
}

func (f fakeSynctechDriveClient) DownloadToFile(_ context.Context, fileID, path string) error {
if err := f.downloadErrByID[fileID]; err != nil {
return err
}
if f.downloadErr != nil {
return f.downloadErr
}
Expand Down Expand Up @@ -280,9 +393,9 @@ func getOnlySyncRun(t *testing.T, st *store.Store, sourceID int64) store.SyncRun
return run
}

func getSourceImportItem(t *testing.T, st *store.Store, sourceID int64, provider, providerID string) *store.SourceImportItem {
func getDriveSourceImportItem(t *testing.T, st *store.Store, sourceID int64, providerID string) *store.SourceImportItem {
t.Helper()
item, err := st.GetSourceImportItem(sourceID, provider, providerID)
item, err := st.GetSourceImportItem(sourceID, "drive", providerID)
requirepkg.NoError(t, err, "GetSourceImportItem")
return item
}
Expand Down
28 changes: 28 additions & 0 deletions cmd/msgvault/cmd/embed_manage_sqlitevec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package cmd
import (
"context"
"database/sql"
"path/filepath"
"testing"

_ "github.com/mattn/go-sqlite3"
Expand Down Expand Up @@ -48,3 +49,30 @@ func TestRunEmbeddingsRetire_ForceActive(t *testing.T) {
row := mustGetEmbeddingGeneration(t.Context(), t, db, 1)
assert.Equal(vector.GenerationRetired, row.State)
}

func TestFillFullCoverageUsesEmbeddingScopeForEmbeddedCount(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)
ctx := context.Background()
dataDir := t.TempDir()
dbPath := newEmbeddingMetadataTestDBFileAt(t, filepath.Join(dataDir, "vectors.db"))
seedMainDBWithScopedFullCoverageMessages(t, dataDir)
withEmbeddingCommandConfigDataDir(t, dbPath, dataDir)
cfg.Vector.Embed.Scope.MessageTypes = []string{"sms"}

backend, closeBackend, err := openEmbeddingsBackend(ctx)
require.NoError(err, "open embeddings backend")
t.Cleanup(closeBackend)
require.NoError(backend.Upsert(ctx, 2, []vector.Chunk{
{MessageID: 1, Vector: []float32{1, 0, 0, 0}},
{MessageID: 2, Vector: []float32{0, 1, 0, 0}},
}), "upsert in-scope and out-of-scope vectors")

row := embeddingGenerationRow{ID: 2}
require.NoError(fillFullCoverage(ctx, backend, &row))

assert.Equal(int64(1), row.LiveCount, "only sms is in scope")
assert.Equal(int64(1), row.EmbeddedCount, "out-of-scope email vector is excluded")
assert.Equal(int64(0), row.BlankCount)
assert.Equal(int64(0), row.MissingCount)
}
Loading