Skip to content

refactor: replace TryLock to ensure data consistency#90

Open
aby913 wants to merge 47 commits intomainfrom
refactor/sync_pipeline
Open

refactor: replace TryLock to ensure data consistency#90
aby913 wants to merge 47 commits intomainfrom
refactor/sync_pipeline

Conversation

@aby913
Copy link
Contributor

@aby913 aby913 commented Mar 6, 2026

No description provided.

aby913 and others added 30 commits February 28, 2026 20:26
- New pipeline.go: orchestrates Syncer -> Hydrator -> DataWatcherRepo -> StatusCorrectionChecker serially
- Pipeline implements HydrationNotifier interface, replaces Hydrator as CacheManager's notifier
- Hydrator: add HydrateSingleApp() public method, remove internal pipeline logic, passive mode
- Syncer: add SyncOnce() and StartWithOptions() for Pipeline-driven scheduling
- cache.go: add version filtering in AppInfoLatestPending - skip apps already in Latest with same version
- datawatcher_state.go: fix RLock -> Lock deadlock in fetchInvisibleFromAppService
- datawatcher_app.go: fix potential nil pointer panic in log, clean up commented code
- appinfomodule.go: create Pipeline, wire all components, all timers disabled (Pipeline handles scheduling)

Co-authored-by: aby913 <aby913@users.noreply.github.com>
Pipeline-internal operations must complete, not skip. Change ~15 TryLock/TryRLock
calls to blocking Lock/RLock in code that is only called from the Pipeline goroutine:

- hydrationfn/task_for_api.go: writeAppDataToCache TryLock -> Lock
- datawatcher_app.go: ProcessSingleAppToLatest TryLock -> Lock
- datawatcher_app.go: calculateAndSetUserHashDirect - simplify goroutine/channel/timeout
  pattern to direct Lock (60+ lines removed)
- status_correction_check.go: hash update TryLock -> Lock
- hydration.go: isAppInRenderFailedList, isAppInLatestQueue TryRLock -> RLock
- hydration.go: moveTaskToRenderFailed TryRLock -> RLock
- hydration.go: removeFromPendingList - simplify two-phase TryRLock+TryLock to single Lock
- syncer.go: storeDataForSource, storeDataDirectly, storeDataViaCacheManager TryLock/TryRLock -> Lock/RLock

cache.go internal methods keep TryLock (shared with DataWatcherState NATS + API handlers).

Co-authored-by: aby913 <aby913@users.noreply.github.com>
- hydration.go: markTaskCompleted/markTaskFailed taskMutex.TryLock -> Lock
- hydration.go: addToCompletedHistory/addToFailedHistory workerStatusMutex.TryLock -> Lock
- syncerfn/detail_fetch_step.go: TryRLock/TryLock -> RLock/Lock (called from Syncer steps in pipeline)

Co-authored-by: aby913 <aby913@users.noreply.github.com>
All cache.go internal methods now use blocking locks instead of non-blocking TryLock.
This is safe because:
- Pipeline goroutine: holds lock briefly for in-memory operations (no HTTP calls)
- DataWatcherState (NATS): holds lock briefly via SetAppData
- API handlers: use RLock for reads, briefly wait during writes
- No HTTP calls inside any locked section in cache.go

Removed ~120 lines of TryLock failure handling, warning logs, and complex
lock acquisition patterns (goroutine+channel+timeout for cleanup worker).

Only the TryLock()/TryRLock() public methods on CacheManager remain as part
of the CacheManagerInterface (kept for backward compatibility).

Co-authored-by: aby913 <aby913@users.noreply.github.com>
…k in appinfomodule.go

Removed 20 dead methods (~830 lines) from hydration.go that were part of the
old worker pool / pendingDataMonitor model, now replaced by Pipeline:

