Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ dependencies {
// DataStore for preferences
implementation("androidx.datastore:datastore-preferences:1.0.0")

// Room local persistence for IPTV health state
implementation("androidx.room:room-runtime:2.5.2")
implementation("androidx.room:room-ktx:2.5.2")
ksp("androidx.room:room-compiler:2.5.2")

// Google Cast SDK — mobile-only at runtime (guarded by DeviceType check), harmless on TV
implementation("com.google.android.gms:play-services-cast-framework:21.4.0")
implementation("androidx.mediarouter:mediarouter:1.7.0")
Expand Down
13 changes: 13 additions & 0 deletions app/src/main/kotlin/com/arflix/tv/data/local/IptvHealthDatabase.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.arflix.tv.data.local

import androidx.room.Database
import androidx.room.RoomDatabase

@Database(
entities = [IptvStreamHealthEntity::class],
version = 1,
exportSchema = false
)
abstract class IptvHealthDatabase : RoomDatabase() {
abstract fun streamHealthDao(): IptvStreamHealthDao
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.arflix.tv.data.local

import androidx.room.Dao
import androidx.room.Insert
import androidx.room.OnConflictStrategy
import androidx.room.Query
import kotlinx.coroutines.flow.Flow

@Dao
interface IptvStreamHealthDao {
@Query("SELECT * FROM iptv_stream_health")
fun observeAll(): Flow<List<IptvStreamHealthEntity>>

@Query("SELECT * FROM iptv_stream_health WHERE channelId IN (:channelIds)")
suspend fun loadByChannelIds(channelIds: List<String>): List<IptvStreamHealthEntity>

@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun upsert(entity: IptvStreamHealthEntity)

@Query("DELETE FROM iptv_stream_health WHERE channelId NOT IN (:channelIds)")
suspend fun deleteStaleHealthEntries(channelIds: List<String>)

@Query("DELETE FROM iptv_stream_health")
suspend fun clearAll()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.arflix.tv.data.local

import androidx.room.Entity
import androidx.room.PrimaryKey

@Entity(tableName = "iptv_stream_health")
data class IptvStreamHealthEntity(
@PrimaryKey val channelId: String,
val httpStatusCode: Int? = null,
val latencyMs: Long? = null,
val consecutiveFailureCount: Int = 0,
val lastSuccessfulAtMs: Long? = null,
val lastCheckedAtMs: Long = 0L
)
63 changes: 61 additions & 2 deletions app/src/main/kotlin/com/arflix/tv/data/model/IptvModels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,47 @@ data class IptvChannel(
val language: String? = null,
val country: String? = null,
val qualityLabel: String? = null,
val variantKey: String? = null
val variantKey: String? = null,
val health: IptvChannelHealth = IptvChannelHealth()
)

/**
* Compact now/next program slice for a channel.
* Status of the last known health check for an IPTV stream.
*/
enum class IptvChannelHealthStatus {
UNKNOWN,
HEALTHY,
DEGRADED,
OFFLINE
}

data class IptvChannelHealth(
val httpStatusCode: Int? = null,
val latencyMs: Long? = null,
val consecutiveFailureCount: Int = 0,
val lastSuccessfulAtMs: Long? = null,
val lastCheckedAtMs: Long = 0L
) {
val status: IptvChannelHealthStatus
get() = when {
lastCheckedAtMs == 0L -> IptvChannelHealthStatus.UNKNOWN
consecutiveFailureCount >= 3 -> IptvChannelHealthStatus.OFFLINE
consecutiveFailureCount > 0 -> IptvChannelHealthStatus.DEGRADED
httpStatusCode != null && httpStatusCode in 200..299 -> IptvChannelHealthStatus.HEALTHY
else -> IptvChannelHealthStatus.UNKNOWN
}

val summaryText: String
get() = when (status) {
IptvChannelHealthStatus.HEALTHY -> "Healthy"
IptvChannelHealthStatus.DEGRADED -> "Degraded"
IptvChannelHealthStatus.OFFLINE -> "Offline"
IptvChannelHealthStatus.UNKNOWN -> "Unknown"
}
}

/**
* Loaded IPTV snapshot used by UI.
*/
data class IptvNowNext(
val now: IptvProgram? = null,
Expand Down Expand Up @@ -66,6 +102,29 @@ data class IptvSnapshot(
val loadedAt: Instant = Instant.now()
)

data class IptvHealthSummary(
val total: Int = 0,
val healthy: Int = 0,
val degraded: Int = 0,
val offline: Int = 0,
val unknown: Int = 0,
val lastCheckedAtMs: Long = 0L
) {
val summaryText: String
get() = when {
total == 0 -> "No health checks yet"
else -> listOfNotNull(
healthy.takeIf { it > 0 }?.let { "$it healthy" },
degraded.takeIf { it > 0 }?.let { "$it degraded" },
offline.takeIf { it > 0 }?.let { "$it offline" },
unknown.takeIf { it > 0 }?.let { "$it unknown" }
).joinToString(" • ")
}

val lastCheckedText: String
get() = if (lastCheckedAtMs == 0L) "" else "Last checked ${java.time.Duration.between(java.time.Instant.ofEpochMilli(lastCheckedAtMs), java.time.Instant.now()).abs().toMinutes()}m ago"
}

/**
* Lightweight helper to handle playlistId|groupName composite keys without
* unnecessary string allocations in UI loops.
Expand Down
174 changes: 173 additions & 1 deletion app/src/main/kotlin/com/arflix/tv/data/repository/IptvRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import android.security.keystore.KeyProperties
import android.util.Base64
import androidx.datastore.preferences.core.Preferences
import androidx.datastore.preferences.core.edit
import com.arflix.tv.data.local.IptvStreamHealthDao
import com.arflix.tv.data.local.IptvStreamHealthEntity
import com.arflix.tv.data.model.IptvChannel
import com.arflix.tv.data.model.IptvChannelHealth
import com.arflix.tv.data.model.IptvChannelHealthStatus
import com.arflix.tv.data.model.IptvHealthSummary
import com.arflix.tv.data.model.IptvNowNext
import com.arflix.tv.data.model.IptvProgram
import com.arflix.tv.data.model.IptvSnapshot
Expand Down Expand Up @@ -158,7 +163,8 @@ class IptvRepository @Inject constructor(
@ApplicationContext private val context: Context,
private val okHttpClient: OkHttpClient,
private val profileManager: ProfileManager,
private val invalidationBus: CloudSyncInvalidationBus
private val invalidationBus: CloudSyncInvalidationBus,
private val streamHealthDao: IptvStreamHealthDao
) {
private val gson = Gson()
private val loadMutex = Mutex()
Expand Down Expand Up @@ -325,6 +331,172 @@ class IptvRepository @Inject constructor(
.build()
}

private val iptvHealthHttpClient: OkHttpClient by lazy {
okHttpClient.newBuilder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(20, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS)
.callTimeout(25, TimeUnit.SECONDS)
.build()
}

private val maxHealthCheckChannels = 60

private suspend fun mergeHealthIntoChannels(channels: List<IptvChannel>): List<IptvChannel> {
if (channels.isEmpty()) return channels
val healthMap = streamHealthDao.loadByChannelIds(channels.map { it.id }).associateBy { it.channelId }
return channels.map { channel ->
val health = healthMap[channel.id]?.let { entity ->
IptvChannelHealth(
httpStatusCode = entity.httpStatusCode,
latencyMs = entity.latencyMs,
consecutiveFailureCount = entity.consecutiveFailureCount,
lastSuccessfulAtMs = entity.lastSuccessfulAtMs,
lastCheckedAtMs = entity.lastCheckedAtMs
)
} ?: IptvChannelHealth()
channel.copy(health = health)
}
}

fun observeIptvHealthSummary(): Flow<IptvHealthSummary> {
return streamHealthDao.observeAll().map { entries ->
val mapped = entries.map { entity ->
val health = IptvChannelHealth(
httpStatusCode = entity.httpStatusCode,
latencyMs = entity.latencyMs,
consecutiveFailureCount = entity.consecutiveFailureCount,
lastSuccessfulAtMs = entity.lastSuccessfulAtMs,
lastCheckedAtMs = entity.lastCheckedAtMs
)
health.status
}
val healthy = mapped.count { it == IptvChannelHealthStatus.HEALTHY }
val degraded = mapped.count { it == IptvChannelHealthStatus.DEGRADED }
val offline = mapped.count { it == IptvChannelHealthStatus.OFFLINE }
val unknown = mapped.count { it == IptvChannelHealthStatus.UNKNOWN }
val lastCheckedAtMs = entries.maxOfOrNull { it.lastCheckedAtMs } ?: 0L
IptvHealthSummary(
total = entries.size,
healthy = healthy,
degraded = degraded,
offline = offline,
unknown = unknown,
lastCheckedAtMs = lastCheckedAtMs
)
}
}

suspend fun recordIptvPlaybackFailure(channelId: String, errorMessage: String?) {
val existing = streamHealthDao.loadByChannelIds(listOf(channelId)).firstOrNull()
val updated = IptvStreamHealthEntity(
channelId = channelId,
httpStatusCode = null,
latencyMs = null,
consecutiveFailureCount = (existing?.consecutiveFailureCount ?: 0) + 1,
lastSuccessfulAtMs = existing?.lastSuccessfulAtMs,
lastCheckedAtMs = System.currentTimeMillis()
)
streamHealthDao.upsert(updated)
}

suspend fun runIptvHealthChecks(onProgress: (IptvLoadProgress) -> Unit = {}) {
val config = observeConfig().first()
val playlists = activePlaylists(config)
val channels = mutableListOf<IptvChannel>()
if (config.m3uUrl.isBlank() && config.stalkerPortalUrl.isNotBlank()) {
onProgress(IptvLoadProgress("Connecting to Stalker portal...", null))
val stalker = com.arflix.tv.data.api.StalkerApi(config.stalkerPortalUrl, config.stalkerMacAddress)
if (stalker.handshake()) {
onProgress(IptvLoadProgress("Loading channels from Stalker portal...", null))
channels += stalker.getChannels()
cachedStalkerApi = stalker
}
} else {
playlists.forEachIndexed { index, playlist ->
val label = "Loading playlist ${index + 1}/${playlists.size}"
onProgress(IptvLoadProgress(label, null))
val playlistChannels = runCatching {
fetchAndParseM3uWithRetries(playlist.m3uUrl, playlist.id, playlist.epgUrls)
}.getOrElse {
emptyList()
}
channels += playlistChannels
}
}

val uniqueChannels = channels.distinctBy { it.id }
val toCheck = uniqueChannels.take(maxHealthCheckChannels)
val semaphore = Semaphore(4)
coroutineScope {
toCheck.mapIndexed { index, channel ->
async {
semaphore.withPermit {
onProgress(IptvLoadProgress("Checking ${index + 1}/${toCheck.size} ${channel.name}", null))
val streamUrl = resolveHealthCheckUrl(channel)
val now = System.currentTimeMillis()
val healthResult = runCatching {
val request = Request.Builder().url(streamUrl).head().build()
val started = System.currentTimeMillis()
iptvHealthHttpClient.newCall(request).execute().use { response ->
val latency = System.currentTimeMillis() - started
IptvStreamHealthEntity(
channelId = channel.id,
httpStatusCode = response.code,
latencyMs = latency,
consecutiveFailureCount = 0,
lastSuccessfulAtMs = now,
lastCheckedAtMs = now
)
}
}.getOrElse { error ->
val existing = streamHealthDao.loadByChannelIds(listOf(channel.id)).firstOrNull()
IptvStreamHealthEntity(
channelId = channel.id,
httpStatusCode = null,
latencyMs = null,
consecutiveFailureCount = (existing?.consecutiveFailureCount ?: 0) + 1,
lastSuccessfulAtMs = existing?.lastSuccessfulAtMs,
lastCheckedAtMs = now
)
}
streamHealthDao.upsert(healthResult)
}
}
}.awaitAll()
}
if (uniqueChannels.isNotEmpty()) {
streamHealthDao.deleteStaleHealthEntries(uniqueChannels.map { it.id })
} else {
streamHealthDao.clearAll()
}
}

private fun resolveHealthCheckUrl(channel: IptvChannel): String {
val stream = channel.streamUrl
if (stream.startsWith("ffmpeg") || (stream.startsWith("/") && !stream.startsWith("//"))) {
val stalker = cachedStalkerApi
if (stalker != null) {
val resolved = stalker.resolveStreamUrl(stream)
if (!resolved.isNullOrBlank()) {
return resolved
}
}
}
return stream
}

private fun findOrCreateStalkerApi(config: IptvConfig): com.arflix.tv.data.api.StalkerApi? {
val existing = cachedStalkerApi
if (existing != null) return existing
return if (config.stalkerPortalUrl.isNotBlank() && config.stalkerMacAddress.isNotBlank()) {
com.arflix.tv.data.api.StalkerApi(config.stalkerPortalUrl, config.stalkerMacAddress)
.also { cachedStalkerApi = it }
} else {
null
}
}

private data class IptvCachePayload(
val channels: List<IptvChannel> = emptyList(),
val nowNext: Map<String, IptvNowNext> = emptyMap(),
Expand Down
17 changes: 17 additions & 0 deletions app/src/main/kotlin/com/arflix/tv/di/AppModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import android.content.Context
import com.arflix.tv.data.api.AniSkipApi
import com.arflix.tv.data.api.ArmApi
import com.arflix.tv.data.api.IntroDbApi
import androidx.room.Room
import com.arflix.tv.data.api.StreamApi
import com.arflix.tv.data.api.SupabaseApi
import com.arflix.tv.data.api.TmdbApi
import com.arflix.tv.data.api.TraktApi
import com.arflix.tv.data.local.IptvHealthDatabase
import com.arflix.tv.data.local.IptvStreamHealthDao
import com.arflix.tv.network.OkHttpProvider
import com.arflix.tv.util.Constants
import dagger.Module
Expand All @@ -31,6 +34,20 @@ object AppModule {
return OkHttpProvider.client
}

@Provides
@Singleton
fun provideIptvHealthDatabase(@ApplicationContext context: Context): IptvHealthDatabase {
return Room.databaseBuilder(context, IptvHealthDatabase::class.java, "iptv_health.db")
.fallbackToDestructiveMigration()
.build()
}

@Provides
@Singleton
fun provideIptvStreamHealthDao(database: IptvHealthDatabase): IptvStreamHealthDao {
return database.streamHealthDao()
}

@Provides
@Singleton
fun provideTmdbApi(okHttpClient: OkHttpClient): TmdbApi {
Expand Down
Loading