Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/publish-discord-backup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: publish-discord-backup
on:
schedule:
- cron: "*/15 * * * *"
- cron: "17 6 * * *"
workflow_dispatch:

permissions:
Expand Down Expand Up @@ -83,7 +84,12 @@ jobs:
go run ./cmd/discrawl --config "$CONFIG" update --repo "$BACKUP_REPO" --remote "$BACKUP_REMOTE"
fi
fi
go run ./cmd/discrawl --config "$CONFIG" sync --guild "$DISCRAWL_GUILD_ID" --skip-members --latest-only
sync_args=(--guild "$DISCRAWL_GUILD_ID" --skip-members --latest-only)
if [ "${{ github.event.schedule }}" = "17 6 * * *" ]; then
sync_args=(--guild "$DISCRAWL_GUILD_ID" --with-members --latest-only)
echo "Refreshing Discord member roles and profiles."
fi
go run ./cmd/discrawl --config "$CONFIG" sync "${sync_args[@]}"
git -C "$BACKUP_REPO" pull --ff-only origin main
go run ./cmd/discrawl --config "$CONFIG" publish \
--repo "$BACKUP_REPO" \
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ latest update time, latest archived message, archive totals, and day/week/month
activity. Filtered publishes skip generated README reports to avoid leaking
full-archive totals.

The backup workflows restore and save `.discrawl-ci/discrawl.db` with `actions/cache`. On a warm runner cache, scheduled publishers skip the pre-sync snapshot import and go straight to the live latest-message delta before publishing. Cache misses still import the latest published snapshot first so `--latest-only` has channel cursors to resume from.
The backup workflows restore and save `.discrawl-ci/discrawl.db` with `actions/cache`. On a warm runner cache, scheduled publishers skip the pre-sync snapshot import and go straight to the live latest-message delta before publishing. Cache misses still import the latest published snapshot first so `--latest-only` has channel cursors to resume from. The Discord backup publisher also runs a daily `--with-members` sync so archived member roles and profiles stay current without slowing every 15-minute message delta. That explicit member refresh fails the run if Discord rejects or times out the member crawl, rather than publishing a silently stale role snapshot.

### `digest`

Expand Down
3 changes: 2 additions & 1 deletion docs/commands/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ discrawl sync --with-media
- `--since <RFC3339>` - limit initial history and `--full` backfill to messages at or after this timestamp
- `--concurrency <n>` - override worker count (default auto-sized: floor 8, cap 32)
- `--skip-members` - refresh guild/channel/message data without crawling members
- `--with-members` - refresh guild members even during the default latest-only sync; fail if the member crawl cannot complete
- `--with-embeddings` - also enqueue changed messages into `embedding_jobs`
- `--with-media` - after sync, download missing attachment media into `cache_dir/media`

Expand All @@ -76,7 +77,7 @@ discrawl sync --with-media
- Each channel crawl has a bounded runtime budget; pathological channels are deferred and retried next sync.
- Retryable failures and unavailable-channel markers are tracked per channel; stale unavailable markers are cleared after a later successful crawl.
- Marker cleanup is best-effort, so one missing local sync-state row cannot crash the run.
- Full sync member refresh is best-effort and gives up after five minutes without a caller-supplied deadline.
- Member refresh is best-effort and gives up after five minutes without a caller-supplied deadline. Routine latest-only syncs skip it unless `--with-members` is set.
- When the archive is already complete, `sync --full` reuses backlog markers and limits steady-state refresh to live top-level channels plus active threads.