- EnqueueTask, worker, processTask, updateWorkerStatus
- logTaskDataBeforeStep, logTaskDataAfterStep, getMapKeys
- pendingDataMonitor, checkForPendingData
- createTasksFromPendingData, createTasksFromPendingDataMap
- isAppDataHydrationComplete, hasActiveTaskForApp, trackTask
- hasRequiredRawDataFields, looksLikeAppsMap
- ForceAddTaskFromLatestData, convertLatestDataToMap, ForceCheckPendingData

appinfomodule.go: replaced last 2 TryLock/TryRLock calls with Lock/RLock
in correctCacheWithChartRepo and GetInvalidDataReport.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
Encapsulate all external lock operations on CacheManager into internal
methods, following a hierarchical accessor pattern (users -> sources -> apps).

New read methods (RLock internally):
- GetUserIDs, GetOrCreateUserIDs, IsLocalSource
- CollectAllPendingItems, FindPendingDataForApp
- IsAppInLatestQueue, IsAppInRenderFailedList
- SnapshotSourcePending, HasSourceData, IsAppInstalled
- GetSourceOthersHash, ListActiveUsers

New write methods (Lock internally):
- SetUserHash, RemoveFromPendingList
- UpsertLatestAndRemovePending, UpdateSourceOthers
- RemoveAppFromAllSources, RemoveDelistedApps
- CopyPendingVersionHistory

Removed public methods:
- Lock/Unlock/TryLock/RLock/RUnlock/TryRLock (lock internalized)
- GetCache (no direct cache access)
- GetUserDataNoLock, GetAllUsersDataWithFallback, GetUserDataWithFallback
- GetLockStats, DumpLockInfo (made private)

Updated CacheManagerInterface to expose semantic operations instead
of lock primitives. All 13 external files updated to use the new API.

Existing read methods (GetAllUsersData, GetUserData, GetSourceData)
are preserved as the hierarchical accessor layer.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
Pipeline now runs purely on a fixed interval (ticker). Removed:
- Pipeline.trigger channel and Trigger()/NotifyPendingDataUpdate() methods
- HydrationNotifier interface and CacheManager.hydrationNotifier field
- SetHydrationNotifier/setHydrationNotifierInternal methods
- Notification call in setAppDataInternal after writing pending data
- Hydrator.NotifyPendingDataUpdate empty compatibility method

The trigger was redundant: it fired from within Pipeline's own run
(Syncer writes pending data → notification → trigger), causing a
duplicate run cycle immediately after each scheduled run.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
…tart

- Change pipeline interval from 30s to 5min (both default and call site)
- Call p.run(ctx) before entering the ticker loop so the first cycle
  executes immediately at startup instead of waiting 5 minutes

Co-authored-by: aby913 <aby913@users.noreply.github.com>
Pipeline interval reverted to 30s so DataWatcherRepo and
StatusCorrectionChecker execute at their expected frequency.

Syncer now tracks lastSyncExecuted internally and skips execution
if less than syncInterval (5min) has elapsed, avoiding redundant
remote data fetches on every 30s pipeline tick.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
- Remove scattered hash calculations from Phase 3 (DataWatcherRepo),
  Phase 4 (StatusCorrectionChecker), and event-driven paths (DataWatcherState)
- Phase 1-4 now only modify data; hash calculation and ForceSync happen
  exactly once per Pipeline cycle in Phase 5
- Add dirty users mechanism (MarkUserDirty/CollectAndClearDirtyUsers) for
  event-driven paths to defer hash calculation to the next Pipeline cycle
- DataWatcherRepo.ProcessOnce() and StatusCorrectionChecker.PerformStatusCheckOnce()
  now return affected user sets for Phase 5 to consume
- calculateAndSetUserHashDirect() no longer calls ForceSync internally
- Pipeline Phase 5 merges affected users from Phase 2/3/4 + dirty users
- ForceSync rate-limit log downgraded from Error to Warning
- Remove '临时注释' from cache.go Start method

Co-authored-by: aby913 <aby913@users.noreply.github.com>
Two bugs were preventing newly added sources from being processed promptly:

