diff --git a/README.md b/README.md index e54209e..0d814c2 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,8 @@ Presence is a service for getting the current Spotify/Listening status of users ## Endpoints - WebSocket stream: `WS /ws/v1/{DISCORD_USER_ID}` (personally use `websocat` to test in dev) -- REST snapshot: `GET /v1/{DISCORD_USER_ID}` (only works with pre-existing websocket subscriber, this is intentional by design) -- Check if user in server: `GET /v1/{DISCORD_USER_ID}/in_server` +- REST snapshot: `GET /v1/{DISCORD_USER_ID}` (returns the latest cached presence, served from Redis whenever the gateway has seen the user) +- Check if user in server: `GET /v1/{DISCORD_USER_ID}/in_server` (served from Redis; falls back to the Discord REST API on cache miss and writes the result back through) - Health: `GET /health` ## Usage @@ -46,6 +46,11 @@ cp .env.example .env Presence uses Redis for caching with automatic fallback to in-memory if Redis is unavailable. On startup, the app waits up to 10 seconds for Redis before falling back. +Two things are cached: + +- **Spotify presence** (`presence:{user_id}`, 5 minute TTL) — populated from gateway `presence_update` events. The REST snapshot endpoint works without an active WebSocket subscriber. +- **Guild membership** (`in_server:{user_id}`) — positive entries kept for 6 hours, negative entries for 5 minutes. Maintained in real-time by the `GUILD_MEMBERS` gateway events (`guild_member_addition` / `guild_member_removal`); a cache miss falls back to a single Discord REST lookup and writes the result back. The bot needs both the `GUILD_MEMBERS` and `GUILD_PRESENCES` privileged intents enabled in the Discord developer portal. + Check `/health` to see current Redis status: ```json {"status": "ok", "redis": true} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index f19782d..1a21655 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,3 @@ [toolchain] channel = "1.92.0" +components = ["rustfmt", "clippy"] diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..4cdc0ce --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,227 @@ +//! Two-tier cache: Redis primary, in-memory fallback. +//! +//! Both the presence cache (`presence:{user_id}`) and the membership cache +//! (`in_server:{user_id}`) share the same shape: +//! +//! 1. Try Redis. A connection error / decode error falls through to the +//! in-memory map; an explicit `Ok(None)` is treated as a definitive miss. +//! 2. The in-memory map enforces its own TTL on read (Redis manages TTL +//! itself via `SET EX`). +//! +//! All of that lives behind [`Cache`] so call sites don't repeat the dance. + +use std::sync::Arc; +use std::time::Duration; + +use dashmap::DashMap; +use redis::AsyncCommands; +use redis::aio::ConnectionManager; +use tokio::sync::OnceCell; +use tracing::{info, warn}; + +use crate::PresenceData; +use crate::consts::{redis_boot, ttl}; + +static REDIS_CLIENT: OnceCell> = OnceCell::const_new(); + +pub async fn init_redis() -> bool { + let result = REDIS_CLIENT + .get_or_init(|| async { + let url = match std::env::var("REDIS_URL") { + Ok(u) => u, + Err(_) => { + info!("REDIS_URL not set, using in-memory cache"); + return None; + } + }; + + match redis::Client::open(url.as_str()) { + Ok(client) => match ConnectionManager::new(client).await { + Ok(cm) => { + info!("redis connected"); + Some(cm) + } + Err(e) => { + warn!(?e, "failed to connect to redis, using in-memory cache"); + None + } + }, + Err(e) => { + warn!(?e, "invalid redis url, using in-memory cache"); + None + } + } + }) + .await; + + result.is_some() +} + +pub fn is_redis_available() -> bool { + REDIS_CLIENT.get().map(|opt| opt.is_some()).unwrap_or(false) +} + +async fn get_redis() -> Option { + REDIS_CLIENT.get()?.clone() +} + +/// Outcome of a Redis read, before any in-memory fallback is consulted. +enum RedisRead { + /// Authoritative hit from Redis. + Hit(T), + /// Authoritative miss from Redis (key was unset, not an error). + Miss, + /// Redis didn't answer authoritatively — caller should fall back to + /// the in-memory tier. Wraps an error message for diagnostics. + Unavailable, +} + +/// Read a key from Redis and parse it via the supplied closure. +/// +/// Centralizes the "Ok(Some) -> Hit / Ok(None) -> Miss / Err -> Unavailable" +/// dance every cache lookup used to repeat. +async fn redis_read(key: &str, parse: impl FnOnce(String) -> Option) -> RedisRead { + let Some(mut redis) = get_redis().await else { + return RedisRead::Unavailable; + }; + match redis.get::<_, Option>(key).await { + Ok(Some(raw)) => match parse(raw) { + Some(v) => RedisRead::Hit(v), + None => RedisRead::Unavailable, + }, + Ok(None) => RedisRead::Miss, + Err(_) => RedisRead::Unavailable, + } +} + +/// Write a string value to Redis with a TTL. Silently swallows errors — +/// the caller is expected to mirror the same value into the in-memory tier +/// so a Redis hiccup doesn't lose data. +async fn redis_set_ex(key: &str, value: &str, ttl_secs: u64) { + if let Some(mut redis) = get_redis().await { + let _: Result<(), _> = redis.set_ex(key, value, ttl_secs).await; + } +} + +async fn redis_del(key: &str) { + if let Some(mut redis) = get_redis().await { + let _: Result<(), _> = redis.del::<_, ()>(key).await; + } +} + +#[derive(Debug, Clone, Copy)] +struct MembershipEntry { + in_server: bool, + cached_at_ms: i64, +} + +impl MembershipEntry { + fn ttl_ms(self) -> i64 { + if self.in_server { + ttl::MEMBERSHIP_POSITIVE_MS + } else { + ttl::MEMBERSHIP_NEGATIVE_MS + } + } + + fn fresh(self, now_ms: i64) -> bool { + now_ms - self.cached_at_ms <= self.ttl_ms() + } +} + +pub struct Cache { + memory_presence: Arc>, + memory_membership: Arc>, +} + +impl Cache { + pub fn new() -> Self { + Self { + memory_presence: Arc::new(DashMap::new()), + memory_membership: Arc::new(DashMap::new()), + } + } + + // -- presence ----------------------------------------------------------- + + pub async fn get(&self, user_id: &str) -> Option { + let key = presence_key(user_id); + match redis_read(&key, |raw| serde_json::from_str(&raw).ok()).await { + RedisRead::Hit(presence) => Some(presence), + RedisRead::Miss => None, + RedisRead::Unavailable => self.memory_presence.get(user_id).map(|r| r.clone()), + } + } + + pub async fn set(&self, user_id: &str, data: &PresenceData) { + let key = presence_key(user_id); + if let Ok(json) = serde_json::to_string(data) { + redis_set_ex(&key, &json, ttl::PRESENCE_SECS).await; + } + self.memory_presence + .insert(user_id.to_string(), data.clone()); + } + + pub async fn remove(&self, user_id: &str) { + redis_del(&presence_key(user_id)).await; + self.memory_presence.remove(user_id); + } + + // -- membership --------------------------------------------------------- + + pub async fn get_membership(&self, user_id: &str) -> Option { + let key = membership_key(user_id); + match redis_read(&key, |raw| Some(raw == "1")).await { + RedisRead::Hit(in_server) => Some(in_server), + RedisRead::Miss => None, + RedisRead::Unavailable => self.read_membership_memory(user_id), + } + } + + pub async fn set_membership(&self, user_id: &str, in_server: bool) { + let ttl_secs = if in_server { + ttl::MEMBERSHIP_POSITIVE_SECS + } else { + ttl::MEMBERSHIP_NEGATIVE_SECS + }; + let value = if in_server { "1" } else { "0" }; + redis_set_ex(&membership_key(user_id), value, ttl_secs).await; + self.memory_membership.insert( + user_id.to_string(), + MembershipEntry { + in_server, + cached_at_ms: chrono::Utc::now().timestamp_millis(), + }, + ); + } + + fn read_membership_memory(&self, user_id: &str) -> Option { + let entry = self.memory_membership.get(user_id).map(|r| *r)?; + let now = chrono::Utc::now().timestamp_millis(); + if entry.fresh(now) { + Some(entry.in_server) + } else { + self.memory_membership.remove(user_id); + None + } + } +} + +fn presence_key(user_id: &str) -> String { + format!("presence:{}", user_id) +} + +fn membership_key(user_id: &str) -> String { + format!("in_server:{}", user_id) +} + +pub async fn wait_for_redis(timeout: Duration) -> bool { + let start = std::time::Instant::now(); + while start.elapsed() < timeout { + if init_redis().await { + return true; + } + tokio::time::sleep(redis_boot::RETRY).await; + } + false +} diff --git a/src/consts.rs b/src/consts.rs new file mode 100644 index 0000000..6c3cd94 --- /dev/null +++ b/src/consts.rs @@ -0,0 +1,58 @@ +//! Centralized configuration constants and env-var defaults. +//! +//! Grouped into nested modules by subject so call sites read like +//! `ttl::PRESENCE_MS`, `http::LISTEN_PORT`, `ws::PING_INTERVAL`, etc. + +use std::time::Duration; + +/// Cache time-to-live tunables. +/// +/// Two flavours of cache: +/// - **Presence** — Redis-side TTL set via `SET EX`; the same value is +/// re-asserted client-side via [`PRESENCE_MS`] when serving snapshots. +/// - **Membership** — split TTL by polarity so positive results stay warm +/// (gateway events keep them fresh) and negative results re-check the +/// API soon after a miss. +pub mod ttl { + /// Presence Redis TTL. + pub const PRESENCE_SECS: u64 = 300; + /// Presence soft staleness threshold (matches Redis TTL). + pub const PRESENCE_MS: i64 = (PRESENCE_SECS as i64) * 1000; + + /// `in_server == true` Redis TTL. + pub const MEMBERSHIP_POSITIVE_SECS: u64 = 6 * 60 * 60; + /// `in_server == false` Redis TTL. + pub const MEMBERSHIP_NEGATIVE_SECS: u64 = 5 * 60; + pub const MEMBERSHIP_POSITIVE_MS: i64 = (MEMBERSHIP_POSITIVE_SECS as i64) * 1000; + pub const MEMBERSHIP_NEGATIVE_MS: i64 = (MEMBERSHIP_NEGATIVE_SECS as i64) * 1000; +} + +/// HTTP server tunables. +pub mod http { + pub const LISTEN_HOST: [u8; 4] = [0, 0, 0, 0]; + pub const LISTEN_PORT: u16 = 8787; + pub const MAX_CONNECTIONS_PER_IP: usize = 10; +} + +/// WebSocket tunables. +pub mod ws { + use super::Duration; + + pub const SEND_TIMEOUT: Duration = Duration::from_secs(5); + pub const PING_INTERVAL: Duration = Duration::from_secs(25); +} + +/// Redis bootstrap behaviour at startup. +pub mod redis_boot { + use super::Duration; + + pub const TIMEOUT: Duration = Duration::from_secs(10); + pub const RETRY: Duration = Duration::from_millis(200); +} + +/// Discord-side defaults. +pub mod discord { + /// Compile-time fallback for the `GUILD_ID` env var. Hardcoded to the + /// Antifield discord; override with the env var if needed. + pub const DEFAULT_GUILD_ID: Option = Some(982_385_887_000_272_956); +} diff --git a/src/discord.rs b/src/discord.rs index 0fdd2f4..203b3a6 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -1,7 +1,8 @@ use std::time::Duration; use serenity::all::{ - ActivityType, Client, Context, EventHandler, GatewayIntents, Presence, Ready, ResumedEvent, + ActivityType, Client, Context, EventHandler, GatewayIntents, Guild, Member, Presence, Ready, + ResumedEvent, UnavailableGuild, User, }; use serenity::async_trait; use serenity::http::Http as SerenityHttp; @@ -13,6 +14,7 @@ use crate::{PresenceCache, PresenceData, SpotifyActivity, UserWatchers}; pub struct Handler { pub cache: PresenceCache, pub watchers: UserWatchers, + pub guild_id: GuildId, } #[async_trait] @@ -25,13 +27,71 @@ impl EventHandler for Handler { info!("discord gateway resumed"); } + async fn guild_create(&self, ctx: Context, guild: Guild, _is_new: Option) { + if guild.id == self.guild_id { + return; + } + // The bot should only ever live in the configured guild. If we end up + // anywhere else (someone invited the bot to their server), leave + // immediately so we don't broadcast or cache data for users outside + // our scope and don't spend compute on guilds we don't intend to serve. + warn!( + guild_id = %guild.id, + guild_name = %guild.name, + "joined unconfigured guild, leaving" + ); + if let Err(err) = ctx.http.leave_guild(guild.id).await { + warn!(?err, guild_id = %guild.id, "failed to leave unconfigured guild"); + } + } + + async fn guild_delete( + &self, + _ctx: Context, + incomplete: UnavailableGuild, + _full: Option, + ) { + if incomplete.id == self.guild_id && !incomplete.unavailable { + warn!( + guild_id = %incomplete.id, + "bot removed from configured guild — membership cache will lazy-refill" + ); + } + } + + async fn guild_member_addition(&self, _ctx: Context, new_member: Member) { + if new_member.guild_id != self.guild_id { + return; + } + let user_id = new_member.user.id.to_string(); + debug!(user_id = %user_id, "guild_member_addition"); + self.cache.set_membership(&user_id, true).await; + } + + async fn guild_member_removal( + &self, + _ctx: Context, + guild_id: GuildId, + user: User, + _member_data_if_available: Option, + ) { + if guild_id != self.guild_id { + return; + } + let user_id = user.id.to_string(); + debug!(user_id = %user_id, "guild_member_removal"); + self.cache.set_membership(&user_id, false).await; + } + async fn presence_update(&self, _ctx: Context, new: Presence) { - let user_id = new.user.id.to_string(); + // Discord delivers presence updates for every guild the bot is in. + // Ignore anything outside the configured guild so we never cache or + // broadcast out-of-scope presence data. + if new.guild_id != Some(self.guild_id) { + return; + } - let watcher = match self.watchers.get(&user_id) { - Some(w) => w, - None => return, - }; + let user_id = new.user.id.to_string(); let raw_spotify_activity = new .activities @@ -72,15 +132,24 @@ impl EventHandler for Handler { timestamp_ms: chrono::Utc::now().timestamp_millis(), }; - if watcher.send(Some(presence.clone())).is_ok() { - self.cache.set(&user_id, &presence).await; + // Cache every presence update so the REST snapshot works without an + // active websocket subscriber. + self.cache.set(&user_id, &presence).await; + + // Receiving a presence update is positive proof the user is in the + // guild — refresh the membership cache opportunistically. + self.cache.set_membership(&user_id, true).await; + + if let Some(watcher) = self.watchers.get(&user_id) { + let _ = watcher.send(Some(presence)); } } } -pub async fn start_discord(cache: PresenceCache, watchers: UserWatchers) -> ! { +pub async fn start_discord(cache: PresenceCache, watchers: UserWatchers, guild_id: GuildId) -> ! { let token = std::env::var("DISCORD_BOT_TOKEN").expect("DISCORD_BOT_TOKEN not set"); - let intents = GatewayIntents::GUILDS | GatewayIntents::GUILD_PRESENCES; + let intents = + GatewayIntents::GUILDS | GatewayIntents::GUILD_MEMBERS | GatewayIntents::GUILD_PRESENCES; let mut attempt: u32 = 0; @@ -88,6 +157,7 @@ pub async fn start_discord(cache: PresenceCache, watchers: UserWatchers) -> ! { let handler = Handler { cache: cache.clone(), watchers: watchers.clone(), + guild_id, }; match Client::builder(&token, intents) diff --git a/src/main.rs b/src/main.rs index 9cab58d..ad197d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,11 +8,13 @@ use serde::{Deserialize, Serialize}; use serenity::http::Http as SerenityHttp; use serenity::model::id::GuildId; use tokio::sync::watch; -use tokio::time::{Duration, Instant, interval_at, timeout}; +use tokio::time::{Instant, interval_at, timeout}; use tracing::{info, warn}; use warp::ws::{Message, WebSocket, Ws}; use warp::{Filter, Rejection, Reply, http::StatusCode}; +use crate::consts::{discord as discord_defaults, http as http_cfg, redis_boot, ttl, ws}; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SpotifyActivity { pub track: Option, @@ -30,12 +32,7 @@ pub struct PresenceData { pub timestamp_ms: i64, } -const PRESENCE_TTL_MINUTES: i64 = 5; -const PRESENCE_TTL_MS: i64 = PRESENCE_TTL_MINUTES * 60 * 1000; -const MAX_CONNECTIONS_PER_IP: usize = 10; -const WS_SEND_TIMEOUT: Duration = Duration::from_secs(5); - -pub type PresenceCache = Arc; +pub type PresenceCache = Arc; pub type UserWatchers = Arc>>>; type ConnectionCounter = Arc>; @@ -50,7 +47,7 @@ struct AppState { fn is_presence_stale(presence: &PresenceData) -> bool { let now = chrono::Utc::now().timestamp_millis(); - now - presence.timestamp_ms > PRESENCE_TTL_MS + now - presence.timestamp_ms > ttl::PRESENCE_MS } fn validate_user_id(user_id: &str) -> bool { @@ -81,6 +78,20 @@ async fn get_presence_handler(user_id: String, state: AppState) -> Result Result { + if !validate_user_id(&user_id) { + return Ok(warp::reply::with_status( + warp::reply::json(&serde_json::json!({ "error": "invalid user id" })), + StatusCode::BAD_REQUEST, + )); + } + + if let Some(in_server) = state.cache.get_membership(&user_id).await { + return Ok(warp::reply::with_status( + warp::reply::json(&serde_json::json!({ "in_server": in_server })), + StatusCode::OK, + )); + } + let uid = match user_id.parse::() { Ok(v) => v, Err(_) => { @@ -92,10 +103,13 @@ async fn user_in_server_handler(user_id: String, state: AppState) -> Result Ok(warp::reply::with_status( - warp::reply::json(&serde_json::json!({ "in_server": in_server })), - StatusCode::OK, - )), + Ok(in_server) => { + state.cache.set_membership(&user_id, in_server).await; + Ok(warp::reply::with_status( + warp::reply::json(&serde_json::json!({ "in_server": in_server })), + StatusCode::OK, + )) + } Err(e) => Ok(warp::reply::with_status( warp::reply::json(&serde_json::json!({ "error": e })), StatusCode::INTERNAL_SERVER_ERROR, @@ -134,7 +148,7 @@ impl Drop for ConnectionGuard { fn try_acquire_connection(connections: &ConnectionCounter, ip: IpAddr) -> Option { let mut entry = connections.entry(ip).or_insert(0); - if *entry >= MAX_CONNECTIONS_PER_IP { + if *entry >= http_cfg::MAX_CONNECTIONS_PER_IP { return None; } *entry += 1; @@ -148,7 +162,6 @@ fn try_acquire_connection(connections: &ConnectionCounter, ip: IpAddr) -> Option struct WatcherGuard { watchers: UserWatchers, - memory_cache: Arc>, user_id: String, } @@ -159,7 +172,6 @@ impl Drop for WatcherGuard { { drop(watcher); self.watchers.remove(&self.user_id); - self.memory_cache.remove(&self.user_id); } } } @@ -168,7 +180,7 @@ async fn ws_send_with_timeout( ws_tx: &mut futures_util::stream::SplitSink, msg: Message, ) -> bool { - matches!(timeout(WS_SEND_TIMEOUT, ws_tx.send(msg)).await, Ok(Ok(_))) + matches!(timeout(ws::SEND_TIMEOUT, ws_tx.send(msg)).await, Ok(Ok(_))) } async fn ws_handler(ws: WebSocket, user_id: String, state: AppState, _conn_guard: ConnectionGuard) { @@ -180,7 +192,6 @@ async fn ws_handler(ws: WebSocket, user_id: String, state: AppState, _conn_guard let _watcher_guard = WatcherGuard { watchers: state.watchers.clone(), - memory_cache: state.cache.get_memory(), user_id: user_id.clone(), }; @@ -201,10 +212,7 @@ async fn ws_loop( ws_rx: &mut futures_util::stream::SplitStream, mut rx: watch::Receiver>, ) { - let mut ping_interval = interval_at( - Instant::now() + Duration::from_secs(25), - Duration::from_secs(25), - ); + let mut ping_interval = interval_at(Instant::now() + ws::PING_INTERVAL, ws::PING_INTERVAL); loop { tokio::select! { @@ -242,8 +250,9 @@ async fn ws_loop( } } +mod cache; +mod consts; mod discord; -mod redis; #[tokio::main] async fn main() { @@ -252,19 +261,16 @@ async fn main() { .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); tracing_subscriber::fmt().with_env_filter(env_filter).init(); - let redis_available = redis::wait_for_redis(Duration::from_secs(10)).await; + let redis_available = cache::wait_for_redis(redis_boot::TIMEOUT).await; if !redis_available { warn!("redis not available after 10s, using in-memory cache"); } let token = std::env::var("DISCORD_BOT_TOKEN").expect("DISCORD_BOT_TOKEN not set"); - let guild_id: u64 = std::env::var("GUILD_ID") - .expect("GUILD_ID not set") - .parse() - .expect("GUILD_ID must be a valid u64"); + let guild_id: u64 = resolve_guild_id(); let http = Arc::new(SerenityHttp::new(&token)); - let cache = Arc::new(redis::Cache::new()); + let cache = Arc::new(cache::Cache::new()); let watchers: UserWatchers = Arc::new(DashMap::new()); let connections: ConnectionCounter = Arc::new(DashMap::new()); @@ -318,7 +324,7 @@ async fn main() { let health_route = warp::path!("health").and(warp::get()).map(|| { warp::reply::json(&serde_json::json!({ "status": "ok", - "redis": redis::is_redis_available() + "redis": cache::is_redis_available() })) }); @@ -329,10 +335,27 @@ async fn main() { .or(ws_route) .with(warp::cors().allow_any_origin()); - info!("starting http server on 0.0.0.0:8787"); + info!( + host = ?http_cfg::LISTEN_HOST, + port = http_cfg::LISTEN_PORT, + "starting http server" + ); tokio::spawn(discord::start_discord( state.cache.clone(), state.watchers.clone(), + state.guild_id, )); - warp::serve(routes).run(([0, 0, 0, 0], 8787)).await; + warp::serve(routes) + .run((http_cfg::LISTEN_HOST, http_cfg::LISTEN_PORT)) + .await; +} + +/// Resolve the configured guild id from `GUILD_ID`, falling back to +/// [`discord_defaults::DEFAULT_GUILD_ID`] (set in [`crate::consts`]). +fn resolve_guild_id() -> u64 { + match std::env::var("GUILD_ID") { + Ok(s) => s.parse().expect("GUILD_ID must be a valid u64"), + Err(_) => discord_defaults::DEFAULT_GUILD_ID + .expect("GUILD_ID not set and no DEFAULT_GUILD_ID compiled in"), + } } diff --git a/src/redis.rs b/src/redis.rs deleted file mode 100644 index 63a6d6c..0000000 --- a/src/redis.rs +++ /dev/null @@ -1,122 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use dashmap::DashMap; -use redis::AsyncCommands; -use redis::aio::ConnectionManager; -use tokio::sync::OnceCell; -use tracing::{info, warn}; - -use crate::PresenceData; - -const CACHE_TTL_SECS: u64 = 300; - -static REDIS_CLIENT: OnceCell> = OnceCell::const_new(); - -pub async fn init_redis() -> bool { - let result = REDIS_CLIENT - .get_or_init(|| async { - let url = match std::env::var("REDIS_URL") { - Ok(u) => u, - Err(_) => { - info!("REDIS_URL not set, using in-memory cache"); - return None; - } - }; - - match redis::Client::open(url.as_str()) { - Ok(client) => match ConnectionManager::new(client).await { - Ok(cm) => { - info!("redis connected"); - Some(cm) - } - Err(e) => { - warn!(?e, "failed to connect to redis, using in-memory cache"); - None - } - }, - Err(e) => { - warn!(?e, "invalid redis url, using in-memory cache"); - None - } - } - }) - .await; - - result.is_some() -} - -pub fn is_redis_available() -> bool { - REDIS_CLIENT.get().map(|opt| opt.is_some()).unwrap_or(false) -} - -async fn get_redis() -> Option { - REDIS_CLIENT.get()?.clone() -} - -pub struct Cache { - memory: Arc>, -} - -impl Cache { - pub fn new() -> Self { - Self { - memory: Arc::new(DashMap::new()), - } - } - - pub async fn get(&self, user_id: &str) -> Option { - if let Some(mut redis) = get_redis().await { - let key = format!("presence:{}", user_id); - match redis.get::<_, Option>(&key).await { - Ok(Some(json)) => { - if let Ok(data) = serde_json::from_str(&json) { - return Some(data); - } - } - Ok(None) => return None, - Err(_) => {} - } - } - - self.memory.get(user_id).map(|r| r.clone()) - } - - pub async fn set(&self, user_id: &str, data: &PresenceData) { - if let Some(mut redis) = get_redis().await { - let key = format!("presence:{}", user_id); - if let Ok(json) = serde_json::to_string(data) { - let _: Result<(), _> = redis.set_ex(&key, json, CACHE_TTL_SECS).await; - } - } - - self.memory.insert(user_id.to_string(), data.clone()); - } - - pub async fn remove(&self, user_id: &str) { - if let Some(mut redis) = get_redis().await { - let key = format!("presence:{}", user_id); - let _: Result<(), _> = redis.del(&key).await; - } - - self.memory.remove(user_id); - } - - pub fn get_memory(&self) -> Arc> { - self.memory.clone() - } -} - -pub async fn wait_for_redis(timeout: Duration) -> bool { - let start = std::time::Instant::now(); - let retry_delay = Duration::from_millis(200); - - while start.elapsed() < timeout { - if init_redis().await { - return true; - } - tokio::time::sleep(retry_delay).await; - } - - false -}