Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
6f560da
refactor: syncer pipeline
aby913 Feb 28, 2026
e87f00d
refactor: extract serial pipeline into standalone component
cursoragent Feb 28, 2026
3dd0fe9
refactor: replace TryLock with blocking Lock on pipeline execution path
cursoragent Feb 28, 2026
6371316
refactor: replace remaining TryLock with Lock on pipeline execution path
cursoragent Feb 28, 2026
c5d6636
refactor: replace all TryLock/TryRLock with Lock/RLock in cache.go
cursoragent Feb 28, 2026
329d528
refactor: remove dead code from hydration.go and fix remaining TryLoc…
cursoragent Feb 28, 2026
70d857b
refactor: encapsulate cache lock operations into CacheManager methods
cursoragent Mar 2, 2026
d5c5608
refactor: remove pipeline trigger mechanism
cursoragent Mar 2, 2026
69d93c4
refactor: set pipeline interval to 5 minutes and run immediately on s…
cursoragent Mar 2, 2026
2cc5b37
refactor: pipeline 30s cycle with syncer self-throttling to 5min
cursoragent Mar 2, 2026
d71a0b8
refactor: centralize hash calculation and ForceSync to Pipeline Phase 5
cursoragent Mar 3, 2026
b0b939c
fix: ensure pipeline syncs data for newly added sources
cursoragent Mar 4, 2026
177b245
fix: also detect user list changes in SyncOnce throttle bypass
cursoragent Mar 4, 2026
180ae71
fix: add V(2) diagnostic logs for silently skipped hydration apps
cursoragent Mar 4, 2026
aeb7473
perf: batch-parallel hydration in Pipeline Phase 2
cursoragent Mar 4, 2026
8232ddb
perf: bypass sync throttle on remote hash change; remove ForceSync ra…
cursoragent Mar 4, 2026
c7cf9e2
refactor: add logs
aby913 Mar 4, 2026
6689b2e
fix: log when Phase 2 has no pending apps to process
cursoragent Mar 4, 2026
d2e7df9
fix: skip pending items from deleted users/sources in Phase 2
cursoragent Mar 4, 2026
77c4d24
fix: update in-memory lastProcessedID after processing state changes
cursoragent Mar 4, 2026
0f22ce4
refactor: improve pipeline log
aby913 Mar 5, 2026
d240ab8
refactor: add logs
aby913 Mar 5, 2026
432da81
refactor: replace TryLock with Lock in TaskModule, extract lock-prote…
cursoragent Mar 5, 2026
fa557a9
refactor: replace TryRLock with RLock in app_install and app_clone
cursoragent Mar 5, 2026
062c769
refactor: improve logs
aby913 Mar 5, 2026
d82d54a
refactor: replace TryLock with Lock/RLock in syncer, hydration, and a…
cursoragent Mar 5, 2026
cd72531
refactor: ClearAppRenderFailedData
aby913 Mar 6, 2026
df83c40
fix: format system status allusers
aby913 Mar 6, 2026
15ba600
feat: restore retryable failed apps to pending queue each pipeline cycle
cursoragent Mar 6, 2026
da7efd0
refactor: improve logs, remove TryLock
aby913 Mar 6, 2026
058e5e1
fix: adjust app operator logs
aby913 Mar 9, 2026
2472d43
refactor: compare NATS and local appStateLatest, fix time parse layout
aby913 Mar 11, 2026
39fb00e
refactor: update user cache on add/delete operations
aby913 Mar 11, 2026
fbde378
fix: add parsing for rawAppName field
aby913 Mar 11, 2026
b7ee8e2
refactor: add state api for settings
aby913 Mar 11, 2026
449e1f6
fix: add parsing for Title field
aby913 Mar 11, 2026
3060c25
Fix pending/failed overlap and deduplicate failed entries
cursoragent Mar 13, 2026
9a8b334
Allow version upgrades past failed-list gating
cursoragent Mar 13, 2026
2bfe9f1
Merge pull request #91 from beclab/cursor/-bc-c174acf8-21e1-419f-89af…
aby913 Mar 13, 2026
5b8333a
refactor: add more logs
aby913 Mar 13, 2026
6e991fb
fix: no push when Resuming app
aby913 Mar 13, 2026
6e967e3
fix: add nats message field
aby913 Mar 18, 2026
5d5e53a
fix: add pendingCanceled status check when deleting app
aby913 Mar 20, 2026
4e83cbf
fix: complete rawAppName field content
aby913 Mar 20, 2026
c449fd9
fix: add logs
aby913 Mar 23, 2026
1816ebc
fix: separate param for different local sources
dkeven Mar 20, 2026
ed0a9aa
Merge branch 'main' into refactor/sync_pipeline
aby913 Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 61 additions & 66 deletions internal/v2/appinfo/appinfomodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type AppInfoModule struct {
redisClient *RedisClient
syncer *Syncer
hydrator *Hydrator
pipeline *Pipeline
dataWatcher *DataWatcher
dataWatcherState *DataWatcherState
dataWatcherUser *DataWatcherUser
dataWatcherRepo *DataWatcherRepo // Add DataWatcherRepo for image info updates
dataWatcherRepo *DataWatcherRepo
dataSender *DataSender
statusCorrectionChecker *StatusCorrectionChecker
settingsManager *settings.SettingsManager
Expand Down Expand Up @@ -223,10 +224,29 @@ func (m *AppInfoModule) Start() error {
}
}