1. SyncOnce throttle ignored source config changes: The syncer's
   SyncOnce() only checked a time-based throttle (syncInterval, default
   5min). When a new remote source was added, the syncer would not run
   until the throttle expired, leaving the new source without app data
   for up to 5 minutes. Added hasRemoteSourceConfigChanged() to detect
   when the set of remote source IDs changes and force an immediate
   sync cycle.

2. LatestData initialized as non-nil empty struct: In
   NewSyncContextWithManager, LatestData was initialized as
   &AppStoreInfoResponse{} instead of nil. When sync steps were skipped
   (hash match for existing sources), the check 'syncContext.LatestData
   != nil' still passed, causing storeDataViaCacheManager to run with
   empty data. This unnecessarily cleared AppInfoLatestPending for
   hash-matched sources. Changed initialization to nil so the store
   code only runs when DataFetchStep actually populates LatestData.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
Extend hasSyncRelevantConfigChanged (renamed from
hasRemoteSourceConfigChanged) to also track the user ID list via
CacheManager.GetUserIDs(). When a user is added or removed, the syncer
now detects the change on the next pipeline tick and forces an immediate
sync cycle, ensuring new users get their app data without waiting for
the full sync interval to expire.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
HydrateSingleApp had several early-return paths that silently returned
false without any log output at V(2) level, making it impossible to
diagnose why pending apps were not being processed. Added V(2) log
lines for three skip conditions:

- App is in the render failed list (will retry after 5min cleanup)
- App is already in the latest queue with the same version
- convertApplicationInfoEntryToMap returned empty data

Co-authored-by: aby913 <aby913@users.noreply.github.com>
The main bottleneck in Phase 2 is the HTTP call to chart-repo
dcr/sync-app in TaskForApiStep (3s timeout). With serial processing,
N apps could block the pipeline for up to N*3 seconds.

Change phaseHydrateApps to process pending apps in concurrent batches:
- Default concurrency: 5 (configurable via PIPELINE_HYDRATION_CONCURRENCY)
- Each batch fires N goroutines running HydrateSingleApp in parallel
- WaitGroup ensures the batch completes before moving to the next
- ProcessSingleAppToLatest remains sequential per batch to avoid
  concurrent writes to the same source's AppInfoLatest slice

Cache concurrency is safe because:
- All write operations go through CacheManager.mutex.Lock (micro-second
  hold times, different apps touch different pendingData objects)
- Read-only checks (isAppInRenderFailedList, isAppInLatestQueue) use
  CacheManager.mutex.RLock which allows concurrent readers
- findPendingDataFromCache reads without lock but only accesses data
  that Phase 1 has already finished writing

Co-authored-by: aby913 <aby913@users.noreply.github.com>
…te limit