## See also
Expand Down
25 changes: 15 additions & 10 deletions internal/cli/admin_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (r *runtime) runSync(args []string) error {
withEmbeddings := fs.Bool("with-embeddings", false, "")
withMedia := fs.Bool("with-media", r.cfg.AttachmentMediaEnabled(), "")
skipMembers := fs.Bool("skip-members", false, "")
withMembers := fs.Bool("with-members", false, "")
latestOnly := fs.Bool("latest-only", false, "")
guildsFlag := fs.String("guilds", "", "")
guildFlag := fs.String("guild", "", "")
Expand All @@ -130,6 +131,9 @@ func (r *runtime) runSync(args []string) error {
if *noUpdate && strings.TrimSpace(*updateMode) != "" && !strings.EqualFold(strings.TrimSpace(*updateMode), string(shareUpdateNever)) {
return usageErr(errors.New("use either --no-update or --update, not both"))
}
if *skipMembers && *withMembers {
return usageErr(errors.New("use either --skip-members or --with-members, not both"))
}
if strings.TrimSpace(*updateMode) != "" {
if _, err := parseShareUpdateMode(*updateMode); err != nil {
return usageErr(err)
Expand All @@ -153,14 +157,15 @@ func (r *runtime) runSync(args []string) error {
}
defaultLatest := defaultLatestSyncMode(*full, *allChannels, *since, *channels)
opts := syncer.SyncOptions{
Full: *full,
GuildIDs: guildIDs,
ChannelIDs: csvList(*channels),
Concurrency: *concurrency,
Since: sinceTime,
Embeddings: *withEmbeddings,
SkipMembers: syncSkipsMembers(*skipMembers, defaultLatest),
LatestOnly: syncLatestOnly(*latestOnly, defaultLatest),
Full: *full,
GuildIDs: guildIDs,
ChannelIDs: csvList(*channels),
Concurrency: *concurrency,
Since: sinceTime,
Embeddings: *withEmbeddings,
SkipMembers: syncSkipsMembers(*skipMembers, *withMembers, defaultLatest),
RequireMembers: *withMembers,
LatestOnly: syncLatestOnly(*latestOnly, defaultLatest),
}
return r.withSyncLock(func() error {
return r.runSyncLocked(sources, opts, *withMedia)
Expand Down Expand Up @@ -283,8 +288,8 @@ func syncLatestOnly(explicit bool, defaultLatest bool) bool {
return explicit || defaultLatest
}

func syncSkipsMembers(explicit bool, defaultLatest bool) bool {
return explicit || defaultLatest
func syncSkipsMembers(skipMembers bool, withMembers bool, defaultLatest bool) bool {
return skipMembers || (!withMembers && defaultLatest)
}

func parseSyncSources(raw string) (syncSources, error) {
Expand Down
24 changes: 23 additions & 1 deletion internal/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3306,6 +3306,13 @@ func TestRuntimeInitSyncTailAndDoctor(t *testing.T) {
require.True(t, fakeSync.lastSync.SkipMembers)
require.True(t, fakeSync.attachmentTextEnabled)

rt = newRuntime()
require.NoError(t, rt.withServices(true, func() error { return rt.runSync([]string{"--guilds", "g2", "--with-members"}) }))
require.Equal(t, []string{"g2"}, fakeSync.lastSync.GuildIDs)
require.True(t, fakeSync.lastSync.LatestOnly)
require.False(t, fakeSync.lastSync.SkipMembers)
require.True(t, fakeSync.lastSync.RequireMembers)

rt = newRuntime()
require.NoError(t, rt.withServices(true, func() error { return rt.runSync([]string{"--all"}) }))
require.Nil(t, fakeSync.lastSync.GuildIDs)
Expand Down Expand Up @@ -3344,8 +3351,10 @@ func TestSyncModeDefaults(t *testing.T) {
skipMembers bool
explicitLatest bool
explicitSkip bool
explicitWith bool
}{
{name: "routine", defaultLatest: true, latestOnly: true, skipMembers: true},
{name: "routine with members", defaultLatest: true, latestOnly: true, explicitWith: true},
{name: "all channels", allChannels: true},
{name: "full", full: true},
{name: "since", since: "2026-04-27T20:00:00Z"},
Expand All @@ -3361,11 +3370,24 @@ func TestSyncModeDefaults(t *testing.T) {
defaultLatest := defaultLatestSyncMode(tt.full, tt.allChannels, tt.since, tt.channels)
require.Equal(t, tt.defaultLatest, defaultLatest)
require.Equal(t, tt.latestOnly, syncLatestOnly(tt.explicitLatest, defaultLatest))
require.Equal(t, tt.skipMembers, syncSkipsMembers(tt.explicitSkip, defaultLatest))
require.Equal(t, tt.skipMembers, syncSkipsMembers(tt.explicitSkip, tt.explicitWith, defaultLatest))
})
}
}

func TestSyncRejectsConflictingMemberFlags(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cfgPath := filepath.Join(dir, "config.toml")
cfg := config.Default()
cfg.DBPath = filepath.Join(dir, "discrawl.db")
require.NoError(t, config.Write(cfgPath, cfg))

err := Run(ctx, []string{"--config", cfgPath, "sync", "--skip-members", "--with-members"}, &bytes.Buffer{}, &bytes.Buffer{})
require.Equal(t, 2, ExitCode(err))
require.ErrorContains(t, err, "use either --skip-members or --with-members, not both")
}

func TestDoctorChecksEnabledLocalEmbeddingProvider(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
Expand Down
62 changes: 41 additions & 21 deletions internal/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ type Syncer struct {
}

type SyncOptions struct {
Full bool
GuildIDs []string
ChannelIDs []string
Concurrency int
Since time.Time
Embeddings bool
SkipMembers bool
LatestOnly bool
RepairReason string
Full bool
GuildIDs []string
ChannelIDs []string
Concurrency int
Since time.Time
Embeddings bool
SkipMembers bool
RequireMembers bool
LatestOnly bool
RepairReason string
}

func (s *Syncer) SetTailReadyCallback(fn func(context.Context) error) {
Expand Down Expand Up @@ -143,7 +144,11 @@ func (s *Syncer) syncGuild(ctx context.Context, guildID string, opts SyncOptions
}
if ok {
stats.add(batched)
stats.Members = s.refreshGuildMembersForSync(ctx, guildID, false, opts)
members, err := s.refreshGuildMembersForSync(ctx, guildID, false, opts)
if err != nil {
return stats, err
}
stats.Members = members
return stats, nil
}
if s.shouldUseIncrementalFullCatalog(ctx, guildID) {
Expand All @@ -158,7 +163,11 @@ func (s *Syncer) syncGuild(ctx context.Context, guildID string, opts SyncOptions
return stats, err
}

stats.Members = s.refreshGuildMembersForSync(ctx, guildID, targeted, opts)
members, err := s.refreshGuildMembersForSync(ctx, guildID, targeted, opts)
if err != nil {
return stats, err
}
stats.Members = members
messageCount, err := s.syncMessageChannels(ctx, guildID, channelList, opts)
if err != nil {
return stats, err
Expand Down Expand Up @@ -202,11 +211,21 @@ func (s *Syncer) storeChannelList(ctx context.Context, channels []*discordgo.Cha
return nil
}

func (s *Syncer) refreshGuildMembersForSync(ctx context.Context, guildID string, targeted bool, opts SyncOptions) int {
if targeted || opts.SkipMembers {
return 0
func (s *Syncer) refreshGuildMembersForSync(ctx context.Context, guildID string, targeted bool, opts SyncOptions) (int, error) {
if targeted {
if opts.RequireMembers {
return 0, errors.New("cannot require a member refresh for a targeted channel sync")
}
return 0, nil
}
if opts.SkipMembers {
return 0, nil
}
members, err := s.refreshGuildMembers(ctx, guildID, opts.RequireMembers)
if err != nil && opts.RequireMembers {
return 0, err
}
return s.refreshGuildMembers(ctx, guildID)
return members, nil
}

func (s *Syncer) syncGuildIncompleteBatches(ctx context.Context, guildID string, opts SyncOptions) (SyncStats, bool, error) {
Expand Down Expand Up @@ -249,9 +268,9 @@ func (stats *SyncStats) addChannel(record store.ChannelRecord) {
}
}

func (s *Syncer) refreshGuildMembers(ctx context.Context, guildID string) int {
if !s.shouldRefreshMembers(ctx, guildID) {
return 0
func (s *Syncer) refreshGuildMembers(ctx context.Context, guildID string, force bool) (int, error) {
if !force && !s.shouldRefreshMembers(ctx, guildID) {
return 0, nil
}
memberCtx := ctx
cancel := func() {}
Expand All @@ -276,19 +295,20 @@ func (s *Syncer) refreshGuildMembers(ctx context.Context, guildID string) int {
"elapsed", time.Since(startedAt).Round(time.Second).String(),
"timed_out", errors.Is(err, context.DeadlineExceeded),
)
return 0
return 0, fmt.Errorf("crawl guild members: %w", err)
}
converted := make([]store.MemberRecord, 0, len(members))
for _, member := range members {
converted = append(converted, toMemberRecord(guildID, member))
}
if err := s.store.ReplaceMembers(ctx, guildID, converted); err != nil {
s.logger.Warn("member replace failed", "guild_id", guildID, "err", err)
return 0
return 0, fmt.Errorf("replace guild members: %w", err)
}
if s.store != nil {
if err := s.store.SetSyncState(ctx, guildMemberSyncSuccessScope(guildID), time.Now().UTC().Format(time.RFC3339Nano)); err != nil {
s.logger.Warn("member sync state update failed", "guild_id", guildID, "err", err)
return 0, fmt.Errorf("record guild member sync: %w", err)
}
}
s.logger.Info(
Expand All @@ -297,7 +317,7 @@ func (s *Syncer) refreshGuildMembers(ctx context.Context, guildID string) int {
"members", len(converted),
"elapsed", time.Since(startedAt).Round(time.Second).String(),
)
return len(converted)
return len(converted), nil
}

func (s *Syncer) shouldUseIncrementalFullCatalog(ctx context.Context, guildID string) bool {
Expand Down
67 changes: 67 additions & 0 deletions internal/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type fakeClient struct {
messageStarted chan string
beforeErrors map[string]map[string]error
memberDelay time.Duration
memberErr error
tailCalls int
tailHandled chan struct{}
messageDelay time.Duration
Expand Down Expand Up @@ -120,6 +121,9 @@ func (f *fakeClient) GuildMembers(ctx context.Context, guildID string) ([]*disco
case <-timer.C:
}
}
if f.memberErr != nil {
return nil, f.memberErr
}
return f.members[guildID], nil
}

Expand Down Expand Up @@ -394,6 +398,69 @@ func TestSyncMemberRefreshTimeoutStillMarksSuccess(t *testing.T) {
require.NotEmpty(t, lastSync)
}

func TestSyncRequiredMemberRefreshFailsOnCrawlError(t *testing.T) {
t.Parallel()

ctx := context.Background()
s, err := store.Open(ctx, filepath.Join(t.TempDir(), "discrawl.db"))
require.NoError(t, err)
defer func() { _ = s.Close() }()

client := &fakeClient{
guilds: []*discordgo.UserGuild{{ID: "g1", Name: "Guild"}},
guildByID: map[string]*discordgo.Guild{
"g1": {ID: "g1", Name: "Guild"},
},
channels: map[string][]*discordgo.Channel{
"g1": {{ID: "c1", GuildID: "g1", Name: "general", Type: discordgo.ChannelTypeGuildText}},
},
memberErr: errors.New("rate limited"),
}

svc := New(client, s, nil)
stats, err := svc.Sync(ctx, SyncOptions{LatestOnly: true, RequireMembers: true})
require.ErrorContains(t, err, "crawl guild members: rate limited")
require.Zero(t, stats.Members)
require.Equal(t, 1, client.memberCalls)

lastSync, err := s.GetSyncState(ctx, "sync:last_success")
require.NoError(t, err)
require.Empty(t, lastSync)
}

func TestSyncRequiredMemberRefreshBypassesFreshSnapshot(t *testing.T) {
t.Parallel()

ctx := context.Background()
s, err := store.Open(ctx, filepath.Join(t.TempDir(), "discrawl.db"))
require.NoError(t, err)
defer func() { _ = s.Close() }()

require.NoError(t, s.SetSyncState(
ctx,
guildMemberSyncSuccessScope("g1"),
time.Now().UTC().Format(time.RFC3339Nano),
))
client := &fakeClient{
guilds: []*discordgo.UserGuild{{ID: "g1", Name: "Guild"}},
guildByID: map[string]*discordgo.Guild{
"g1": {ID: "g1", Name: "Guild"},
},
channels: map[string][]*discordgo.Channel{
"g1": {{ID: "c1", GuildID: "g1", Name: "general", Type: discordgo.ChannelTypeGuildText}},
},
members: map[string][]*discordgo.Member{
"g1": {{User: &discordgo.User{ID: "u1", Username: "user"}}},
},
}

svc := New(client, s, nil)
stats, err := svc.Sync(ctx, SyncOptions{LatestOnly: true, RequireMembers: true})
require.NoError(t, err)
require.Equal(t, 1, stats.Members)
require.Equal(t, 1, client.memberCalls)
}

func TestSyncRejectsUnknownRequestedGuild(t *testing.T) {
t.Parallel()

Expand Down