// Set up hydration notifier connection if both cache and hydrator are enabled
if m.config.EnableCache && m.config.EnableHydrator && m.cacheManager != nil && m.hydrator != nil {
m.cacheManager.SetHydrationNotifier(m.hydrator)
glog.Infof("Hydration notifier connection established between cache manager and hydrator")
// Create and start Pipeline to orchestrate all components serially
if m.config.EnableHydrator && m.cacheManager != nil {
p := NewPipeline(m.cacheManager, m.cacheManager.cache, 30*time.Second)
if m.syncer != nil {
p.SetSyncer(m.syncer)
}
if m.hydrator != nil {
p.SetHydrator(m.hydrator)
}
if m.dataWatcher != nil {
p.SetDataWatcher(m.dataWatcher)
}
if m.dataWatcherRepo != nil {
p.SetDataWatcherRepo(m.dataWatcherRepo)
}
if m.statusCorrectionChecker != nil {
p.SetStatusCorrectionChecker(m.statusCorrectionChecker)
}
if err := p.Start(m.ctx); err != nil {
return fmt.Errorf("failed to start Pipeline: %w", err)
}
m.pipeline = p
glog.Infof("Pipeline started, all components orchestrated serially")
}

m.isStarted = true
Expand All @@ -245,7 +265,11 @@ func (m *AppInfoModule) Stop() error {

glog.V(3).Info("Stopping AppInfo module...")

// Stop components in reverse order
// Stop Pipeline first (it orchestrates other components)
if m.pipeline != nil {
m.pipeline.Stop()
}

if m.hydrator != nil {
m.hydrator.Stop()
}
Expand Down Expand Up @@ -378,25 +402,14 @@ func (m *AppInfoModule) GetRedisConfig() *RedisConfig {

// IsStarted returns whether the module is currently running
func (m *AppInfoModule) IsStarted() bool {
// Boolean read is atomic, but we need to ensure consistency with Start/Stop operations
if !m.mutex.TryRLock() {
glog.Warning("[TryRLock] AppInfoModule.IsStarted: Read lock not available, returning false")
return false
}
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.isStarted
}

// GetModuleStatus returns the current status of the module and all components
func (m *AppInfoModule) GetModuleStatus() map[string]interface{} {
// Need read lock to ensure consistent snapshot of all component states
if !m.mutex.TryRLock() {
glog.Warning("[TryRLock] AppInfoModule.GetModuleStatus: Read lock not available, returning error status")
return map[string]interface{}{
"error": "lock not available",
"status": "unknown",
}
}
m.mutex.RLock()
defer m.mutex.RUnlock()

status := map[string]interface{}{
Expand Down Expand Up @@ -517,12 +530,12 @@ func (m *AppInfoModule) initSyncer() error {
glog.V(3).Info("Cache manager reference set in syncer for hydration notifications")
}

// Start syncer
if err := m.syncer.Start(m.ctx); err != nil {
// Start syncer in passive mode (Pipeline handles scheduling)
if err := m.syncer.StartWithOptions(m.ctx, false); err != nil {
return fmt.Errorf("failed to start syncer: %w", err)
}

glog.V(2).Info("Syncer initialized successfully")
glog.V(2).Info("Syncer initialized (passive mode, Pipeline handles scheduling)")
return nil
}

Expand Down Expand Up @@ -583,12 +596,11 @@ func (m *AppInfoModule) initDataWatcher() error {
// Create DataWatcher instance
m.dataWatcher = NewDataWatcher(m.cacheManager, m.hydrator, m.dataSender)

// Start DataWatcher
if err := m.dataWatcher.Start(m.ctx); err != nil {
if err := m.dataWatcher.StartWithOptions(m.ctx, false); err != nil {
return fmt.Errorf("failed to start DataWatcher: %w", err)
}

glog.V(2).Info("DataWatcher initialized successfully")
glog.V(2).Info("DataWatcher initialized (passive mode)")
return nil
}

Expand Down Expand Up @@ -634,7 +646,7 @@ func (m *AppInfoModule) initDataWatcherUser() error {

// initDataWatcherRepo initializes the DataWatcherRepo
func (m *AppInfoModule) initDataWatcherRepo() error {
glog.V(3).Info("Initializing DataWatcherRepo...")
glog.V(2).Info("Initializing DataWatcherRepo...")

if m.redisClient == nil {
return fmt.Errorf("redis client is required for DataWatcherRepo")
Expand All @@ -647,31 +659,29 @@ func (m *AppInfoModule) initDataWatcherRepo() error {
// Create DataWatcherRepo instance
m.dataWatcherRepo = NewDataWatcherRepo(m.redisClient, m.cacheManager, m.dataWatcher, m.dataSender)

// Start DataWatcherRepo
if err := m.dataWatcherRepo.Start(); err != nil {
if err := m.dataWatcherRepo.StartWithOptions(false); err != nil {
return fmt.Errorf("failed to start DataWatcherRepo: %w", err)
}

glog.V(2).Info("DataWatcherRepo initialized successfully")
glog.V(2).Info("DataWatcherRepo initialized (passive mode)")
return nil
}

// initStatusCorrectionChecker initializes the StatusCorrectionChecker
func (m *AppInfoModule) initStatusCorrectionChecker() error {
glog.V(3).Info("Initializing StatusCorrectionChecker...")
glog.V(2).Info("Initializing StatusCorrectionChecker...")

if m.cacheManager == nil {
return fmt.Errorf("cache manager is required for StatusCorrectionChecker")
}

m.statusCorrectionChecker = NewStatusCorrectionChecker(m.cacheManager)

// Start StatusCorrectionChecker
if err := m.statusCorrectionChecker.Start(); err != nil {
if err := m.statusCorrectionChecker.StartWithOptions(false); err != nil {
return fmt.Errorf("failed to start StatusCorrectionChecker: %w", err)
}

glog.V(2).Info("StatusCorrectionChecker initialized successfully")
glog.V(2).Info("StatusCorrectionChecker initialized (passive mode)")
return nil
}

Expand Down Expand Up @@ -750,17 +760,11 @@ func (m *AppInfoModule) correctCacheWithChartRepo() error {
return fmt.Errorf("cache manager not available")
}

// Add detailed lock logs for diagnosis
glog.V(3).Infof("[LOCK] m.cacheManager.mutex.TryLock() @appinfomodule:cleanup Start")
if !m.cacheManager.mutex.TryLock() {
glog.Warning("[TryLock] AppInfoModule cleanup: CacheManager write lock not available, skipping cleanup")
return nil
}
defer m.cacheManager.mutex.Unlock()
removedCount := 0
for userID, userData := range m.cacheManager.cache.Users {
// Build the set of delisted app IDs (apps NOT in validApps)
delistedAppIDs := make(map[string]bool)
allUsersData := m.cacheManager.GetAllUsersData() // ~ correctCacheWithChartRepo
for _, userData := range allUsersData {
for sourceID, sourceData := range userData.Sources {
newLatest := sourceData.AppInfoLatest[:0]
for _, app := range sourceData.AppInfoLatest {
var appID string
if app != nil && app.RawData != nil {
Expand All @@ -772,22 +776,19 @@ func (m *AppInfoModule) correctCacheWithChartRepo() error {
appID = app.RawData.Name
}
}
if appID != "" && validApps[sourceID] != nil {
if _, ok := validApps[sourceID][appID]; ok {
newLatest = append(newLatest, app)
} else {
removedCount++
glog.V(3).Infof("Removed app from cache: user=%s source=%s appID=%s", userID, sourceID, appID)
}
} else {
// If appID is empty, treat as invalid and remove
removedCount++
glog.V(3).Infof("Removed app from cache (empty appID): user=%s source=%s", userID, sourceID)
if appID == "" {
continue
}
if validApps[sourceID] == nil {
delistedAppIDs[appID] = true
} else if _, ok := validApps[sourceID][appID]; !ok {
delistedAppIDs[appID] = true
}
}
sourceData.AppInfoLatest = newLatest
}
}

removedCount := m.cacheManager.RemoveDelistedApps(delistedAppIDs)
glog.V(2).Infof("Cache correction finished, removed %d apps not in chart repo", removedCount)
return nil
}
Expand Down Expand Up @@ -1109,7 +1110,7 @@ func (m *AppInfoModule) SetAppData(userID, sourceID string, dataType AppDataType
if !m.isStarted || m.cacheManager == nil {
return fmt.Errorf("module is not started or cache manager is not available")
}
return m.cacheManager.SetAppData(userID, sourceID, dataType, data)
return m.cacheManager.SetAppData(userID, sourceID, dataType, data, "AppInfoModule")
}

// GetAppData is a convenience function to get app data
Expand Down Expand Up @@ -1271,6 +1272,7 @@ func (m *AppInfoModule) SyncUserListToCache() error {
}

// RefreshUserDataStructures ensures all configured users have proper data structures
// not used
func (m *AppInfoModule) RefreshUserDataStructures() error {
// Check isStarted without lock since it's only read
if !m.isStarted {
Expand Down Expand Up @@ -1316,7 +1318,7 @@ func (m *AppInfoModule) GetCachedUsers() []string {
return []string{}
}

allUsersData := m.cacheManager.GetAllUsersData()
allUsersData := m.cacheManager.GetAllUsersData() // not used
users := make([]string, 0, len(allUsersData))
for userID := range allUsersData {
users = append(users, userID)
Expand Down Expand Up @@ -1355,21 +1357,14 @@ func (m *AppInfoModule) GetInvalidDataReport() map[string]interface{} {
},
}

if !m.cacheManager.mutex.TryRLock() {
glog.Warning("[TryRLock] AppInfoModule: CacheManager read lock not available, skipping operation")
return map[string]interface{}{
"error": "lock not available",
"status": "unknown",
}
}
defer m.cacheManager.mutex.RUnlock()
allUsersForReport := m.cacheManager.GetAllUsersData() // not used

totalUsers := 0
totalSources := 0
totalPendingData := 0
totalInvalidData := 0

for userID, userData := range m.cacheManager.cache.Users {
for userID, userData := range allUsersForReport {
totalUsers++
userReport := map[string]interface{}{
"sources": make(map[string]interface{}),
Expand Down
Loading
Loading