SyncOnce: when the 5-minute throttle is active, do a lightweight HTTP
probe to each remote source's hash endpoint. If any source's remote
hash differs from the locally cached Others.Hash, bypass the throttle
and run a full sync cycle immediately. Network errors during the probe
are silently ignored (conservative: don't force sync on failure).

ForceSync: remove the 1-minute rate limiter. In the Pipeline
architecture, ForceSync is called exactly once per cycle in Phase 5,
so the rate limiter is unnecessary. The Hydrator's databaseSyncMonitor
was the only other caller that could trigger it, and it was always
hitting the rate limit set by Phase 5 — producing useless error logs
every 30 seconds.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
Co-authored-by: aby913 <aby913@users.noreply.github.com>
CollectAllPendingItems returns a snapshot taken before async deletions
(RemoveUserData, SyncMarketSourcesToCache) have completed. Without this
check, Phase 2 would attempt to hydrate apps for users or sources that
no longer exist, resulting in chart-repo 500 errors and useless
render-failed entries.

Before batching, verify each pending item's user+source still exists
via CacheManager.GetSourceData. Items referencing deleted users or
sources are logged and skipped.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
processStateChanges wrote the last processed ID to Redis but never
updated the in-memory dwr.lastProcessedID field. On the next pipeline
cycle, fetchStateChanges used the stale in-memory ID, causing the same
batch of state changes (especially image_info_updated events) to be
fetched and re-processed every cycle.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
…cted helpers

- Replace all TryLock/TryRLock with Lock/RLock across TaskModule
- Extract dequeueNextPendingTask(): atomically moves pending task to running
- Extract removeRunningTask(): atomically removes task from running map
- Extract completeRunningTask(): atomically finds, updates, and removes a running task
- Extract handleTaskFailure(): consolidates 5 identical error handling blocks in executeTask
- Move I/O operations (DB persist, NATS notify, history record) outside lock scope
  in all 7 external signal methods (InstallTaskSucceed, InstallTaskFailed, etc.)
- Fix missing RLock in checkRunningTasksStatus (was iterating runningTasks without lock)
- Simplify HasPendingOrRunningInstallTask to use blocking RLock

This eliminates:
- Task leaks in runningTasks when TryLock failed (goroutine retry could also fail)
- Lost external signals (InstallTaskSucceed etc. returning error on TryLock failure)
- Unreliable HasPendingOrRunningInstallTask results causing unnecessary message delays
- ~500 lines of duplicated TryLock retry boilerplate

Co-authored-by: aby913 <aby913@users.noreply.github.com>
TryRLock failure was silently skipping VC (verifiable credential) injection
during install/clone, which could cause payment-related issues. The write lock
holder (SetSettingsManager) only does a field assignment (nanoseconds), so
RLock wait time is negligible.

Co-authored-by: aby913 <aby913@users.noreply.github.com>
…ppinfomodule

syncer.go:
- AddStep/RemoveStep/GetSteps: TryLock/TryRLock → Lock/RLock
  (prevents silent step loss on lock contention)
- StartWithOptions/Stop: TryLock → Lock
  (Stop must not silently fail - syncer would be unstoppable)
- syncLoop defer cleanup: TryLock → Lock
  (ensures isRunning is always reset on exit)
- updateSyncSuccess/updateSyncFailure: TryLock → Lock
  (prevents lost status updates)
- SetCacheManager: TryLock → Lock
  (prevents silent configuration loss)

hydration.go:
- GetMetrics: TryRLock → RLock, simplified fallback-free path
- getRecentCompletedTasks/getRecentFailedTasks: TryRLock → RLock
- processCompletedTask/processBatchCompletions: TryRLock → RLock
- checkAndSyncToDatabase: TryRLock → RLock
- monitorMemoryUsage: TryRLock → RLock
- cleanupOldCompletedTasks/cleanupOldTasks: TryLock → Lock
  (prevents memory leak from skipped cleanup)

appinfomodule.go:
- IsStarted: TryRLock → RLock (no longer returns false on contention)
- GetModuleStatus: TryRLock → RLock (no longer returns error status)

Kept TryLock in pipeline.go:run() and datawatcher_app.go:processCompletedApps()
as timer-guard pattern (skip cycle if previous still running).

Co-authored-by: aby913 <aby913@users.noreply.github.com>
Add RestoreRetryableFailedToPending to CacheManager: each Pipeline cycle,
move up to 50 items from AppRenderFailed back to AppInfoLatestPending (FIFO)
so the hydrator can retry them without waiting for the 5-minute GC cleanup.

- Failed apps that timeout on dcr/sync-app (3s) are retried ~30s later
  instead of waiting 5+ minutes for ClearAppRenderFailedData
- Items are atomically removed from Failed and added to Pending under
  cm.mutex.Lock, no concurrent modification risk
- ClearAppRenderFailedData (5-min GC) remains as safety net for
  permanently failing apps
- moveTaskToRenderFailed is unchanged: if retry fails again, the app
  goes back to Failed through the normal path

Co-authored-by: aby913 <aby913@users.noreply.github.com>
@aby913 aby913 force-pushed the refactor/sync_pipeline branch from c0cfb52 to 058e5e1 Compare March 11, 2026 06:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants