From 46374023d4647bfedf1820d8697553800d741a65 Mon Sep 17 00:00:00 2001 From: Bhuvanesh S Date: Fri, 5 Jun 2026 19:51:01 +0530 Subject: [PATCH] feat: cloud sync reliability & conflict resolution audit --- .../tv/data/repository/CloudSyncRepository.kt | 351 ++++++++++-------- .../data/repository/WatchHistoryRepository.kt | 18 +- .../tv/data/repository/WatchlistRepository.kt | 28 +- .../tv/ui/screens/player/PlayerViewModel.kt | 5 +- 4 files changed, 237 insertions(+), 165 deletions(-) diff --git a/app/src/main/kotlin/com/arflix/tv/data/repository/CloudSyncRepository.kt b/app/src/main/kotlin/com/arflix/tv/data/repository/CloudSyncRepository.kt index abd107f7..0a304536 100644 --- a/app/src/main/kotlin/com/arflix/tv/data/repository/CloudSyncRepository.kt +++ b/app/src/main/kotlin/com/arflix/tv/data/repository/CloudSyncRepository.kt @@ -158,10 +158,11 @@ class CloudSyncRepository @Inject constructor( return isPushDirty || latestLocalDirtyAt > 0L || storedDirtyAt > 0L } - private suspend fun markCloudPayloadApplied(payload: String) { + private suspend fun markCloudPayloadApplied(payload: String, payloadHash: Int) { val cloudUpdatedAt = runCatching { JSONObject(payload).optLong("updatedAt", 0L) }.getOrDefault(0L) context.settingsDataStore.edit { prefs -> prefs[cloudSyncLastAppliedAtKey] = cloudUpdatedAt.takeIf { it > 0L } ?: System.currentTimeMillis() + prefs[androidx.datastore.preferences.core.intPreferencesKey("cloud_sync_last_applied_hash")] = payloadHash } } @@ -648,11 +649,24 @@ class CloudSyncRepository @Inject constructor( return@withLock RestoreResult.NO_BACKUP } + val prefs = context.settingsDataStore.data.first() + val lastAppliedHash = prefs[androidx.datastore.preferences.core.intPreferencesKey("cloud_sync_last_applied_hash")] + val payloadHash = payload.hashCode() + + if (lastAppliedHash == payloadHash) { + AppLogger.breadcrumb( + tag = "CloudSync", + message = "pull_skipped_identical_payload", + severity = "info" + ) + return@withLock RestoreResult.RESTORED + } + runCatching { invalidationBus.suppressDuringRemoteApply { applyCloudPayload(payload) } - markCloudPayloadApplied(payload) + markCloudPayloadApplied(payload, payloadHash) }.fold( onSuccess = { AppLogger.breadcrumb( @@ -941,207 +955,222 @@ class CloudSyncRepository @Inject constructor( authRepository.saveAutoPlayNextToProfile(fallbackAutoPlayNext) // ── Trakt tokens ── - root.optJSONObject("traktTokens")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TraktRepository.CloudTraktToken::class.java).type - val tokens: Map = gson.fromJson(json, type) ?: emptyMap() - traktRepository.importTokensForProfiles(tokens) - } + runCatching { + root.optJSONObject("traktTokens")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TraktRepository.CloudTraktToken::class.java).type + val tokens: Map = gson.fromJson(json, type) ?: emptyMap() + traktRepository.importTokensForProfiles(tokens) + } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_trakt_tokens")) } // ── Addons ── - root.optJSONObject("addonsByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, Addon::class.java).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - val sharedAddons = mergeAddonsForSharedRestore(map.values) - if (sharedAddons.isNotEmpty()) { - streamRepository.replaceSharedAddonsFromCloud(sharedAddons) + runCatching { + root.optJSONObject("addonsByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, Addon::class.java).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + val sharedAddons = mergeAddonsForSharedRestore(map.values) + if (sharedAddons.isNotEmpty()) { + streamRepository.replaceSharedAddonsFromCloud(sharedAddons) + } } - } - root.optJSONArray("addons")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - if (!root.has("addonsByProfile")) { - val type = TypeToken.getParameterized(List::class.java, Addon::class.java).type - val addons: List = gson.fromJson(json, type) ?: emptyList() - if (addons.isNotEmpty()) { - streamRepository.replaceSharedAddonsFromCloud(addons) + root.optJSONArray("addons")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + if (!root.has("addonsByProfile")) { + val type = TypeToken.getParameterized(List::class.java, Addon::class.java).type + val addons: List = gson.fromJson(json, type) ?: emptyList() + if (addons.isNotEmpty()) { + streamRepository.replaceSharedAddonsFromCloud(addons) + } } } - } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_addons")) } // ── Catalogs ── - root.optJSONObject("catalogsByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, CatalogConfig::class.java).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - map.forEach { (profileId, catalogs) -> - catalogRepository.replaceCatalogsForProfile(profileId, catalogs) + runCatching { + root.optJSONObject("catalogsByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, CatalogConfig::class.java).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + map.forEach { (profileId, catalogs) -> + catalogRepository.replaceCatalogsForProfile(profileId, catalogs) + } } - } - root.optJSONArray("catalogs")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - if (!root.has("catalogsByProfile")) { - val type = TypeToken.getParameterized(List::class.java, CatalogConfig::class.java).type - val catalogs: List = gson.fromJson(json, type) ?: emptyList() - if (catalogs.isNotEmpty()) { - catalogRepository.replaceCatalogsForProfile(activeProfileId, catalogs) + root.optJSONArray("catalogs")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + if (!root.has("catalogsByProfile")) { + val type = TypeToken.getParameterized(List::class.java, CatalogConfig::class.java).type + val catalogs: List = gson.fromJson(json, type) ?: emptyList() + if (catalogs.isNotEmpty()) { + catalogRepository.replaceCatalogsForProfile(activeProfileId, catalogs) + } } } - } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_catalogs")) } // ── Hidden preinstalled catalogs ── - root.optJSONObject("hiddenPreinstalledByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, String::class.java).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - map.forEach { (profileId, hidden) -> - catalogRepository.setHiddenPreinstalledCatalogIdsForProfile(profileId, hidden) + runCatching { + root.optJSONObject("hiddenPreinstalledByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, String::class.java).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + map.forEach { (profileId, hidden) -> + catalogRepository.setHiddenPreinstalledCatalogIdsForProfile(profileId, hidden) + } } - } - root.optJSONArray("hiddenPreinstalledCatalogs")?.toString()?.let { json -> - if (!root.has("hiddenPreinstalledByProfile")) { - val hidden = if (json.isBlank()) { - emptyList() - } else { - val type = TypeToken.getParameterized(List::class.java, String::class.java).type - gson.fromJson>(json, type) ?: emptyList() + root.optJSONArray("hiddenPreinstalledCatalogs")?.toString()?.let { json -> + if (!root.has("hiddenPreinstalledByProfile")) { + val hidden = if (json.isBlank()) { + emptyList() + } else { + val type = TypeToken.getParameterized(List::class.java, String::class.java).type + gson.fromJson>(json, type) ?: emptyList() + } + catalogRepository.setHiddenPreinstalledCatalogIdsForProfile(activeProfileId, hidden) } - catalogRepository.setHiddenPreinstalledCatalogIdsForProfile(activeProfileId, hidden) } - } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_hidden_preinstalled")) } // ── Hidden addon catalogs ── - root.optJSONObject("hiddenAddonByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, String::class.java).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - map.forEach { (profileId, hidden) -> - catalogRepository.setHiddenAddonCatalogIdsForProfile(profileId, hidden) + runCatching { + root.optJSONObject("hiddenAddonByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, String::class.java).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + map.forEach { (profileId, hidden) -> + catalogRepository.setHiddenAddonCatalogIdsForProfile(profileId, hidden) + } } - } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_hidden_addons")) } - // ── IPTV config + favorites ── - root.optJSONObject("hiddenHomeServerByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, String::class.java).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - map.forEach { (profileId, hidden) -> - catalogRepository.setHiddenHomeServerCatalogIdsForProfile(profileId, hidden) + // ── Hidden Home Server catalogs ── + runCatching { + root.optJSONObject("hiddenHomeServerByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, String::class.java).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + map.forEach { (profileId, hidden) -> + catalogRepository.setHiddenHomeServerCatalogIdsForProfile(profileId, hidden) + } } - } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_hidden_home_server")) } - var importedActiveProfileIptv = false - root.optJSONObject("iptvByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, IptvCloudProfileState::class.java).type - val map: Map = gson.fromJson(json, type) ?: emptyMap() - map.forEach { (profileId, state) -> - iptvRepository.importCloudConfigForProfile(profileId, state) - if (profileId == activeProfileId) { - importedActiveProfileIptv = true + // ── IPTV config + favorites ── + runCatching { + var importedActiveProfileIptv = false + root.optJSONObject("iptvByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, IptvCloudProfileState::class.java).type + val map: Map = gson.fromJson(json, type) ?: emptyMap() + map.forEach { (profileId, state) -> + iptvRepository.importCloudConfigForProfile(profileId, state) + if (profileId == activeProfileId) { + importedActiveProfileIptv = true + } } } - } - // Legacy IPTV flat fields (only used if iptvByProfile is absent) - val cloudHasIptvKeys = root.has("iptvM3uUrl") || root.has("iptvEpgUrl") || - root.has("iptvFavoriteGroups") || root.has("iptvFavoriteChannels") - val m3u = root.optString("iptvM3uUrl") - val epg = root.optString("iptvEpgUrl") - val favorites = root.optJSONArray("iptvFavoriteGroups")?.toString().orEmpty().let { j -> - if (j.isBlank()) emptyList() else { - val type = TypeToken.getParameterized(List::class.java, String::class.java).type - gson.fromJson>(j, type) ?: emptyList() + // Legacy IPTV flat fields (only used if iptvByProfile is absent) + val cloudHasIptvKeys = root.has("iptvM3uUrl") || root.has("iptvEpgUrl") || + root.has("iptvFavoriteGroups") || root.has("iptvFavoriteChannels") + val m3u = root.optString("iptvM3uUrl") + val epg = root.optString("iptvEpgUrl") + val favorites = root.optJSONArray("iptvFavoriteGroups")?.toString().orEmpty().let { j -> + if (j.isBlank()) emptyList() else { + val type = TypeToken.getParameterized(List::class.java, String::class.java).type + gson.fromJson>(j, type) ?: emptyList() + } } - } - val favoriteChannels = root.optJSONArray("iptvFavoriteChannels")?.toString().orEmpty().let { j -> - if (j.isBlank()) emptyList() else { - val type = TypeToken.getParameterized(List::class.java, String::class.java).type - gson.fromJson>(j, type) ?: emptyList() + val favoriteChannels = root.optJSONArray("iptvFavoriteChannels")?.toString().orEmpty().let { j -> + if (j.isBlank()) emptyList() else { + val type = TypeToken.getParameterized(List::class.java, String::class.java).type + gson.fromJson>(j, type) ?: emptyList() + } + } + val localIptv = iptvRepository.observeConfig().first() + val cloudHasIptvData = m3u.isNotBlank() || epg.isNotBlank() || favorites.isNotEmpty() || favoriteChannels.isNotEmpty() + val localHasIptvData = localIptv.m3uUrl.isNotBlank() || localIptv.epgUrl.isNotBlank() + var importedLegacyIptv = false + if (!root.has("iptvByProfile") && cloudHasIptvKeys && (cloudHasIptvData || !localHasIptvData)) { + iptvRepository.importCloudConfig(m3u, epg, favorites, favoriteChannels) + importedLegacyIptv = true } - } - val localIptv = iptvRepository.observeConfig().first() - val cloudHasIptvData = m3u.isNotBlank() || epg.isNotBlank() || favorites.isNotEmpty() || favoriteChannels.isNotEmpty() - val localHasIptvData = localIptv.m3uUrl.isNotBlank() || localIptv.epgUrl.isNotBlank() - var importedLegacyIptv = false - if (!root.has("iptvByProfile") && cloudHasIptvKeys && (cloudHasIptvData || !localHasIptvData)) { - iptvRepository.importCloudConfig(m3u, epg, favorites, favoriteChannels) - importedLegacyIptv = true - } - if (importedActiveProfileIptv || importedLegacyIptv) { - runCatching { - iptvRepository.invalidateCache() + if (importedActiveProfileIptv || importedLegacyIptv) { + runCatching { + iptvRepository.invalidateCache() + } } - } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_iptv")) } // ── Watchlist ── - root.optJSONObject("watchlistByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, LocalWatchlistItem::class.java).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - map.forEach { (profileId, items) -> - // Restore the cloud mirror for every profile, including Trakt profiles. - // Trakt remains the source of truth after a successful live sync, but - // skipping this cache made fresh installs show an empty watchlist while - // auth/network refresh was still settling or failed. - watchlistRepository.importWatchlistForProfile(profileId, items) + runCatching { + root.optJSONObject("watchlistByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, LocalWatchlistItem::class.java).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + map.forEach { (profileId, items) -> + // Restore the cloud mirror for every profile, including Trakt profiles. + // Trakt remains the source of truth after a successful live sync, but + // skipping this cache made fresh installs show an empty watchlist while + // auth/network refresh was still settling or failed. + watchlistRepository.importWatchlistForProfile(profileId, items) + } } - } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_watchlist")) } // ── Dismissed Continue Watching ── - root.optJSONObject("dismissedContinueWatchingByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, String::class.java).type - val map: Map = gson.fromJson(json, type) ?: emptyMap() - traktRepository.importDismissedContinueWatchingForProfiles(map) - } + runCatching { + root.optJSONObject("dismissedContinueWatchingByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, String::class.java).type + val map: Map = gson.fromJson(json, type) ?: emptyMap() + traktRepository.importDismissedContinueWatchingForProfiles(map) + } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_dismissed_cw")) } - // Only import local CW for profiles that DON'T have Trakt connected. - // For Trakt profiles, CW is sourced exclusively from Trakt's progress API. - // The previous code imported local CW unconditionally, which meant every - // show ever partially watched in ARVIO (written to cloud by - // saveLocalContinueWatching during playback) got restored to local DataStore - // on cloud pull — polluting the CW row with non-Trakt items that persisted - // even after app reinstall. - // Only import local CW for profiles that DON'T have Trakt connected. - // For Trakt profiles, CW is sourced exclusively from Trakt's progress API. - // Only import local CW for profiles that DON'T have Trakt connected. - // For Trakt profiles, CW is sourced exclusively from Trakt's progress API. - root.optJSONObject("localContinueWatchingByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, ContinueWatchingItem::class.java).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - val traktProfiles = mutableSetOf() - - val traktTokenType = TypeToken.getParameterized(Map::class.java, String::class.java, TraktRepository.CloudTraktToken::class.java).type - val traktTokens = root.optJSONObject("traktTokens") - ?.toString() - ?.takeIf { it.isNotBlank() } - ?.let { tokenJson -> - runCatching { - gson.fromJson>(tokenJson, traktTokenType) - }.getOrNull() + // ── Local Continue Watching ── + runCatching { + // Only import local CW for profiles that DON'T have Trakt connected. + // For Trakt profiles, CW is sourced exclusively from Trakt's progress API. + root.optJSONObject("localContinueWatchingByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, ContinueWatchingItem::class.java).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + val traktProfiles = mutableSetOf() + + val traktTokenType = TypeToken.getParameterized(Map::class.java, String::class.java, TraktRepository.CloudTraktToken::class.java).type + val traktTokens = root.optJSONObject("traktTokens") + ?.toString() + ?.takeIf { it.isNotBlank() } + ?.let { tokenJson -> + runCatching { + gson.fromJson>(tokenJson, traktTokenType) + }.getOrNull() + } + .orEmpty() + + traktTokens.forEach { (profileId, token) -> + if (profileId.isNotBlank() && token.accessToken.isNotBlank()) { + traktProfiles.add(profileId) + } } - .orEmpty() - traktTokens.forEach { (profileId, token) -> - if (profileId.isNotBlank() && token.accessToken.isNotBlank()) { - traktProfiles.add(profileId) + val isActiveProfileTrakt = runCatching { traktRepository.hasTrakt() }.getOrDefault(false) + val activeProfileIdLocal = profileManager.getProfileIdSync().ifBlank { null } + if (isActiveProfileTrakt && activeProfileIdLocal != null) { + traktProfiles.add(activeProfileIdLocal) } - } - val isActiveProfileTrakt = runCatching { traktRepository.hasTrakt() }.getOrDefault(false) - val activeProfileId = profileManager.getProfileIdSync().ifBlank { null } - if (isActiveProfileTrakt && activeProfileId != null) { - traktProfiles.add(activeProfileId) + val nonTraktOnly = map.filterKeys { it !in traktProfiles } + if (nonTraktOnly.isNotEmpty()) { + traktRepository.importLocalContinueWatchingForProfiles(nonTraktOnly) + } } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_local_cw")) } - val nonTraktOnly = map.filterKeys { it !in traktProfiles } - if (nonTraktOnly.isNotEmpty()) { - traktRepository.importLocalContinueWatchingForProfiles(nonTraktOnly) + runCatching { + root.optJSONObject("localWatchedMoviesByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, Int::class.javaObjectType).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + traktRepository.importLocalWatchedMoviesForProfiles(map) } - } - root.optJSONObject("localWatchedMoviesByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, Int::class.javaObjectType).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - traktRepository.importLocalWatchedMoviesForProfiles(map) - } - - root.optJSONObject("localWatchedEpisodesByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> - val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, String::class.java).type).type - val map: Map> = gson.fromJson(json, type) ?: emptyMap() - traktRepository.importLocalWatchedEpisodesForProfiles(map) - } + root.optJSONObject("localWatchedEpisodesByProfile")?.toString()?.takeIf { it.isNotBlank() }?.let { json -> + val type = TypeToken.getParameterized(Map::class.java, String::class.java, TypeToken.getParameterized(List::class.java, String::class.java).type).type + val map: Map> = gson.fromJson(json, type) ?: emptyMap() + traktRepository.importLocalWatchedEpisodesForProfiles(map) + } + }.onFailure { AppLogger.recordException(it, mapOf("error_area" to "CloudSync", "cloud_flow" to "apply_local_watched")) } traktRepository.clearAllProfileCaches() watchHistoryRepository.clearProfileCaches() diff --git a/app/src/main/kotlin/com/arflix/tv/data/repository/WatchHistoryRepository.kt b/app/src/main/kotlin/com/arflix/tv/data/repository/WatchHistoryRepository.kt index dc104ee2..3832a48b 100644 --- a/app/src/main/kotlin/com/arflix/tv/data/repository/WatchHistoryRepository.kt +++ b/app/src/main/kotlin/com/arflix/tv/data/repository/WatchHistoryRepository.kt @@ -125,10 +125,26 @@ class WatchHistoryRepository @Inject constructor( position: Long, streamKey: String? = null, streamAddonId: String? = null, - streamTitle: String? = null + streamTitle: String? = null, + sessionStartTime: Long = 0L ) { val userId = authRepositoryProvider.get().getCurrentUserId() ?: return + if (sessionStartTime > 0L) { + val existingProgress = getProgress(mediaType, tmdbId, season, episode) + if (existingProgress != null) { + val cloudUpdatedAt = parseEpoch(existingProgress.updated_at) + if (cloudUpdatedAt > sessionStartTime + 10_000L) { + com.arflix.tv.util.AppLogger.breadcrumb( + tag = "WatchHistory", + message = "Rejected stale push for $tmdbId. Cloud updated $cloudUpdatedAt, session start $sessionStartTime", + severity = "warning" + ) + return + } + } + } + val entry = WatchHistoryEntry( user_id = userId, profile_id = currentProfileId(), diff --git a/app/src/main/kotlin/com/arflix/tv/data/repository/WatchlistRepository.kt b/app/src/main/kotlin/com/arflix/tv/data/repository/WatchlistRepository.kt index 0d615f80..11f8bef7 100644 --- a/app/src/main/kotlin/com/arflix/tv/data/repository/WatchlistRepository.kt +++ b/app/src/main/kotlin/com/arflix/tv/data/repository/WatchlistRepository.kt @@ -331,9 +331,33 @@ class WatchlistRepository @Inject constructor( } } - suspend fun importWatchlistForProfile(profileId: String, items: List) { + suspend fun importWatchlistForProfile(profileId: String, cloudItems: List) { val safeProfileId = profileId.trim().ifBlank { "default" } - val json = runCatching { gson.toJson(items) }.getOrDefault("[]") + + // Union merge local and cloud items to prevent offline additions from being wiped + val localJson = runCatching { context.traktDataStore.data.first()[watchlistKeyFor(safeProfileId)] }.getOrNull() + val type = TypeToken.getParameterized(MutableList::class.java, LocalWatchlistItem::class.java).type + val localItems: List = if (localJson != null) { + runCatching { gson.fromJson>(localJson, type) }.getOrDefault(emptyList()) ?: emptyList() + } else { + emptyList() + } + + val combinedMap = mutableMapOf() + cloudItems.forEach { item -> + combinedMap["${item.mediaType}:${item.tmdbId}"] = item + } + localItems.forEach { item -> + val key = "${item.mediaType}:${item.tmdbId}" + val existing = combinedMap[key] + if (existing == null || item.addedAt > existing.addedAt) { + combinedMap[key] = item + } + } + + val mergedList = combinedMap.values.sortedWith(compareBy { it.sourceOrder }.thenByDescending { it.addedAt }) + val json = runCatching { gson.toJson(mergedList) }.getOrDefault("[]") + context.traktDataStore.edit { prefs -> prefs[watchlistKeyFor(safeProfileId)] = json } diff --git a/app/src/main/kotlin/com/arflix/tv/ui/screens/player/PlayerViewModel.kt b/app/src/main/kotlin/com/arflix/tv/ui/screens/player/PlayerViewModel.kt index e6dfd884..599cf49b 100644 --- a/app/src/main/kotlin/com/arflix/tv/ui/screens/player/PlayerViewModel.kt +++ b/app/src/main/kotlin/com/arflix/tv/ui/screens/player/PlayerViewModel.kt @@ -174,6 +174,7 @@ class PlayerViewModel @Inject constructor( private var lastIsPlaying: Boolean = false private var hasMarkedWatched: Boolean = false private var hasManualSubtitleSelection: Boolean = false + private var playbackSessionStartTime: Long = 0L // AI subtitle settings (read once per video load) private var aiSubtitleEnabled = false @@ -314,6 +315,7 @@ class PlayerViewModel @Inject constructor( currentPreferredAddonId = preferredAddonId?.trim()?.takeIf { it.isNotBlank() } currentPreferredSourceName = preferredSourceName?.trim()?.takeIf { it.isNotBlank() } currentPreferredBingeGroup = preferredBingeGroup?.trim()?.takeIf { it.isNotBlank() } + playbackSessionStartTime = System.currentTimeMillis() playbackDiag( "loadMedia type=$mediaType id=$mediaId season=$seasonNumber episode=$episodeNumber " + "providedUrl=${!providedStreamUrl.isNullOrBlank()} preferredAddon=${currentPreferredAddonId.orEmpty()} " + @@ -2533,7 +2535,8 @@ class PlayerViewModel @Inject constructor( position = positionSeconds, streamKey = streamKey, streamAddonId = streamAddonId, - streamTitle = streamTitle + streamTitle = streamTitle, + sessionStartTime = playbackSessionStartTime ) // Also save to local Continue Watching (profile-scoped, for profiles without Trakt).