From 5d9f75eff509ea61910d574a68ecbb16a7a53711 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Fri, 13 Feb 2026 17:35:39 +0700 Subject: [PATCH 01/12] add forex --- Cargo.lock | 6 +- bothan-api/server-cli/config.example.toml | 6 + bothan-api/server-cli/src/commands/start.rs | 69 ++++++++--- bothan-api/server/src/api/server.rs | 8 +- bothan-api/server/src/api/utils.rs | 2 +- bothan-api/server/src/config/manager.rs | 5 + .../server/src/config/manager/forex_info.rs | 37 ++++++ .../src/config/manager/forex_info/sources.rs | 42 +++++++ bothan-core/Cargo.toml | 1 + bothan-core/src/ipfs/client.rs | 1 + bothan-core/src/manager.rs | 4 +- .../{crypto_asset_info.rs => asset_info.rs} | 9 +- bothan-core/src/manager/asset_info/crypto.rs | 1 + .../crypto}/worker.rs | 7 +- .../crypto}/worker/opts.rs | 0 .../error.rs | 0 bothan-core/src/manager/asset_info/forex.rs | 1 + .../src/manager/asset_info/forex/worker.rs | 71 ++++++++++++ .../manager/asset_info/forex/worker/opts.rs | 20 ++++ .../manager.rs | 108 +++++++++++++----- .../price.rs | 0 .../price/cache.rs | 2 +- .../price/error.rs | 0 .../price/tasks.rs | 6 +- .../signal_ids.rs | 22 +++- .../types.rs | 15 ++- 26 files changed, 372 insertions(+), 71 deletions(-) create mode 100644 bothan-api/server/src/config/manager/forex_info.rs create mode 100644 bothan-api/server/src/config/manager/forex_info/sources.rs rename bothan-core/src/manager/{crypto_asset_info.rs => asset_info.rs} (58%) create mode 100644 bothan-core/src/manager/asset_info/crypto.rs rename bothan-core/src/manager/{crypto_asset_info => asset_info/crypto}/worker.rs (94%) rename bothan-core/src/manager/{crypto_asset_info => asset_info/crypto}/worker/opts.rs (100%) rename bothan-core/src/manager/{crypto_asset_info => asset_info}/error.rs (100%) create mode 100644 bothan-core/src/manager/asset_info/forex.rs create mode 100644 bothan-core/src/manager/asset_info/forex/worker.rs create mode 100644 bothan-core/src/manager/asset_info/forex/worker/opts.rs rename bothan-core/src/manager/{crypto_asset_info => asset_info}/manager.rs (69%) rename bothan-core/src/manager/{crypto_asset_info => asset_info}/price.rs (100%) rename bothan-core/src/manager/{crypto_asset_info => asset_info}/price/cache.rs (97%) rename bothan-core/src/manager/{crypto_asset_info => asset_info}/price/error.rs (100%) rename bothan-core/src/manager/{crypto_asset_info => asset_info}/price/tasks.rs (99%) rename bothan-core/src/manager/{crypto_asset_info => asset_info}/signal_ids.rs (85%) rename bothan-core/src/manager/{crypto_asset_info => asset_info}/types.rs (81%) diff --git a/Cargo.lock b/Cargo.lock index 47363319..04265e06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -680,6 +680,7 @@ dependencies = [ "semver", "serde", "serde_json", + "strum_macros", "thiserror 2.0.12", "tokio", "tracing", @@ -3531,14 +3532,13 @@ checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" [[package]] name = "strum_macros" -version = "0.27.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" dependencies = [ "heck", "proc-macro2", "quote", - "rustversion", "syn 2.0.104", ] diff --git a/bothan-api/server-cli/config.example.toml b/bothan-api/server-cli/config.example.toml index 51c3c3be..7c616bef 100644 --- a/bothan-api/server-cli/config.example.toml +++ b/bothan-api/server-cli/config.example.toml @@ -114,6 +114,12 @@ url = "https://macaw.bandchain.org" # Update interval for Band Macaw source update_interval = "1m" +[manager.forex.source.band_kiki] +# URL for Band Macaw source +url = "https://macaw.bandchain.org" +# Update interval for Band Macaw source +update_interval = "1m" + # Telemetry configuration [telemetry] # Enable or disable telemetry. diff --git a/bothan-api/server-cli/src/commands/start.rs b/bothan-api/server-cli/src/commands/start.rs index afe33027..aefef2c5 100644 --- a/bothan-api/server-cli/src/commands/start.rs +++ b/bothan-api/server-cli/src/commands/start.rs @@ -15,11 +15,13 @@ use bothan_api::api::BothanServer; use bothan_api::config::AppConfig; use bothan_api::config::ipfs::IpfsAuthentication; use bothan_api::config::manager::crypto_info::sources::CryptoSourceConfigs; +use bothan_api::config::manager::forex_info::sources::ForexSourceConfigs; use bothan_api::proto::bothan::v1::{BothanServiceServer, FILE_DESCRIPTOR_SET}; use bothan_api::{REGISTRY_REQUIREMENT, VERSION}; use bothan_core::ipfs::{IpfsClient, IpfsClientBuilder}; -use bothan_core::manager::CryptoAssetInfoManager; -use bothan_core::manager::crypto_asset_info::CryptoAssetWorkerOpts; +use bothan_core::manager::AssetInfoManager; +use bothan_core::manager::asset_info::CryptoAssetWorkerOpts; +use bothan_core::manager::asset_info::forex::worker::opts::ForexAssetWorkerOpts; use bothan_core::monitoring::{Client as MonitoringClient, Signer}; use bothan_core::store::rocksdb::RocksDbStore; use bothan_core::telemetry; @@ -193,15 +195,24 @@ async fn init_bothan_server( let registry_version_requirement = VersionReq::from_str(REGISTRY_REQUIREMENT) .with_context(|| "Failed to parse registry version requirement")?; - let opts = match init_crypto_opts(&config.manager.crypto.source).await { + let crypto_opts = match init_crypto_opts(&config.manager.crypto.source).await { Ok(workers) => workers, Err(e) => { bail!("failed to initialize workers: {:?}", e); } }; - let manager = match CryptoAssetInfoManager::build( + + let forex_opts = match init_forex_opts(&config.manager.forex.source).await { + Ok(workers) => workers, + Err(e) => { + bail!("failed to initialize workers: {:?}", e); + } + }; + + let manager = match AssetInfoManager::build( store, - opts, + crypto_opts, + forex_opts, ipfs_client, stale_threshold, bothan_version, @@ -242,22 +253,22 @@ async fn init_crypto_opts( ) -> Result, AssetWorkerError> { let mut worker_opts = HashMap::new(); - add_worker_opts(&mut worker_opts, &source.binance).await?; - add_worker_opts(&mut worker_opts, &source.bitfinex).await?; - add_worker_opts(&mut worker_opts, &source.bybit).await?; - add_worker_opts(&mut worker_opts, &source.coinbase).await?; - add_worker_opts(&mut worker_opts, &source.coingecko).await?; - add_worker_opts(&mut worker_opts, &source.coinmarketcap).await?; - add_worker_opts(&mut worker_opts, &source.htx).await?; - add_worker_opts(&mut worker_opts, &source.kraken).await?; - add_worker_opts(&mut worker_opts, &source.okx).await?; - add_worker_opts(&mut worker_opts, &source.band_kiwi).await?; - add_worker_opts(&mut worker_opts, &source.band_macaw).await?; + add_crypto_worker_opts(&mut worker_opts, &source.binance).await?; + add_crypto_worker_opts(&mut worker_opts, &source.bitfinex).await?; + add_crypto_worker_opts(&mut worker_opts, &source.bybit).await?; + add_crypto_worker_opts(&mut worker_opts, &source.coinbase).await?; + add_crypto_worker_opts(&mut worker_opts, &source.coingecko).await?; + add_crypto_worker_opts(&mut worker_opts, &source.coinmarketcap).await?; + add_crypto_worker_opts(&mut worker_opts, &source.htx).await?; + add_crypto_worker_opts(&mut worker_opts, &source.kraken).await?; + add_crypto_worker_opts(&mut worker_opts, &source.okx).await?; + add_crypto_worker_opts(&mut worker_opts, &source.band_kiwi).await?; + add_crypto_worker_opts(&mut worker_opts, &source.band_macaw).await?; Ok(worker_opts) } -async fn add_worker_opts>( +async fn add_crypto_worker_opts>( workers_opts: &mut HashMap, opts: &Option, ) -> Result<(), AssetWorkerError> { @@ -271,6 +282,30 @@ async fn add_worker_opts>( Ok(()) } +async fn init_forex_opts( + source: &ForexSourceConfigs, +) -> Result, AssetWorkerError> { + let mut worker_opts = HashMap::new(); + + add_forex_worker_opts(&mut worker_opts, &source.band_kiwi2).await?; + + Ok(worker_opts) +} + +async fn add_forex_worker_opts>( + workers_opts: &mut HashMap, + opts: &Option, +) -> Result<(), AssetWorkerError> { + if let Some(opts) = opts { + let worker_opts = opts.clone().into(); + let worker_name = worker_opts.name(); + info!("{} worker is enabled", worker_name); + workers_opts.insert(worker_name.to_string(), worker_opts); + } + + Ok(()) +} + async fn init_telemetry_server(config: &AppConfig) -> anyhow::Result<()> { if config.telemetry.enabled { let registry = telemetry::init_telemetry_registry()?; diff --git a/bothan-api/server/src/api/server.rs b/bothan-api/server/src/api/server.rs index 1432f9c5..b5680af6 100644 --- a/bothan-api/server/src/api/server.rs +++ b/bothan-api/server/src/api/server.rs @@ -16,8 +16,8 @@ use std::sync::Arc; use std::time::Instant; -use bothan_core::manager::CryptoAssetInfoManager; -use bothan_core::manager::crypto_asset_info::error::{PushMonitoringRecordError, SetRegistryError}; +use bothan_core::manager::AssetInfoManager; +use bothan_core::manager::asset_info::error::{PushMonitoringRecordError, SetRegistryError}; use bothan_lib::metrics::server::{Metrics, ServiceName}; use bothan_lib::store::Store; use semver::Version; @@ -35,13 +35,13 @@ pub const PRECISION: u32 = 9; /// The `BothanServer` struct represents a server that implements the `BothanService` trait. pub struct BothanServer { - manager: Arc>, + manager: Arc>, metrics: Metrics, } impl BothanServer { /// Creates a new `BothanServer` instance. - pub fn new(manager: Arc>, metrics: Metrics) -> Self { + pub fn new(manager: Arc>, metrics: Metrics) -> Self { BothanServer { manager, metrics } } } diff --git a/bothan-api/server/src/api/utils.rs b/bothan-api/server/src/api/utils.rs index f80aca2b..0b20b1e0 100644 --- a/bothan-api/server/src/api/utils.rs +++ b/bothan-api/server/src/api/utils.rs @@ -7,7 +7,7 @@ //! //! - `parse_price_state`: Converts a `PriceState` to a `Price` API response. -use bothan_core::manager::crypto_asset_info::types::PriceState; +use bothan_core::manager::asset_info::types::PriceState; use rust_decimal::prelude::Zero; use tracing::error; diff --git a/bothan-api/server/src/config/manager.rs b/bothan-api/server/src/config/manager.rs index 2f01cfd6..695bceb1 100644 --- a/bothan-api/server/src/config/manager.rs +++ b/bothan-api/server/src/config/manager.rs @@ -10,14 +10,19 @@ //! ``` use crypto_info::CryptoInfoManagerConfig; +use forex_info::ForexInfoManagerConfig; use serde::{Deserialize, Serialize}; /// Crypto info manager configuration module. pub mod crypto_info; +/// Forex info manager configuration module. +pub mod forex_info; /// The configuration for all bothan-api's manager. #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct ManagerConfig { /// The configuration for the crypto info manager. pub crypto: CryptoInfoManagerConfig, + /// The configuration for the forex info manager. + pub forex: ForexInfoManagerConfig, } diff --git a/bothan-api/server/src/config/manager/forex_info.rs b/bothan-api/server/src/config/manager/forex_info.rs new file mode 100644 index 00000000..f4286053 --- /dev/null +++ b/bothan-api/server/src/config/manager/forex_info.rs @@ -0,0 +1,37 @@ +//! Bothan API server forex info manager configuration. +//! +//! Settings for forex asset info sources and staleness threshold. + +use serde::{Deserialize, Serialize}; + +use crate::config::manager::forex_info::sources::ForexSourceConfigs; + +/// Forex info source configuration module. +pub mod sources; + +/// Configuration for the Bothan API Server's forex asset info manager. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ForexInfoManagerConfig { + /// The source configuration for the forex asset info manager. + pub source: ForexSourceConfigs, + /// The stale threshold for the forex asset info (in seconds). + /// Any source that has not been updated in this amount of time + /// relative to the call will be considered stale. + #[serde(default = "default_stale_threshold")] + pub stale_threshold: i64, +} + +/// Returns the default stale threshold (in seconds). +fn default_stale_threshold() -> i64 { + 300 +} + +impl Default for ForexInfoManagerConfig { + /// Creates a new `ForexInfoManagerConfig` with default values. + fn default() -> Self { + ForexInfoManagerConfig { + source: ForexSourceConfigs::default(), + stale_threshold: default_stale_threshold(), + } + } +} diff --git a/bothan-api/server/src/config/manager/forex_info/sources.rs b/bothan-api/server/src/config/manager/forex_info/sources.rs new file mode 100644 index 00000000..43248f50 --- /dev/null +++ b/bothan-api/server/src/config/manager/forex_info/sources.rs @@ -0,0 +1,42 @@ +//! Bothan API server forex source configuration. +//! +//! Worker options for supported forex data sources. + +use serde::{Deserialize, Serialize}; + +/// Configuration for the worker sources for forex asset info. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ForexSourceConfigs { + /// Band/kiwi worker options. + #[serde(default, deserialize_with = "de_kiwi2")] + pub band_kiwi2: Option, +} + +// Macro to generate deserialization functions for Band workers with preset names. +macro_rules! de_band_named { + ($fn_name:ident, $name:expr) => { + fn $fn_name<'de, D>(d: D) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + let v = Option::::deserialize(d)?; + let v = v.map(|w| bothan_band::WorkerOpts::new($name, &w.url, Some(w.update_interval))); + Ok(v) + } + }; +} + +const BAND1_WORKER_NAME: &str = "band/kiwi2"; +de_band_named!(de_kiwi2, BAND1_WORKER_NAME); + +impl Default for ForexSourceConfigs { + fn default() -> Self { + ForexSourceConfigs { + band_kiwi2: Some(bothan_band::WorkerOpts::new( + "band/kiwi2", + "https://kiwi.bandchain.org", + None, + )), + } + } +} diff --git a/bothan-core/Cargo.toml b/bothan-core/Cargo.toml index d7e6d514..3361e29e 100644 --- a/bothan-core/Cargo.toml +++ b/bothan-core/Cargo.toml @@ -47,6 +47,7 @@ opentelemetry-prometheus = "0.28.0" opentelemetry_sdk = "0.28.0" prometheus = "0.13.4" rust-rocksdb = { version = "0.36.0", optional = true } +strum_macros = "0.27.2" [features] default = [] diff --git a/bothan-core/src/ipfs/client.rs b/bothan-core/src/ipfs/client.rs index 6a5b698c..cd4fb6d5 100644 --- a/bothan-core/src/ipfs/client.rs +++ b/bothan-core/src/ipfs/client.rs @@ -6,6 +6,7 @@ use reqwest::{Client, StatusCode}; use crate::ipfs::error::Error; +#[derive(Clone)] pub struct IpfsClient { url: String, client: Client, diff --git a/bothan-core/src/manager.rs b/bothan-core/src/manager.rs index 4908c9f1..6c2234cc 100644 --- a/bothan-core/src/manager.rs +++ b/bothan-core/src/manager.rs @@ -2,6 +2,6 @@ //! //! Provides the crypto asset info manager and related types. -pub use crypto_asset_info::CryptoAssetInfoManager; +pub use asset_info::AssetInfoManager; -pub mod crypto_asset_info; +pub mod asset_info; diff --git a/bothan-core/src/manager/crypto_asset_info.rs b/bothan-core/src/manager/asset_info.rs similarity index 58% rename from bothan-core/src/manager/crypto_asset_info.rs rename to bothan-core/src/manager/asset_info.rs index 18099013..7eb59d22 100644 --- a/bothan-core/src/manager/crypto_asset_info.rs +++ b/bothan-core/src/manager/asset_info.rs @@ -2,13 +2,14 @@ //! //! Provides types and logic for managing crypto asset information. -pub use manager::CryptoAssetInfoManager; -pub use worker::CryptoAssetWorker; -pub use worker::opts::CryptoAssetWorkerOpts; +pub use crypto::worker::CryptoAssetWorker; +pub use crypto::worker::opts::CryptoAssetWorkerOpts; +pub use manager::AssetInfoManager; +pub mod crypto; pub mod error; +pub mod forex; pub(super) mod manager; pub(super) mod price; pub(super) mod signal_ids; pub mod types; -pub(super) mod worker; diff --git a/bothan-core/src/manager/asset_info/crypto.rs b/bothan-core/src/manager/asset_info/crypto.rs new file mode 100644 index 00000000..2c8b8399 --- /dev/null +++ b/bothan-core/src/manager/asset_info/crypto.rs @@ -0,0 +1 @@ +pub mod worker; diff --git a/bothan-core/src/manager/crypto_asset_info/worker.rs b/bothan-core/src/manager/asset_info/crypto/worker.rs similarity index 94% rename from bothan-core/src/manager/crypto_asset_info/worker.rs rename to bothan-core/src/manager/asset_info/crypto/worker.rs index 252b449b..906b48a5 100644 --- a/bothan-core/src/manager/crypto_asset_info/worker.rs +++ b/bothan-core/src/manager/asset_info/crypto/worker.rs @@ -11,8 +11,9 @@ use bothan_lib::worker::error::AssetWorkerError; use derive_more::From; use tracing::{error, info}; -use crate::manager::crypto_asset_info::signal_ids::get_source_batched_query_ids; -use crate::manager::crypto_asset_info::worker::opts::CryptoAssetWorkerOpts; +use crate::manager::asset_info::crypto::worker::opts::CryptoAssetWorkerOpts; +use crate::manager::asset_info::signal_ids::get_source_batched_query_ids; +use crate::manager::asset_info::types::AssetType; #[derive(From)] pub enum CryptoAssetWorker { @@ -93,7 +94,7 @@ pub async fn build_workers( store: S, ) -> Vec { let mut workers = Vec::with_capacity(opts.len()); - for (source_id, query_id) in get_source_batched_query_ids(registry).drain() { + for (source_id, query_id) in get_source_batched_query_ids(registry, AssetType::Crypto).drain() { match opts.get(&source_id) { Some(opts) => { let ids = query_id.into_iter().collect(); diff --git a/bothan-core/src/manager/crypto_asset_info/worker/opts.rs b/bothan-core/src/manager/asset_info/crypto/worker/opts.rs similarity index 100% rename from bothan-core/src/manager/crypto_asset_info/worker/opts.rs rename to bothan-core/src/manager/asset_info/crypto/worker/opts.rs diff --git a/bothan-core/src/manager/crypto_asset_info/error.rs b/bothan-core/src/manager/asset_info/error.rs similarity index 100% rename from bothan-core/src/manager/crypto_asset_info/error.rs rename to bothan-core/src/manager/asset_info/error.rs diff --git a/bothan-core/src/manager/asset_info/forex.rs b/bothan-core/src/manager/asset_info/forex.rs new file mode 100644 index 00000000..2c8b8399 --- /dev/null +++ b/bothan-core/src/manager/asset_info/forex.rs @@ -0,0 +1 @@ +pub mod worker; diff --git a/bothan-core/src/manager/asset_info/forex/worker.rs b/bothan-core/src/manager/asset_info/forex/worker.rs new file mode 100644 index 00000000..6be78210 --- /dev/null +++ b/bothan-core/src/manager/asset_info/forex/worker.rs @@ -0,0 +1,71 @@ +//! Worker trait and implementations for forex asset sources. + +pub mod opts; + +use std::collections::HashMap; + +use bothan_lib::registry::{Registry, Valid}; +use bothan_lib::store::Store; +use bothan_lib::worker::AssetWorker; +use bothan_lib::worker::error::AssetWorkerError; +use derive_more::From; +use tracing::{error, info}; + +use crate::manager::asset_info::forex::worker::opts::ForexAssetWorkerOpts; +use crate::manager::asset_info::signal_ids::get_source_batched_query_ids; +use crate::manager::asset_info::types::AssetType; + +#[derive(From)] +pub enum ForexAssetWorker { + Band(bothan_band::Worker), +} + +#[async_trait::async_trait] +impl AssetWorker for ForexAssetWorker { + type Opts = ForexAssetWorkerOpts; + + fn name(&self) -> &'static str { + match self { + ForexAssetWorker::Band(w) => w.name(), + } + } + + async fn build( + opts: Self::Opts, + store: &S, + ids: Vec, + ) -> Result { + Ok(match opts { + ForexAssetWorkerOpts::Band(opts) => { + ForexAssetWorker::from(bothan_band::Worker::build(opts, store, ids).await?) + } + }) + } +} + +pub async fn build_workers( + registry: &Registry, + opts: &HashMap, + store: S, +) -> Vec { + let mut workers = Vec::with_capacity(opts.len()); + for (source_id, query_id) in get_source_batched_query_ids(registry, AssetType::Forex).drain() { + match opts.get(&source_id) { + Some(opts) => { + let ids = query_id.into_iter().collect(); + let builder_callable = ForexAssetWorker::build(opts.clone(), &store, ids); + let worker = match builder_callable.await { + Ok(worker) => worker, + Err(e) => { + error!("failed to build worker {}: {}", source_id, e); + continue; + } + }; + workers.push(worker); + } + None => info!("worker {} not activated", source_id), + } + } + + workers +} diff --git a/bothan-core/src/manager/asset_info/forex/worker/opts.rs b/bothan-core/src/manager/asset_info/forex/worker/opts.rs new file mode 100644 index 00000000..6706599f --- /dev/null +++ b/bothan-core/src/manager/asset_info/forex/worker/opts.rs @@ -0,0 +1,20 @@ +//! Worker options for configuring crypto asset source workers. + +#[derive(Clone)] +pub enum ForexAssetWorkerOpts { + Band(bothan_band::WorkerOpts), +} + +impl ForexAssetWorkerOpts { + pub fn name(&self) -> &str { + match self { + ForexAssetWorkerOpts::Band(opts) => opts.name(), + } + } +} + +impl From for ForexAssetWorkerOpts { + fn from(value: bothan_band::WorkerOpts) -> Self { + ForexAssetWorkerOpts::Band(value) + } +} diff --git a/bothan-core/src/manager/crypto_asset_info/manager.rs b/bothan-core/src/manager/asset_info/manager.rs similarity index 69% rename from bothan-core/src/manager/crypto_asset_info/manager.rs rename to bothan-core/src/manager/asset_info/manager.rs index a8b6d6a2..2b045d11 100644 --- a/bothan-core/src/manager/crypto_asset_info/manager.rs +++ b/bothan-core/src/manager/asset_info/manager.rs @@ -19,21 +19,29 @@ use tokio::time::sleep; use crate::ipfs::IpfsClient; use crate::ipfs::error::Error as IpfsError; -use crate::manager::crypto_asset_info::error::{ +use crate::manager::asset_info::crypto::worker::opts::CryptoAssetWorkerOpts; +use crate::manager::asset_info::crypto::worker::{ + CryptoAssetWorker, build_workers as build_crypto_workers, +}; +use crate::manager::asset_info::error::{ PostHeartbeatError, PushMonitoringRecordError, SetRegistryError, }; -use crate::manager::crypto_asset_info::price::tasks::get_signal_price_states; -use crate::manager::crypto_asset_info::types::{ - CryptoAssetManagerInfo, MONITORING_TTL, PriceSignalComputationRecord, PriceState, +use crate::manager::asset_info::forex::worker::opts::ForexAssetWorkerOpts; +use crate::manager::asset_info::forex::worker::{ + ForexAssetWorker, build_workers as build_forex_workers, +}; +use crate::manager::asset_info::price::tasks::get_signal_price_states; +use crate::manager::asset_info::types::{ + AssetManagerInfo, MONITORING_TTL, PriceSignalComputationRecord, PriceState, }; -use crate::manager::crypto_asset_info::worker::opts::CryptoAssetWorkerOpts; -use crate::manager::crypto_asset_info::worker::{CryptoAssetWorker, build_workers}; use crate::monitoring::{Client as MonitoringClient, create_uuid}; -pub struct CryptoAssetInfoManager { +pub struct AssetInfoManager { store: S, - opts: HashMap, - workers: Mutex>, + crypto_opts: HashMap, + forex_opts: HashMap, + crypto_workers: Mutex>, + forex_workers: Mutex>, stale_threshold: i64, ipfs_client: IpfsClient, bothan_version: Version, @@ -43,11 +51,13 @@ pub struct CryptoAssetInfoManager { metrics: Metrics, } -impl CryptoAssetInfoManager { - /// builds a new `CryptoAssetInfoManager`. +impl AssetInfoManager { + /// builds a new `AssetInfoManager`. + #[allow(clippy::too_many_arguments)] pub async fn build( store: S, - opts: HashMap, + crypto_opts: HashMap, + forex_opts: HashMap, ipfs_client: IpfsClient, stale_threshold: i64, bothan_version: Version, @@ -60,14 +70,19 @@ impl CryptoAssetInfoManager { let registry = store.get_registry().await?; - let workers = Mutex::new(build_workers(®istry, &opts, store.clone()).await); + let crypto_workers = + Mutex::new(build_crypto_workers(®istry, &crypto_opts, store.clone()).await); + let forex_workers = + Mutex::new(build_forex_workers(®istry, &forex_opts, store.clone()).await); let metrics = Metrics::new(); - let manager = CryptoAssetInfoManager { + let manager = AssetInfoManager { store, - opts, - workers, + crypto_opts, + forex_opts, + crypto_workers, + forex_workers, stale_threshold, ipfs_client, bothan_version, @@ -80,8 +95,8 @@ impl CryptoAssetInfoManager { Ok(manager) } - /// Gets the `CryptoAssetManagerInfo`. - pub async fn get_info(&self) -> Result { + /// Gets the `AssetManagerInfo`. + pub async fn get_info(&self) -> Result { let bothan_version = self.bothan_version.to_string(); let registry_hash = self .store @@ -89,15 +104,28 @@ impl CryptoAssetInfoManager { .await? .unwrap_or(String::new()); // If value doesn't exist, return an empty string let registry_version_requirement = self.registry_version_requirement.to_string(); - let active_sources = self - .workers + let crypto_active_sources = self + .crypto_workers .lock() .await .iter() .map(|w| w.name().to_string()) - .collect(); + .collect::>(); - Ok(CryptoAssetManagerInfo::new( + let forex_active_sources = self + .forex_workers + .lock() + .await + .iter() + .map(|w| w.name().to_string()) + .collect::>(); + + let active_sources = crypto_active_sources + .into_iter() + .chain(forex_active_sources) + .collect::>(); + + Ok(AssetManagerInfo::new( bothan_version, registry_hash, registry_version_requirement, @@ -115,13 +143,27 @@ impl CryptoAssetInfoManager { let uuid = create_uuid(); - let active_sources = self - .workers + let crypto_active_sources = self + .crypto_workers + .lock() + .await + .iter() + .map(|w| w.name().to_string()) + .collect::>(); + + let forex_active_sources = self + .forex_workers .lock() .await .iter() .map(|w| w.name().to_string()) - .collect(); + .collect::>(); + + let active_sources = crypto_active_sources + .into_iter() + .chain(forex_active_sources) + .collect::>(); + let bothan_version = self.bothan_version.clone(); let registry_hash = self .store @@ -243,14 +285,24 @@ impl CryptoAssetInfoManager { .await .map_err(|_| SetRegistryError::FailedToSetRegistry)?; - // drop old workers to kill connection - let mut locked_workers = self.workers.lock().await; + // drop old crypto workers to kill connection + let mut locked_workers = self.crypto_workers.lock().await; + locked_workers.clear(); + + // TODO: find method to wait for connections to clear up thats better than sleeping for 1 second + sleep(Duration::from_secs(1)).await; + + let workers = build_crypto_workers(®istry, &self.crypto_opts, self.store.clone()).await; + *locked_workers = workers; + + // drop old forex workers to kill connection + let mut locked_workers = self.forex_workers.lock().await; locked_workers.clear(); // TODO: find method to wait for connections to clear up thats better than sleeping for 1 second sleep(Duration::from_secs(1)).await; - let workers = build_workers(®istry, &self.opts, self.store.clone()).await; + let workers = build_forex_workers(®istry, &self.forex_opts, self.store.clone()).await; *locked_workers = workers; Ok(()) diff --git a/bothan-core/src/manager/crypto_asset_info/price.rs b/bothan-core/src/manager/asset_info/price.rs similarity index 100% rename from bothan-core/src/manager/crypto_asset_info/price.rs rename to bothan-core/src/manager/asset_info/price.rs diff --git a/bothan-core/src/manager/crypto_asset_info/price/cache.rs b/bothan-core/src/manager/asset_info/price/cache.rs similarity index 97% rename from bothan-core/src/manager/crypto_asset_info/price/cache.rs rename to bothan-core/src/manager/asset_info/price/cache.rs index b3aac3c8..967c3863 100644 --- a/bothan-core/src/manager/crypto_asset_info/price/cache.rs +++ b/bothan-core/src/manager/asset_info/price/cache.rs @@ -8,7 +8,7 @@ use std::hash::Hash; use rust_decimal::Decimal; -use crate::manager::crypto_asset_info::types::PriceState; +use crate::manager::asset_info::types::PriceState; /// In-memory cache for storing `PriceState` values keyed by asset or signal ID. pub struct PriceCache { diff --git a/bothan-core/src/manager/crypto_asset_info/price/error.rs b/bothan-core/src/manager/asset_info/price/error.rs similarity index 100% rename from bothan-core/src/manager/crypto_asset_info/price/error.rs rename to bothan-core/src/manager/asset_info/price/error.rs diff --git a/bothan-core/src/manager/crypto_asset_info/price/tasks.rs b/bothan-core/src/manager/asset_info/price/tasks.rs similarity index 99% rename from bothan-core/src/manager/crypto_asset_info/price/tasks.rs rename to bothan-core/src/manager/asset_info/price/tasks.rs index e89ec102..89cb9740 100644 --- a/bothan-core/src/manager/crypto_asset_info/price/tasks.rs +++ b/bothan-core/src/manager/asset_info/price/tasks.rs @@ -18,9 +18,9 @@ use num_traits::Zero; use rust_decimal::Decimal; use tracing::{debug, error, info, warn}; -use crate::manager::crypto_asset_info::price::cache::PriceCache; -use crate::manager::crypto_asset_info::price::error::{Error, MissingPrerequisiteError}; -use crate::manager::crypto_asset_info::types::{PriceSignalComputationRecord, PriceState}; +use crate::manager::asset_info::price::cache::PriceCache; +use crate::manager::asset_info::price::error::{Error, MissingPrerequisiteError}; +use crate::manager::asset_info::types::{PriceSignalComputationRecord, PriceState}; use crate::monitoring::types::{OperationRecord, ProcessRecord, SourceRecord}; // TODO: Allow records to be Option diff --git a/bothan-core/src/manager/crypto_asset_info/signal_ids.rs b/bothan-core/src/manager/asset_info/signal_ids.rs similarity index 85% rename from bothan-core/src/manager/crypto_asset_info/signal_ids.rs rename to bothan-core/src/manager/asset_info/signal_ids.rs index 111d80e9..9abd2f62 100644 --- a/bothan-core/src/manager/crypto_asset_info/signal_ids.rs +++ b/bothan-core/src/manager/asset_info/signal_ids.rs @@ -2,6 +2,8 @@ use std::collections::{HashMap, HashSet, VecDeque}; use bothan_lib::registry::{Registry, Valid}; +use crate::manager::asset_info::types::AssetType; + // Returns a mapping of source_id to a set of query_ids that are batched together // This is used to determine which queries should be batched together // e.g. if we have a registry with the following entries: @@ -55,12 +57,15 @@ use bothan_lib::registry::{Registry, Valid}; // } pub fn get_source_batched_query_ids( registry: &Registry, + asset_type: AssetType, ) -> HashMap> { let mut source_query_ids: HashMap> = HashMap::new(); // Seen signal_ids let mut seen = HashSet::::new(); - let mut queue = VecDeque::from_iter(registry.signal_ids()); + let signal_ids = get_signal_ids_by_asset_type(registry.signal_ids(), asset_type); + + let mut queue = VecDeque::from_iter(signal_ids); while let Some(signal_id) = queue.pop_front() { if seen.contains(signal_id) { continue; @@ -88,6 +93,19 @@ pub fn get_source_batched_query_ids( source_query_ids } +fn get_signal_ids_by_asset_type<'a>( + signal_ids: impl Iterator, + asset_type: AssetType, +) -> HashSet<&'a String> { + let mut result = HashSet::new(); + for signal_id in signal_ids { + if signal_id.starts_with(asset_type.as_ref()) { + result.insert(signal_id); + } + } + result +} + #[cfg(test)] mod tests { use bothan_lib::registry::Invalid; @@ -103,7 +121,7 @@ mod tests { fn test_get_source_batched_query_ids() { let registry = mock_registry().validate().unwrap(); - let diff = get_source_batched_query_ids(®istry); + let diff = get_source_batched_query_ids(®istry, AssetType::Crypto); let expected = HashMap::from_iter([ ( "binance".to_string(), diff --git a/bothan-core/src/manager/crypto_asset_info/types.rs b/bothan-core/src/manager/asset_info/types.rs similarity index 81% rename from bothan-core/src/manager/crypto_asset_info/types.rs rename to bothan-core/src/manager/asset_info/types.rs index 00025df8..3dd9c041 100644 --- a/bothan-core/src/manager/crypto_asset_info/types.rs +++ b/bothan-core/src/manager/asset_info/types.rs @@ -4,6 +4,7 @@ use std::time::Duration; use bothan_lib::types::AssetInfo; use rust_decimal::Decimal; +use strum_macros::AsRefStr; use crate::monitoring::types::SignalComputationRecord; @@ -19,7 +20,15 @@ pub enum PriceState { Unsupported, } -pub struct CryptoAssetManagerInfo { +#[derive(Debug, AsRefStr)] +pub enum AssetType { + #[strum(serialize = "CS")] + Crypto, + #[strum(serialize = "FS")] + Forex, +} + +pub struct AssetManagerInfo { pub bothan_version: String, pub registry_hash: String, pub registry_version_requirement: String, @@ -27,7 +36,7 @@ pub struct CryptoAssetManagerInfo { pub monitoring_enabled: bool, } -impl CryptoAssetManagerInfo { +impl AssetManagerInfo { pub fn new( bothan_version: String, registry_hash: String, @@ -35,7 +44,7 @@ impl CryptoAssetManagerInfo { active_sources: Vec, monitoring_enabled: bool, ) -> Self { - CryptoAssetManagerInfo { + AssetManagerInfo { bothan_version, registry_hash, registry_version_requirement, From a17fe05eb6a8d89656258ba41853fe58c3864f28 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Mon, 2 Mar 2026 15:15:47 +0700 Subject: [PATCH 02/12] fix copilot comment --- bothan-api/server-cli/config.example.toml | 6 +++--- bothan-core/src/manager/asset_info/forex/worker/opts.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bothan-api/server-cli/config.example.toml b/bothan-api/server-cli/config.example.toml index 7c616bef..3cbbb501 100644 --- a/bothan-api/server-cli/config.example.toml +++ b/bothan-api/server-cli/config.example.toml @@ -115,9 +115,9 @@ url = "https://macaw.bandchain.org" update_interval = "1m" [manager.forex.source.band_kiki] -# URL for Band Macaw source -url = "https://macaw.bandchain.org" -# Update interval for Band Macaw source +# URL for Band Kiki source +url = "https://kiki.bandchain.org" +# Update interval for Band Kiki source update_interval = "1m" # Telemetry configuration diff --git a/bothan-core/src/manager/asset_info/forex/worker/opts.rs b/bothan-core/src/manager/asset_info/forex/worker/opts.rs index 6706599f..1110d468 100644 --- a/bothan-core/src/manager/asset_info/forex/worker/opts.rs +++ b/bothan-core/src/manager/asset_info/forex/worker/opts.rs @@ -1,4 +1,4 @@ -//! Worker options for configuring crypto asset source workers. +//! Worker options for configuring forex asset source workers. #[derive(Clone)] pub enum ForexAssetWorkerOpts { From b1e5404775000104e418c5598f92f21199adf155 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Fri, 6 Mar 2026 16:01:57 +0700 Subject: [PATCH 03/12] add name --- README.md | 8 ++- bothan-api/server-cli/config.example.toml | 20 +++++-- bothan-api/server-cli/src/commands/query.rs | 57 ++++++++++++++----- bothan-api/server-cli/src/commands/start.rs | 4 +- .../src/config/manager/crypto_info/sources.rs | 1 + .../src/config/manager/forex_info/sources.rs | 38 ++++++++++--- docs/architecture.md | 2 +- 7 files changed, 102 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 4238f903..2c1c01a1 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ This project comprises primarily of 6 main components: - `bothan-{exchange}` - Exchange-specific implementations - [`proto`](proto/) - Protocol buffer definitions -## Supported Data Sources +## Supported Crypto Data Sources - [Binance](bothan-binance) - [Bitfinex](bothan-bitfinex) @@ -38,6 +38,12 @@ This project comprises primarily of 6 main components: - [Band/kiwi](bothan-band) - [Band/macaw](bothan-band) +## Supported Forex Data Sources + +- [Band/owlet](bothan-band) +- [Band/fieldfare](bothan-band) +- [Band/xenops](bothan-band) + ## Features - **Unified API**: Consistent interface across all supported exchanges diff --git a/bothan-api/server-cli/config.example.toml b/bothan-api/server-cli/config.example.toml index 3cbbb501..1521fffa 100644 --- a/bothan-api/server-cli/config.example.toml +++ b/bothan-api/server-cli/config.example.toml @@ -114,10 +114,22 @@ url = "https://macaw.bandchain.org" # Update interval for Band Macaw source update_interval = "1m" -[manager.forex.source.band_kiki] -# URL for Band Kiki source -url = "https://kiki.bandchain.org" -# Update interval for Band Kiki source +[manager.forex.source.band_owlet] +# URL for Band Owlet source +url = "https://owlet.bandchain.org" +# Update interval for Band Owlet source +update_interval = "1m" + +[manager.forex.source.band_fieldfare] +# URL for Band Fieldfare source +url = "https://fieldfare.bandchain.org" +# Update interval for Band Fieldfare source +update_interval = "1m" + +[manager.forex.source.band_xenops] +# URL for Band Xenops source +url = "https://xenops.bandchain.org" +# Update interval for Band Xenops source update_interval = "1m" # Telemetry configuration diff --git a/bothan-api/server-cli/src/commands/query.rs b/bothan-api/server-cli/src/commands/query.rs index 9072f824..90734f2b 100644 --- a/bothan-api/server-cli/src/commands/query.rs +++ b/bothan-api/server-cli/src/commands/query.rs @@ -6,7 +6,7 @@ //! //! ## Features //! -//! - Query prices from Binance, Bitfinex, Bybit, Coinbase, CoinGecko, CoinMarketCap, HTX, Kraken, OKX +//! - Query prices from Binance, Bitfinex, Bybit, Coinbase, CoinGecko, CoinMarketCap, HTX, Kraken, OKX, Band (Kiwi, Macaw, Owlet, Fieldfare, Xenops) //! - Customizable timeout and query IDs //! - Pretty-printed table output //! @@ -124,55 +124,86 @@ pub enum QuerySubCommand { #[clap(flatten)] args: QueryArgs, }, + /// Query Band/owlet prices + #[clap(name = "band/owlet")] + BandOwlet { + #[clap(flatten)] + args: QueryArgs, + }, + /// Query Band/fieldfare prices + #[clap(name = "band/fieldfare")] + BandFieldfare { + #[clap(flatten)] + args: QueryArgs, + }, + /// Query Band/xenops prices + #[clap(name = "band/xenops")] + BandXenops { + #[clap(flatten)] + args: QueryArgs, + }, } impl QueryCli { pub async fn run(&self, app_config: AppConfig) -> anyhow::Result<()> { - let source_config = app_config.manager.crypto.source; + let crypto_config = app_config.manager.crypto.source; + let forex_config = app_config.manager.forex.source; let config_err = anyhow!("Config is missing. Please check your config.toml."); match &self.subcommand { QuerySubCommand::Binance { args } => { - let opts = source_config.binance.ok_or(config_err)?; + let opts = crypto_config.binance.ok_or(config_err)?; query_binance(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Bitfinex { args } => { - let opts = source_config.bitfinex.ok_or(config_err)?; + let opts = crypto_config.bitfinex.ok_or(config_err)?; query_bitfinex(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Bybit { args } => { - let opts = source_config.bybit.ok_or(config_err)?; + let opts = crypto_config.bybit.ok_or(config_err)?; query_bybit(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Coinbase { args } => { - let opts = source_config.coinbase.ok_or(config_err)?; + let opts = crypto_config.coinbase.ok_or(config_err)?; query_coinbase(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::CoinGecko { args } => { - let opts = source_config.coingecko.ok_or(config_err)?; + let opts = crypto_config.coingecko.ok_or(config_err)?; query_coingecko(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::CoinMarketCap { args } => { - let opts = source_config.coinmarketcap.ok_or(config_err)?; + let opts = crypto_config.coinmarketcap.ok_or(config_err)?; query_coinmarketcap(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Htx { args } => { - let opts = source_config.htx.ok_or(config_err)?; + let opts = crypto_config.htx.ok_or(config_err)?; query_htx(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Kraken { args } => { - let opts = source_config.kraken.ok_or(config_err)?; + let opts = crypto_config.kraken.ok_or(config_err)?; query_kraken(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::Okx { args } => { - let opts = source_config.okx.ok_or(config_err)?; + let opts = crypto_config.okx.ok_or(config_err)?; query_okx(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::BandKiwi { args } => { - let opts = source_config.band_kiwi.ok_or(config_err)?; + let opts = crypto_config.band_kiwi.ok_or(config_err)?; query_band(opts, &args.query_ids, args.timeout).await?; } QuerySubCommand::BandMacaw { args } => { - let opts = source_config.band_macaw.ok_or(config_err)?; + let opts = crypto_config.band_macaw.ok_or(config_err)?; + query_band(opts, &args.query_ids, args.timeout).await?; + } + QuerySubCommand::BandOwlet { args } => { + let opts = forex_config.band_owlet.ok_or(config_err)?; + query_band(opts, &args.query_ids, args.timeout).await?; + } + QuerySubCommand::BandFieldfare { args } => { + let opts = forex_config.band_fieldfare.ok_or(config_err)?; + query_band(opts, &args.query_ids, args.timeout).await?; + } + QuerySubCommand::BandXenops { args } => { + let opts = forex_config.band_xenops.ok_or(config_err)?; query_band(opts, &args.query_ids, args.timeout).await?; } } diff --git a/bothan-api/server-cli/src/commands/start.rs b/bothan-api/server-cli/src/commands/start.rs index aefef2c5..a17f0f9c 100644 --- a/bothan-api/server-cli/src/commands/start.rs +++ b/bothan-api/server-cli/src/commands/start.rs @@ -287,7 +287,9 @@ async fn init_forex_opts( ) -> Result, AssetWorkerError> { let mut worker_opts = HashMap::new(); - add_forex_worker_opts(&mut worker_opts, &source.band_kiwi2).await?; + add_forex_worker_opts(&mut worker_opts, &source.band_owlet).await?; + add_forex_worker_opts(&mut worker_opts, &source.band_fieldfare).await?; + add_forex_worker_opts(&mut worker_opts, &source.band_xenops).await?; Ok(worker_opts) } diff --git a/bothan-api/server/src/config/manager/crypto_info/sources.rs b/bothan-api/server/src/config/manager/crypto_info/sources.rs index 85c8ab83..6fc52559 100644 --- a/bothan-api/server/src/config/manager/crypto_info/sources.rs +++ b/bothan-api/server/src/config/manager/crypto_info/sources.rs @@ -65,6 +65,7 @@ macro_rules! de_band_named { const BAND1_WORKER_NAME: &str = "band/kiwi"; de_band_named!(de_kiwi, BAND1_WORKER_NAME); + const BAND2_WORKER_NAME: &str = "band/macaw"; de_band_named!(de_macaw, BAND2_WORKER_NAME); diff --git a/bothan-api/server/src/config/manager/forex_info/sources.rs b/bothan-api/server/src/config/manager/forex_info/sources.rs index 43248f50..05b4cf29 100644 --- a/bothan-api/server/src/config/manager/forex_info/sources.rs +++ b/bothan-api/server/src/config/manager/forex_info/sources.rs @@ -7,9 +7,15 @@ use serde::{Deserialize, Serialize}; /// Configuration for the worker sources for forex asset info. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ForexSourceConfigs { - /// Band/kiwi worker options. - #[serde(default, deserialize_with = "de_kiwi2")] - pub band_kiwi2: Option, + /// Band/owlet worker options. + #[serde(default, deserialize_with = "de_owlet")] + pub band_owlet: Option, + /// Band/fieldfare worker options. + #[serde(default, deserialize_with = "de_fieldfare")] + pub band_fieldfare: Option, + /// Band/xenops worker options. + #[serde(default, deserialize_with = "de_xenops")] + pub band_xenops: Option, } // Macro to generate deserialization functions for Band workers with preset names. @@ -26,15 +32,31 @@ macro_rules! de_band_named { }; } -const BAND1_WORKER_NAME: &str = "band/kiwi2"; -de_band_named!(de_kiwi2, BAND1_WORKER_NAME); +const BAND1_WORKER_NAME: &str = "band/owlet"; +de_band_named!(de_owlet, BAND1_WORKER_NAME); + +const BAND2_WORKER_NAME: &str = "band/fieldfare"; +de_band_named!(de_fieldfare, BAND2_WORKER_NAME); + +const BAND3_WORKER_NAME: &str = "band/xenops"; +de_band_named!(de_xenops, BAND3_WORKER_NAME); impl Default for ForexSourceConfigs { fn default() -> Self { ForexSourceConfigs { - band_kiwi2: Some(bothan_band::WorkerOpts::new( - "band/kiwi2", - "https://kiwi.bandchain.org", + band_owlet: Some(bothan_band::WorkerOpts::new( + "band/owlet", + "https://owlet.bandchain.org", + None, + )), + band_fieldfare: Some(bothan_band::WorkerOpts::new( + "band/fieldfare", + "https://fieldfare.bandchain.org", + None, + )), + band_xenops: Some(bothan_band::WorkerOpts::new( + "band/xenops", + "https://xenops.bandchain.org", None, )), } diff --git a/docs/architecture.md b/docs/architecture.md index ae658c3f..4da88634 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -82,7 +82,7 @@ Each provider has its own dedicated module: - `bothan-htx`: Integration with HTX - `bothan-kraken`: Integration with Kraken - `bothan-okx`: Integration with OKX -- `bothan-band`: Integration with Band sources (e.g. band/kiwi, band/macaw) +- `bothan-band`: Integration with Band sources (e.g. band/kiwi, band/macaw, band/owlet, band/fieldfare, band/xenops) These modules implement provider-specific logic while conforming to common interfaces defined in the core components. From 2465273d30285d6d0131ec33a9051adf7e87a32b Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Mon, 9 Mar 2026 00:05:58 +0700 Subject: [PATCH 04/12] add forex stale time --- bothan-api/server-cli/config.example.toml | 5 + bothan-api/server-cli/src/commands/start.rs | 75 ++++++----- .../server/src/config/manager/forex_info.rs | 2 +- bothan-core/src/manager/asset_info.rs | 7 +- .../src/manager/asset_info/crypto/worker.rs | 116 ------------------ .../manager/asset_info/crypto/worker/opts.rs | 92 -------------- .../src/manager/asset_info/forex/worker.rs | 71 ----------- .../manager/asset_info/forex/worker/opts.rs | 20 --- bothan-core/src/manager/asset_info/manager.rs | 93 +++----------- .../src/manager/asset_info/price/tasks.rs | 51 ++++++-- .../src/manager/asset_info/signal_ids.rs | 22 +--- bothan-core/src/manager/asset_info/worker.rs | 115 +++++++++++++++++ .../src/manager/asset_info/worker/opts.rs | 97 +++++++++++++++ bothan-htx/src/api/error.rs | 1 + bothan-htx/src/api/websocket.rs | 2 + 15 files changed, 321 insertions(+), 448 deletions(-) delete mode 100644 bothan-core/src/manager/asset_info/crypto/worker.rs delete mode 100644 bothan-core/src/manager/asset_info/crypto/worker/opts.rs delete mode 100644 bothan-core/src/manager/asset_info/forex/worker.rs delete mode 100644 bothan-core/src/manager/asset_info/forex/worker/opts.rs create mode 100644 bothan-core/src/manager/asset_info/worker.rs create mode 100644 bothan-core/src/manager/asset_info/worker/opts.rs diff --git a/bothan-api/server-cli/config.example.toml b/bothan-api/server-cli/config.example.toml index 1521fffa..96d3f1f7 100644 --- a/bothan-api/server-cli/config.example.toml +++ b/bothan-api/server-cli/config.example.toml @@ -41,6 +41,11 @@ enabled = false # The threshold in seconds after which data is considered stale. stale_threshold = 300 +# Manager configuration for handling forex data sources +[manager.forex] +# The threshold in seconds after which data is considered stale. +stale_threshold = 3600 + # Configuration for the data sources that the manager will use. # If any of these [manager.crypto.source] sections (e.g., section manager.crypto.source.binance) are removed, # that specific source will not be used in bothan. diff --git a/bothan-api/server-cli/src/commands/start.rs b/bothan-api/server-cli/src/commands/start.rs index a17f0f9c..4d684592 100644 --- a/bothan-api/server-cli/src/commands/start.rs +++ b/bothan-api/server-cli/src/commands/start.rs @@ -20,8 +20,7 @@ use bothan_api::proto::bothan::v1::{BothanServiceServer, FILE_DESCRIPTOR_SET}; use bothan_api::{REGISTRY_REQUIREMENT, VERSION}; use bothan_core::ipfs::{IpfsClient, IpfsClientBuilder}; use bothan_core::manager::AssetInfoManager; -use bothan_core::manager::asset_info::CryptoAssetWorkerOpts; -use bothan_core::manager::asset_info::forex::worker::opts::ForexAssetWorkerOpts; +use bothan_core::manager::asset_info::AssetWorkerOpts; use bothan_core::monitoring::{Client as MonitoringClient, Signer}; use bothan_core::store::rocksdb::RocksDbStore; use bothan_core::telemetry; @@ -189,7 +188,10 @@ async fn init_bothan_server( ipfs_client: IpfsClient, monitoring_client: Option>, ) -> anyhow::Result>> { - let stale_threshold = config.manager.crypto.stale_threshold; + let prefix_stale_thresholds = init_prefix_stale_thresholds( + config.manager.crypto.stale_threshold, + config.manager.forex.stale_threshold, + ); let bothan_version = Version::from_str(VERSION).with_context(|| "Failed to parse bothan version")?; let registry_version_requirement = VersionReq::from_str(REGISTRY_REQUIREMENT) @@ -209,12 +211,13 @@ async fn init_bothan_server( } }; + let worker_opts = crypto_opts.into_iter().chain(forex_opts).collect(); + let manager = match AssetInfoManager::build( store, - crypto_opts, - forex_opts, + worker_opts, ipfs_client, - stale_threshold, + prefix_stale_thresholds, bothan_version, registry_version_requirement, monitoring_client, @@ -248,54 +251,50 @@ async fn init_bothan_server( Ok(Arc::new(BothanServer::new(manager, metrics))) } +fn init_prefix_stale_thresholds( + crypto_stale_threshold: i64, + forex_stale_threshold: i64, +) -> HashMap { + HashMap::from([ + ("CS".to_string(), crypto_stale_threshold), + ("FS".to_string(), forex_stale_threshold), + ]) +} + async fn init_crypto_opts( source: &CryptoSourceConfigs, -) -> Result, AssetWorkerError> { +) -> Result, AssetWorkerError> { let mut worker_opts = HashMap::new(); - add_crypto_worker_opts(&mut worker_opts, &source.binance).await?; - add_crypto_worker_opts(&mut worker_opts, &source.bitfinex).await?; - add_crypto_worker_opts(&mut worker_opts, &source.bybit).await?; - add_crypto_worker_opts(&mut worker_opts, &source.coinbase).await?; - add_crypto_worker_opts(&mut worker_opts, &source.coingecko).await?; - add_crypto_worker_opts(&mut worker_opts, &source.coinmarketcap).await?; - add_crypto_worker_opts(&mut worker_opts, &source.htx).await?; - add_crypto_worker_opts(&mut worker_opts, &source.kraken).await?; - add_crypto_worker_opts(&mut worker_opts, &source.okx).await?; - add_crypto_worker_opts(&mut worker_opts, &source.band_kiwi).await?; - add_crypto_worker_opts(&mut worker_opts, &source.band_macaw).await?; + add_worker_opts(&mut worker_opts, &source.binance).await?; + add_worker_opts(&mut worker_opts, &source.bitfinex).await?; + add_worker_opts(&mut worker_opts, &source.bybit).await?; + add_worker_opts(&mut worker_opts, &source.coinbase).await?; + add_worker_opts(&mut worker_opts, &source.coingecko).await?; + add_worker_opts(&mut worker_opts, &source.coinmarketcap).await?; + add_worker_opts(&mut worker_opts, &source.htx).await?; + add_worker_opts(&mut worker_opts, &source.kraken).await?; + add_worker_opts(&mut worker_opts, &source.okx).await?; + add_worker_opts(&mut worker_opts, &source.band_kiwi).await?; + add_worker_opts(&mut worker_opts, &source.band_macaw).await?; Ok(worker_opts) } -async fn add_crypto_worker_opts>( - workers_opts: &mut HashMap, - opts: &Option, -) -> Result<(), AssetWorkerError> { - if let Some(opts) = opts { - let worker_opts = opts.clone().into(); - let worker_name = worker_opts.name(); - info!("{} worker is enabled", worker_name); - workers_opts.insert(worker_name.to_string(), worker_opts); - } - - Ok(()) -} - async fn init_forex_opts( source: &ForexSourceConfigs, -) -> Result, AssetWorkerError> { +) -> Result, AssetWorkerError> { let mut worker_opts = HashMap::new(); - add_forex_worker_opts(&mut worker_opts, &source.band_owlet).await?; - add_forex_worker_opts(&mut worker_opts, &source.band_fieldfare).await?; - add_forex_worker_opts(&mut worker_opts, &source.band_xenops).await?; + add_worker_opts(&mut worker_opts, &source.band_owlet).await?; + add_worker_opts(&mut worker_opts, &source.band_fieldfare).await?; + add_worker_opts(&mut worker_opts, &source.band_xenops).await?; Ok(worker_opts) } -async fn add_forex_worker_opts>( - workers_opts: &mut HashMap, +async fn add_worker_opts>( + workers_opts: &mut HashMap, opts: &Option, ) -> Result<(), AssetWorkerError> { if let Some(opts) = opts { diff --git a/bothan-api/server/src/config/manager/forex_info.rs b/bothan-api/server/src/config/manager/forex_info.rs index f4286053..277d4d7b 100644 --- a/bothan-api/server/src/config/manager/forex_info.rs +++ b/bothan-api/server/src/config/manager/forex_info.rs @@ -23,7 +23,7 @@ pub struct ForexInfoManagerConfig { /// Returns the default stale threshold (in seconds). fn default_stale_threshold() -> i64 { - 300 + 3600 } impl Default for ForexInfoManagerConfig { diff --git a/bothan-core/src/manager/asset_info.rs b/bothan-core/src/manager/asset_info.rs index 7eb59d22..8aa59f93 100644 --- a/bothan-core/src/manager/asset_info.rs +++ b/bothan-core/src/manager/asset_info.rs @@ -2,14 +2,13 @@ //! //! Provides types and logic for managing crypto asset information. -pub use crypto::worker::CryptoAssetWorker; -pub use crypto::worker::opts::CryptoAssetWorkerOpts; pub use manager::AssetInfoManager; +pub use worker::AnyAssetWorker; +pub use worker::opts::AssetWorkerOpts; -pub mod crypto; pub mod error; -pub mod forex; pub(super) mod manager; pub(super) mod price; pub(super) mod signal_ids; pub mod types; +pub mod worker; diff --git a/bothan-core/src/manager/asset_info/crypto/worker.rs b/bothan-core/src/manager/asset_info/crypto/worker.rs deleted file mode 100644 index 906b48a5..00000000 --- a/bothan-core/src/manager/asset_info/crypto/worker.rs +++ /dev/null @@ -1,116 +0,0 @@ -//! Worker trait and implementations for crypto asset sources. - -pub mod opts; - -use std::collections::HashMap; - -use bothan_lib::registry::{Registry, Valid}; -use bothan_lib::store::Store; -use bothan_lib::worker::AssetWorker; -use bothan_lib::worker::error::AssetWorkerError; -use derive_more::From; -use tracing::{error, info}; - -use crate::manager::asset_info::crypto::worker::opts::CryptoAssetWorkerOpts; -use crate::manager::asset_info::signal_ids::get_source_batched_query_ids; -use crate::manager::asset_info::types::AssetType; - -#[derive(From)] -pub enum CryptoAssetWorker { - Binance(bothan_binance::Worker), - Bitfinex(bothan_bitfinex::Worker), - Bybit(bothan_bybit::Worker), - Coinbase(bothan_coinbase::Worker), - CoinGecko(bothan_coingecko::Worker), - CoinMarketCap(bothan_coinmarketcap::Worker), - Htx(bothan_htx::Worker), - Kraken(bothan_kraken::Worker), - Okx(bothan_okx::Worker), - Band(bothan_band::Worker), -} - -#[async_trait::async_trait] -impl AssetWorker for CryptoAssetWorker { - type Opts = CryptoAssetWorkerOpts; - - fn name(&self) -> &'static str { - match self { - CryptoAssetWorker::Binance(w) => w.name(), - CryptoAssetWorker::Bitfinex(w) => w.name(), - CryptoAssetWorker::Bybit(w) => w.name(), - CryptoAssetWorker::Coinbase(w) => w.name(), - CryptoAssetWorker::CoinGecko(w) => w.name(), - CryptoAssetWorker::CoinMarketCap(w) => w.name(), - CryptoAssetWorker::Htx(w) => w.name(), - CryptoAssetWorker::Kraken(w) => w.name(), - CryptoAssetWorker::Okx(w) => w.name(), - CryptoAssetWorker::Band(w) => w.name(), - } - } - - async fn build( - opts: Self::Opts, - store: &S, - ids: Vec, - ) -> Result { - Ok(match opts { - CryptoAssetWorkerOpts::Binance(opts) => { - CryptoAssetWorker::from(bothan_binance::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Bitfinex(opts) => { - CryptoAssetWorker::from(bothan_bitfinex::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Bybit(opts) => { - CryptoAssetWorker::from(bothan_bybit::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Coinbase(opts) => { - CryptoAssetWorker::from(bothan_coinbase::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::CoinGecko(opts) => { - CryptoAssetWorker::from(bothan_coingecko::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::CoinMarketCap(opts) => CryptoAssetWorker::from( - bothan_coinmarketcap::Worker::build(opts, store, ids).await?, - ), - CryptoAssetWorkerOpts::Htx(opts) => { - CryptoAssetWorker::from(bothan_htx::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Kraken(opts) => { - CryptoAssetWorker::from(bothan_kraken::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Okx(opts) => { - CryptoAssetWorker::from(bothan_okx::Worker::build(opts, store, ids).await?) - } - CryptoAssetWorkerOpts::Band(opts) => { - CryptoAssetWorker::from(bothan_band::Worker::build(opts, store, ids).await?) - } - }) - } -} - -pub async fn build_workers( - registry: &Registry, - opts: &HashMap, - store: S, -) -> Vec { - let mut workers = Vec::with_capacity(opts.len()); - for (source_id, query_id) in get_source_batched_query_ids(registry, AssetType::Crypto).drain() { - match opts.get(&source_id) { - Some(opts) => { - let ids = query_id.into_iter().collect(); - let builder_callable = CryptoAssetWorker::build(opts.clone(), &store, ids); - let worker = match builder_callable.await { - Ok(worker) => worker, - Err(e) => { - error!("failed to build worker {}: {}", source_id, e); - continue; - } - }; - workers.push(worker); - } - None => info!("worker {} not activated", source_id), - } - } - - workers -} diff --git a/bothan-core/src/manager/asset_info/crypto/worker/opts.rs b/bothan-core/src/manager/asset_info/crypto/worker/opts.rs deleted file mode 100644 index 8f36016f..00000000 --- a/bothan-core/src/manager/asset_info/crypto/worker/opts.rs +++ /dev/null @@ -1,92 +0,0 @@ -//! Worker options for configuring crypto asset source workers. - -#[derive(Clone)] -pub enum CryptoAssetWorkerOpts { - Binance(bothan_binance::WorkerOpts), - Bitfinex(bothan_bitfinex::WorkerOpts), - Bybit(bothan_bybit::WorkerOpts), - Coinbase(bothan_coinbase::WorkerOpts), - CoinGecko(bothan_coingecko::WorkerOpts), - CoinMarketCap(bothan_coinmarketcap::WorkerOpts), - Htx(bothan_htx::WorkerOpts), - Kraken(bothan_kraken::WorkerOpts), - Okx(bothan_okx::WorkerOpts), - Band(bothan_band::WorkerOpts), -} - -impl CryptoAssetWorkerOpts { - pub fn name(&self) -> &str { - match self { - CryptoAssetWorkerOpts::Binance(_) => "binance", - CryptoAssetWorkerOpts::Bitfinex(_) => "bitfinex", - CryptoAssetWorkerOpts::Bybit(_) => "bybit", - CryptoAssetWorkerOpts::Coinbase(_) => "coinbase", - CryptoAssetWorkerOpts::CoinGecko(_) => "coingecko", - CryptoAssetWorkerOpts::CoinMarketCap(_) => "coinmarketcap", - CryptoAssetWorkerOpts::Htx(_) => "htx", - CryptoAssetWorkerOpts::Kraken(_) => "kraken", - CryptoAssetWorkerOpts::Okx(_) => "okx", - CryptoAssetWorkerOpts::Band(opts) => opts.name(), - } - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_binance::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Binance(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_bitfinex::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Bitfinex(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_bybit::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Bybit(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_coinbase::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Coinbase(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_coingecko::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::CoinGecko(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_coinmarketcap::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::CoinMarketCap(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_htx::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Htx(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_kraken::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Kraken(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_okx::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Okx(value) - } -} - -impl From for CryptoAssetWorkerOpts { - fn from(value: bothan_band::WorkerOpts) -> Self { - CryptoAssetWorkerOpts::Band(value) - } -} diff --git a/bothan-core/src/manager/asset_info/forex/worker.rs b/bothan-core/src/manager/asset_info/forex/worker.rs deleted file mode 100644 index 6be78210..00000000 --- a/bothan-core/src/manager/asset_info/forex/worker.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! Worker trait and implementations for forex asset sources. - -pub mod opts; - -use std::collections::HashMap; - -use bothan_lib::registry::{Registry, Valid}; -use bothan_lib::store::Store; -use bothan_lib::worker::AssetWorker; -use bothan_lib::worker::error::AssetWorkerError; -use derive_more::From; -use tracing::{error, info}; - -use crate::manager::asset_info::forex::worker::opts::ForexAssetWorkerOpts; -use crate::manager::asset_info::signal_ids::get_source_batched_query_ids; -use crate::manager::asset_info::types::AssetType; - -#[derive(From)] -pub enum ForexAssetWorker { - Band(bothan_band::Worker), -} - -#[async_trait::async_trait] -impl AssetWorker for ForexAssetWorker { - type Opts = ForexAssetWorkerOpts; - - fn name(&self) -> &'static str { - match self { - ForexAssetWorker::Band(w) => w.name(), - } - } - - async fn build( - opts: Self::Opts, - store: &S, - ids: Vec, - ) -> Result { - Ok(match opts { - ForexAssetWorkerOpts::Band(opts) => { - ForexAssetWorker::from(bothan_band::Worker::build(opts, store, ids).await?) - } - }) - } -} - -pub async fn build_workers( - registry: &Registry, - opts: &HashMap, - store: S, -) -> Vec { - let mut workers = Vec::with_capacity(opts.len()); - for (source_id, query_id) in get_source_batched_query_ids(registry, AssetType::Forex).drain() { - match opts.get(&source_id) { - Some(opts) => { - let ids = query_id.into_iter().collect(); - let builder_callable = ForexAssetWorker::build(opts.clone(), &store, ids); - let worker = match builder_callable.await { - Ok(worker) => worker, - Err(e) => { - error!("failed to build worker {}: {}", source_id, e); - continue; - } - }; - workers.push(worker); - } - None => info!("worker {} not activated", source_id), - } - } - - workers -} diff --git a/bothan-core/src/manager/asset_info/forex/worker/opts.rs b/bothan-core/src/manager/asset_info/forex/worker/opts.rs deleted file mode 100644 index 1110d468..00000000 --- a/bothan-core/src/manager/asset_info/forex/worker/opts.rs +++ /dev/null @@ -1,20 +0,0 @@ -//! Worker options for configuring forex asset source workers. - -#[derive(Clone)] -pub enum ForexAssetWorkerOpts { - Band(bothan_band::WorkerOpts), -} - -impl ForexAssetWorkerOpts { - pub fn name(&self) -> &str { - match self { - ForexAssetWorkerOpts::Band(opts) => opts.name(), - } - } -} - -impl From for ForexAssetWorkerOpts { - fn from(value: bothan_band::WorkerOpts) -> Self { - ForexAssetWorkerOpts::Band(value) - } -} diff --git a/bothan-core/src/manager/asset_info/manager.rs b/bothan-core/src/manager/asset_info/manager.rs index 2b045d11..140c58ec 100644 --- a/bothan-core/src/manager/asset_info/manager.rs +++ b/bothan-core/src/manager/asset_info/manager.rs @@ -10,7 +10,7 @@ use std::time::Duration; use bothan_lib::metrics::store::Metrics; use bothan_lib::registry::{Invalid, Registry}; use bothan_lib::store::Store; -use bothan_lib::worker::AssetWorker; +use bothan_lib::worker::AssetWorker as AssetWorkerTrait; use mini_moka::sync::Cache; use semver::{Version, VersionReq}; use serde_json::from_str; @@ -19,30 +19,22 @@ use tokio::time::sleep; use crate::ipfs::IpfsClient; use crate::ipfs::error::Error as IpfsError; -use crate::manager::asset_info::crypto::worker::opts::CryptoAssetWorkerOpts; -use crate::manager::asset_info::crypto::worker::{ - CryptoAssetWorker, build_workers as build_crypto_workers, -}; use crate::manager::asset_info::error::{ PostHeartbeatError, PushMonitoringRecordError, SetRegistryError, }; -use crate::manager::asset_info::forex::worker::opts::ForexAssetWorkerOpts; -use crate::manager::asset_info::forex::worker::{ - ForexAssetWorker, build_workers as build_forex_workers, -}; use crate::manager::asset_info::price::tasks::get_signal_price_states; use crate::manager::asset_info::types::{ AssetManagerInfo, MONITORING_TTL, PriceSignalComputationRecord, PriceState, }; +use crate::manager::asset_info::worker::opts::AssetWorkerOpts; +use crate::manager::asset_info::worker::{AnyAssetWorker, build_workers}; use crate::monitoring::{Client as MonitoringClient, create_uuid}; pub struct AssetInfoManager { store: S, - crypto_opts: HashMap, - forex_opts: HashMap, - crypto_workers: Mutex>, - forex_workers: Mutex>, - stale_threshold: i64, + worker_opts: HashMap, + workers: Mutex>, + prefix_stale_thresholds: HashMap, ipfs_client: IpfsClient, bothan_version: Version, registry_version_requirement: VersionReq, @@ -56,10 +48,9 @@ impl AssetInfoManager { #[allow(clippy::too_many_arguments)] pub async fn build( store: S, - crypto_opts: HashMap, - forex_opts: HashMap, + worker_opts: HashMap, ipfs_client: IpfsClient, - stale_threshold: i64, + prefix_stale_thresholds: HashMap, bothan_version: Version, registry_version_requirement: VersionReq, monitoring_client: Option>, @@ -70,20 +61,15 @@ impl AssetInfoManager { let registry = store.get_registry().await?; - let crypto_workers = - Mutex::new(build_crypto_workers(®istry, &crypto_opts, store.clone()).await); - let forex_workers = - Mutex::new(build_forex_workers(®istry, &forex_opts, store.clone()).await); + let workers = Mutex::new(build_workers(®istry, &worker_opts, store.clone()).await); let metrics = Metrics::new(); let manager = AssetInfoManager { store, - crypto_opts, - forex_opts, - crypto_workers, - forex_workers, - stale_threshold, + worker_opts, + workers, + prefix_stale_thresholds, ipfs_client, bothan_version, registry_version_requirement, @@ -104,27 +90,14 @@ impl AssetInfoManager { .await? .unwrap_or(String::new()); // If value doesn't exist, return an empty string let registry_version_requirement = self.registry_version_requirement.to_string(); - let crypto_active_sources = self - .crypto_workers + let active_sources = self + .workers .lock() .await .iter() .map(|w| w.name().to_string()) .collect::>(); - let forex_active_sources = self - .forex_workers - .lock() - .await - .iter() - .map(|w| w.name().to_string()) - .collect::>(); - - let active_sources = crypto_active_sources - .into_iter() - .chain(forex_active_sources) - .collect::>(); - Ok(AssetManagerInfo::new( bothan_version, registry_hash, @@ -143,27 +116,14 @@ impl AssetInfoManager { let uuid = create_uuid(); - let crypto_active_sources = self - .crypto_workers - .lock() - .await - .iter() - .map(|w| w.name().to_string()) - .collect::>(); - - let forex_active_sources = self - .forex_workers + let active_sources = self + .workers .lock() .await .iter() .map(|w| w.name().to_string()) .collect::>(); - let active_sources = crypto_active_sources - .into_iter() - .chain(forex_active_sources) - .collect::>(); - let bothan_version = self.bothan_version.clone(); let registry_hash = self .store @@ -187,16 +147,13 @@ impl AssetInfoManager { ) -> Result<(String, Vec), S::Error> { let registry = self.store.get_registry().await?; - let current_time = chrono::Utc::now().timestamp(); - let stale_cutoff = current_time - self.stale_threshold; - let mut records = Vec::new(); let price_states = get_signal_price_states( ids, &self.store, ®istry, - stale_cutoff, + &self.prefix_stale_thresholds, &mut records, &self.metrics, ) @@ -285,24 +242,14 @@ impl AssetInfoManager { .await .map_err(|_| SetRegistryError::FailedToSetRegistry)?; - // drop old crypto workers to kill connection - let mut locked_workers = self.crypto_workers.lock().await; - locked_workers.clear(); - - // TODO: find method to wait for connections to clear up thats better than sleeping for 1 second - sleep(Duration::from_secs(1)).await; - - let workers = build_crypto_workers(®istry, &self.crypto_opts, self.store.clone()).await; - *locked_workers = workers; - - // drop old forex workers to kill connection - let mut locked_workers = self.forex_workers.lock().await; + // drop old workers to kill connection + let mut locked_workers = self.workers.lock().await; locked_workers.clear(); // TODO: find method to wait for connections to clear up thats better than sleeping for 1 second sleep(Duration::from_secs(1)).await; - let workers = build_forex_workers(®istry, &self.forex_opts, self.store.clone()).await; + let workers = build_workers(®istry, &self.worker_opts, self.store.clone()).await; *locked_workers = workers; Ok(()) diff --git a/bothan-core/src/manager/asset_info/price/tasks.rs b/bothan-core/src/manager/asset_info/price/tasks.rs index 89cb9740..f06c31e4 100644 --- a/bothan-core/src/manager/asset_info/price/tasks.rs +++ b/bothan-core/src/manager/asset_info/price/tasks.rs @@ -5,7 +5,7 @@ //! processor and post-processors, and handles any missing prerequisites by recursively //! fetching the required data. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::time::Instant; use bothan_lib::metrics::store::{Metrics, Operation, OperationStatus}; @@ -29,12 +29,14 @@ pub async fn get_signal_price_states( ids: Vec, store: &S, registry: &Registry, - stale_cutoff: i64, + prefix_stale_thresholds: &HashMap, records: &mut Vec, metrics: &Metrics, ) -> Vec { let mut cache = PriceCache::new(); + let current_time = chrono::Utc::now().timestamp(); + let mut queue = ids.clone(); while let Some(id) = queue.pop() { if cache.contains(&id) { @@ -46,7 +48,8 @@ pub async fn get_signal_price_states( &id, store, registry, - stale_cutoff, + current_time, + prefix_stale_thresholds, &cache, &mut record, metrics, @@ -91,17 +94,22 @@ pub async fn get_signal_price_states( .collect() } +#[allow(clippy::too_many_arguments)] async fn compute_signal_result( id: &str, store: &S, registry: &Registry, - stale_cutoff: i64, + current_time: i64, + prefix_stale_thresholds: &HashMap, cache: &PriceCache, record: &mut PriceSignalComputationRecord, metrics: &Metrics, ) -> Result { - match registry.get(id) { - Some(signal) => { + match ( + registry.get(id), + get_stale_cutoff(id, current_time, prefix_stale_thresholds), + ) { + (Some(signal), Some(stale_cutoff)) => { let source_results = compute_source_result(signal, store, cache, stale_cutoff, record, metrics).await?; @@ -136,10 +144,21 @@ async fn compute_signal_result( Ok(processed_signal) } - None => Err(Error::InvalidSignal), + _ => Err(Error::InvalidSignal), } } +fn get_stale_cutoff( + id: &str, + current_time: i64, + prefix_stale_thresholds: &HashMap, +) -> Option { + let prefix = id.split(':').next()?; + prefix_stale_thresholds + .get(prefix) + .map(|threshold| current_time - threshold) +} + async fn compute_source_result( signal: &Signal, store: &S, @@ -400,7 +419,9 @@ mod tests { ); let registry = mock_registry(); - let stale_cutoff = 0; + let current_timestamp = chrono::Utc::now().timestamp(); + let mut prefix_stale_thresholds = HashMap::new(); + prefix_stale_thresholds.insert("CS".to_string(), current_timestamp); let mut records = Vec::new(); let metrics = Metrics::new(); @@ -408,7 +429,7 @@ mod tests { ids, &mock_store, ®istry, - stale_cutoff, + &prefix_stale_thresholds, &mut records, &metrics, ) @@ -484,14 +505,16 @@ mod tests { let ids = vec!["CS:BTC-USD".to_string(), "CS:USDT-USD".to_string()]; let mock_store = MockStore::default(); let registry = mock_registry(); - let stale_cutoff = 0; + let current_timestamp = chrono::Utc::now().timestamp(); + let mut prefix_stale_thresholds = HashMap::new(); + prefix_stale_thresholds.insert("CS".to_string(), current_timestamp); let mut records = Vec::new(); let metrics = Metrics::new(); let res = get_signal_price_states( ids, &mock_store, ®istry, - stale_cutoff, + &prefix_stale_thresholds, &mut records, &metrics, ) @@ -569,7 +592,9 @@ mod tests { ); let registry = mock_registry(); - let stale_cutoff = 10000; + let current_timestamp = chrono::Utc::now().timestamp(); + let mut prefix_stale_thresholds = HashMap::new(); + prefix_stale_thresholds.insert("CS".to_string(), current_timestamp - 10000); let mut records = Vec::new(); let metrics = Metrics::new(); @@ -577,7 +602,7 @@ mod tests { ids, &mock_store, ®istry, - stale_cutoff, + &prefix_stale_thresholds, &mut records, &metrics, ) diff --git a/bothan-core/src/manager/asset_info/signal_ids.rs b/bothan-core/src/manager/asset_info/signal_ids.rs index 9abd2f62..111d80e9 100644 --- a/bothan-core/src/manager/asset_info/signal_ids.rs +++ b/bothan-core/src/manager/asset_info/signal_ids.rs @@ -2,8 +2,6 @@ use std::collections::{HashMap, HashSet, VecDeque}; use bothan_lib::registry::{Registry, Valid}; -use crate::manager::asset_info::types::AssetType; - // Returns a mapping of source_id to a set of query_ids that are batched together // This is used to determine which queries should be batched together // e.g. if we have a registry with the following entries: @@ -57,15 +55,12 @@ use crate::manager::asset_info::types::AssetType; // } pub fn get_source_batched_query_ids( registry: &Registry, - asset_type: AssetType, ) -> HashMap> { let mut source_query_ids: HashMap> = HashMap::new(); // Seen signal_ids let mut seen = HashSet::::new(); - let signal_ids = get_signal_ids_by_asset_type(registry.signal_ids(), asset_type); - - let mut queue = VecDeque::from_iter(signal_ids); + let mut queue = VecDeque::from_iter(registry.signal_ids()); while let Some(signal_id) = queue.pop_front() { if seen.contains(signal_id) { continue; @@ -93,19 +88,6 @@ pub fn get_source_batched_query_ids( source_query_ids } -fn get_signal_ids_by_asset_type<'a>( - signal_ids: impl Iterator, - asset_type: AssetType, -) -> HashSet<&'a String> { - let mut result = HashSet::new(); - for signal_id in signal_ids { - if signal_id.starts_with(asset_type.as_ref()) { - result.insert(signal_id); - } - } - result -} - #[cfg(test)] mod tests { use bothan_lib::registry::Invalid; @@ -121,7 +103,7 @@ mod tests { fn test_get_source_batched_query_ids() { let registry = mock_registry().validate().unwrap(); - let diff = get_source_batched_query_ids(®istry, AssetType::Crypto); + let diff = get_source_batched_query_ids(®istry); let expected = HashMap::from_iter([ ( "binance".to_string(), diff --git a/bothan-core/src/manager/asset_info/worker.rs b/bothan-core/src/manager/asset_info/worker.rs new file mode 100644 index 00000000..b59158e7 --- /dev/null +++ b/bothan-core/src/manager/asset_info/worker.rs @@ -0,0 +1,115 @@ +//! Worker trait and implementations for asset sources. + +pub mod opts; + +use std::collections::HashMap; + +use bothan_lib::registry::{Registry, Valid}; +use bothan_lib::store::Store; +use bothan_lib::worker::AssetWorker; +use bothan_lib::worker::error::AssetWorkerError; +use derive_more::From; +use tracing::{error, info}; + +use crate::manager::asset_info::signal_ids::get_source_batched_query_ids; +use crate::manager::asset_info::worker::opts::AssetWorkerOpts; + +#[derive(From)] +pub enum AnyAssetWorker { + Binance(bothan_binance::Worker), + Bitfinex(bothan_bitfinex::Worker), + Bybit(bothan_bybit::Worker), + Coinbase(bothan_coinbase::Worker), + CoinGecko(bothan_coingecko::Worker), + CoinMarketCap(bothan_coinmarketcap::Worker), + Htx(bothan_htx::Worker), + Kraken(bothan_kraken::Worker), + Okx(bothan_okx::Worker), + Band(bothan_band::Worker), +} + +#[async_trait::async_trait] +impl AssetWorker for AnyAssetWorker { + type Opts = AssetWorkerOpts; + + fn name(&self) -> &'static str { + match self { + AnyAssetWorker::Binance(w) => w.name(), + AnyAssetWorker::Bitfinex(w) => w.name(), + AnyAssetWorker::Bybit(w) => w.name(), + AnyAssetWorker::Coinbase(w) => w.name(), + AnyAssetWorker::CoinGecko(w) => w.name(), + AnyAssetWorker::CoinMarketCap(w) => w.name(), + AnyAssetWorker::Htx(w) => w.name(), + AnyAssetWorker::Kraken(w) => w.name(), + AnyAssetWorker::Okx(w) => w.name(), + AnyAssetWorker::Band(w) => w.name(), + } + } + + async fn build( + opts: Self::Opts, + store: &S, + ids: Vec, + ) -> Result { + Ok(match opts { + AssetWorkerOpts::Binance(opts) => { + AnyAssetWorker::from(bothan_binance::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Bitfinex(opts) => { + AnyAssetWorker::from(bothan_bitfinex::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Bybit(opts) => { + AnyAssetWorker::from(bothan_bybit::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Coinbase(opts) => { + AnyAssetWorker::from(bothan_coinbase::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::CoinGecko(opts) => { + AnyAssetWorker::from(bothan_coingecko::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::CoinMarketCap(opts) => { + AnyAssetWorker::from(bothan_coinmarketcap::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Htx(opts) => { + AnyAssetWorker::from(bothan_htx::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Kraken(opts) => { + AnyAssetWorker::from(bothan_kraken::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Okx(opts) => { + AnyAssetWorker::from(bothan_okx::Worker::build(opts, store, ids).await?) + } + AssetWorkerOpts::Band(opts) => { + AnyAssetWorker::from(bothan_band::Worker::build(opts, store, ids).await?) + } + }) + } +} + +pub async fn build_workers( + registry: &Registry, + opts: &HashMap, + store: S, +) -> Vec { + let mut workers = Vec::with_capacity(opts.len()); + for (source_id, query_id) in get_source_batched_query_ids(registry).drain() { + match opts.get(&source_id) { + Some(opts) => { + let ids = query_id.into_iter().collect(); + let builder_callable = AnyAssetWorker::build(opts.clone(), &store, ids); + let worker = match builder_callable.await { + Ok(worker) => worker, + Err(e) => { + error!("failed to build worker {}: {}", source_id, e); + continue; + } + }; + workers.push(worker); + } + None => info!("worker {} not activated", source_id), + } + } + + workers +} diff --git a/bothan-core/src/manager/asset_info/worker/opts.rs b/bothan-core/src/manager/asset_info/worker/opts.rs new file mode 100644 index 00000000..ec0df44c --- /dev/null +++ b/bothan-core/src/manager/asset_info/worker/opts.rs @@ -0,0 +1,97 @@ +//! Worker options for configuring asset source workers. + +use { + bothan_band, bothan_binance, bothan_bitfinex, bothan_bybit, bothan_coinbase, bothan_coingecko, + bothan_coinmarketcap, bothan_htx, bothan_kraken, bothan_okx, +}; + +#[derive(Clone)] +pub enum AssetWorkerOpts { + Binance(bothan_binance::WorkerOpts), + Bitfinex(bothan_bitfinex::WorkerOpts), + Bybit(bothan_bybit::WorkerOpts), + Coinbase(bothan_coinbase::WorkerOpts), + CoinGecko(bothan_coingecko::WorkerOpts), + CoinMarketCap(bothan_coinmarketcap::WorkerOpts), + Htx(bothan_htx::WorkerOpts), + Kraken(bothan_kraken::WorkerOpts), + Okx(bothan_okx::WorkerOpts), + Band(bothan_band::WorkerOpts), +} + +impl AssetWorkerOpts { + pub fn name(&self) -> &str { + match self { + AssetWorkerOpts::Binance(_) => "binance", + AssetWorkerOpts::Bitfinex(_) => "bitfinex", + AssetWorkerOpts::Bybit(_) => "bybit", + AssetWorkerOpts::Coinbase(_) => "coinbase", + AssetWorkerOpts::CoinGecko(_) => "coingecko", + AssetWorkerOpts::CoinMarketCap(_) => "coinmarketcap", + AssetWorkerOpts::Htx(_) => "htx", + AssetWorkerOpts::Kraken(_) => "kraken", + AssetWorkerOpts::Okx(_) => "okx", + AssetWorkerOpts::Band(opts) => opts.name(), + } + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_binance::WorkerOpts) -> Self { + AssetWorkerOpts::Binance(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_bitfinex::WorkerOpts) -> Self { + AssetWorkerOpts::Bitfinex(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_bybit::WorkerOpts) -> Self { + AssetWorkerOpts::Bybit(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_coinbase::WorkerOpts) -> Self { + AssetWorkerOpts::Coinbase(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_coingecko::WorkerOpts) -> Self { + AssetWorkerOpts::CoinGecko(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_coinmarketcap::WorkerOpts) -> Self { + AssetWorkerOpts::CoinMarketCap(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_htx::WorkerOpts) -> Self { + AssetWorkerOpts::Htx(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_kraken::WorkerOpts) -> Self { + AssetWorkerOpts::Kraken(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_okx::WorkerOpts) -> Self { + AssetWorkerOpts::Okx(value) + } +} + +impl From for AssetWorkerOpts { + fn from(value: bothan_band::WorkerOpts) -> Self { + AssetWorkerOpts::Band(value) + } +} diff --git a/bothan-htx/src/api/error.rs b/bothan-htx/src/api/error.rs index 50fa7039..721b4891 100644 --- a/bothan-htx/src/api/error.rs +++ b/bothan-htx/src/api/error.rs @@ -50,5 +50,6 @@ pub enum ListeningError { /// Indicates a failure to send a pong response to a ping message. #[error("failed to pong")] + #[allow(clippy::result_large_err)] PongFailed(#[from] tungstenite::Error), } diff --git a/bothan-htx/src/api/websocket.rs b/bothan-htx/src/api/websocket.rs index 09be7416..04ba1690 100644 --- a/bothan-htx/src/api/websocket.rs +++ b/bothan-htx/src/api/websocket.rs @@ -393,6 +393,8 @@ impl AssetInfoProvider for WebSocketConnection { /// Returns a `ListeningError` if: /// - The channel ID cannot be extracted from the channel name /// - The price data contains invalid values (NaN) +/// +#[allow(clippy::result_large_err)] fn parse_data(data: super::types::Data) -> Result { let ch = data.ch; let id = ch From f1a11f0a0a221b249bb858946340c5cdbe0800b9 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Tue, 10 Mar 2026 12:20:46 +0700 Subject: [PATCH 05/12] fix copilot comment --- Cargo.lock | 1 - Cargo.toml | 4 ---- bothan-core/Cargo.toml | 1 - bothan-core/src/manager.rs | 4 ++-- bothan-core/src/manager/asset_info/types.rs | 9 --------- bothan-htx/src/api/error.rs | 1 - 6 files changed, 2 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04265e06..f407068e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -680,7 +680,6 @@ dependencies = [ "semver", "serde", "serde_json", - "strum_macros", "thiserror 2.0.12", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 2e9336be..9f650d91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,8 +31,6 @@ bothan-htx = { path = "bothan-htx", version = "0.1.0" } bothan-kraken = { path = "bothan-kraken", version = "0.1.0" } bothan-okx = { path = "bothan-okx", version = "0.1.0" } bothan-band = { path = "bothan-band", version = "0.1.0" } - -anyhow = "1.0.86" async-trait = "0.1.77" bincode = "2.0.1" chrono = "0.4.39" @@ -46,8 +44,6 @@ mockito = "1.4.0" num-traits = "0.2.19" opentelemetry = { version = "0.28.0", features = ["metrics"] } prost = "0.13.1" -protoc-gen-prost = "0.4.0" -protoc-gen-tonic = "0.4.1" rand = "0.8.5" reqwest = { version = "0.12.3", features = ["json"] } rust_decimal = "1.10.2" diff --git a/bothan-core/Cargo.toml b/bothan-core/Cargo.toml index 3361e29e..d7e6d514 100644 --- a/bothan-core/Cargo.toml +++ b/bothan-core/Cargo.toml @@ -47,7 +47,6 @@ opentelemetry-prometheus = "0.28.0" opentelemetry_sdk = "0.28.0" prometheus = "0.13.4" rust-rocksdb = { version = "0.36.0", optional = true } -strum_macros = "0.27.2" [features] default = [] diff --git a/bothan-core/src/manager.rs b/bothan-core/src/manager.rs index 6c2234cc..c226a301 100644 --- a/bothan-core/src/manager.rs +++ b/bothan-core/src/manager.rs @@ -1,6 +1,6 @@ -//! Manager module for crypto asset information and workers. +//! Manager module for crypto and forex asset information and workers. //! -//! Provides the crypto asset info manager and related types. +//! Provides the crypto and forex asset info manager and related types. pub use asset_info::AssetInfoManager; diff --git a/bothan-core/src/manager/asset_info/types.rs b/bothan-core/src/manager/asset_info/types.rs index 3dd9c041..4b298e9b 100644 --- a/bothan-core/src/manager/asset_info/types.rs +++ b/bothan-core/src/manager/asset_info/types.rs @@ -4,7 +4,6 @@ use std::time::Duration; use bothan_lib::types::AssetInfo; use rust_decimal::Decimal; -use strum_macros::AsRefStr; use crate::monitoring::types::SignalComputationRecord; @@ -20,14 +19,6 @@ pub enum PriceState { Unsupported, } -#[derive(Debug, AsRefStr)] -pub enum AssetType { - #[strum(serialize = "CS")] - Crypto, - #[strum(serialize = "FS")] - Forex, -} - pub struct AssetManagerInfo { pub bothan_version: String, pub registry_hash: String, diff --git a/bothan-htx/src/api/error.rs b/bothan-htx/src/api/error.rs index 721b4891..50fa7039 100644 --- a/bothan-htx/src/api/error.rs +++ b/bothan-htx/src/api/error.rs @@ -50,6 +50,5 @@ pub enum ListeningError { /// Indicates a failure to send a pong response to a ping message. #[error("failed to pong")] - #[allow(clippy::result_large_err)] PongFailed(#[from] tungstenite::Error), } From b5afb9ffd95f196829d58483afd97c946420a867 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Thu, 19 Mar 2026 14:14:51 +0700 Subject: [PATCH 06/12] add serde default to forex config --- bothan-api/server/src/config/manager.rs | 2 ++ bothan-core/src/manager/asset_info/crypto.rs | 1 - bothan-core/src/manager/asset_info/forex.rs | 1 - bothan-htx/src/api/websocket.rs | 1 - 4 files changed, 2 insertions(+), 3 deletions(-) delete mode 100644 bothan-core/src/manager/asset_info/crypto.rs delete mode 100644 bothan-core/src/manager/asset_info/forex.rs diff --git a/bothan-api/server/src/config/manager.rs b/bothan-api/server/src/config/manager.rs index 695bceb1..f88931fa 100644 --- a/bothan-api/server/src/config/manager.rs +++ b/bothan-api/server/src/config/manager.rs @@ -24,5 +24,7 @@ pub struct ManagerConfig { /// The configuration for the crypto info manager. pub crypto: CryptoInfoManagerConfig, /// The configuration for the forex info manager. + /// Note: This field is optional and will be set to default if not provided. + #[serde(default)] pub forex: ForexInfoManagerConfig, } diff --git a/bothan-core/src/manager/asset_info/crypto.rs b/bothan-core/src/manager/asset_info/crypto.rs deleted file mode 100644 index 2c8b8399..00000000 --- a/bothan-core/src/manager/asset_info/crypto.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod worker; diff --git a/bothan-core/src/manager/asset_info/forex.rs b/bothan-core/src/manager/asset_info/forex.rs deleted file mode 100644 index 2c8b8399..00000000 --- a/bothan-core/src/manager/asset_info/forex.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod worker; diff --git a/bothan-htx/src/api/websocket.rs b/bothan-htx/src/api/websocket.rs index 04ba1690..d04a2c2d 100644 --- a/bothan-htx/src/api/websocket.rs +++ b/bothan-htx/src/api/websocket.rs @@ -375,7 +375,6 @@ impl AssetInfoProvider for WebSocketConnection { } /// Parses market data from the HTX WebSocket API into `AssetInfo`. -/// /// This function extracts the asset identifier from the channel name and creates /// an `AssetInfo` instance with the last price and timestamp from the ticker data. /// From b3e2568fae2e12b6ced9d73bb06ad5d8ed13e166 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Wed, 25 Mar 2026 13:27:43 +0700 Subject: [PATCH 07/12] add macro deserialize comment --- .../server/src/config/manager/forex_info/sources.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/bothan-api/server/src/config/manager/forex_info/sources.rs b/bothan-api/server/src/config/manager/forex_info/sources.rs index 05b4cf29..7568bdb4 100644 --- a/bothan-api/server/src/config/manager/forex_info/sources.rs +++ b/bothan-api/server/src/config/manager/forex_info/sources.rs @@ -8,17 +8,29 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ForexSourceConfigs { /// Band/owlet worker options. + /// + /// NOTE: The `name` field in `WorkerOpts` is marked with `#[serde(skip)]`, so deserialized instances + /// will have an empty/default name. The custom deserializer `de_owlet` reconstructs the options #[serde(default, deserialize_with = "de_owlet")] pub band_owlet: Option, /// Band/fieldfare worker options. + /// + /// NOTE: The `name` field in `WorkerOpts` is marked with `#[serde(skip)]`, so deserialized instances + /// will have an empty/default name. The custom deserializer `de_fieldfare` reconstructs the options #[serde(default, deserialize_with = "de_fieldfare")] pub band_fieldfare: Option, /// Band/xenops worker options. + /// + /// NOTE: The `name` field in `WorkerOpts` is marked with `#[serde(skip)]`, so deserialized instances + /// will have an empty/default name. The custom deserializer `de_xenops` reconstructs the options #[serde(default, deserialize_with = "de_xenops")] pub band_xenops: Option, } // Macro to generate deserialization functions for Band workers with preset names. +// This macro defines a function that: +// - Deserializes an Option, +// - If present, creates a new WorkerOpts with the given name and original URL/update_interval. macro_rules! de_band_named { ($fn_name:ident, $name:expr) => { fn $fn_name<'de, D>(d: D) -> Result, D::Error> From 773a5eee20886c4fab6a1ed4de089732bfdfacd3 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Thu, 26 Mar 2026 14:11:38 +0700 Subject: [PATCH 08/12] refactor macro --- bothan-api/server/src/config/manager.rs | 2 ++ .../server/src/config/manager/band_serde.rs | 19 +++++++++++++++++++ .../src/config/manager/crypto_info/sources.rs | 19 ++----------------- .../src/config/manager/forex_info/sources.rs | 19 ++----------------- 4 files changed, 25 insertions(+), 34 deletions(-) create mode 100644 bothan-api/server/src/config/manager/band_serde.rs diff --git a/bothan-api/server/src/config/manager.rs b/bothan-api/server/src/config/manager.rs index f88931fa..51d887ff 100644 --- a/bothan-api/server/src/config/manager.rs +++ b/bothan-api/server/src/config/manager.rs @@ -13,6 +13,8 @@ use crypto_info::CryptoInfoManagerConfig; use forex_info::ForexInfoManagerConfig; use serde::{Deserialize, Serialize}; +/// Shared Band worker serde helpers. +pub(crate) mod band_serde; /// Crypto info manager configuration module. pub mod crypto_info; /// Forex info manager configuration module. diff --git a/bothan-api/server/src/config/manager/band_serde.rs b/bothan-api/server/src/config/manager/band_serde.rs new file mode 100644 index 00000000..1008476a --- /dev/null +++ b/bothan-api/server/src/config/manager/band_serde.rs @@ -0,0 +1,19 @@ +// Macro to generate deserialization functions for Band workers with preset names. +// This macro defines a function that: +// - Deserializes an Option, +// - If present, creates a new WorkerOpts with the given name and original URL/update_interval. +macro_rules! de_band_named { + ($fn_name:ident, $name:expr) => { + fn $fn_name<'de, D>(d: D) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + use serde::Deserialize; + let v = Option::::deserialize(d)?; + let v = v.map(|w| bothan_band::WorkerOpts::new($name, &w.url, Some(w.update_interval))); + Ok(v) + } + }; +} + +pub(crate) use de_band_named; diff --git a/bothan-api/server/src/config/manager/crypto_info/sources.rs b/bothan-api/server/src/config/manager/crypto_info/sources.rs index 6fc52559..52163c64 100644 --- a/bothan-api/server/src/config/manager/crypto_info/sources.rs +++ b/bothan-api/server/src/config/manager/crypto_info/sources.rs @@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize}; +use crate::config::manager::band_serde::de_band_named; + /// Configuration for the worker sources for crypto asset info. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CryptoSourceConfigs { @@ -46,23 +48,6 @@ pub struct CryptoSourceConfigs { pub band_macaw: Option, } -// Macro to generate deserialization functions for Band workers with preset names. -// This macro defines a function that: -// - Deserializes an Option, -// - If present, creates a new WorkerOpts with the given name and original URL/update_interval. -macro_rules! de_band_named { - ($fn_name:ident, $name:expr) => { - fn $fn_name<'de, D>(d: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, - { - let v = Option::::deserialize(d)?; - let v = v.map(|w| bothan_band::WorkerOpts::new($name, &w.url, Some(w.update_interval))); - Ok(v) - } - }; -} - const BAND1_WORKER_NAME: &str = "band/kiwi"; de_band_named!(de_kiwi, BAND1_WORKER_NAME); diff --git a/bothan-api/server/src/config/manager/forex_info/sources.rs b/bothan-api/server/src/config/manager/forex_info/sources.rs index 7568bdb4..a0a331e8 100644 --- a/bothan-api/server/src/config/manager/forex_info/sources.rs +++ b/bothan-api/server/src/config/manager/forex_info/sources.rs @@ -4,6 +4,8 @@ use serde::{Deserialize, Serialize}; +use crate::config::manager::band_serde::de_band_named; + /// Configuration for the worker sources for forex asset info. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ForexSourceConfigs { @@ -27,23 +29,6 @@ pub struct ForexSourceConfigs { pub band_xenops: Option, } -// Macro to generate deserialization functions for Band workers with preset names. -// This macro defines a function that: -// - Deserializes an Option, -// - If present, creates a new WorkerOpts with the given name and original URL/update_interval. -macro_rules! de_band_named { - ($fn_name:ident, $name:expr) => { - fn $fn_name<'de, D>(d: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, - { - let v = Option::::deserialize(d)?; - let v = v.map(|w| bothan_band::WorkerOpts::new($name, &w.url, Some(w.update_interval))); - Ok(v) - } - }; -} - const BAND1_WORKER_NAME: &str = "band/owlet"; de_band_named!(de_owlet, BAND1_WORKER_NAME); From 497bbb2ac485a969b035f0d3c5318546a6af14d9 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Fri, 17 Apr 2026 13:17:59 +0700 Subject: [PATCH 09/12] update stale_threshold --- bothan-api/server-cli/config.example.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bothan-api/server-cli/config.example.toml b/bothan-api/server-cli/config.example.toml index 96d3f1f7..5f53f266 100644 --- a/bothan-api/server-cli/config.example.toml +++ b/bothan-api/server-cli/config.example.toml @@ -44,7 +44,7 @@ stale_threshold = 300 # Manager configuration for handling forex data sources [manager.forex] # The threshold in seconds after which data is considered stale. -stale_threshold = 3600 +stale_threshold = 7200 # Configuration for the data sources that the manager will use. # If any of these [manager.crypto.source] sections (e.g., section manager.crypto.source.binance) are removed, From 48c431de3d934d683673bd069c102a1d4b5874c0 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Fri, 17 Apr 2026 15:41:06 +0700 Subject: [PATCH 10/12] fix test failed --- bothan-bitfinex/src/api/rest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bothan-bitfinex/src/api/rest.rs b/bothan-bitfinex/src/api/rest.rs index 2cf87c99..b00b6311 100644 --- a/bothan-bitfinex/src/api/rest.rs +++ b/bothan-bitfinex/src/api/rest.rs @@ -86,7 +86,7 @@ impl RestApi { /// /// # Examples /// - /// ```rust + /// ```rust,no_run /// use bothan_bitfinex::api::rest::RestApi; /// use reqwest::Client; /// use url::Url; From 8976ed03272ab541a05f67b9e4b23ff4c60594e1 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Fri, 17 Apr 2026 16:33:39 +0700 Subject: [PATCH 11/12] change mod --- bothan-core/src/manager/asset_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bothan-core/src/manager/asset_info.rs b/bothan-core/src/manager/asset_info.rs index 8aa59f93..d9adfa2e 100644 --- a/bothan-core/src/manager/asset_info.rs +++ b/bothan-core/src/manager/asset_info.rs @@ -11,4 +11,4 @@ pub(super) mod manager; pub(super) mod price; pub(super) mod signal_ids; pub mod types; -pub mod worker; +pub(super) mod worker; From 87d5f9c50529df4415544d6452f1c7be00deabf2 Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Fri, 17 Apr 2026 17:53:39 +0700 Subject: [PATCH 12/12] change prefix logic --- bothan-api/server-cli/src/commands/start.rs | 7 +--- bothan-core/src/manager/asset_info/manager.rs | 4 +- .../src/manager/asset_info/price/tasks.rs | 40 ++++++++++--------- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/bothan-api/server-cli/src/commands/start.rs b/bothan-api/server-cli/src/commands/start.rs index 4d684592..6cf8f039 100644 --- a/bothan-api/server-cli/src/commands/start.rs +++ b/bothan-api/server-cli/src/commands/start.rs @@ -254,11 +254,8 @@ async fn init_bothan_server( fn init_prefix_stale_thresholds( crypto_stale_threshold: i64, forex_stale_threshold: i64, -) -> HashMap { - HashMap::from([ - ("CS".to_string(), crypto_stale_threshold), - ("FS".to_string(), forex_stale_threshold), - ]) +) -> HashMap { + HashMap::from([('C', crypto_stale_threshold), ('F', forex_stale_threshold)]) } async fn init_crypto_opts( diff --git a/bothan-core/src/manager/asset_info/manager.rs b/bothan-core/src/manager/asset_info/manager.rs index 140c58ec..a1f1191f 100644 --- a/bothan-core/src/manager/asset_info/manager.rs +++ b/bothan-core/src/manager/asset_info/manager.rs @@ -34,7 +34,7 @@ pub struct AssetInfoManager { store: S, worker_opts: HashMap, workers: Mutex>, - prefix_stale_thresholds: HashMap, + prefix_stale_thresholds: HashMap, ipfs_client: IpfsClient, bothan_version: Version, registry_version_requirement: VersionReq, @@ -50,7 +50,7 @@ impl AssetInfoManager { store: S, worker_opts: HashMap, ipfs_client: IpfsClient, - prefix_stale_thresholds: HashMap, + prefix_stale_thresholds: HashMap, bothan_version: Version, registry_version_requirement: VersionReq, monitoring_client: Option>, diff --git a/bothan-core/src/manager/asset_info/price/tasks.rs b/bothan-core/src/manager/asset_info/price/tasks.rs index f06c31e4..b9ede272 100644 --- a/bothan-core/src/manager/asset_info/price/tasks.rs +++ b/bothan-core/src/manager/asset_info/price/tasks.rs @@ -23,13 +23,15 @@ use crate::manager::asset_info::price::error::{Error, MissingPrerequisiteError}; use crate::manager::asset_info::types::{PriceSignalComputationRecord, PriceState}; use crate::monitoring::types::{OperationRecord, ProcessRecord, SourceRecord}; +const DEFAULT_STALE_TIME: i64 = 600; // 10 minutes + // TODO: Allow records to be Option /// Computes the price states for a list of signal ids. pub async fn get_signal_price_states( ids: Vec, store: &S, registry: &Registry, - prefix_stale_thresholds: &HashMap, + prefix_stale_thresholds: &HashMap, records: &mut Vec, metrics: &Metrics, ) -> Vec { @@ -100,16 +102,14 @@ async fn compute_signal_result( store: &S, registry: &Registry, current_time: i64, - prefix_stale_thresholds: &HashMap, + prefix_stale_thresholds: &HashMap, cache: &PriceCache, record: &mut PriceSignalComputationRecord, metrics: &Metrics, ) -> Result { - match ( - registry.get(id), - get_stale_cutoff(id, current_time, prefix_stale_thresholds), - ) { - (Some(signal), Some(stale_cutoff)) => { + match registry.get(id) { + Some(signal) => { + let stale_cutoff = current_time - get_stale_time(id, prefix_stale_thresholds); let source_results = compute_source_result(signal, store, cache, stale_cutoff, record, metrics).await?; @@ -148,15 +148,17 @@ async fn compute_signal_result( } } -fn get_stale_cutoff( - id: &str, - current_time: i64, - prefix_stale_thresholds: &HashMap, -) -> Option { - let prefix = id.split(':').next()?; - prefix_stale_thresholds - .get(prefix) - .map(|threshold| current_time - threshold) +fn get_stale_time(id: &str, prefix_stale_thresholds: &HashMap) -> i64 { + match id.split(':').next() { + Some(prefix) => match prefix.chars().next() { + Some(first_letter) => prefix_stale_thresholds + .get(&first_letter) + .cloned() + .unwrap_or(DEFAULT_STALE_TIME), + None => DEFAULT_STALE_TIME, + }, + None => DEFAULT_STALE_TIME, + } } async fn compute_source_result( @@ -421,7 +423,7 @@ mod tests { let registry = mock_registry(); let current_timestamp = chrono::Utc::now().timestamp(); let mut prefix_stale_thresholds = HashMap::new(); - prefix_stale_thresholds.insert("CS".to_string(), current_timestamp); + prefix_stale_thresholds.insert('C', current_timestamp); let mut records = Vec::new(); let metrics = Metrics::new(); @@ -507,7 +509,7 @@ mod tests { let registry = mock_registry(); let current_timestamp = chrono::Utc::now().timestamp(); let mut prefix_stale_thresholds = HashMap::new(); - prefix_stale_thresholds.insert("CS".to_string(), current_timestamp); + prefix_stale_thresholds.insert('C', current_timestamp); let mut records = Vec::new(); let metrics = Metrics::new(); let res = get_signal_price_states( @@ -594,7 +596,7 @@ mod tests { let registry = mock_registry(); let current_timestamp = chrono::Utc::now().timestamp(); let mut prefix_stale_thresholds = HashMap::new(); - prefix_stale_thresholds.insert("CS".to_string(), current_timestamp - 10000); + prefix_stale_thresholds.insert('C', current_timestamp - 10000); let mut records = Vec::new(); let metrics = Metrics::new();